1
2use core::{pin::Pin, task::{Context, Poll}};
3
4use crate::{mutex::Mutex, AsyncBuffer, BufferError};
5
6pub enum WriteSliceAsyncResult {
7 Wait,
8 Ready(usize),
9}
10
11pub struct BufferWriter <'a, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> {
12 buffer: &'a AsyncBuffer<C, T>
13}
14
15impl <'a, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> BufferWriter<'a, C, T> {
16
17 pub(crate) fn new(buffer: &'a AsyncBuffer<C, T>) -> Self {
18 Self {
19 buffer: buffer
20 }
21 }
22
23 fn write_base(&mut self, buf: &[u8]) -> Result<usize, BufferError> {
31 if buf.is_empty() {
32 return Err(BufferError::ProvidedSliceEmpty);
33 }
34
35 self.buffer.inner.lock_mut(|inner|{
36 let cap = inner.maybe_shift();
37
38 if cap == 0 {
39 return Err(BufferError::NoCapacity);
40 }
41
42 let tgt = inner.writeable_data();
43
44 if cap < buf.len() {
45 tgt.copy_from_slice(&buf[0..cap]);
46 inner.write_commit(cap)?;
47 Ok(cap)
48 } else {
49 let tgt = &mut tgt[0..buf.len()];
50 tgt.copy_from_slice(buf);
51 inner.write_commit(cap)?;
52 Ok(buf.len())
53 }
54 })
55 }
56
57 fn poll_write(&self, buf: &[u8], cx: &mut Context<'_>) -> Poll<Result<usize, BufferError>> {
58 if buf.is_empty() {
59 return Poll::Ready(Err(BufferError::ProvidedSliceEmpty));
60 }
61
62 self.buffer.inner.lock_mut(|inner|{
63 let cap = match inner.poll_ensure_capacity(cx, 1) {
64 Poll::Pending => return Poll::Pending,
65 Poll::Ready(Ok(n)) => n,
66 Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
67 };
68
69 let tgt = inner.writeable_data();
70
71 if cap < buf.len() {
72 tgt.copy_from_slice(&buf[0..cap]);
73 inner.write_commit(cap).unwrap();
74 Poll::Ready(Ok(cap))
75 } else {
76 let tgt = &mut tgt[0..buf.len()];
77 tgt.copy_from_slice(buf);
78 inner.write_commit(buf.len()).unwrap();
79 Poll::Ready(Ok(buf.len()))
80 }
81 })
82 }
83
84 pub fn push(&self, data: &[u8]) -> Result<(), BufferError> {
85 self.buffer.inner.lock_mut(|inner| {
86 if inner.has_capacity(data.len()) {
87 let tgt = inner.writeable_data();
88 let tgt = &mut tgt[..data.len()];
89 tgt.copy_from_slice(data);
90 inner.write_commit(data.len())
91 .expect("must not throw because capacity is checked before");
92 Ok(())
93 } else {
94 Err(BufferError::NoCapacity)
95 }
96 })
97 }
98
99 pub fn push_async<'b, 'c>(&'b self, data: &'c [u8]) -> PushFuture<'a, 'b, 'c, C, T> {
100 PushFuture {
101 writer: self,
102 data: data
103 }
104 }
105
106 fn poll_push(&self, data: &[u8], cx: &mut Context<'_>) -> Poll<Result<(), BufferError>> {
107 self.buffer.inner.lock_mut(|inner| {
108 if data.len() > inner.capacity() {
109 return Poll::Ready(Err(BufferError::NoCapacity));
110 }
111
112 inner.shift();
113
114 let writeable = inner.writeable_data();
115 if writeable.len() < data.len() {
116 inner.add_write_waker(cx);
118 Poll::Pending
119 } else {
120 writeable[..data.len()].copy_from_slice(data);
121 inner.write_commit(data.len()).unwrap();
122 Poll::Ready(Ok(()))
123 }
124 })
125 }
126
127 pub fn write_slice<F>(&self, f: F) -> Result<usize, BufferError> where F: FnOnce(&mut [u8]) -> usize {
128 self.buffer.inner.lock_mut(|inner| {
129 inner.shift();
130 let writeable = inner.writeable_data();
131 let bytes_written = f(writeable);
132 if bytes_written > writeable.len() {
133 Err(BufferError::NoCapacity)
134 } else {
135 inner.write_commit(bytes_written).unwrap();
136 Ok(bytes_written)
137 }
138 })
139 }
140
141 pub fn write_slice_async<F>(&self, f: F) -> impl Future<Output = Result<usize, BufferError>> where F: FnMut(&mut [u8]) -> WriteSliceAsyncResult{
142 WriteSliceAsyncFuture{
143 writer: self,
144 f: f
145 }
146 }
147
148 fn poll_write_slice<F>(&self, f: &mut F, cx: &mut Context<'_>) -> Poll<Result<usize, BufferError>> where F: FnMut(&mut [u8]) -> WriteSliceAsyncResult {
149 self.buffer.inner.lock_mut(|inner| {
150 inner.shift();
151 let writeable = inner.writeable_data();
152 match f(writeable) {
153 WriteSliceAsyncResult::Wait => if writeable.len() < inner.capacity() {
154 inner.add_write_waker(cx);
155 Poll::Pending
156 } else {
157 Poll::Ready(Err(BufferError::NoCapacity))
158 },
159 WriteSliceAsyncResult::Ready(bytes_written) if bytes_written > writeable.len() => {
160 Poll::Ready(Err(BufferError::NoCapacity))
161 },
162 WriteSliceAsyncResult::Ready(bytes_written) => {
163 inner.write_commit(bytes_written).unwrap();
164 Poll::Ready(Ok(bytes_written))
165 }
166 }
167 })
168 }
169
170 pub fn await_capacity<'b>(&'b self, expected_capacity: usize) -> CapacityFuture<'a, 'b, C, T> {
171 CapacityFuture {
172 writer: self,
173 expected_capacity: expected_capacity
174 }
175 }
176
177 pub fn reset(&self) {
178 self.buffer.inner.lock_mut(|inner| inner.reset());
179 }
180}
181
182pub struct WriteSliceAsyncFuture<'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>, F> where F: FnMut(&mut [u8]) -> WriteSliceAsyncResult {
183 writer: &'b BufferWriter<'a, C, T>,
184 f: F
185}
186
187impl <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>, F> Unpin for WriteSliceAsyncFuture<'a, 'b, C, T, F> where F: FnMut(&mut [u8]) -> WriteSliceAsyncResult {}
188
189impl <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>, F> Future for WriteSliceAsyncFuture<'a, 'b, C, T, F> where F: FnMut(&mut [u8]) -> WriteSliceAsyncResult {
190 type Output = Result<usize, BufferError>;
191
192 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
193 self.writer.poll_write_slice(&mut self.f, cx)
194 }
195}
196
197#[cfg(feature = "embedded")]
198impl <'a, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> embedded_io::ErrorType for BufferWriter<'a, C, T> {
199 type Error = embedded_io::ErrorKind;
200}
201
202#[cfg(feature = "embedded")]
203struct EmbeddedWriteFuture<'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> {
204 writer: &'a BufferWriter<'a, C, T>,
205 buf: &'b [u8]
206}
207
208#[cfg(feature = "embedded")]
209impl <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> Unpin for EmbeddedWriteFuture<'a, 'b, C, T> {}
210
211#[cfg(feature = "embedded")]
212impl <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> Future for EmbeddedWriteFuture<'a, 'b, C, T> {
213 type Output = Result<usize, embedded_io::ErrorKind>;
214
215 fn poll(self: core::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
216 self.writer.poll_write(self.buf, cx)
217 .map(|result| match result {
218 Ok(n) => Ok(n),
219 Err(BufferError::ProvidedSliceEmpty) => Ok(0),
220 Err(err) => panic!("unexpected err returned from poll_write(): {}", err)
221 })
222 }
223}
224
225#[cfg(feature = "embedded")]
226impl <'a, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> embedded_io_async::Write for BufferWriter<'a, C, T> {
227 async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
228 let f = EmbeddedWriteFuture{
229 writer: self,
230 buf: buf
231 };
232
233 f.await
234 }
235}
236
237#[cfg(feature = "std")]
238impl <'a, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> std::io::Write for BufferWriter<'a, C, T> {
239
240 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
241 use std::io::ErrorKind;
242 match self.write_base(buf) {
243 Ok(n) => Ok(n),
244 Err(BufferError::ProvidedSliceEmpty) => Ok(0),
245 Err(BufferError::NoCapacity) => Err(ErrorKind::WouldBlock.into()),
246 Err(e) => {
247 panic!("unexpected error writing to buffer: {}", e);
248 }
249 }
250 }
251
252 fn flush(&mut self) -> std::io::Result<()> {
253 Ok(())
254 }
255}
256
257pub struct CapacityFuture <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> {
258 writer: &'b BufferWriter<'a, C, T>,
259 expected_capacity: usize
260}
261
262impl <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> Unpin for CapacityFuture<'a, 'b, C, T> {}
263
264impl <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> Future for CapacityFuture<'a, 'b, C, T> {
265 type Output = Result<(), BufferError>;
266
267 fn poll(self: core::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
268 self.writer.buffer.inner.lock_mut(|inner|{
269 inner.poll_ensure_capacity(cx, self.expected_capacity)
270 .map_ok(|_| ())
271 })
272 }
273}
274
275pub struct PushFuture <'a, 'b, 'c, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> {
276 writer: &'b BufferWriter<'a, C, T>,
277 data: &'c [u8]
278}
279
280impl <'a, 'b, 'c, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> Unpin for PushFuture<'a, 'b, 'c, C, T> {}
281
282impl <'a, 'b, 'c, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> Future for PushFuture<'a, 'b, 'c, C, T> {
283 type Output = Result<(), BufferError>;
284
285 fn poll(self: core::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
286 self.writer.poll_push(self.data, cx)
287 }
288}