streamdata/lib.rs
1//! Utilities for decoding stream data.
2
3mod buffer;
4pub mod decoder;
5
6pub use buffer::*;
7
8/// The managed decoding state for the stream of data.
9#[derive(Debug)]
10pub struct State<Decoder, Buffer> {
11 /// The decoder to use for processing the data.
12 pub decoder: Decoder,
13 /// The buffer containing carried data from the previously decoded chunks.
14 pub buffer: Buffer,
15}
16
17/// The decoder error.
18/// This can either be any error that indicates that more data is needed
19/// to decode the value, i.e. the data is empty or is an incomplete part of
20/// an encoded value.
21pub enum DecodeError<T> {
22 /// More data is required for successful decoding.
23 NeedMoreData,
24 /// The decoder was unable to decode the data, but it has deremined
25 /// the correct action to proceed would be to drop some bytes from
26 /// the buffer and try again.
27 SkipData(usize),
28 /// Some other error has occured.
29 Other(T),
30}
31
32/// The results of the successful decoding.
33pub struct Decoded<T> {
34 /// The decoded value.
35 pub value: T,
36 /// The amount of bytes consumed from the buffer.
37 pub consumed_bytes: usize,
38}
39
40/// [`Decoder`] represents the ability to decode a value from a given buffer
41/// of data.
42pub trait Decoder<Input>
43where
44 Input: self::Buffer,
45{
46 /// The value to decode.
47 type Value;
48
49 /// The error that can occur while decoding the value.
50 type Error;
51
52 /// Decode (up to one) value from the buffer, returning the decoded value
53 /// accompanied by the amount of bytes consumed from the `buf` on success,
54 /// or a relevant decoding error.
55 fn decode(&mut self, input: &mut Input) -> Result<Self::Value, DecodeError<Self::Error>>;
56}
57
58impl<Decoder, Buffer> State<Decoder, Buffer>
59where
60 Decoder: self::Decoder<Buffer>,
61 Buffer: self::Buffer,
62{
63 /// Take the next chunk of data and return the iterator over the values
64 /// available with this new data.
65 pub fn process_next_chunk(&mut self, chunk: &[u8]) -> AvailableIter<'_, Decoder, Buffer> {
66 self.buffer.append(chunk);
67 AvailableIter::new(self)
68 }
69
70 /// Returns `true` if there is no bufferred data.
71 ///
72 /// When the [`State`] buffer is not empty, this means that it contains
73 /// some (possibly incomplete) data for the encoded values that has been
74 /// added to this state previously but has not yet been decoded (the most
75 /// obvious reason for which is that decoding would require more data
76 /// to arrive to succeed).
77 pub fn is_empty(&self) -> bool {
78 self.buffer.view().is_empty()
79 }
80
81 /// Finish the processing.
82 ///
83 /// Returns `Ok(())` if the state is empty [see [`Self::is_empty`],
84 /// otherwise returns an `Err` with the buffer containing the unhandled
85 /// data.
86 pub fn finish(self) -> Result<(), Buffer> {
87 if self.is_empty() {
88 return Ok(());
89 }
90 Err(self.buffer)
91 }
92}
93
94/// Iterate over the data readily available in the state, decoding the values
95/// on the fly.
96///
97/// When the error is returned, the further progress is no longer possible
98/// because the iterator will (most certainly, but actually depending on
99/// the `Decoder` implementation) be stuck, trying to decode the same data over
100/// and over again. The way to continue from this situation is to give up on
101/// the iterator, and either give up on the whole [`State`]
102/// (by [`State::finish`]-ing it) or try to correct the state by altering
103/// the `buffer` somehow.
104///
105/// This can be ergonomic when used with `.collect::<Result<Vec<_>, _>`.
106pub struct AvailableIter<'state, Decoder, Buffer>
107where
108 Decoder: self::Decoder<Buffer>,
109 Buffer: self::Buffer,
110{
111 /// A reference to the state.
112 /// The fact that we are holding this reference prevents anything from
113 /// happening to the state until we are dropped.
114 state: &'state mut State<Decoder, Buffer>,
115 /// Short circut on error.
116 short_circut: bool,
117}
118
119impl<'state, Decoder, Buffer> Iterator for AvailableIter<'state, Decoder, Buffer>
120where
121 Decoder: self::Decoder<Buffer>,
122 Buffer: self::Buffer,
123{
124 type Item = Result<
125 <Decoder as self::Decoder<Buffer>>::Value,
126 <Decoder as self::Decoder<Buffer>>::Error,
127 >;
128
129 fn next(&mut self) -> Option<Self::Item> {
130 if self.short_circut {
131 return None;
132 }
133 loop {
134 return match self.state.decoder.decode(&mut self.state.buffer) {
135 Ok(value) => Some(Ok(value)),
136 Err(DecodeError::NeedMoreData) => None,
137 Err(DecodeError::SkipData(bytes_to_skip)) => {
138 self.state.buffer.advance(bytes_to_skip);
139 continue; // skip return
140 }
141 Err(DecodeError::Other(error)) => {
142 self.short_circut = true;
143 Some(Err(error))
144 }
145 };
146 }
147 }
148}
149
150impl<'state, Decoder, Buffer> AvailableIter<'state, Decoder, Buffer>
151where
152 Decoder: self::Decoder<Buffer>,
153 Buffer: self::Buffer,
154{
155 /// Create a new [`Self`] for a given state.
156 /// Private fn for internal use only.
157 fn new(state: &'state mut State<Decoder, Buffer>) -> Self {
158 Self {
159 state,
160 short_circut: false,
161 }
162 }
163
164 /// View access to the state buffer.
165 ///
166 /// This can be useful for inspecting the buffer after the chunks have beed added to it.
167 pub fn buffer(&self) -> &Buffer {
168 &self.state.buffer
169 }
170
171 /// Decode and drop all available data, or fail with the first encountered
172 /// decoding error.
173 pub fn try_drain(self) -> Result<(), <Decoder as self::Decoder<Buffer>>::Error> {
174 for result in self {
175 let _ = result?;
176 }
177 Ok(())
178 }
179
180 /// Decode and collect all available data, or fail with the first
181 /// encountered decoding error.
182 pub fn try_collect<T>(self) -> Result<T, <Decoder as self::Decoder<Buffer>>::Error>
183 where
184 T: FromIterator<<Decoder as self::Decoder<Buffer>>::Value>,
185 {
186 self.collect()
187 }
188}