fluvio_protocol_codec/
codec.rs1use std::io::Cursor;
2use std::io::Error as IoError;
3
4use log::trace;
5use tokio_util::codec::Decoder;
6use tokio_util::codec::Encoder;
7
8use crate::core::Decoder as FluvioDecoder;
9use crate::core::Encoder as FluvioEncoder;
10use crate::core::bytes::{Bytes, BytesMut, BufMut};
11use crate::core::Version;
12
13#[derive(Debug, Default)]
17pub struct FluvioCodec {}
18
19pub type FluvioCodecData<T> = (T, Version);
22
23impl FluvioCodec {
24 pub fn new() -> Self {
25 Self {}
26 }
27}
28
29impl Decoder for FluvioCodec {
30 type Item = BytesMut;
31 type Error = IoError;
32
33 fn decode(&mut self, bytes: &mut BytesMut) -> Result<Option<BytesMut>, Self::Error> {
34 let len = bytes.len();
35 if len == 0 {
36 return Ok(None);
37 }
38 if len >= 4 {
39 let mut src = Cursor::new(&*bytes);
40 let mut packet_len: i32 = 0;
41 packet_len.decode(&mut src, 0)?;
42 trace!(
43 "Decoder: received buffer: {}, message size: {}",
44 len,
45 packet_len
46 );
47 if (packet_len + 4) as usize <= bytes.len() {
48 trace!(
49 "Decoder: all packets are in buffer len: {}, excess {}",
50 packet_len + 4,
51 bytes.len() - (packet_len + 4) as usize
52 );
53 let mut buf = bytes.split_to((packet_len + 4) as usize);
54 let message = buf.split_off(4); Ok(Some(message))
56 } else {
57 trace!(
58 "Decoder buffer len: {} is less than packet+4: {}, waiting",
59 len,
60 packet_len + 4
61 );
62 Ok(None)
63 }
64 } else {
65 trace!(
66 "Decoder received raw bytes len: {} less than 4 not enough for size",
67 len
68 );
69 Ok(None)
70 }
71 }
72}
73
74impl Encoder<Bytes> for FluvioCodec {
77 type Error = IoError;
78
79 fn encode(&mut self, data: Bytes, buf: &mut BytesMut) -> Result<(), IoError> {
80 trace!("Encoder: Encoding raw data with {} bytes", data.len());
81 buf.put(data);
82 Ok(())
83 }
84}
85
86impl<T: FluvioEncoder> Encoder<FluvioCodecData<T>> for FluvioCodec {
88 type Error = IoError;
89
90 fn encode(&mut self, src: FluvioCodecData<T>, buf: &mut BytesMut) -> Result<(), IoError> {
91 let (src, version) = src;
92
93 let size = src.write_size(version) as i32;
94 trace!("encoding data with {} bytes.", size);
95 buf.reserve(4 + size as usize);
96
97 let mut len_slice = Vec::new();
100 size.encode(&mut len_slice, version)?;
101 buf.extend_from_slice(&len_slice);
102 buf.extend_from_slice(&src.as_bytes(version)?);
103
104 Ok(())
105 }
106}
107
108#[cfg(test)]
109mod test {
110
111 use std::io::Error;
112 use std::net::SocketAddr;
113 use std::time;
114
115 use futures::future::join;
116 use futures::sink::SinkExt;
117 use futures::stream::StreamExt;
118 use tokio_util::codec::Framed;
119 use tokio_util::compat::FuturesAsyncReadCompatExt;
120
121 use fluvio_future::net::TcpListener;
122 use fluvio_future::net::TcpStream;
123 use fluvio_future::timer::sleep;
124 use futures::AsyncWriteExt;
125 use fluvio_protocol::Decoder as FluvioDecoder;
126 use fluvio_protocol::Encoder as FluvioEncoder;
127 use log::debug;
128
129 use super::FluvioCodec;
130
131 async fn run_server_raw_data<T: FluvioEncoder>(
132 data: T,
133 addr: &SocketAddr,
134 ) -> Result<(), Error> {
135 debug!("server: binding");
136 let listener = TcpListener::bind(&addr).await.expect("bind");
137 debug!("server: successfully binding. waiting for incoming");
138 let mut incoming = listener.incoming();
139 if let Some(stream) = incoming.next().await {
140 debug!("server: got connection from client");
141 let mut tcp_stream = stream.expect("stream");
142
143 let mut len_buf = vec![];
145 let message_size = data.write_size(0) as i32;
146 message_size.encode(&mut len_buf, 0).expect("encoding len");
147 tcp_stream.write(&len_buf).await?;
148
149 let encoded_data = data.as_bytes(0).expect("encoding data");
150 tcp_stream.write(&encoded_data).await?;
151
152 let mut len_buf = vec![];
155 let message_size = data.write_size(0) as i32;
156 message_size.encode(&mut len_buf, 0).expect("encoding len");
157 tcp_stream.write(&len_buf).await?;
158
159 let mut encoded_data = data.as_bytes(0).expect("encoding data");
160 let buf2 = encoded_data.split_off(3);
161 tcp_stream.write(&encoded_data).await?;
162 fluvio_future::timer::sleep(time::Duration::from_millis(10)).await;
163 tcp_stream.write(&buf2).await?;
164 }
165 fluvio_future::timer::sleep(time::Duration::from_millis(50)).await;
166 debug!("finishing. terminating server");
167 Ok(())
168 }
169
170 async fn run_server_object<T: FluvioEncoder + Clone>(
171 data: T,
172 addr: &SocketAddr,
173 ) -> Result<(), Error> {
174 debug!("server: binding");
175 let listener = TcpListener::bind(&addr).await.expect("bind");
176 debug!("server: successfully binding. waiting for incoming");
177 let mut incoming = listener.incoming();
178 if let Some(stream) = incoming.next().await {
179 debug!("server: got connection from client");
180 let tcp_stream = stream.expect("stream");
181
182 let framed = Framed::new(tcp_stream.compat(), FluvioCodec {});
183 let (mut sink, _) = framed.split();
184
185 for _ in 0..2_u8 {
187 sink.send((data.clone(), 0)).await.expect("sending");
188 }
189 }
190 fluvio_future::timer::sleep(time::Duration::from_millis(50)).await;
191 debug!("finishing. terminating server");
192 Ok(())
193 }
194
195 async fn run_client<
196 T: PartialEq + std::fmt::Debug + Default + FluvioDecoder + FluvioEncoder,
197 >(
198 data: T,
199 addr: &SocketAddr,
200 ) -> Result<(), Error> {
201 debug!("client: sleep to give server chance to come up");
202 sleep(time::Duration::from_millis(100)).await;
203 debug!("client: trying to connect");
204 let tcp_stream = TcpStream::connect(&addr).await.expect("connect");
205 debug!("client: got connection. waiting");
206 let framed = Framed::new(tcp_stream.compat(), FluvioCodec {});
207 let (_, mut stream) = framed.split::<(T, _)>();
208 for _ in 0..2u16 {
209 if let Some(value) = stream.next().await {
210 debug!("client :received first value from server");
211 let mut bytes = value.expect("bytes");
212 let bytes_len = bytes.len();
213 debug!("client: received bytes len: {}", bytes_len);
214 let mut decoded_value = T::default();
215 decoded_value
216 .decode(&mut bytes, 0)
217 .expect("decoding failed");
218 assert_eq!(bytes_len, decoded_value.write_size(0));
219 assert_eq!(decoded_value, data);
220 debug!("all test pass");
221 } else {
222 panic!("no first value received");
223 }
224 }
225
226 debug!("finished client");
227 Ok(())
228 }
229
230 #[fluvio_future::test]
231 async fn test_async_tcp_vec() {
232 debug!("start running test");
233
234 let addr = "127.0.0.1:11223".parse::<SocketAddr>().expect("parse");
235 let data: Vec<u8> = vec![0x1, 0x02, 0x03, 0x04, 0x5];
236
237 let server_ft = run_server_object(data.clone(), &addr);
238 let client_ft = run_client(data, &addr);
239
240 let _rt = join(client_ft, server_ft).await;
241 }
242
243 #[fluvio_future::test]
244 async fn test_async_tcp_string() {
245 debug!("start running test");
246
247 let addr = "127.0.0.1:11224".parse::<SocketAddr>().expect("parse");
248 let data: String = String::from("hello");
249
250 let server_ft = run_server_object(data.clone(), &addr);
251 let client_ft = run_client(data, &addr);
252
253 let _rt = join(client_ft, server_ft).await;
254 }
255
256 #[allow(clippy::clone_on_copy)]
257 #[fluvio_future::test]
258 async fn test_async_tcp_i32() {
259 debug!("start running test");
260
261 let addr = "127.0.0.1:11225".parse::<SocketAddr>().expect("parse");
262 let data: i32 = 1000;
263
264 let server_ft = run_server_object(data.clone(), &addr);
265 let client_ft = run_client(data, &addr);
266
267 let _rt = join(client_ft, server_ft).await;
268 }
269
270 #[fluvio_future::test]
271 async fn test_async_tcp_raw_data() {
272 debug!("start running test");
273
274 let addr = "127.0.0.1:11226".parse::<SocketAddr>().expect("parse");
275 let data: String = String::from("Raw text");
276
277 let server_ft = run_server_raw_data(data.clone(), &addr);
278 let client_ft = run_client(data, &addr);
279
280 let _rt = join(client_ft, server_ft).await;
281 }
282}