embytes_buffer_async/
reader.rs

1use core::{pin::Pin, task::{Context, Poll}};
2
3use crate::{mutex::Mutex, AsyncBuffer, BufferError};
4
5/// A type to read from an [`AsyncBuffer`]
6pub struct BufferReader<'a, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> {
7    buffer: &'a AsyncBuffer<C, T>
8}
9
10pub enum ReadSliceAsyncResult<T> {
11    Wait,
12    Ready(usize, T),
13}
14
15impl <'a, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> BufferReader<'a, C, T> {
16
17    pub(super) fn new(buffer: &'a AsyncBuffer<C, T>) -> Self {
18        Self {
19            buffer: buffer
20        }
21    }
22
23    /// Ready from the currently available data in the [`BufferReader`].
24    /// The readable part of the buffer is passed to `f`. 
25    /// `f` returns the number of bytes read and a result that is passed back to the caller.
26    pub fn read_slice<F, U>(&self, f: F) -> Result<U, BufferError> where F: FnOnce(&[u8]) -> (usize, U){
27        self.buffer.inner.lock_mut(|inner|{
28            let readable = inner.readable_data();
29            let (bytes_read, result) = f(readable);
30            inner.read_commit(bytes_read)
31                .map(|_| result )
32        })
33    }
34
35    pub fn read_slice_async<F, U>(&self, f: F) -> impl Future<Output = Result<U, BufferError>> 
36    where F: FnMut(&[u8]) -> ReadSliceAsyncResult<U> {
37        ReadSliceAsyncFuture{
38            reader: self,
39            f
40        }
41    }
42
43    /// A poll method to implement a `ReadSliceAsync`-[`Future`]
44    /// The readable part of the buffer is passed to `f`. 
45    /// `f` returns [`ReadSliceAsyncResult::Wait`] if the function should wait for new data.
46    /// `f` returns [`ReadSliceAsyncResult::Ready`] the number of bytes read and a result that is passed back to the caller.
47    fn poll_read_slice<F, U>(&self, f: &mut F, cx:&mut Context<'_> ) -> Poll<Result<U, BufferError>> 
48    where F: FnMut(&[u8]) -> ReadSliceAsyncResult<U> {
49        self.buffer.inner.lock_mut(|inner| {
50            let readable = inner.readable_data();
51            match f(readable) {
52                ReadSliceAsyncResult::Wait => {
53                    inner.add_read_waker(cx);
54                    Poll::Pending
55                },
56                ReadSliceAsyncResult::Ready(bytes_read, result) => Poll::Ready(
57                    inner.read_commit(bytes_read).map(|_| result)
58                ),
59            }
60        })
61    }
62
63    /// Base function for implementing readers like [`embedded_io::Read`]
64    /// Returns the number of bytes read from the buffer to the provided slice
65    /// 
66    /// # Errors
67    /// 
68    /// [`BufferError::ProvidedSliceEmpty`] if the provided slice is empty
69    /// [`BufferError::NoData`] if there ae no bytes to read
70    fn read_base(&self, buf: &mut[u8]) -> Result<usize, BufferError> {
71        if buf.is_empty() {
72            return Err(BufferError::ProvidedSliceEmpty);
73        }
74
75        self.buffer.inner.lock_mut(|inner|{
76            let src = inner.readable_data();
77
78            if src.is_empty() {
79                return Err(BufferError::NoData);
80            }
81            else if src.len() > buf.len() {
82                buf.copy_from_slice(&src[0..buf.len()]);
83                inner.read_commit(buf.len()).unwrap();
84                Ok(buf.len())
85            } else {
86                let buf = &mut buf[0..src.len()];
87                buf.copy_from_slice(src);
88                let bytes_read = src.len();
89                inner.read_commit(bytes_read).unwrap();
90                Ok(bytes_read)
91            }
92        })
93    }
94
95    /// Base function to implement async read functionality like [`embedded_io_async::Read`]
96    fn poll_read(&self, buf: &mut[u8], cx: &mut Context<'_>) -> Poll<Result<usize, BufferError>>{
97        if buf.is_empty() {
98            return Poll::Ready(Err(BufferError::ProvidedSliceEmpty));
99        }
100
101        self.buffer.inner.lock_mut(|inner|{
102            let src = inner.readable_data();
103
104            if src.is_empty() {
105                inner.add_read_waker(cx);
106                Poll::Pending
107            }
108            else if src.len() > buf.len() {
109                buf.copy_from_slice(&src[0..buf.len()]);
110                inner.read_commit(buf.len()).unwrap();
111
112                Poll::Ready(Ok(buf.len()))
113            } else {
114                let buf = &mut buf[0..src.len()];
115                buf.copy_from_slice(src);
116                let bytes_read = src.len();
117                inner.read_commit(bytes_read).unwrap();
118
119                Poll::Ready(Ok(bytes_read))
120            }
121        })
122    }
123
124    pub fn pull(&self, buf: &mut[u8]) -> impl Future<Output = Result<(), BufferError>> {
125        PullDataFuture{
126            reader: self,
127            buf: buf
128        }
129    }
130
131    /// Base poll function to implement a pull future
132    /// a pull future wait until `buf.len()` bytes are available
133    fn poll_pull(&self, buf: &mut[u8], cx: &mut Context<'_>) -> Poll<Result<(), BufferError>> {
134        self.buffer.inner.lock_mut(|inner|{
135            if inner.capacity() < buf.len() {
136                Poll::Ready(Err(BufferError::NoCapacity))
137            } else if inner.len() >= buf.len() {
138                let readable = inner.readable_data();
139                let readable = &readable[..buf.len()];
140                buf.copy_from_slice(readable);
141                inner.read_commit(buf.len()).unwrap();
142                Poll::Ready(Ok(()))
143            } else {
144                inner.add_read_waker(cx);
145                Poll::Pending
146            }
147        })
148    }
149
150    
151    pub fn wait_for_new_data<'b>(&'b self) -> NewDataFuture<'a, 'b, C, T> {
152        self.buffer.inner.lock(|inner|{
153            NewDataFuture{
154                reader: self,
155                old_len: inner.len()
156            }
157        })
158    }
159    
160}
161
162pub struct ReadSliceAsyncFuture<'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>, F, U> where F: FnMut(&[u8]) -> ReadSliceAsyncResult<U> {
163    reader: &'b BufferReader<'a, C, T>,
164    f: F
165}
166
167impl <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>, F, U> Unpin for ReadSliceAsyncFuture<'a, 'b, C, T, F, U> where F: FnMut(&[u8]) -> ReadSliceAsyncResult<U> {}
168
169impl <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>, F, U> Future for ReadSliceAsyncFuture<'a, 'b, C, T, F, U> where F: FnMut(&[u8]) -> ReadSliceAsyncResult<U> {
170    type Output = Result<U, BufferError>;
171
172    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
173        self.reader.poll_read_slice(&mut self.f, cx)
174    }
175}
176
177pub struct NewDataFuture <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> {
178    reader: &'b BufferReader<'a, C, T>,
179    old_len: usize
180}
181
182impl <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> Unpin for NewDataFuture<'a, 'b, C, T> {}
183
184impl <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> Future for NewDataFuture<'a, 'b, C, T> {
185    type Output = Result<(), BufferError>;
186
187    fn poll(self: core::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
188        self.reader.buffer.inner.lock_mut(|inner|{
189            if self.old_len < inner.len() {
190                Poll::Ready(Ok(()))
191            } else if inner.capacity() <= self.old_len {
192                Poll::Ready(Err(BufferError::NoCapacity))
193            } else {
194                inner.add_read_waker(cx);
195                Poll::Pending
196            }
197        })
198    }
199}
200
201pub struct PullDataFuture <'a, 'b, 'c, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> {
202    reader: &'b BufferReader<'a, C, T>,
203    buf: &'c mut [u8]
204}
205
206impl <'a, 'b, 'c, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> Unpin for PullDataFuture<'a, 'b, 'c, C, T> {}
207
208impl <'a, 'b, 'c, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> Future for PullDataFuture<'a, 'b, 'c, C, T> {
209    type Output = Result<(), BufferError>;
210
211    fn poll(mut self: core::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
212        self.reader.poll_pull(self.buf, cx)
213    }
214}
215
216#[cfg(feature = "embedded")]
217impl <'a, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> embedded_io::ErrorType for BufferReader<'a, C, T> {
218    type Error = embedded_io::ErrorKind;
219}
220
221#[cfg(feature = "embedded")]
222pub struct EmbeddedReadFuture <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> {
223    reader: &'a BufferReader<'a, C, T>,
224    buf: &'b mut [u8]
225}
226
227#[cfg(feature = "embedded")]
228impl <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> Unpin for EmbeddedReadFuture<'a, 'b, C, T> {}
229
230#[cfg(feature = "embedded")]
231impl <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> Future for EmbeddedReadFuture<'a, 'b, C, T> {
232    type Output = Result<usize, embedded_io::ErrorKind>;
233
234    fn poll(mut self: core::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
235        self.reader.poll_read(&mut self.buf, cx)
236            .map(|err| match err {
237                Ok(n) => Ok(n),
238                Err(BufferError::ProvidedSliceEmpty) => Ok(0),
239                Err(err) => panic!("unexpected err returned from poll_read(): {}", err)
240            })
241    }
242}
243
244#[cfg(feature = "embedded")]
245impl <'a, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> embedded_io_async::Read for BufferReader<'a, C, T> {
246    async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
247        let f = EmbeddedReadFuture{
248            reader: self,
249            buf: buf
250        };
251        f.await
252    }
253}
254
255#[cfg(feature = "std")]
256impl <'a, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> std::io::Read for BufferReader<'a, C, T> {
257    
258    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
259        use std::io::ErrorKind;
260
261        match self.read_base(buf) {
262            Ok(n) => Ok(n),
263            Err(BufferError::ProvidedSliceEmpty) => Ok(0),
264            Err(BufferError::NoData) => Err(ErrorKind::WouldBlock.into()),
265            Err(e) => {
266                panic!("unexpected error reading from buffer: {}", e);
267            }
268        }
269    }
270}