recode_util/framed/
length_delimited.rs

1use std::{error::Error as StdError, fmt, io, marker::PhantomData};
2
3use bytes::{Buf, BytesMut};
4use recode::{util::EncoderExt, Decoder, Encoder};
5use tokio_util::codec::{Decoder as TokioDecoder, Encoder as TokioEncoder};
6
7/// Trait for frames that can be decoded for encoded as a length-delimited
8/// sequence of bytes.
9pub trait LengthDelimitedFrame: Decoder + Encoder + Sized {
10    /// Maximum frame length.
11    const MAX_FRAME_LEN: usize = 8 * 1024 * 1024;
12
13    /// Type representing the length of a frame.
14    type Length: Decoder<usize> + Encoder<usize>;
15
16    /// Error type that can be returned when decoding/encoding a frame.
17    type Error: From<std::io::Error>
18        + From<<Self as Decoder>::Error>
19        + From<<Self as Encoder>::Error>
20        + From<<Self::Length as Decoder<usize>>::Error>
21        + From<<Self::Length as Encoder<usize>>::Error>;
22}
23
24/// A codec for decoding and decoding length-delimited frames that implement
25/// [`LengthDelimitedFrame`].
26#[derive(Debug, Clone)]
27pub struct LengthDelimitedCodec<F> {
28    state: DecodeState,
29    _marker: PhantomData<F>,
30}
31
32/// Error returned when decoding a frame.
33pub struct LengthDelimitedCodecError(&'static str);
34
35/// Current decode state.
36#[derive(Debug, Clone, Copy)]
37enum DecodeState {
38    Head,
39    Data(usize),
40}
41
42impl<F> LengthDelimitedCodec<F> {
43    /// Create a new [`LengthDelimitedCodec`] instance for [`F`].
44    #[inline]
45    pub const fn new() -> Self {
46        Self {
47            state: DecodeState::Head,
48            _marker: PhantomData,
49        }
50    }
51}
52
53impl<F> TokioDecoder for LengthDelimitedCodec<F>
54where
55    F: LengthDelimitedFrame,
56{
57    type Error = <F as LengthDelimitedFrame>::Error;
58    type Item = F;
59
60    fn decode(
61        &mut self,
62        src: &mut BytesMut,
63    ) -> Result<Option<Self::Item>, Self::Error> {
64        match self.state {
65            | DecodeState::Head => {
66                if <F::Length>::has_enough_bytes(src) {
67                    let len = <F::Length>::decode(src)?;
68                    src.reserve(len);
69                    self.state = DecodeState::Data(len);
70                }
71
72                Ok(None)
73            }
74            | DecodeState::Data(len) => {
75                if src.remaining() < len {
76                    return Ok(None);
77                }
78
79                let mut src = src.split_to(len);
80                let frame = F::decode(&mut src)?;
81
82                if !src.is_empty() {
83                    return Err(io::Error::new(
84                        io::ErrorKind::InvalidData,
85                        LengthDelimitedCodecError(
86                            "bytes remaining after frame",
87                        ),
88                    ))?;
89                }
90
91                self.state = DecodeState::Head;
92
93                Ok(Some(frame))
94            }
95        }
96    }
97}
98
99impl<F> TokioEncoder<F> for LengthDelimitedCodec<F>
100where
101    F: LengthDelimitedFrame,
102{
103    type Error = <F as LengthDelimitedFrame>::Error;
104
105    fn encode(
106        &mut self,
107        item: F,
108        dst: &mut BytesMut,
109    ) -> Result<(), Self::Error> {
110        let len = item.size();
111
112        dst.reserve(len);
113
114        <F::Length>::encode(&len, dst)?;
115        <F>::encode(&item, dst)?;
116
117        Ok(())
118    }
119}
120
121impl fmt::Debug for LengthDelimitedCodecError {
122    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
123        f.debug_struct("LengthDelimitedCodecError").finish()
124    }
125}
126
127impl fmt::Display for LengthDelimitedCodecError {
128    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
129        f.write_str(self.0)
130    }
131}
132
133impl StdError for LengthDelimitedCodecError {}