servant_codec/length_codec/
length.rs1use {
4 bytes::{Buf, Bytes, BytesMut},
6 futures_codec::{Decoder, Encoder},
7 std::io::Error,
8 crate::utility::Length,
9};
10
11#[derive(Default, Clone, Copy)]
14pub struct LengthCodec<T: Length>(std::marker::PhantomData<T>);
15impl<T: Length> Encoder for LengthCodec<T> {
23 type Item = Bytes;
24 type Error = Error;
25
26 fn encode(&mut self, src: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
27 let head_len = std::mem::size_of_val(&T::from_usize(Default::default()));
28 dst.reserve(head_len + src.len());
29 T::from_usize(src.len()).put(dst);
30 dst.extend_from_slice(&src);
31 Ok(())
32 }
33}
34
35impl<T: Length> Decoder for LengthCodec<T> {
36 type Item = Bytes;
37 type Error = Error;
38
39 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
40 let head_len = std::mem::size_of_val(&T::from_usize(Default::default()));
41 if src.len() < head_len {
42 return Ok(None);
43 }
44
45 let len: usize = T::get(&src[..head_len]).to_usize();
46 if src.len() - head_len >= len {
47 src.advance(head_len);
48 Ok(Some(src.split_to(len).freeze()))
49 } else {
50 Ok(None)
51 }
52 }
53}
54
55#[cfg(test)]
58mod tests {
59 extern crate test_case;
60
61 use crate::utility::type_of;
62 use super::*;
63 use futures::{executor, io::Cursor, sink::SinkExt, TryStreamExt};
64 use futures_codec::{BytesCodec, Framed, FramedRead, FramedWrite};
65 use test_case::test_case;
66
67 #[test_case(Bytes::from("Hello World!") => vec![72, 101, 108, 108, 111, 32, 87, 111, 114, 108, 100, 33]; "hello")]
70 #[test_case(Bytes::from("this is a new world!") => vec![116, 104, 105, 115, 32, 105, 115, 32, 97, 32, 110, 101, 119, 32, 119, 111, 114, 108, 100, 33]; "new_world")]
71 fn bytes_codec_encode(msg: Bytes) -> Vec<u8> {
72 executor::block_on(async move {
73 let mut buf = vec![];
74 let cur = Cursor::new(&mut buf);
75 let mut framed = Framed::new(cur, BytesCodec {});
76
77 framed.send(msg.clone()).await.unwrap();
78 println!("\nbuf: {} = {:?}", type_of(&&buf), buf);
79 buf
80 })
81 }
82
83 #[test_case(vec![72, 101, 108, 108, 111, 32, 87, 111, 114, 108, 100, 33] => Bytes::from("Hello World!"); "hello")]
84 #[test_case(vec![116, 104, 105, 115, 32, 105, 115, 32, 97, 32, 110, 101, 119, 32, 119, 111, 114, 108, 100, 33] => Bytes::from("this is a new world!"); "new_world")]
85 fn bytes_codec_decode(mut buf: Vec<u8>) -> Bytes {
86 executor::block_on(async move {
87 let cur = Cursor::new(&mut buf);
88 let mut framed = Framed::new(cur, BytesCodec {});
89 if let Some(msg) = framed.try_next().await.unwrap() {
90 println!("\nmsg: {} = {:?}", type_of(&msg), msg);
91 msg
92 } else {
93 Bytes::new()
94 }
95 })
96 }
97
98 #[test]
99 fn t_bytes_codec() {
100 executor::block_on(async move {
101 let mut buf = vec![];
102 let cur = Cursor::new(&mut buf);
103 let mut framed = Framed::new(cur, BytesCodec {});
104
105 let msg = Bytes::from("Hello World!");
106 framed.send(msg.clone()).await.unwrap();
107 println!("buf: {:?}", buf);
108
109 let cur = Cursor::new(&mut buf);
110 let mut framed2 = Framed::new(cur, BytesCodec {});
111 while let Some(msg2) = framed2.try_next().await.unwrap() {
112 println!("msg: {:?}", msg2);
113 assert_eq!(msg, msg2);
114 }
115 });
116 }
117
118 #[test]
119 fn t_length_u8_codec() {
120 executor::block_on(async move {
121 let mut buf = vec![];
122 let cur = Cursor::new(&mut buf);
123 let mut framed = FramedWrite::new(cur, LengthCodec::<u8>::default());
124
125 let msg = Bytes::from("Hello World!");
126 framed.send(msg.clone()).await.unwrap();
127 println!("buf: {:?}", buf);
128
129 let mut framed2 = FramedRead::new(&buf[..], LengthCodec::<u8>::default());
130 let msg2 = framed2.try_next().await.unwrap().unwrap();
131 println!("msg: {:?}", msg2);
132
133 assert_eq!(msg, msg2);
134 });
135 }
136 #[test]
137 fn t_length_u16_codec() {
138 executor::block_on(async move {
139 let mut buf = vec![];
140 let cur = Cursor::new(&mut buf);
141 let mut framed = FramedWrite::new(cur, LengthCodec::<u16>::default());
142
143 let msg = Bytes::from("Hello World!");
144 framed.send(msg.clone()).await.unwrap();
145 println!("buf: {:?}", buf);
146
147 let mut framed2 = FramedRead::new(&buf[..], LengthCodec::<u16>::default());
148 let msg2 = framed2.try_next().await.unwrap().unwrap();
149 println!("msg: {:?}", msg2);
150
151 assert_eq!(msg, msg2);
152 });
153 }
154 #[test]
155 fn t_length_u32_codec() {
156 executor::block_on(async move {
157 let mut buf = vec![];
158 let cur = Cursor::new(&mut buf);
159 let mut framed = FramedWrite::new(cur, LengthCodec::<u32>::default());
160
161 let msg = Bytes::from("Hello World!");
162 framed.send(msg.clone()).await.unwrap();
163 println!("buf: {:?}", buf);
164
165 let mut framed2 = FramedRead::new(&buf[..], LengthCodec::<u32>::default());
166 let msg2 = framed2.try_next().await.unwrap().unwrap();
167 println!("msg: {:?}", msg2);
168
169 assert_eq!(msg, msg2);
170 });
171 }
172 #[test]
173 fn t_length_u64_codec() {
174 executor::block_on(async move {
175 let mut buf = vec![];
176 let cur = Cursor::new(&mut buf);
177 let mut framed = FramedWrite::new(cur, LengthCodec::<u64>::default());
178
179 let msg = Bytes::from("Hello World!");
180 framed.send(msg.clone()).await.unwrap();
181 println!("buf: {:?}", buf);
182
183 let mut framed2 = FramedRead::new(&buf[..], LengthCodec::<u64>::default());
184 let msg2 = framed2.try_next().await.unwrap().unwrap();
185 println!("msg: {:?}", msg2);
186
187 assert_eq!(msg, msg2);
188 });
189 }
190}