channels_io/framed/
framed_read.rs

1use core::fmt;
2use core::pin::Pin;
3use core::task::{ready, Context, Poll};
4
5use alloc::vec::Vec;
6
7use pin_project::pin_project;
8
9use crate::buf::Cursor;
10use crate::framed::Decoder;
11use crate::source::{AsyncSource, Source};
12use crate::util::slice_uninit_assume_init_mut;
13use crate::util::Error;
14use crate::{AsyncRead, AsyncReadExt, Read, ReadExt};
15
16/// Convert a [`Read`] to a [`Source`] or an [`AsyncRead`] to an [`AsyncSource`].
17///
18/// This abstraction converts unstructured input streams to structured typed streams.
19/// It reads raw bytes from a reader and processes them into structured data with the
20/// help of a [`Decoder`]. The [`Decoder`] decides how the raw bytes are converted back
21/// to items. At the other end of the stream, a [`FramedWrite`] will have produced the bytes
22/// with a matching [`Encoder`].
23///
24/// The [`Decoder`] will read bytes from the reader via an intermediary buffer owned by the
25/// [`FramedRead`] instance, the "read buffer". The [`Decoder`] will remove bytes from it
26/// as each item is decoded.
27///
28/// [`FramedWrite`]: crate::framed::FramedWrite
29/// [`Encoder`]: crate::framed::Encoder
30#[pin_project]
31#[derive(Debug)]
32pub struct FramedRead<R, D> {
33	#[pin]
34	reader: R,
35	decoder: D,
36	buf: Vec<u8>,
37}
38
39impl<R, D> FramedRead<R, D> {
40	/// Create a new [`FramedRead`].
41	#[inline]
42	#[must_use]
43	pub const fn new(reader: R, decoder: D) -> Self {
44		Self { reader, decoder, buf: Vec::new() }
45	}
46
47	/// Create a new [`FramedRead`] that can hold `capacity` bytes in its read buffer
48	/// before allocating.
49	#[inline]
50	#[must_use]
51	pub fn with_capacity(
52		reader: R,
53		decoder: D,
54		capacity: usize,
55	) -> Self {
56		Self { reader, decoder, buf: Vec::with_capacity(capacity) }
57	}
58
59	/// Get a reference to the underlying reader.
60	#[inline]
61	#[must_use]
62	pub fn reader(&self) -> &R {
63		&self.reader
64	}
65
66	/// Get a mutable reference to the underlying reader.
67	#[inline]
68	#[must_use]
69	pub fn reader_mut(&mut self) -> &mut R {
70		&mut self.reader
71	}
72
73	/// Get a pinned reference to the underlying reader.
74	#[inline]
75	#[must_use]
76	pub fn reader_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
77		self.project().reader
78	}
79
80	/// Get a reference to the underlying decoder.
81	#[inline]
82	#[must_use]
83	pub fn decoder(&self) -> &D {
84		&self.decoder
85	}
86
87	/// Get a mutable reference to the decoder.
88	#[inline]
89	#[must_use]
90	pub fn decoder_mut(&mut self) -> &mut D {
91		&mut self.decoder
92	}
93
94	/// Get a reference to the decoder from a pinned reference of the [`FramedRead`].
95	#[inline]
96	#[must_use]
97	pub fn decoder_pin_mut(self: Pin<&mut Self>) -> &mut D {
98		self.project().decoder
99	}
100
101	/// Get a reference to the read buffer.
102	#[inline]
103	#[must_use]
104	pub fn read_buffer(&self) -> &Vec<u8> {
105		&self.buf
106	}
107
108	/// Get a mutable reference to the read buffer.
109	#[inline]
110	#[must_use]
111	pub fn read_buffer_mut(&mut self) -> &mut Vec<u8> {
112		&mut self.buf
113	}
114
115	/// Get a reference to the read buffer from a pinned reference of the [`FramedRead`].
116	#[inline]
117	#[must_use]
118	pub fn map_decoder<T, F>(self, f: F) -> FramedRead<R, T>
119	where
120		T: Decoder,
121		F: FnOnce(D) -> T,
122	{
123		FramedRead {
124			reader: self.reader,
125			decoder: f(self.decoder),
126			buf: self.buf,
127		}
128	}
129
130	/// Destruct this [`FramedRead`] and get back the reader, dropping the decoder.
131	#[inline]
132	#[must_use]
133	pub fn into_reader(self) -> R {
134		self.reader
135	}
136
137	/// Destruct this [`FramedRead`] and get back the decoder, dropping the reader.
138	#[inline]
139	#[must_use]
140	pub fn into_decoder(self) -> D {
141		self.decoder
142	}
143
144	/// Destruct this [`FramedRead`] and get back both the decoder and the reader.
145	#[inline]
146	#[must_use]
147	pub fn into_inner(self) -> (R, D) {
148		(self.reader, self.decoder)
149	}
150}
151
152/// Errors when receiving an item over a [`FramedRead`].
153#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
154pub enum FramedReadError<T, Io> {
155	/// The decoder returned an error.
156	Decode(T),
157	/// There was an I/O error.
158	Io(Io),
159}
160
161impl<T, Io> fmt::Display for FramedReadError<T, Io>
162where
163	T: fmt::Display,
164	Io: fmt::Display,
165{
166	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
167		match self {
168			Self::Decode(e) => e.fmt(f),
169			Self::Io(e) => e.fmt(f),
170		}
171	}
172}
173
174impl<T, Io> Error for FramedReadError<T, Io> where
175	Self: fmt::Debug + fmt::Display
176{
177}
178
179impl<R, D> AsyncSource for FramedRead<R, D>
180where
181	R: AsyncRead,
182	D: Decoder,
183{
184	type Item =
185		Result<D::Output, FramedReadError<D::Error, R::Error>>;
186
187	fn poll_next(
188		self: Pin<&mut Self>,
189		cx: &mut Context,
190	) -> Poll<Self::Item> {
191		let mut this = self.project();
192
193		loop {
194			match this.decoder.decode(this.buf) {
195				Ok(Some(x)) => return Poll::Ready(Ok(x)),
196				Ok(None) => {
197					if this.buf.spare_capacity_mut().is_empty() {
198						this.buf.reserve(1);
199					}
200
201					let n = {
202						let mut buf =
203							Cursor::new(get_vec_spare_cap(this.buf));
204
205						ready!(this
206							.reader
207							.as_mut()
208							.poll_read_buf(cx, &mut buf))
209						.map_err(FramedReadError::Io)?;
210
211						buf.pos()
212					};
213
214					unsafe {
215						this.buf.set_len(this.buf.len() + n);
216					}
217				},
218				Err(e) => {
219					return Poll::Ready(Err(
220						FramedReadError::Decode(e),
221					));
222				},
223			}
224		}
225	}
226}
227
228impl<R, D> Source for FramedRead<R, D>
229where
230	R: Read,
231	D: Decoder,
232{
233	type Item =
234		Result<D::Output, FramedReadError<D::Error, R::Error>>;
235
236	fn next(&mut self) -> Self::Item {
237		loop {
238			match self.decoder.decode(&mut self.buf) {
239				Ok(Some(x)) => return Ok(x),
240				Ok(None) => {
241					if self.buf.spare_capacity_mut().is_empty() {
242						self.buf.reserve(1);
243					}
244
245					let n = {
246						let mut buf = Cursor::new(get_vec_spare_cap(
247							&mut self.buf,
248						));
249
250						self.reader
251							.read_buf(&mut buf)
252							.map_err(FramedReadError::Io)?;
253
254						buf.pos()
255					};
256
257					unsafe {
258						self.buf.set_len(self.buf.len() + n);
259					}
260				},
261				Err(e) => return Err(FramedReadError::Decode(e)),
262			}
263		}
264	}
265}
266
267fn get_vec_spare_cap(vec: &mut Vec<u8>) -> &mut [u8] {
268	unsafe { slice_uninit_assume_init_mut(vec.spare_capacity_mut()) }
269}
270
271#[cfg(test)]
272mod tests {
273	use super::*;
274
275	use core::convert::Infallible;
276
277	use crate::buf::{Buf, Cursor};
278	use crate::source::Source;
279
280	struct U32Decoder;
281
282	impl Decoder for U32Decoder {
283		type Output = i32;
284		type Error = Infallible;
285
286		fn decode(
287			&mut self,
288			buf: &mut Vec<u8>,
289		) -> Result<Option<Self::Output>, Self::Error> {
290			let x = match buf.get(..4) {
291				Some(x) => x.try_into().expect(""),
292				None => return Ok(None),
293			};
294
295			let x = i32::from_be_bytes(x);
296
297			buf.drain(..4);
298			Ok(Some(x))
299		}
300	}
301
302	#[test]
303	fn test_framed_read_eof() {
304		let reader = Cursor::new([0u8, 0, 0, 42, 0, 0]).reader();
305		let mut framed = FramedRead::new(reader, U32Decoder);
306
307		assert_eq!(Source::next(&mut framed).expect(""), 42);
308		assert!(matches!(
309			Source::next(&mut framed),
310			Err(FramedReadError::Io(_))
311		));
312	}
313
314	#[test]
315	fn test_framed_read_eof_exact() {
316		let reader =
317			Cursor::new([0u8, 0, 0, 42, 0, 0, 0, 62]).reader();
318		let mut framed = FramedRead::new(reader, U32Decoder);
319
320		assert_eq!(Source::next(&mut framed).expect(""), 42);
321		assert_eq!(Source::next(&mut framed).expect(""), 62);
322		assert!(matches!(
323			Source::next(&mut framed),
324			Err(FramedReadError::Io(_))
325		));
326	}
327}