fluvio_protocol_codec/
codec.rs

1use std::io::Cursor;
2use std::io::Error as IoError;
3
4use log::trace;
5use tokio_util::codec::Decoder;
6use tokio_util::codec::Encoder;
7
8use crate::core::Decoder as FluvioDecoder;
9use crate::core::Encoder as FluvioEncoder;
10use crate::core::bytes::{Bytes, BytesMut, BufMut};
11use crate::core::Version;
12
13/// Implement Fluvio Encoding
14/// First 4 bytes are size of the message.  Then total buffer = 4 + message content
15///
16#[derive(Debug, Default)]
17pub struct FluvioCodec {}
18
19/// Type used as input by the [`FluvioCodec`] encoder implementation.
20/// Contains the data of the message and the [`crate::core:Version`].
21pub type FluvioCodecData<T> = (T, Version);
22
23impl FluvioCodec {
24    pub fn new() -> Self {
25        Self {}
26    }
27}
28
29impl Decoder for FluvioCodec {
30    type Item = BytesMut;
31    type Error = IoError;
32
33    fn decode(&mut self, bytes: &mut BytesMut) -> Result<Option<BytesMut>, Self::Error> {
34        let len = bytes.len();
35        if len == 0 {
36            return Ok(None);
37        }
38        if len >= 4 {
39            let mut src = Cursor::new(&*bytes);
40            let mut packet_len: i32 = 0;
41            packet_len.decode(&mut src, 0)?;
42            trace!(
43                "Decoder: received buffer: {}, message size: {}",
44                len,
45                packet_len
46            );
47            if (packet_len + 4) as usize <= bytes.len() {
48                trace!(
49                    "Decoder: all packets are in buffer len: {}, excess {}",
50                    packet_len + 4,
51                    bytes.len() - (packet_len + 4) as usize
52                );
53                let mut buf = bytes.split_to((packet_len + 4) as usize);
54                let message = buf.split_off(4); // truncate length
55                Ok(Some(message))
56            } else {
57                trace!(
58                    "Decoder buffer len: {} is less than packet+4: {}, waiting",
59                    len,
60                    packet_len + 4
61                );
62                Ok(None)
63            }
64        } else {
65            trace!(
66                "Decoder received raw bytes len: {} less than 4 not enough for size",
67                len
68            );
69            Ok(None)
70        }
71    }
72}
73
74/// Implement encoder for Fluvio Codec
75/// This is straight pass thru, actual encoding is done file slice
76impl Encoder<Bytes> for FluvioCodec {
77    type Error = IoError;
78
79    fn encode(&mut self, data: Bytes, buf: &mut BytesMut) -> Result<(), IoError> {
80        trace!("Encoder: Encoding raw data with {} bytes", data.len());
81        buf.put(data);
82        Ok(())
83    }
84}
85
86/// Implement encoder for Fluvio Codec
87impl<T: FluvioEncoder> Encoder<FluvioCodecData<T>> for FluvioCodec {
88    type Error = IoError;
89
90    fn encode(&mut self, src: FluvioCodecData<T>, buf: &mut BytesMut) -> Result<(), IoError> {
91        let (src, version) = src;
92
93        let size = src.write_size(version) as i32;
94        trace!("encoding data with {} bytes.", size);
95        buf.reserve(4 + size as usize);
96
97        // First 4 bytes are the size of the message.
98        // Then the message payload.
99        let mut len_slice = Vec::new();
100        size.encode(&mut len_slice, version)?;
101        buf.extend_from_slice(&len_slice);
102        buf.extend_from_slice(&src.as_bytes(version)?);
103
104        Ok(())
105    }
106}
107
108#[cfg(test)]
109mod test {
110
111    use std::io::Error;
112    use std::net::SocketAddr;
113    use std::time;
114
115    use futures::future::join;
116    use futures::sink::SinkExt;
117    use futures::stream::StreamExt;
118    use tokio_util::codec::Framed;
119    use tokio_util::compat::FuturesAsyncReadCompatExt;
120
121    use fluvio_future::net::TcpListener;
122    use fluvio_future::net::TcpStream;
123    use fluvio_future::timer::sleep;
124    use futures::AsyncWriteExt;
125    use fluvio_protocol::Decoder as FluvioDecoder;
126    use fluvio_protocol::Encoder as FluvioEncoder;
127    use log::debug;
128
129    use super::FluvioCodec;
130
131    async fn run_server_raw_data<T: FluvioEncoder>(
132        data: T,
133        addr: &SocketAddr,
134    ) -> Result<(), Error> {
135        debug!("server: binding");
136        let listener = TcpListener::bind(&addr).await.expect("bind");
137        debug!("server: successfully binding. waiting for incoming");
138        let mut incoming = listener.incoming();
139        if let Some(stream) = incoming.next().await {
140            debug!("server: got connection from client");
141            let mut tcp_stream = stream.expect("stream");
142
143            // write message_size since we are not using the encoder
144            let mut len_buf = vec![];
145            let message_size = data.write_size(0) as i32;
146            message_size.encode(&mut len_buf, 0).expect("encoding len");
147            tcp_stream.write(&len_buf).await?;
148
149            let encoded_data = data.as_bytes(0).expect("encoding data");
150            tcp_stream.write(&encoded_data).await?;
151
152            // Now trying partial send:
153            // write message_size since we are not using the encoder
154            let mut len_buf = vec![];
155            let message_size = data.write_size(0) as i32;
156            message_size.encode(&mut len_buf, 0).expect("encoding len");
157            tcp_stream.write(&len_buf).await?;
158
159            let mut encoded_data = data.as_bytes(0).expect("encoding data");
160            let buf2 = encoded_data.split_off(3);
161            tcp_stream.write(&encoded_data).await?;
162            fluvio_future::timer::sleep(time::Duration::from_millis(10)).await;
163            tcp_stream.write(&buf2).await?;
164        }
165        fluvio_future::timer::sleep(time::Duration::from_millis(50)).await;
166        debug!("finishing. terminating server");
167        Ok(())
168    }
169
170    async fn run_server_object<T: FluvioEncoder + Clone>(
171        data: T,
172        addr: &SocketAddr,
173    ) -> Result<(), Error> {
174        debug!("server: binding");
175        let listener = TcpListener::bind(&addr).await.expect("bind");
176        debug!("server: successfully binding. waiting for incoming");
177        let mut incoming = listener.incoming();
178        if let Some(stream) = incoming.next().await {
179            debug!("server: got connection from client");
180            let tcp_stream = stream.expect("stream");
181
182            let framed = Framed::new(tcp_stream.compat(), FluvioCodec {});
183            let (mut sink, _) = framed.split();
184
185            // send 2 times in order
186            for _ in 0..2_u8 {
187                sink.send((data.clone(), 0)).await.expect("sending");
188            }
189        }
190        fluvio_future::timer::sleep(time::Duration::from_millis(50)).await;
191        debug!("finishing. terminating server");
192        Ok(())
193    }
194
195    async fn run_client<
196        T: PartialEq + std::fmt::Debug + Default + FluvioDecoder + FluvioEncoder,
197    >(
198        data: T,
199        addr: &SocketAddr,
200    ) -> Result<(), Error> {
201        debug!("client: sleep to give server chance to come up");
202        sleep(time::Duration::from_millis(100)).await;
203        debug!("client: trying to connect");
204        let tcp_stream = TcpStream::connect(&addr).await.expect("connect");
205        debug!("client: got connection. waiting");
206        let framed = Framed::new(tcp_stream.compat(), FluvioCodec {});
207        let (_, mut stream) = framed.split::<(T, _)>();
208        for _ in 0..2u16 {
209            if let Some(value) = stream.next().await {
210                debug!("client :received first value from server");
211                let mut bytes = value.expect("bytes");
212                let bytes_len = bytes.len();
213                debug!("client: received bytes len: {}", bytes_len);
214                let mut decoded_value = T::default();
215                decoded_value
216                    .decode(&mut bytes, 0)
217                    .expect("decoding failed");
218                assert_eq!(bytes_len, decoded_value.write_size(0));
219                assert_eq!(decoded_value, data);
220                debug!("all test pass");
221            } else {
222                panic!("no first value received");
223            }
224        }
225
226        debug!("finished client");
227        Ok(())
228    }
229
230    #[fluvio_future::test]
231    async fn test_async_tcp_vec() {
232        debug!("start running test");
233
234        let addr = "127.0.0.1:11223".parse::<SocketAddr>().expect("parse");
235        let data: Vec<u8> = vec![0x1, 0x02, 0x03, 0x04, 0x5];
236
237        let server_ft = run_server_object(data.clone(), &addr);
238        let client_ft = run_client(data, &addr);
239
240        let _rt = join(client_ft, server_ft).await;
241    }
242
243    #[fluvio_future::test]
244    async fn test_async_tcp_string() {
245        debug!("start running test");
246
247        let addr = "127.0.0.1:11224".parse::<SocketAddr>().expect("parse");
248        let data: String = String::from("hello");
249
250        let server_ft = run_server_object(data.clone(), &addr);
251        let client_ft = run_client(data, &addr);
252
253        let _rt = join(client_ft, server_ft).await;
254    }
255
256    #[allow(clippy::clone_on_copy)]
257    #[fluvio_future::test]
258    async fn test_async_tcp_i32() {
259        debug!("start running test");
260
261        let addr = "127.0.0.1:11225".parse::<SocketAddr>().expect("parse");
262        let data: i32 = 1000;
263
264        let server_ft = run_server_object(data.clone(), &addr);
265        let client_ft = run_client(data, &addr);
266
267        let _rt = join(client_ft, server_ft).await;
268    }
269
270    #[fluvio_future::test]
271    async fn test_async_tcp_raw_data() {
272        debug!("start running test");
273
274        let addr = "127.0.0.1:11226".parse::<SocketAddr>().expect("parse");
275        let data: String = String::from("Raw text");
276
277        let server_ft = run_server_raw_data(data.clone(), &addr);
278        let client_ft = run_client(data, &addr);
279
280        let _rt = join(client_ft, server_ft).await;
281    }
282}