1use core::{pin::Pin, task::{Context, Poll}};
2
3use crate::{mutex::Mutex, AsyncBuffer, BufferError};
4
5pub struct BufferReader<'a, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> {
7 buffer: &'a AsyncBuffer<C, T>
8}
9
10pub enum ReadSliceAsyncResult<T> {
11 Wait,
12 Ready(usize, T),
13}
14
15impl <'a, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> BufferReader<'a, C, T> {
16
17 pub(super) fn new(buffer: &'a AsyncBuffer<C, T>) -> Self {
18 Self {
19 buffer: buffer
20 }
21 }
22
23 pub fn read_slice<F, U>(&self, f: F) -> Result<U, BufferError> where F: FnOnce(&[u8]) -> (usize, U){
27 self.buffer.inner.lock_mut(|inner|{
28 let readable = inner.readable_data();
29 let (bytes_read, result) = f(readable);
30 inner.read_commit(bytes_read)
31 .map(|_| result )
32 })
33 }
34
35 pub fn read_slice_async<F, U>(&self, f: F) -> impl Future<Output = Result<U, BufferError>>
36 where F: FnMut(&[u8]) -> ReadSliceAsyncResult<U> {
37 ReadSliceAsyncFuture{
38 reader: self,
39 f
40 }
41 }
42
43 fn poll_read_slice<F, U>(&self, f: &mut F, cx:&mut Context<'_> ) -> Poll<Result<U, BufferError>>
48 where F: FnMut(&[u8]) -> ReadSliceAsyncResult<U> {
49 self.buffer.inner.lock_mut(|inner| {
50 let readable = inner.readable_data();
51 match f(readable) {
52 ReadSliceAsyncResult::Wait => {
53 inner.add_read_waker(cx);
54 Poll::Pending
55 },
56 ReadSliceAsyncResult::Ready(bytes_read, result) => Poll::Ready(
57 inner.read_commit(bytes_read).map(|_| result)
58 ),
59 }
60 })
61 }
62
63 fn read_base(&self, buf: &mut[u8]) -> Result<usize, BufferError> {
71 if buf.is_empty() {
72 return Err(BufferError::ProvidedSliceEmpty);
73 }
74
75 self.buffer.inner.lock_mut(|inner|{
76 let src = inner.readable_data();
77
78 if src.is_empty() {
79 return Err(BufferError::NoData);
80 }
81 else if src.len() > buf.len() {
82 buf.copy_from_slice(&src[0..buf.len()]);
83 inner.read_commit(buf.len()).unwrap();
84 Ok(buf.len())
85 } else {
86 let buf = &mut buf[0..src.len()];
87 buf.copy_from_slice(src);
88 let bytes_read = src.len();
89 inner.read_commit(bytes_read).unwrap();
90 Ok(bytes_read)
91 }
92 })
93 }
94
95 fn poll_read(&self, buf: &mut[u8], cx: &mut Context<'_>) -> Poll<Result<usize, BufferError>>{
97 if buf.is_empty() {
98 return Poll::Ready(Err(BufferError::ProvidedSliceEmpty));
99 }
100
101 self.buffer.inner.lock_mut(|inner|{
102 let src = inner.readable_data();
103
104 if src.is_empty() {
105 inner.add_read_waker(cx);
106 Poll::Pending
107 }
108 else if src.len() > buf.len() {
109 buf.copy_from_slice(&src[0..buf.len()]);
110 inner.read_commit(buf.len()).unwrap();
111
112 Poll::Ready(Ok(buf.len()))
113 } else {
114 let buf = &mut buf[0..src.len()];
115 buf.copy_from_slice(src);
116 let bytes_read = src.len();
117 inner.read_commit(bytes_read).unwrap();
118
119 Poll::Ready(Ok(bytes_read))
120 }
121 })
122 }
123
124 pub fn pull(&self, buf: &mut[u8]) -> impl Future<Output = Result<(), BufferError>> {
125 PullDataFuture{
126 reader: self,
127 buf: buf
128 }
129 }
130
131 fn poll_pull(&self, buf: &mut[u8], cx: &mut Context<'_>) -> Poll<Result<(), BufferError>> {
134 self.buffer.inner.lock_mut(|inner|{
135 if inner.capacity() < buf.len() {
136 return Poll::Ready(Err(BufferError::NoCapacity));
137 }
138
139 let readable = inner.readable_data();
140 if readable.len() >= buf.len() {
141 let readable = &readable[..buf.len()];
142 buf.copy_from_slice(readable);
143 inner.read_commit(buf.len()).unwrap();
144 Poll::Ready(Ok(()))
145 } else {
146 inner.add_read_waker(cx);
148 Poll::Pending
149 }
150 })
151 }
152
153
154 pub fn wait_for_new_data<'b>(&'b self) -> NewDataFuture<'a, 'b, C, T> {
155 self.buffer.inner.lock(|inner|{
156 NewDataFuture{
157 reader: self,
158 old_len: inner.len()
159 }
160 })
161 }
162
163 pub fn reset(&self) {
164 self.buffer.inner.lock_mut(|inner| inner.reset());
165 }
166
167}
168
169pub struct ReadSliceAsyncFuture<'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>, F, U> where F: FnMut(&[u8]) -> ReadSliceAsyncResult<U> {
170 reader: &'b BufferReader<'a, C, T>,
171 f: F
172}
173
174impl <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>, F, U> Unpin for ReadSliceAsyncFuture<'a, 'b, C, T, F, U> where F: FnMut(&[u8]) -> ReadSliceAsyncResult<U> {}
175
176impl <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>, F, U> Future for ReadSliceAsyncFuture<'a, 'b, C, T, F, U> where F: FnMut(&[u8]) -> ReadSliceAsyncResult<U> {
177 type Output = Result<U, BufferError>;
178
179 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
180 self.reader.poll_read_slice(&mut self.f, cx)
181 }
182}
183
184pub struct NewDataFuture <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> {
185 reader: &'b BufferReader<'a, C, T>,
186 old_len: usize
187}
188
189impl <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> Unpin for NewDataFuture<'a, 'b, C, T> {}
190
191impl <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> Future for NewDataFuture<'a, 'b, C, T> {
192 type Output = Result<(), BufferError>;
193
194 fn poll(self: core::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
195 self.reader.buffer.inner.lock_mut(|inner|{
196 if self.old_len < inner.len() {
197 Poll::Ready(Ok(()))
198 } else if inner.capacity() <= self.old_len {
199 Poll::Ready(Err(BufferError::NoCapacity))
200 } else {
201 inner.add_read_waker(cx);
202 Poll::Pending
203 }
204 })
205 }
206}
207
208pub struct PullDataFuture <'a, 'b, 'c, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> {
209 reader: &'b BufferReader<'a, C, T>,
210 buf: &'c mut [u8]
211}
212
213impl <'a, 'b, 'c, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> Unpin for PullDataFuture<'a, 'b, 'c, C, T> {}
214
215impl <'a, 'b, 'c, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> Future for PullDataFuture<'a, 'b, 'c, C, T> {
216 type Output = Result<(), BufferError>;
217
218 fn poll(mut self: core::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
219 self.reader.poll_pull(self.buf, cx)
220 }
221}
222
223#[cfg(feature = "embedded")]
224impl <'a, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> embedded_io::ErrorType for BufferReader<'a, C, T> {
225 type Error = embedded_io::ErrorKind;
226}
227
228#[cfg(feature = "embedded")]
229pub struct EmbeddedReadFuture <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> {
230 reader: &'a BufferReader<'a, C, T>,
231 buf: &'b mut [u8]
232}
233
234#[cfg(feature = "embedded")]
235impl <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> Unpin for EmbeddedReadFuture<'a, 'b, C, T> {}
236
237#[cfg(feature = "embedded")]
238impl <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> Future for EmbeddedReadFuture<'a, 'b, C, T> {
239 type Output = Result<usize, embedded_io::ErrorKind>;
240
241 fn poll(mut self: core::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
242 self.reader.poll_read(&mut self.buf, cx)
243 .map(|err| match err {
244 Ok(n) => Ok(n),
245 Err(BufferError::ProvidedSliceEmpty) => Ok(0),
246 Err(err) => panic!("unexpected err returned from poll_read(): {}", err)
247 })
248 }
249}
250
251#[cfg(feature = "embedded")]
252impl <'a, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> embedded_io_async::Read for BufferReader<'a, C, T> {
253 async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
254 let f = EmbeddedReadFuture{
255 reader: self,
256 buf: buf
257 };
258 f.await
259 }
260}
261
262#[cfg(feature = "std")]
263impl <'a, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> std::io::Read for BufferReader<'a, C, T> {
264
265 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
266 use std::io::ErrorKind;
267
268 match self.read_base(buf) {
269 Ok(n) => Ok(n),
270 Err(BufferError::ProvidedSliceEmpty) => Ok(0),
271 Err(BufferError::NoData) => Err(ErrorKind::WouldBlock.into()),
272 Err(e) => {
273 panic!("unexpected error reading from buffer: {}", e);
274 }
275 }
276 }
277}