fusio_log/serdes/
bytes.rs

1use bytes::Bytes;
2use fusio::{Error, IoBuf, SeqRead, Write};
3
4use crate::serdes::{Decode, Encode};
5
6impl Encode for &[u8] {
7    async fn encode<W: Write>(&self, writer: &mut W) -> Result<(), Error> {
8        (self.len() as u32).encode(writer).await?;
9        #[cfg(feature = "monoio")]
10        let (result, _) = writer.write_all(self.to_vec()).await;
11        #[cfg(not(feature = "monoio"))]
12        let (result, _) = writer.write_all(*self).await;
13        result?;
14
15        Ok(())
16    }
17
18    fn size(&self) -> usize {
19        self.len() + 4
20    }
21}
22
23impl Encode for Bytes {
24    async fn encode<W: Write>(&self, writer: &mut W) -> Result<(), Error> {
25        (self.len() as u32).encode(writer).await?;
26        #[cfg(feature = "monoio")]
27        let (result, _) = writer.write_all(self.as_bytes()).await;
28        #[cfg(not(feature = "monoio"))]
29        let (result, _) = writer.write_all(self.as_slice()).await;
30        result?;
31
32        Ok(())
33    }
34
35    fn size(&self) -> usize {
36        self.len() + 4
37    }
38}
39
40impl Decode for Bytes {
41    async fn decode<R: SeqRead>(reader: &mut R) -> Result<Self, Error> {
42        let len = u32::decode(reader).await?;
43        let (result, buf) = reader.read_exact(vec![0u8; len as usize]).await;
44        result?;
45
46        Ok(buf.as_bytes())
47    }
48}
49
50#[cfg(test)]
51mod tests {
52    use std::io::Cursor;
53
54    use bytes::Bytes;
55    use tokio::io::AsyncSeekExt;
56
57    use crate::serdes::{Decode, Encode};
58
59    async fn encode_decode_bytes() {
60        let source = Bytes::from_static(b"hello! Tonbo");
61
62        let mut bytes = Vec::new();
63        let mut cursor = Cursor::new(&mut bytes);
64
65        let before = cursor.position();
66        source.encode(&mut cursor).await.unwrap();
67        let after = cursor.position();
68
69        assert_eq!(source.size(), (after - before) as usize);
70
71        cursor.seek(std::io::SeekFrom::Start(0)).await.unwrap();
72        let decoded = Bytes::decode(&mut cursor).await.unwrap();
73
74        assert_eq!(source, decoded);
75    }
76
77    async fn encode_u8_slice_decode_bytes() {
78        let source = b"hello! Tonbo".as_slice();
79
80        let mut bytes = Vec::new();
81        let mut cursor = Cursor::new(&mut bytes);
82
83        let before = cursor.position();
84        source.encode(&mut cursor).await.unwrap();
85        let after = cursor.position();
86
87        assert_eq!(source.size(), (after - before) as usize);
88
89        cursor.seek(std::io::SeekFrom::Start(0)).await.unwrap();
90        let decoded = Bytes::decode(&mut cursor).await.unwrap();
91
92        assert_eq!(source, decoded);
93    }
94
95    #[cfg(feature = "tokio")]
96    #[tokio::test]
97    async fn test_tokio_encode_decode_bytes() {
98        encode_decode_bytes().await;
99    }
100
101    #[cfg(feature = "monoio")]
102    #[monoio::test]
103    async fn test_monoio_encode_decode() {
104        encode_decode_bytes().await;
105    }
106
107    #[cfg(feature = "tokio")]
108    #[tokio::test]
109    async fn test_tokio_encode_decode_u8_slice() {
110        encode_u8_slice_decode_bytes().await;
111    }
112
113    #[cfg(feature = "monoio")]
114    #[monoio::test]
115    async fn test_monoio_encode_decode_u8_slice() {
116        encode_u8_slice_decode_bytes().await;
117    }
118}