embytes_buffer_async/
writer.rs

1
2use core::{pin::Pin, task::{Context, Poll}};
3
4use crate::{mutex::Mutex, AsyncBuffer, BufferError};
5
6pub enum WriteSliceAsyncResult {
7    Wait,
8    Ready(usize),
9}
10
11pub struct BufferWriter <'a, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> {
12    buffer: &'a AsyncBuffer<C, T>
13}
14 
15impl <'a, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> BufferWriter<'a, C, T> {
16
17    pub(crate) fn new(buffer: &'a AsyncBuffer<C, T>) -> Self {
18        Self {
19            buffer: buffer
20        }
21    }
22
23    /// Base function for implementing writers like [`embedded_io::Write`]
24    /// Returns the number of bytes writen to the buffer from the provided slice
25    /// 
26    /// # Errors
27    /// 
28    /// [`BufferError::ProvidedSliceEmpty`] if the provided slice is empty
29    /// [`BufferError::NoCapacity`] if the buffer has no capacity after calling [`Buffer::shift`]
30    fn write_base(&mut self, buf: &[u8]) -> Result<usize, BufferError> {
31        if buf.is_empty() {
32            return Err(BufferError::ProvidedSliceEmpty);
33        }
34
35        self.buffer.inner.lock_mut(|inner|{
36            let cap = inner.maybe_shift();
37
38            if cap == 0 {
39                return Err(BufferError::NoCapacity);
40            }
41
42            let tgt = inner.writeable_data();
43            
44            if cap < buf.len() {
45                tgt.copy_from_slice(&buf[0..cap]);
46                inner.write_commit(cap)?;
47                Ok(cap)
48            } else {
49                let tgt = &mut tgt[0..buf.len()];
50                tgt.copy_from_slice(buf);
51                inner.write_commit(cap)?;
52                Ok(buf.len())
53            }
54        })
55    }
56
57    fn poll_write(&self, buf: &[u8], cx: &mut Context<'_>) -> Poll<Result<usize, BufferError>> {
58        if buf.is_empty() {
59            return Poll::Ready(Err(BufferError::ProvidedSliceEmpty));
60        }
61
62        self.buffer.inner.lock_mut(|inner|{
63            let cap = match inner.poll_ensure_capacity(cx, 1) {
64                Poll::Pending => return Poll::Pending,
65                Poll::Ready(Ok(n)) => n,
66                Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
67            };
68
69            let tgt = inner.writeable_data();
70            
71            if cap < buf.len() {
72                tgt.copy_from_slice(&buf[0..cap]);
73                inner.write_commit(cap).unwrap();
74                Poll::Ready(Ok(cap))
75            } else {
76                let tgt = &mut tgt[0..buf.len()];
77                tgt.copy_from_slice(buf);
78                inner.write_commit(buf.len()).unwrap();
79                Poll::Ready(Ok(buf.len()))
80            }
81        })
82    }
83
84    pub fn push(&self, data: &[u8]) -> Result<(), BufferError> {
85        self.buffer.inner.lock_mut(|inner| {
86            if inner.has_capacity(data.len()) {
87                let tgt = inner.writeable_data();
88                let tgt = &mut tgt[..data.len()];
89                tgt.copy_from_slice(data);
90                inner.write_commit(data.len())
91                    .expect("must not throw because capacity is checked before");
92                Ok(())
93            } else {
94                Err(BufferError::NoCapacity)
95            }
96        })
97    }
98
99    pub fn push_async<'b, 'c>(&'b self, data: &'c [u8]) -> PushFuture<'a, 'b, 'c, C, T> {
100        PushFuture {
101            writer: self,
102            data: data
103        }
104    } 
105
106    fn poll_push(&self, data: &[u8], cx: &mut Context<'_>) -> Poll<Result<(), BufferError>> {
107        self.buffer.inner.lock_mut(|inner| {
108            if data.len() > inner.capacity() {
109                return Poll::Ready(Err(BufferError::NoCapacity));
110            }
111            
112            inner.shift();
113
114            let writeable = inner.writeable_data();
115            if writeable.len() < data.len() {
116                // println!("poll_push: waiting: {} bytes writeable but {} bytes of data", writeable.len(), data.len());
117                inner.add_write_waker(cx);
118                Poll::Pending
119            } else {
120                writeable[..data.len()].copy_from_slice(data);
121                inner.write_commit(data.len()).unwrap();
122                Poll::Ready(Ok(()))
123            }
124        })
125    }
126
127    pub fn write_slice<F>(&self, f: F) -> Result<usize, BufferError> where F: FnOnce(&mut [u8]) -> usize {
128        self.buffer.inner.lock_mut(|inner| {
129            inner.shift();
130            let writeable = inner.writeable_data();
131            let bytes_written = f(writeable);
132            if bytes_written > writeable.len() {
133                Err(BufferError::NoCapacity)
134            } else {
135                inner.write_commit(bytes_written).unwrap();
136                Ok(bytes_written)
137            }
138        })
139    }
140
141    pub fn write_slice_async<F>(&self, f: F) -> impl Future<Output = Result<usize, BufferError>> where F: FnMut(&mut [u8]) -> WriteSliceAsyncResult{
142        WriteSliceAsyncFuture{
143            writer: self,
144            f: f
145        }
146    }
147
148    fn poll_write_slice<F>(&self, f: &mut F, cx: &mut Context<'_>) -> Poll<Result<usize, BufferError>> where F: FnMut(&mut [u8]) -> WriteSliceAsyncResult {
149        self.buffer.inner.lock_mut(|inner| {
150            inner.shift();
151            let writeable = inner.writeable_data();
152            match f(writeable) {
153                WriteSliceAsyncResult::Wait => if writeable.len() < inner.capacity() {
154                        inner.add_write_waker(cx);
155                        Poll::Pending
156                    } else {
157                        Poll::Ready(Err(BufferError::NoCapacity))
158                    },
159                WriteSliceAsyncResult::Ready(bytes_written) if bytes_written > writeable.len() => {
160                    Poll::Ready(Err(BufferError::NoCapacity))
161                },
162                WriteSliceAsyncResult::Ready(bytes_written) => {
163                    inner.write_commit(bytes_written).unwrap();
164                    Poll::Ready(Ok(bytes_written))
165                }
166            }
167        })
168    }
169
170    pub fn await_capacity<'b>(&'b self, expected_capacity: usize) -> CapacityFuture<'a, 'b, C, T> {
171        CapacityFuture {
172            writer: self,
173            expected_capacity: expected_capacity
174        }
175    }
176
177    pub fn reset(&self) {
178        self.buffer.inner.lock_mut(|inner| inner.reset());
179    }
180}
181
182pub struct WriteSliceAsyncFuture<'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>, F> where F: FnMut(&mut [u8]) -> WriteSliceAsyncResult {
183    writer: &'b BufferWriter<'a, C, T>,
184    f: F
185}
186
187impl <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>, F> Unpin for WriteSliceAsyncFuture<'a, 'b, C, T, F> where F: FnMut(&mut [u8]) -> WriteSliceAsyncResult {}
188
189impl <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>, F> Future for WriteSliceAsyncFuture<'a, 'b, C, T, F> where F: FnMut(&mut [u8]) -> WriteSliceAsyncResult {
190    type Output = Result<usize, BufferError>;
191
192    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
193        self.writer.poll_write_slice(&mut self.f, cx)
194    }
195}
196
197#[cfg(feature = "embedded")]
198impl <'a, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> embedded_io::ErrorType for BufferWriter<'a, C, T> {
199    type Error = embedded_io::ErrorKind;
200}
201
202#[cfg(feature = "embedded")]
203struct EmbeddedWriteFuture<'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> {
204    writer: &'a BufferWriter<'a, C, T>,
205    buf: &'b [u8]
206}
207
208#[cfg(feature = "embedded")]
209impl <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> Unpin for EmbeddedWriteFuture<'a, 'b, C, T> {}
210
211#[cfg(feature = "embedded")]
212impl <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> Future for EmbeddedWriteFuture<'a, 'b, C, T> {
213    type Output = Result<usize, embedded_io::ErrorKind>;
214    
215    fn poll(self: core::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
216        self.writer.poll_write(self.buf, cx)
217            .map(|result| match result {
218                Ok(n) => Ok(n),
219                Err(BufferError::ProvidedSliceEmpty) => Ok(0),
220                Err(err) => panic!("unexpected err returned from poll_write(): {}", err)
221            })
222    }
223}
224
225#[cfg(feature = "embedded")]
226impl <'a, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> embedded_io_async::Write for BufferWriter<'a, C, T> {
227    async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
228        let f = EmbeddedWriteFuture{
229            writer: self,
230            buf: buf
231        };
232
233        f.await
234    }
235}
236
237#[cfg(feature = "std")]
238impl <'a, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> std::io::Write for BufferWriter<'a, C, T> {
239
240    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
241        use std::io::ErrorKind;
242        match self.write_base(buf) {
243            Ok(n) => Ok(n),
244            Err(BufferError::ProvidedSliceEmpty) => Ok(0),
245            Err(BufferError::NoCapacity) => Err(ErrorKind::WouldBlock.into()),
246            Err(e) => {
247                panic!("unexpected error writing to buffer: {}", e);
248            }
249        }
250    }
251
252    fn flush(&mut self) -> std::io::Result<()> {
253        Ok(())
254    }
255}
256
257pub struct CapacityFuture <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> {
258    writer: &'b BufferWriter<'a, C, T>,
259    expected_capacity: usize
260}
261
262impl <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> Unpin for CapacityFuture<'a, 'b, C, T> {}
263
264impl <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> Future for CapacityFuture<'a, 'b, C, T> {
265    type Output = Result<(), BufferError>;
266
267    fn poll(self: core::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
268        self.writer.buffer.inner.lock_mut(|inner|{
269            inner.poll_ensure_capacity(cx, self.expected_capacity)
270                .map_ok(|_| ())
271        })
272    }
273}
274
275pub struct PushFuture <'a, 'b, 'c, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> {
276    writer: &'b BufferWriter<'a, C, T>,
277    data: &'c [u8]
278}
279
280impl <'a, 'b, 'c, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> Unpin for PushFuture<'a, 'b, 'c, C, T> {}
281
282impl <'a, 'b, 'c, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> Future for PushFuture<'a, 'b, 'c, C, T> {
283    type Output = Result<(), BufferError>;
284
285    fn poll(self: core::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
286        self.writer.poll_push(self.data, cx)
287    }
288}