embytes_buffer_async/
reader.rs

1use core::{pin::Pin, task::{Context, Poll}};
2
3use crate::{mutex::Mutex, AsyncBuffer, BufferError};
4
5/// A type to read from an [`AsyncBuffer`]
6pub struct BufferReader<'a, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> {
7    buffer: &'a AsyncBuffer<C, T>
8}
9
10pub enum ReadSliceAsyncResult<T> {
11    Wait,
12    Ready(usize, T),
13}
14
15impl <'a, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> BufferReader<'a, C, T> {
16
17    pub(super) fn new(buffer: &'a AsyncBuffer<C, T>) -> Self {
18        Self {
19            buffer: buffer
20        }
21    }
22
23    /// Ready from the currently available data in the [`BufferReader`].
24    /// The readable part of the buffer is passed to `f`. 
25    /// `f` returns the number of bytes read and a result that is passed back to the caller.
26    pub fn read_slice<F, U>(&self, f: F) -> Result<U, BufferError> where F: FnOnce(&[u8]) -> (usize, U){
27        self.buffer.inner.lock_mut(|inner|{
28            let readable = inner.readable_data();
29            let (bytes_read, result) = f(readable);
30            inner.read_commit(bytes_read)
31                .map(|_| result )
32        })
33    }
34
35    pub fn read_slice_async<F, U>(&self, f: F) -> impl Future<Output = Result<U, BufferError>> 
36    where F: FnMut(&[u8]) -> ReadSliceAsyncResult<U> {
37        ReadSliceAsyncFuture{
38            reader: self,
39            f
40        }
41    }
42
43    /// A poll method to implement a `ReadSliceAsync`-[`Future`]
44    /// The readable part of the buffer is passed to `f`. 
45    /// `f` returns [`ReadSliceAsyncResult::Wait`] if the function should wait for new data.
46    /// `f` returns [`ReadSliceAsyncResult::Ready`] the number of bytes read and a result that is passed back to the caller.
47    fn poll_read_slice<F, U>(&self, f: &mut F, cx:&mut Context<'_> ) -> Poll<Result<U, BufferError>> 
48    where F: FnMut(&[u8]) -> ReadSliceAsyncResult<U> {
49        self.buffer.inner.lock_mut(|inner| {
50            let readable = inner.readable_data();
51            match f(readable) {
52                ReadSliceAsyncResult::Wait => {
53                    inner.add_read_waker(cx);
54                    Poll::Pending
55                },
56                ReadSliceAsyncResult::Ready(bytes_read, result) => Poll::Ready(
57                    inner.read_commit(bytes_read).map(|_| result)
58                ),
59            }
60        })
61    }
62
63    /// Base function for implementing readers like [`embedded_io::Read`]
64    /// Returns the number of bytes read from the buffer to the provided slice
65    /// 
66    /// # Errors
67    /// 
68    /// [`BufferError::ProvidedSliceEmpty`] if the provided slice is empty
69    /// [`BufferError::NoData`] if there ae no bytes to read
70    fn read_base(&self, buf: &mut[u8]) -> Result<usize, BufferError> {
71        if buf.is_empty() {
72            return Err(BufferError::ProvidedSliceEmpty);
73        }
74
75        self.buffer.inner.lock_mut(|inner|{
76            let src = inner.readable_data();
77
78            if src.is_empty() {
79                return Err(BufferError::NoData);
80            }
81            else if src.len() > buf.len() {
82                buf.copy_from_slice(&src[0..buf.len()]);
83                inner.read_commit(buf.len()).unwrap();
84                Ok(buf.len())
85            } else {
86                let buf = &mut buf[0..src.len()];
87                buf.copy_from_slice(src);
88                let bytes_read = src.len();
89                inner.read_commit(bytes_read).unwrap();
90                Ok(bytes_read)
91            }
92        })
93    }
94
95    /// Base function to implement async read functionality like [`embedded_io_async::Read`]
96    fn poll_read(&self, buf: &mut[u8], cx: &mut Context<'_>) -> Poll<Result<usize, BufferError>>{
97        if buf.is_empty() {
98            return Poll::Ready(Err(BufferError::ProvidedSliceEmpty));
99        }
100
101        self.buffer.inner.lock_mut(|inner|{
102            let src = inner.readable_data();
103
104            if src.is_empty() {
105                inner.add_read_waker(cx);
106                Poll::Pending
107            }
108            else if src.len() > buf.len() {
109                buf.copy_from_slice(&src[0..buf.len()]);
110                inner.read_commit(buf.len()).unwrap();
111
112                Poll::Ready(Ok(buf.len()))
113            } else {
114                let buf = &mut buf[0..src.len()];
115                buf.copy_from_slice(src);
116                let bytes_read = src.len();
117                inner.read_commit(bytes_read).unwrap();
118
119                Poll::Ready(Ok(bytes_read))
120            }
121        })
122    }
123
124    pub fn pull(&self, buf: &mut[u8]) -> impl Future<Output = Result<(), BufferError>> {
125        PullDataFuture{
126            reader: self,
127            buf: buf
128        }
129    }
130
131    /// Base poll function to implement a pull future
132    /// a pull future wait until `buf.len()` bytes are available
133    fn poll_pull(&self, buf: &mut[u8], cx: &mut Context<'_>) -> Poll<Result<(), BufferError>> {
134        self.buffer.inner.lock_mut(|inner|{
135            if inner.capacity() < buf.len() {
136                return Poll::Ready(Err(BufferError::NoCapacity));
137            }
138            
139            let readable = inner.readable_data();
140            if readable.len() >= buf.len() {
141                let readable = &readable[..buf.len()];
142                buf.copy_from_slice(readable);
143                inner.read_commit(buf.len()).unwrap();
144                Poll::Ready(Ok(()))
145            } else {
146                // println!("poll_pull: waiting: {} bytes readable but {} bytes required", readable.len(), buf.len());
147                inner.add_read_waker(cx);
148                Poll::Pending
149            }
150        })
151    }
152
153    
154    pub fn wait_for_new_data<'b>(&'b self) -> NewDataFuture<'a, 'b, C, T> {
155        self.buffer.inner.lock(|inner|{
156            NewDataFuture{
157                reader: self,
158                old_len: inner.len()
159            }
160        })
161    }
162
163    pub fn reset(&self) {
164        self.buffer.inner.lock_mut(|inner| inner.reset());
165    }
166    
167}
168
169pub struct ReadSliceAsyncFuture<'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>, F, U> where F: FnMut(&[u8]) -> ReadSliceAsyncResult<U> {
170    reader: &'b BufferReader<'a, C, T>,
171    f: F
172}
173
174impl <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>, F, U> Unpin for ReadSliceAsyncFuture<'a, 'b, C, T, F, U> where F: FnMut(&[u8]) -> ReadSliceAsyncResult<U> {}
175
176impl <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>, F, U> Future for ReadSliceAsyncFuture<'a, 'b, C, T, F, U> where F: FnMut(&[u8]) -> ReadSliceAsyncResult<U> {
177    type Output = Result<U, BufferError>;
178
179    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
180        self.reader.poll_read_slice(&mut self.f, cx)
181    }
182}
183
184pub struct NewDataFuture <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> {
185    reader: &'b BufferReader<'a, C, T>,
186    old_len: usize
187}
188
189impl <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> Unpin for NewDataFuture<'a, 'b, C, T> {}
190
191impl <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> Future for NewDataFuture<'a, 'b, C, T> {
192    type Output = Result<(), BufferError>;
193
194    fn poll(self: core::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
195        self.reader.buffer.inner.lock_mut(|inner|{
196            if self.old_len < inner.len() {
197                Poll::Ready(Ok(()))
198            } else if inner.capacity() <= self.old_len {
199                Poll::Ready(Err(BufferError::NoCapacity))
200            } else {
201                inner.add_read_waker(cx);
202                Poll::Pending
203            }
204        })
205    }
206}
207
208pub struct PullDataFuture <'a, 'b, 'c, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> {
209    reader: &'b BufferReader<'a, C, T>,
210    buf: &'c mut [u8]
211}
212
213impl <'a, 'b, 'c, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> Unpin for PullDataFuture<'a, 'b, 'c, C, T> {}
214
215impl <'a, 'b, 'c, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> Future for PullDataFuture<'a, 'b, 'c, C, T> {
216    type Output = Result<(), BufferError>;
217
218    fn poll(mut self: core::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
219        self.reader.poll_pull(self.buf, cx)
220    }
221}
222
223#[cfg(feature = "embedded")]
224impl <'a, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> embedded_io::ErrorType for BufferReader<'a, C, T> {
225    type Error = embedded_io::ErrorKind;
226}
227
228#[cfg(feature = "embedded")]
229pub struct EmbeddedReadFuture <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> {
230    reader: &'a BufferReader<'a, C, T>,
231    buf: &'b mut [u8]
232}
233
234#[cfg(feature = "embedded")]
235impl <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> Unpin for EmbeddedReadFuture<'a, 'b, C, T> {}
236
237#[cfg(feature = "embedded")]
238impl <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> Future for EmbeddedReadFuture<'a, 'b, C, T> {
239    type Output = Result<usize, embedded_io::ErrorKind>;
240
241    fn poll(mut self: core::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
242        self.reader.poll_read(&mut self.buf, cx)
243            .map(|err| match err {
244                Ok(n) => Ok(n),
245                Err(BufferError::ProvidedSliceEmpty) => Ok(0),
246                Err(err) => panic!("unexpected err returned from poll_read(): {}", err)
247            })
248    }
249}
250
251#[cfg(feature = "embedded")]
252impl <'a, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> embedded_io_async::Read for BufferReader<'a, C, T> {
253    async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
254        let f = EmbeddedReadFuture{
255            reader: self,
256            buf: buf
257        };
258        f.await
259    }
260}
261
262#[cfg(feature = "std")]
263impl <'a, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> std::io::Read for BufferReader<'a, C, T> {
264    
265    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
266        use std::io::ErrorKind;
267
268        match self.read_base(buf) {
269            Ok(n) => Ok(n),
270            Err(BufferError::ProvidedSliceEmpty) => Ok(0),
271            Err(BufferError::NoData) => Err(ErrorKind::WouldBlock.into()),
272            Err(e) => {
273                panic!("unexpected error reading from buffer: {}", e);
274            }
275        }
276    }
277}