servant_codec/length_codec/
length.rs

1// -- length.rs --
2
3use {
4    // byteorder::{BigEndian, ByteOrder},
5    bytes::{Buf, Bytes, BytesMut},
6    futures_codec::{Decoder, Encoder},
7    std::io::Error,
8    crate::utility::Length,
9};
10
11// --
12
13#[derive(Default, Clone, Copy)]
14pub struct LengthCodec<T: Length>(std::marker::PhantomData<T>);
15/*
16unsafe impl<T: Length> Send for LengthCodec<T> {}
17unsafe impl<T: Length> Sync for LengthCodec<T> {}
18impl<T: Length> Unpin for LengthCodec<T> {}
19impl<T: Length> std::panic::UnwindSafe for LengthCodec<T> {}
20impl<T: Length> std::panic::RefUnwindSafe for LengthCodec<T> {}
21*/
22impl<T: Length> Encoder for LengthCodec<T> {
23    type Item = Bytes;
24    type Error = Error;
25
26    fn encode(&mut self, src: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
27        let head_len = std::mem::size_of_val(&T::from_usize(Default::default()));
28        dst.reserve(head_len + src.len());
29        T::from_usize(src.len()).put(dst);
30        dst.extend_from_slice(&src);
31        Ok(())
32    }
33}
34
35impl<T: Length> Decoder for LengthCodec<T> {
36    type Item = Bytes;
37    type Error = Error;
38
39    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
40        let head_len = std::mem::size_of_val(&T::from_usize(Default::default()));
41        if src.len() < head_len {
42            return Ok(None);
43        }
44
45        let len: usize = T::get(&src[..head_len]).to_usize();
46        if src.len() - head_len >= len {
47            src.advance(head_len);
48            Ok(Some(src.split_to(len).freeze()))
49        } else {
50            Ok(None)
51        }
52    }
53}
54
55// --
56
57#[cfg(test)]
58mod tests {
59    extern crate test_case;
60
61    use crate::utility::type_of;
62    use super::*;
63    use futures::{executor, io::Cursor, sink::SinkExt, TryStreamExt};
64    use futures_codec::{BytesCodec, Framed, FramedRead, FramedWrite};
65    use test_case::test_case;
66
67    // --
68
69    #[test_case(Bytes::from("Hello World!") => vec![72, 101, 108, 108, 111, 32, 87, 111, 114, 108, 100, 33]; "hello")]
70    #[test_case(Bytes::from("this is a new world!") => vec![116, 104, 105, 115, 32, 105, 115, 32, 97, 32, 110, 101, 119, 32, 119, 111, 114, 108, 100, 33]; "new_world")]
71    fn bytes_codec_encode(msg: Bytes) -> Vec<u8> {
72        executor::block_on(async move {
73            let mut buf = vec![];
74            let cur = Cursor::new(&mut buf);
75            let mut framed = Framed::new(cur, BytesCodec {});
76
77            framed.send(msg.clone()).await.unwrap();
78            println!("\nbuf: {} = {:?}", type_of(&&buf), buf);
79            buf
80        })
81    }
82
83    #[test_case(vec![72, 101, 108, 108, 111, 32, 87, 111, 114, 108, 100, 33] => Bytes::from("Hello World!"); "hello")]
84    #[test_case(vec![116, 104, 105, 115, 32, 105, 115, 32, 97, 32, 110, 101, 119, 32, 119, 111, 114, 108, 100, 33] => Bytes::from("this is a new world!"); "new_world")]
85    fn bytes_codec_decode(mut buf: Vec<u8>) -> Bytes {
86        executor::block_on(async move {
87            let cur = Cursor::new(&mut buf);
88            let mut framed = Framed::new(cur, BytesCodec {});
89            if let Some(msg) = framed.try_next().await.unwrap() {
90                println!("\nmsg: {} = {:?}", type_of(&msg), msg);
91                msg
92            } else {
93                Bytes::new()
94            }
95        })
96    }
97
98    #[test]
99    fn t_bytes_codec() {
100        executor::block_on(async move {
101            let mut buf = vec![];
102            let cur = Cursor::new(&mut buf);
103            let mut framed = Framed::new(cur, BytesCodec {});
104
105            let msg = Bytes::from("Hello World!");
106            framed.send(msg.clone()).await.unwrap();
107            println!("buf: {:?}", buf);
108
109            let cur = Cursor::new(&mut buf);
110            let mut framed2 = Framed::new(cur, BytesCodec {});
111            while let Some(msg2) = framed2.try_next().await.unwrap() {
112                println!("msg: {:?}", msg2);
113                assert_eq!(msg, msg2);
114            }
115        });
116    }
117
118    #[test]
119    fn t_length_u8_codec() {
120        executor::block_on(async move {
121            let mut buf = vec![];
122            let cur = Cursor::new(&mut buf);
123            let mut framed = FramedWrite::new(cur, LengthCodec::<u8>::default());
124
125            let msg = Bytes::from("Hello World!");
126            framed.send(msg.clone()).await.unwrap();
127            println!("buf: {:?}", buf);
128
129            let mut framed2 = FramedRead::new(&buf[..], LengthCodec::<u8>::default());
130            let msg2 = framed2.try_next().await.unwrap().unwrap();
131            println!("msg: {:?}", msg2);
132
133            assert_eq!(msg, msg2);
134        });
135    }
136    #[test]
137    fn t_length_u16_codec() {
138        executor::block_on(async move {
139            let mut buf = vec![];
140            let cur = Cursor::new(&mut buf);
141            let mut framed = FramedWrite::new(cur, LengthCodec::<u16>::default());
142
143            let msg = Bytes::from("Hello World!");
144            framed.send(msg.clone()).await.unwrap();
145            println!("buf: {:?}", buf);
146
147            let mut framed2 = FramedRead::new(&buf[..], LengthCodec::<u16>::default());
148            let msg2 = framed2.try_next().await.unwrap().unwrap();
149            println!("msg: {:?}", msg2);
150
151            assert_eq!(msg, msg2);
152        });
153    }
154    #[test]
155    fn t_length_u32_codec() {
156        executor::block_on(async move {
157            let mut buf = vec![];
158            let cur = Cursor::new(&mut buf);
159            let mut framed = FramedWrite::new(cur, LengthCodec::<u32>::default());
160
161            let msg = Bytes::from("Hello World!");
162            framed.send(msg.clone()).await.unwrap();
163            println!("buf: {:?}", buf);
164
165            let mut framed2 = FramedRead::new(&buf[..], LengthCodec::<u32>::default());
166            let msg2 = framed2.try_next().await.unwrap().unwrap();
167            println!("msg: {:?}", msg2);
168
169            assert_eq!(msg, msg2);
170        });
171    }
172    #[test]
173    fn t_length_u64_codec() {
174        executor::block_on(async move {
175            let mut buf = vec![];
176            let cur = Cursor::new(&mut buf);
177            let mut framed = FramedWrite::new(cur, LengthCodec::<u64>::default());
178
179            let msg = Bytes::from("Hello World!");
180            framed.send(msg.clone()).await.unwrap();
181            println!("buf: {:?}", buf);
182
183            let mut framed2 = FramedRead::new(&buf[..], LengthCodec::<u64>::default());
184            let msg2 = framed2.try_next().await.unwrap().unwrap();
185            println!("msg: {:?}", msg2);
186
187            assert_eq!(msg, msg2);
188        });
189    }
190}