channels_io/read/
mod.rs

1use core::pin::Pin;
2use core::task::{ready, Context, Poll};
3
4use crate::buf::BufMut;
5use crate::error::{IoError, ReadError};
6
7mod read_buf;
8mod read_buf_all;
9
10use self::read_buf::ReadBuf;
11use self::read_buf_all::ReadBufAll;
12
13/// This trait allows reading bytes from a source.
14///
15/// Types implementing this trait are called "readers".
16pub trait Read {
17	/// Error type for IO operations involving the reader.
18	type Error: ReadError;
19
20	/// Read some bytes into the slice `buf`.
21	///
22	/// This function is the lower level building block of the other `read_*` methods.
23	/// It reads bytes into `buf` and reports back to the caller how many bytes it read.
24	fn read_slice(
25		&mut self,
26		buf: &mut [u8],
27	) -> Result<usize, Self::Error>;
28}
29
30/// This trait is the asynchronous version of [`Read`].
31///
32/// [`Read`]: crate::Read
33pub trait AsyncRead {
34	/// Error type for IO operations involving the reader.
35	type Error: ReadError;
36
37	/// Poll the reader once and read some bytes into the slice `buf`.
38	///
39	/// This method reads bytes directly into `buf` and reports how many bytes it
40	/// read.
41	fn poll_read_slice(
42		self: Pin<&mut Self>,
43		cx: &mut Context,
44		buf: &mut [u8],
45	) -> Poll<Result<usize, Self::Error>>;
46}
47
48/// Read bytes from a reader.
49///
50/// Extension trait for all [`Read`] types.
51pub trait ReadExt: Read {
52	/// Read some bytes into `buf` advancing it appropriately.
53	fn read_buf<B>(&mut self, buf: B) -> Result<(), Self::Error>
54	where
55		B: BufMut,
56	{
57		read_buf(self, buf)
58	}
59
60	/// Read bytes into `buf` advancing it until it is full.
61	///
62	/// This method will try to read bytes into `buf` repeatedly until either a)
63	/// `buf` has been filled, b) an error occurs or c) the reader reaches EOF.
64	fn read_buf_all<B>(&mut self, buf: B) -> Result<(), Self::Error>
65	where
66		B: BufMut,
67	{
68		read_buf_all(self, buf)
69	}
70
71	/// Create a "by reference" adapter that takes the current instance of [`Read`]
72	/// by mutable reference.
73	fn by_ref(&mut self) -> &mut Self
74	where
75		Self: Sized,
76	{
77		self
78	}
79}
80
81/// This trait is the asynchronous version of [`ReadExt`].
82///
83/// Extension trait for all [`AsyncRead`] types.
84///
85/// [`ReadExt`]: crate::ReadExt
86pub trait AsyncReadExt: AsyncRead {
87	/// Poll the reader and try to read some bytes into `buf`.
88	fn poll_read_buf<B>(
89		self: Pin<&mut Self>,
90		cx: &mut Context,
91		buf: &mut B,
92	) -> Poll<Result<(), Self::Error>>
93	where
94		B: BufMut + ?Sized,
95	{
96		poll_read_buf(self, cx, buf)
97	}
98
99	/// Poll the reader and try to read some bytes into `buf`.
100	fn poll_read_buf_all<B>(
101		self: Pin<&mut Self>,
102		cx: &mut Context,
103		buf: &mut B,
104	) -> Poll<Result<(), Self::Error>>
105	where
106		B: BufMut + ?Sized,
107	{
108		poll_read_buf_all(self, cx, buf)
109	}
110
111	/// Asynchronously read some bytes into `buf` advancing it appropriately.
112	///
113	/// This function behaves in the same way as [`read_buf()`] except that it
114	/// returns a [`Future`] that must be `.await`ed.
115	///
116	/// [`read_buf()`]: crate::ReadExt::read_buf
117	/// [`Future`]: core::future::Future
118	fn read_buf<B>(&mut self, buf: B) -> ReadBuf<'_, Self, B>
119	where
120		B: BufMut + Unpin,
121		Self: Unpin,
122	{
123		ReadBuf::new(self, buf)
124	}
125
126	/// Asynchronously read bytes into `buf` advancing it until it is full.
127	///
128	/// This function behaves in the same way as [`read_buf_all()`] except that it
129	/// returns a [`Future`] that must be `.await`ed.
130	///
131	/// [`read_buf_all()`]: crate::ReadExt::read_buf_all
132	/// [`Future`]: core::future::Future
133	fn read_buf_all<B>(&mut self, buf: B) -> ReadBufAll<'_, Self, B>
134	where
135		B: BufMut + Unpin,
136		Self: Unpin,
137	{
138		ReadBufAll::new(self, buf)
139	}
140
141	/// Create a "by reference" adapter that takes the current instance of [`AsyncRead`]
142	/// by mutable reference.
143	fn by_ref(&mut self) -> &mut Self
144	where
145		Self: Sized,
146	{
147		self
148	}
149}
150
151impl<T: Read + ?Sized> ReadExt for T {}
152
153impl<T: AsyncRead + ?Sized> AsyncReadExt for T {}
154
155fn read_buf<T, B>(reader: &mut T, mut buf: B) -> Result<(), T::Error>
156where
157	T: ReadExt + ?Sized,
158	B: BufMut,
159{
160	if !buf.has_remaining_mut() {
161		return Ok(());
162	}
163
164	loop {
165		match reader.read_slice(buf.chunk_mut()) {
166			Ok(0) => return Err(T::Error::eof()),
167			Ok(n) => {
168				buf.advance_mut(n);
169				return Ok(());
170			},
171			Err(e) if e.should_retry() => continue,
172			Err(e) => return Err(e),
173		}
174	}
175}
176
177fn poll_read_buf<T, B>(
178	mut reader: Pin<&mut T>,
179	cx: &mut Context,
180	buf: &mut B,
181) -> Poll<Result<(), T::Error>>
182where
183	T: AsyncReadExt + ?Sized,
184	B: BufMut + ?Sized,
185{
186	use Poll::Ready;
187
188	if !buf.has_remaining_mut() {
189		return Ready(Ok(()));
190	}
191
192	loop {
193		match ready!(reader
194			.as_mut()
195			.poll_read_slice(cx, buf.chunk_mut()))
196		{
197			Ok(0) => return Ready(Err(T::Error::eof())),
198			Ok(n) => {
199				buf.advance_mut(n);
200				return Ready(Ok(()));
201			},
202			Err(e) if e.should_retry() => continue,
203			Err(e) => return Ready(Err(e)),
204		}
205	}
206}
207
208fn read_buf_all<T, B>(
209	reader: &mut T,
210	mut buf: B,
211) -> Result<(), T::Error>
212where
213	T: ReadExt + ?Sized,
214	B: BufMut,
215{
216	while buf.has_remaining_mut() {
217		match reader.read_slice(buf.chunk_mut()) {
218			Ok(0) => return Err(T::Error::eof()),
219			Ok(n) => buf.advance_mut(n),
220			Err(e) if e.should_retry() => continue,
221			Err(e) => return Err(e),
222		}
223	}
224
225	Ok(())
226}
227
228fn poll_read_buf_all<T, B>(
229	mut reader: Pin<&mut T>,
230	cx: &mut Context,
231	buf: &mut B,
232) -> Poll<Result<(), T::Error>>
233where
234	T: AsyncReadExt + ?Sized,
235	B: BufMut + ?Sized,
236{
237	use Poll::Ready;
238
239	while buf.has_remaining_mut() {
240		match ready!(reader
241			.as_mut()
242			.poll_read_slice(cx, buf.chunk_mut()))
243		{
244			Ok(0) => return Ready(Err(T::Error::eof())),
245			Ok(n) => buf.advance_mut(n),
246			Err(e) if e.should_retry() => continue,
247			Err(e) => return Ready(Err(e)),
248		}
249	}
250
251	Ready(Ok(()))
252}
253
254macro_rules! forward_impl_read {
255	($to:ty) => {
256		type Error = <$to>::Error;
257
258		fn read_slice(
259			&mut self,
260			buf: &mut [u8],
261		) -> Result<usize, Self::Error> {
262			<$to>::read_slice(self, buf)
263		}
264	};
265}
266
267macro_rules! forward_impl_async_read {
268	($to:ty) => {
269		type Error = <$to>::Error;
270
271		fn poll_read_slice(
272			mut self: Pin<&mut Self>,
273			cx: &mut Context,
274			buf: &mut [u8],
275		) -> Poll<Result<usize, Self::Error>> {
276			let this = Pin::new(&mut **self);
277			<$to>::poll_read_slice(this, cx, buf)
278		}
279	};
280}
281
282impl<T: Read + ?Sized> Read for &mut T {
283	forward_impl_read!(T);
284}
285
286impl<T: AsyncRead + Unpin + ?Sized> AsyncRead for &mut T {
287	forward_impl_async_read!(T);
288}
289
290#[cfg(feature = "alloc")]
291impl<T: AsyncRead + Unpin + ?Sized> AsyncRead
292	for alloc::boxed::Box<T>
293{
294	forward_impl_async_read!(T);
295}
296
297#[cfg(feature = "alloc")]
298impl<T: Read + ?Sized> Read for alloc::boxed::Box<T> {
299	forward_impl_read!(T);
300}