embytes_buffer_async/
writer.rs

1
2use core::{pin::Pin, task::{Context, Poll}};
3
4use crate::{mutex::Mutex, AsyncBuffer, BufferError};
5
6pub enum WriteSliceAsyncResult {
7    Wait,
8    Ready(usize),
9}
10
11pub struct BufferWriter <'a, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> {
12    buffer: &'a AsyncBuffer<C, T>
13}
14 
15impl <'a, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> BufferWriter<'a, C, T> {
16
17    pub(crate) fn new(buffer: &'a AsyncBuffer<C, T>) -> Self {
18        Self {
19            buffer: buffer
20        }
21    }
22
23    /// Base function for implementing writers like [`embedded_io::Write`]
24    /// Returns the number of bytes writen to the buffer from the provided slice
25    /// 
26    /// # Errors
27    /// 
28    /// [`BufferError::ProvidedSliceEmpty`] if the provided slice is empty
29    /// [`BufferError::NoCapacity`] if the buffer has no capacity after calling [`Buffer::shift`]
30    fn write_base(&mut self, buf: &[u8]) -> Result<usize, BufferError> {
31        if buf.is_empty() {
32            return Err(BufferError::ProvidedSliceEmpty);
33        }
34
35        self.buffer.inner.lock_mut(|inner|{
36            let cap = inner.maybe_shift();
37
38            if cap == 0 {
39                return Err(BufferError::NoCapacity);
40            }
41
42            let tgt = inner.writeable_data();
43            
44            if cap < buf.len() {
45                tgt.copy_from_slice(&buf[0..cap]);
46                inner.write_commit(cap)?;
47                Ok(cap)
48            } else {
49                let tgt = &mut tgt[0..buf.len()];
50                tgt.copy_from_slice(buf);
51                inner.write_commit(cap)?;
52                Ok(buf.len())
53            }
54        })
55    }
56
57    fn poll_write(&self, buf: &[u8], cx: &mut Context<'_>) -> Poll<Result<usize, BufferError>> {
58        if buf.is_empty() {
59            return Poll::Ready(Err(BufferError::ProvidedSliceEmpty));
60        }
61
62        self.buffer.inner.lock_mut(|inner|{
63            let cap = match inner.poll_ensure_capacity(cx, 1) {
64                Poll::Pending => return Poll::Pending,
65                Poll::Ready(Ok(n)) => n,
66                Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
67            };
68
69            let tgt = inner.writeable_data();
70            
71            if cap < buf.len() {
72                tgt.copy_from_slice(&buf[0..cap]);
73                inner.write_commit(cap).unwrap();
74                Poll::Ready(Ok(cap))
75            } else {
76                let tgt = &mut tgt[0..buf.len()];
77                tgt.copy_from_slice(buf);
78                inner.write_commit(buf.len()).unwrap();
79                Poll::Ready(Ok(buf.len()))
80            }
81        })
82    }
83
84    pub fn push(&self, data: &[u8]) -> Result<(), BufferError> {
85        self.buffer.inner.lock_mut(|inner| {
86            if inner.has_capacity(data.len()) {
87                let tgt = inner.writeable_data();
88                let tgt = &mut tgt[..data.len()];
89                tgt.copy_from_slice(data);
90                inner.write_commit(data.len())
91                    .expect("must not throw because capacity is checked before");
92                Ok(())
93            } else {
94                Err(BufferError::NoCapacity)
95            }
96        })
97    }
98
99    pub fn write_slice<F>(&self, f: F) -> Result<(), BufferError> where F: FnOnce(&mut [u8]) -> usize {
100        self.buffer.inner.lock_mut(|inner| {
101            inner.shift();
102            let writeable = inner.writeable_data();
103            let bytes_written = f(writeable);
104            if bytes_written > writeable.len() {
105                Err(BufferError::NoCapacity)
106            } else {
107                inner.write_commit(bytes_written).unwrap();
108                Ok(())
109            }
110        })
111    }
112
113    pub fn write_slice_async<F>(&self, f: F) -> impl Future<Output = Result<(), BufferError>> where F: FnMut(&mut [u8]) -> WriteSliceAsyncResult{
114        WriteSliceAsyncFuture{
115            writer: self,
116            f: f
117        }
118    }
119
120    fn poll_write_slice<F>(&self, f: &mut F, cx: &mut Context<'_>) -> Poll<Result<(), BufferError>> where F: FnMut(&mut [u8]) -> WriteSliceAsyncResult {
121        self.buffer.inner.lock_mut(|inner| {
122            inner.shift();
123            let writeable = inner.writeable_data();
124            match f(writeable) {
125                WriteSliceAsyncResult::Wait => if writeable.len() < inner.capacity() {
126                        inner.add_write_waker(cx);
127                        Poll::Pending
128                    } else {
129                        Poll::Ready(Err(BufferError::NoCapacity))
130                    },
131                WriteSliceAsyncResult::Ready(bytes_written) if bytes_written > writeable.len() => {
132                    Poll::Ready(Err(BufferError::NoCapacity))
133                },
134                WriteSliceAsyncResult::Ready(bytes_written) => {
135                    inner.write_commit(bytes_written).unwrap();
136                    Poll::Ready(Ok(()))
137                }
138            }
139        })
140    }
141
142    pub fn await_capacity<'b>(&'b self, expected_capacity: usize) -> CapacityFuture<'a, 'b, C, T> {
143        CapacityFuture {
144            writer: self,
145            expected_capacity: expected_capacity
146        }
147    }
148}
149
150pub struct WriteSliceAsyncFuture<'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>, F> where F: FnMut(&mut [u8]) -> WriteSliceAsyncResult {
151    writer: &'b BufferWriter<'a, C, T>,
152    f: F
153}
154
155impl <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>, F> Unpin for WriteSliceAsyncFuture<'a, 'b, C, T, F> where F: FnMut(&mut [u8]) -> WriteSliceAsyncResult {}
156
157impl <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>, F> Future for WriteSliceAsyncFuture<'a, 'b, C, T, F> where F: FnMut(&mut [u8]) -> WriteSliceAsyncResult {
158    type Output = Result<(), BufferError>;
159
160    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
161        self.writer.poll_write_slice(&mut self.f, cx)
162    }
163}
164
165#[cfg(feature = "embedded")]
166impl <'a, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> embedded_io::ErrorType for BufferWriter<'a, C, T> {
167    type Error = embedded_io::ErrorKind;
168}
169
170#[cfg(feature = "embedded")]
171struct EmbeddedWriteFuture<'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> {
172    writer: &'a BufferWriter<'a, C, T>,
173    buf: &'b [u8]
174}
175
176#[cfg(feature = "embedded")]
177impl <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> Unpin for EmbeddedWriteFuture<'a, 'b, C, T> {}
178
179#[cfg(feature = "embedded")]
180impl <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> Future for EmbeddedWriteFuture<'a, 'b, C, T> {
181    type Output = Result<usize, embedded_io::ErrorKind>;
182    
183    fn poll(self: core::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
184        self.writer.poll_write(self.buf, cx)
185            .map(|result| match result {
186                Ok(n) => Ok(n),
187                Err(BufferError::ProvidedSliceEmpty) => Ok(0),
188                Err(err) => panic!("unexpected err returned from poll_write(): {}", err)
189            })
190    }
191}
192
193#[cfg(feature = "embedded")]
194impl <'a, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> embedded_io_async::Write for BufferWriter<'a, C, T> {
195    async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
196        let f = EmbeddedWriteFuture{
197            writer: self,
198            buf: buf
199        };
200
201        f.await
202    }
203}
204
205#[cfg(feature = "std")]
206impl <'a, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> std::io::Write for BufferWriter<'a, C, T> {
207
208    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
209        use std::io::ErrorKind;
210        match self.write_base(buf) {
211            Ok(n) => Ok(n),
212            Err(BufferError::ProvidedSliceEmpty) => Ok(0),
213            Err(BufferError::NoCapacity) => Err(ErrorKind::WouldBlock.into()),
214            Err(e) => {
215                panic!("unexpected error writing to buffer: {}", e);
216            }
217        }
218    }
219
220    fn flush(&mut self) -> std::io::Result<()> {
221        Ok(())
222    }
223}
224
225pub struct CapacityFuture <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> {
226    writer: &'b BufferWriter<'a, C, T>,
227    expected_capacity: usize
228}
229
230impl <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> Unpin for CapacityFuture<'a, 'b, C, T> {}
231
232impl <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> Future for CapacityFuture<'a, 'b, C, T> {
233    type Output = Result<(), BufferError>;
234
235    fn poll(self: core::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
236        self.writer.buffer.inner.lock_mut(|inner|{
237            inner.poll_ensure_capacity(cx, self.expected_capacity)
238                .map_ok(|_| ())
239        })
240    }
241}