monoio_codec/
sync_codec.rs

1use std::io;
2
3use bytes::BytesMut;
4use monoio::io::{AsyncReadRent, AsyncWriteRent};
5pub use tokio_util::codec::Encoder;
6
7use crate::Framed;
8
9/// Decoder may return Decoded to represent a decoded item,
10/// or the insufficient length hint, or just the insufficient.
11#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
12pub enum Decoded<T> {
13    Some(T),
14    // The length needed is unknown.
15    // Same as None in tokio_codec::Decoder
16    Insufficient,
17    // The total length needed.
18    InsufficientAtLeast(usize),
19}
20
21impl<T> Decoded<T> {
22    #[inline]
23    pub fn unwrap(self) -> T {
24        match self {
25            Decoded::Some(inner) => inner,
26            Decoded::Insufficient => panic!("unwrap Decoded::Insufficient"),
27            Decoded::InsufficientAtLeast(_) => panic!("unwrap Decoded::InsufficientAtLeast"),
28        }
29    }
30}
31
32pub trait Decoder {
33    /// The type of decoded frames.
34    type Item;
35
36    /// The type of unrecoverable frame decoding errors.
37    ///
38    /// If an individual message is ill-formed but can be ignored without
39    /// interfering with the processing of future messages, it may be more
40    /// useful to report the failure as an `Item`.
41    ///
42    /// `From<io::Error>` is required in the interest of making `Error` suitable
43    /// for returning directly from a [`FramedRead`], and to enable the default
44    /// implementation of `decode_eof` to yield an `io::Error` when the decoder
45    /// fails to consume all available data.
46    ///
47    /// Note that implementors of this trait can simply indicate `type Error =
48    /// io::Error` to use I/O errors as this type.
49    ///
50    /// [`FramedRead`]: crate::codec::FramedRead
51    type Error: From<io::Error>;
52
53    /// Attempts to decode a frame from the provided buffer of bytes.
54    ///
55    /// This method is called by [`FramedRead`] whenever bytes are ready to be
56    /// parsed. The provided buffer of bytes is what's been read so far, and
57    /// this instance of `Decode` can determine whether an entire frame is in
58    /// the buffer and is ready to be returned.
59    ///
60    /// If an entire frame is available, then this instance will remove those
61    /// bytes from the buffer provided and return them as a decoded
62    /// frame. Note that removing bytes from the provided buffer doesn't always
63    /// necessarily copy the bytes, so this should be an efficient operation in
64    /// most circumstances.
65    ///
66    /// If the bytes look valid, but a frame isn't fully available yet, then
67    /// `Ok(InsufficientUnknown)` is returned. This indicates to the [`Framed`] instance that
68    /// it needs to read some more bytes before calling this method again.
69    ///
70    /// Note that the bytes provided may be empty. If a previous call to
71    /// `decode` consumed all the bytes in the buffer then `decode` will be
72    /// called again until it returns `Ok(InsufficientUnknown)`, indicating that more bytes need to
73    /// be read.
74    ///
75    /// Finally, if the bytes in the buffer are malformed then an error is
76    /// returned indicating why. This informs [`Framed`] that the stream is now
77    /// corrupt and should be terminated.
78    ///
79    /// [`Framed`]: crate::codec::Framed
80    /// [`FramedRead`]: crate::codec::FramedRead
81    fn decode(&mut self, src: &mut BytesMut) -> Result<Decoded<Self::Item>, Self::Error>;
82
83    /// A default method available to be called when there are no more bytes
84    /// available to be read from the underlying I/O.
85    ///
86    /// This method defaults to calling `decode` and returns an error if
87    /// `Ok(None)` is returned while there is unconsumed data in `buf`.
88    /// Typically this doesn't need to be implemented unless the framing
89    /// protocol differs near the end of the stream, or if you need to construct
90    /// frames _across_ eof boundaries on sources that can be resumed.
91    ///
92    /// Note that the `buf` argument may be empty. If a previous call to
93    /// `decode_eof` consumed all the bytes in the buffer, `decode_eof` will be
94    /// called again until it returns `None`, indicating that there are no more
95    /// frames to yield. This behavior enables returning finalization frames
96    /// that may not be based on inbound data.
97    ///
98    /// Once `None` has been returned, `decode_eof` won't be called again until
99    /// an attempt to resume the stream has been made, where the underlying stream
100    /// actually returned more data.
101    fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Decoded<Self::Item>, Self::Error> {
102        match self.decode(buf)? {
103            Decoded::Some(frame) => Ok(Decoded::Some(frame)),
104            d => {
105                if buf.is_empty() {
106                    Ok(d)
107                } else {
108                    Err(io::Error::new(io::ErrorKind::Other, "bytes remaining on stream").into())
109                }
110            }
111        }
112    }
113
114    /// Provides a [`Stream`] and [`Sink`] interface for reading and writing to this
115    /// `Io` object, using `Decode` and `Encode` to read and write the raw data.
116    ///
117    /// Raw I/O objects work with byte sequences, but higher-level code usually
118    /// wants to batch these into meaningful chunks, called "frames". This
119    /// method layers framing on top of an I/O object, by using the `Codec`
120    /// traits to handle encoding and decoding of messages frames. Note that
121    /// the incoming and outgoing frame types may be distinct.
122    ///
123    /// This function returns a *single* object that is both `Stream` and
124    /// `Sink`; grouping this into a single object is often useful for layering
125    /// things like gzip or TLS, which require both read and write access to the
126    /// underlying object.
127    ///
128    /// If you want to work more directly with the streams and sink, consider
129    /// calling `split` on the [`Framed`] returned by this method, which will
130    /// break them into separate objects, allowing them to interact more easily.
131    ///
132    /// [`Stream`]: futures_core::Stream
133    /// [`Sink`]: futures_sink::Sink
134    /// [`Framed`]: crate::Framed
135    fn framed<T: AsyncReadRent + AsyncWriteRent + Sized>(self, io: T) -> Framed<T, Self>
136    where
137        Self: Sized,
138    {
139        Framed::new(io, self)
140    }
141}