recode_util/framed/
length_delimited.rs1use std::{error::Error as StdError, fmt, io, marker::PhantomData};
2
3use bytes::{Buf, BytesMut};
4use recode::{util::EncoderExt, Decoder, Encoder};
5use tokio_util::codec::{Decoder as TokioDecoder, Encoder as TokioEncoder};
6
7pub trait LengthDelimitedFrame: Decoder + Encoder + Sized {
10 const MAX_FRAME_LEN: usize = 8 * 1024 * 1024;
12
13 type Length: Decoder<usize> + Encoder<usize>;
15
16 type Error: From<std::io::Error>
18 + From<<Self as Decoder>::Error>
19 + From<<Self as Encoder>::Error>
20 + From<<Self::Length as Decoder<usize>>::Error>
21 + From<<Self::Length as Encoder<usize>>::Error>;
22}
23
24#[derive(Debug, Clone)]
27pub struct LengthDelimitedCodec<F> {
28 state: DecodeState,
29 _marker: PhantomData<F>,
30}
31
32pub struct LengthDelimitedCodecError(&'static str);
34
35#[derive(Debug, Clone, Copy)]
37enum DecodeState {
38 Head,
39 Data(usize),
40}
41
42impl<F> LengthDelimitedCodec<F> {
43 #[inline]
45 pub const fn new() -> Self {
46 Self {
47 state: DecodeState::Head,
48 _marker: PhantomData,
49 }
50 }
51}
52
53impl<F> TokioDecoder for LengthDelimitedCodec<F>
54where
55 F: LengthDelimitedFrame,
56{
57 type Error = <F as LengthDelimitedFrame>::Error;
58 type Item = F;
59
60 fn decode(
61 &mut self,
62 src: &mut BytesMut,
63 ) -> Result<Option<Self::Item>, Self::Error> {
64 match self.state {
65 | DecodeState::Head => {
66 if <F::Length>::has_enough_bytes(src) {
67 let len = <F::Length>::decode(src)?;
68 src.reserve(len);
69 self.state = DecodeState::Data(len);
70 }
71
72 Ok(None)
73 }
74 | DecodeState::Data(len) => {
75 if src.remaining() < len {
76 return Ok(None);
77 }
78
79 let mut src = src.split_to(len);
80 let frame = F::decode(&mut src)?;
81
82 if !src.is_empty() {
83 return Err(io::Error::new(
84 io::ErrorKind::InvalidData,
85 LengthDelimitedCodecError(
86 "bytes remaining after frame",
87 ),
88 ))?;
89 }
90
91 self.state = DecodeState::Head;
92
93 Ok(Some(frame))
94 }
95 }
96 }
97}
98
99impl<F> TokioEncoder<F> for LengthDelimitedCodec<F>
100where
101 F: LengthDelimitedFrame,
102{
103 type Error = <F as LengthDelimitedFrame>::Error;
104
105 fn encode(
106 &mut self,
107 item: F,
108 dst: &mut BytesMut,
109 ) -> Result<(), Self::Error> {
110 let len = item.size();
111
112 dst.reserve(len);
113
114 <F::Length>::encode(&len, dst)?;
115 <F>::encode(&item, dst)?;
116
117 Ok(())
118 }
119}
120
121impl fmt::Debug for LengthDelimitedCodecError {
122 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
123 f.debug_struct("LengthDelimitedCodecError").finish()
124 }
125}
126
127impl fmt::Display for LengthDelimitedCodecError {
128 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
129 f.write_str(self.0)
130 }
131}
132
133impl StdError for LengthDelimitedCodecError {}