1use core::{pin::Pin, task::{Context, Poll}};
2
3use crate::{mutex::Mutex, AsyncBuffer, BufferError};
4
5pub struct BufferReader<'a, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> {
7 buffer: &'a AsyncBuffer<C, T>
8}
9
10pub enum ReadSliceAsyncResult<T> {
11 Wait,
12 Ready(usize, T),
13}
14
15impl <'a, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> BufferReader<'a, C, T> {
16
17 pub(super) fn new(buffer: &'a AsyncBuffer<C, T>) -> Self {
18 Self {
19 buffer: buffer
20 }
21 }
22
23 pub fn read_slice<F, U>(&self, f: F) -> Result<U, BufferError> where F: FnOnce(&[u8]) -> (usize, U){
27 self.buffer.inner.lock_mut(|inner|{
28 let readable = inner.readable_data();
29 let (bytes_read, result) = f(readable);
30 inner.read_commit(bytes_read)
31 .map(|_| result )
32 })
33 }
34
35 pub fn read_slice_async<F, U>(&self, f: F) -> impl Future<Output = Result<U, BufferError>>
36 where F: FnMut(&[u8]) -> ReadSliceAsyncResult<U> {
37 ReadSliceAsyncFuture{
38 reader: self,
39 f
40 }
41 }
42
43 fn poll_read_slice<F, U>(&self, f: &mut F, cx:&mut Context<'_> ) -> Poll<Result<U, BufferError>>
48 where F: FnMut(&[u8]) -> ReadSliceAsyncResult<U> {
49 self.buffer.inner.lock_mut(|inner| {
50 let readable = inner.readable_data();
51 match f(readable) {
52 ReadSliceAsyncResult::Wait => {
53 inner.add_read_waker(cx);
54 Poll::Pending
55 },
56 ReadSliceAsyncResult::Ready(bytes_read, result) => Poll::Ready(
57 inner.read_commit(bytes_read).map(|_| result)
58 ),
59 }
60 })
61 }
62
63 fn read_base(&self, buf: &mut[u8]) -> Result<usize, BufferError> {
71 if buf.is_empty() {
72 return Err(BufferError::ProvidedSliceEmpty);
73 }
74
75 self.buffer.inner.lock_mut(|inner|{
76 let src = inner.readable_data();
77
78 if src.is_empty() {
79 return Err(BufferError::NoData);
80 }
81 else if src.len() > buf.len() {
82 buf.copy_from_slice(&src[0..buf.len()]);
83 inner.read_commit(buf.len()).unwrap();
84 Ok(buf.len())
85 } else {
86 let buf = &mut buf[0..src.len()];
87 buf.copy_from_slice(src);
88 let bytes_read = src.len();
89 inner.read_commit(bytes_read).unwrap();
90 Ok(bytes_read)
91 }
92 })
93 }
94
95 fn poll_read(&self, buf: &mut[u8], cx: &mut Context<'_>) -> Poll<Result<usize, BufferError>>{
97 if buf.is_empty() {
98 return Poll::Ready(Err(BufferError::ProvidedSliceEmpty));
99 }
100
101 self.buffer.inner.lock_mut(|inner|{
102 let src = inner.readable_data();
103
104 if src.is_empty() {
105 inner.add_read_waker(cx);
106 Poll::Pending
107 }
108 else if src.len() > buf.len() {
109 buf.copy_from_slice(&src[0..buf.len()]);
110 inner.read_commit(buf.len()).unwrap();
111
112 Poll::Ready(Ok(buf.len()))
113 } else {
114 let buf = &mut buf[0..src.len()];
115 buf.copy_from_slice(src);
116 let bytes_read = src.len();
117 inner.read_commit(bytes_read).unwrap();
118
119 Poll::Ready(Ok(bytes_read))
120 }
121 })
122 }
123
124 pub fn pull(&self, buf: &mut[u8]) -> impl Future<Output = Result<(), BufferError>> {
125 PullDataFuture{
126 reader: self,
127 buf: buf
128 }
129 }
130
131 fn poll_pull(&self, buf: &mut[u8], cx: &mut Context<'_>) -> Poll<Result<(), BufferError>> {
134 self.buffer.inner.lock_mut(|inner|{
135 if inner.capacity() < buf.len() {
136 Poll::Ready(Err(BufferError::NoCapacity))
137 } else if inner.len() >= buf.len() {
138 let readable = inner.readable_data();
139 let readable = &readable[..buf.len()];
140 buf.copy_from_slice(readable);
141 inner.read_commit(buf.len()).unwrap();
142 Poll::Ready(Ok(()))
143 } else {
144 inner.add_read_waker(cx);
145 Poll::Pending
146 }
147 })
148 }
149
150
151 pub fn wait_for_new_data<'b>(&'b self) -> NewDataFuture<'a, 'b, C, T> {
152 self.buffer.inner.lock(|inner|{
153 NewDataFuture{
154 reader: self,
155 old_len: inner.len()
156 }
157 })
158 }
159
160}
161
162pub struct ReadSliceAsyncFuture<'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>, F, U> where F: FnMut(&[u8]) -> ReadSliceAsyncResult<U> {
163 reader: &'b BufferReader<'a, C, T>,
164 f: F
165}
166
167impl <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>, F, U> Unpin for ReadSliceAsyncFuture<'a, 'b, C, T, F, U> where F: FnMut(&[u8]) -> ReadSliceAsyncResult<U> {}
168
169impl <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>, F, U> Future for ReadSliceAsyncFuture<'a, 'b, C, T, F, U> where F: FnMut(&[u8]) -> ReadSliceAsyncResult<U> {
170 type Output = Result<U, BufferError>;
171
172 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
173 self.reader.poll_read_slice(&mut self.f, cx)
174 }
175}
176
177pub struct NewDataFuture <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> {
178 reader: &'b BufferReader<'a, C, T>,
179 old_len: usize
180}
181
182impl <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> Unpin for NewDataFuture<'a, 'b, C, T> {}
183
184impl <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> Future for NewDataFuture<'a, 'b, C, T> {
185 type Output = Result<(), BufferError>;
186
187 fn poll(self: core::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
188 self.reader.buffer.inner.lock_mut(|inner|{
189 if self.old_len < inner.len() {
190 Poll::Ready(Ok(()))
191 } else if inner.capacity() <= self.old_len {
192 Poll::Ready(Err(BufferError::NoCapacity))
193 } else {
194 inner.add_read_waker(cx);
195 Poll::Pending
196 }
197 })
198 }
199}
200
201pub struct PullDataFuture <'a, 'b, 'c, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> {
202 reader: &'b BufferReader<'a, C, T>,
203 buf: &'c mut [u8]
204}
205
206impl <'a, 'b, 'c, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> Unpin for PullDataFuture<'a, 'b, 'c, C, T> {}
207
208impl <'a, 'b, 'c, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> Future for PullDataFuture<'a, 'b, 'c, C, T> {
209 type Output = Result<(), BufferError>;
210
211 fn poll(mut self: core::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
212 self.reader.poll_pull(self.buf, cx)
213 }
214}
215
216#[cfg(feature = "embedded")]
217impl <'a, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> embedded_io::ErrorType for BufferReader<'a, C, T> {
218 type Error = embedded_io::ErrorKind;
219}
220
221#[cfg(feature = "embedded")]
222pub struct EmbeddedReadFuture <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> {
223 reader: &'a BufferReader<'a, C, T>,
224 buf: &'b mut [u8]
225}
226
227#[cfg(feature = "embedded")]
228impl <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> Unpin for EmbeddedReadFuture<'a, 'b, C, T> {}
229
230#[cfg(feature = "embedded")]
231impl <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> Future for EmbeddedReadFuture<'a, 'b, C, T> {
232 type Output = Result<usize, embedded_io::ErrorKind>;
233
234 fn poll(mut self: core::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
235 self.reader.poll_read(&mut self.buf, cx)
236 .map(|err| match err {
237 Ok(n) => Ok(n),
238 Err(BufferError::ProvidedSliceEmpty) => Ok(0),
239 Err(err) => panic!("unexpected err returned from poll_read(): {}", err)
240 })
241 }
242}
243
244#[cfg(feature = "embedded")]
245impl <'a, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> embedded_io_async::Read for BufferReader<'a, C, T> {
246 async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
247 let f = EmbeddedReadFuture{
248 reader: self,
249 buf: buf
250 };
251 f.await
252 }
253}
254
255#[cfg(feature = "std")]
256impl <'a, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> std::io::Read for BufferReader<'a, C, T> {
257
258 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
259 use std::io::ErrorKind;
260
261 match self.read_base(buf) {
262 Ok(n) => Ok(n),
263 Err(BufferError::ProvidedSliceEmpty) => Ok(0),
264 Err(BufferError::NoData) => Err(ErrorKind::WouldBlock.into()),
265 Err(e) => {
266 panic!("unexpected error reading from buffer: {}", e);
267 }
268 }
269 }
270}