streamdata/
lib.rs

1//! Utilities for decoding stream data.
2
3mod buffer;
4pub mod decoder;
5
6pub use buffer::*;
7
8/// The managed decoding state for the stream of data.
9#[derive(Debug)]
10pub struct State<Decoder, Buffer> {
11    /// The decoder to use for processing the data.
12    pub decoder: Decoder,
13    /// The buffer containing carried data from the previously decoded chunks.
14    pub buffer: Buffer,
15}
16
17/// The decoder error.
18/// This can either be any error that indicates that more data is needed
19/// to decode the value, i.e. the data is empty or is an incomplete part of
20/// an encoded value.
21pub enum DecodeError<T> {
22    /// More data is required for successful decoding.
23    NeedMoreData,
24    /// The decoder was unable to decode the data, but it has deremined
25    /// the correct action to proceed would be to drop some bytes from
26    /// the buffer and try again.
27    SkipData(usize),
28    /// Some other error has occured.
29    Other(T),
30}
31
32/// The results of the successful decoding.
33pub struct Decoded<T> {
34    /// The decoded value.
35    pub value: T,
36    /// The amount of bytes consumed from the buffer.
37    pub consumed_bytes: usize,
38}
39
40/// [`Decoder`] represents the ability to decode a value from a given buffer
41/// of data.
42pub trait Decoder<Input>
43where
44    Input: self::Buffer,
45{
46    /// The value to decode.
47    type Value;
48
49    /// The error that can occur while decoding the value.
50    type Error;
51
52    /// Decode (up to one) value from the buffer, returning the decoded value
53    /// accompanied by the amount of bytes consumed from the `buf` on success,
54    /// or a relevant decoding error.
55    fn decode(&mut self, input: &mut Input) -> Result<Self::Value, DecodeError<Self::Error>>;
56}
57
58impl<Decoder, Buffer> State<Decoder, Buffer>
59where
60    Decoder: self::Decoder<Buffer>,
61    Buffer: self::Buffer,
62{
63    /// Take the next chunk of data and return the iterator over the values
64    /// available with this new data.
65    pub fn process_next_chunk(&mut self, chunk: &[u8]) -> AvailableIter<'_, Decoder, Buffer> {
66        self.buffer.append(chunk);
67        AvailableIter::new(self)
68    }
69
70    /// Returns `true` if there is no bufferred data.
71    ///
72    /// When the [`State`] buffer is not empty, this means that it contains
73    /// some (possibly incomplete) data for the encoded values that has been
74    /// added to this state previously but has not yet been decoded (the most
75    /// obvious reason for which is that decoding would require more data
76    /// to arrive to succeed).
77    pub fn is_empty(&self) -> bool {
78        self.buffer.view().is_empty()
79    }
80
81    /// Finish the processing.
82    ///
83    /// Returns `Ok(())` if the state is empty [see [`Self::is_empty`],
84    /// otherwise returns an `Err` with the buffer containing the unhandled
85    /// data.
86    pub fn finish(self) -> Result<(), Buffer> {
87        if self.is_empty() {
88            return Ok(());
89        }
90        Err(self.buffer)
91    }
92}
93
94/// Iterate over the data readily available in the state, decoding the values
95/// on the fly.
96///
97/// When the error is returned, the further progress is no longer possible
98/// because the iterator will (most certainly, but actually depending on
99/// the `Decoder` implementation) be stuck, trying to decode the same data over
100/// and over again. The way to continue from this situation is to give up on
101/// the iterator, and either give up on the whole [`State`]
102/// (by [`State::finish`]-ing it) or try to correct the state by altering
103/// the `buffer` somehow.
104///
105/// This can be ergonomic when used with `.collect::<Result<Vec<_>, _>`.
106pub struct AvailableIter<'state, Decoder, Buffer>
107where
108    Decoder: self::Decoder<Buffer>,
109    Buffer: self::Buffer,
110{
111    /// A reference to the state.
112    /// The fact that we are holding this reference prevents anything from
113    /// happening to the state until we are dropped.
114    state: &'state mut State<Decoder, Buffer>,
115    /// Short circut on error.
116    short_circut: bool,
117}
118
119impl<'state, Decoder, Buffer> Iterator for AvailableIter<'state, Decoder, Buffer>
120where
121    Decoder: self::Decoder<Buffer>,
122    Buffer: self::Buffer,
123{
124    type Item = Result<
125        <Decoder as self::Decoder<Buffer>>::Value,
126        <Decoder as self::Decoder<Buffer>>::Error,
127    >;
128
129    fn next(&mut self) -> Option<Self::Item> {
130        if self.short_circut {
131            return None;
132        }
133        loop {
134            return match self.state.decoder.decode(&mut self.state.buffer) {
135                Ok(value) => Some(Ok(value)),
136                Err(DecodeError::NeedMoreData) => None,
137                Err(DecodeError::SkipData(bytes_to_skip)) => {
138                    self.state.buffer.advance(bytes_to_skip);
139                    continue; // skip return
140                }
141                Err(DecodeError::Other(error)) => {
142                    self.short_circut = true;
143                    Some(Err(error))
144                }
145            };
146        }
147    }
148}
149
150impl<'state, Decoder, Buffer> AvailableIter<'state, Decoder, Buffer>
151where
152    Decoder: self::Decoder<Buffer>,
153    Buffer: self::Buffer,
154{
155    /// Create a new [`Self`] for a given state.
156    /// Private fn for internal use only.
157    fn new(state: &'state mut State<Decoder, Buffer>) -> Self {
158        Self {
159            state,
160            short_circut: false,
161        }
162    }
163
164    /// View access to the state buffer.
165    ///
166    /// This can be useful for inspecting the buffer after the chunks have beed added to it.
167    pub fn buffer(&self) -> &Buffer {
168        &self.state.buffer
169    }
170
171    /// Decode and drop all available data, or fail with the first encountered
172    /// decoding error.
173    pub fn try_drain(self) -> Result<(), <Decoder as self::Decoder<Buffer>>::Error> {
174        for result in self {
175            let _ = result?;
176        }
177        Ok(())
178    }
179
180    /// Decode and collect all available data, or fail with the first
181    /// encountered decoding error.
182    pub fn try_collect<T>(self) -> Result<T, <Decoder as self::Decoder<Buffer>>::Error>
183    where
184        T: FromIterator<<Decoder as self::Decoder<Buffer>>::Value>,
185    {
186        self.collect()
187    }
188}