1
2use core::{pin::Pin, task::{Context, Poll}};
3
4use crate::{mutex::Mutex, AsyncBuffer, BufferError};
5
6pub enum WriteSliceAsyncResult {
7 Wait,
8 Ready(usize),
9}
10
11pub struct BufferWriter <'a, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> {
12 buffer: &'a AsyncBuffer<C, T>
13}
14
15impl <'a, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> BufferWriter<'a, C, T> {
16
17 pub(crate) fn new(buffer: &'a AsyncBuffer<C, T>) -> Self {
18 Self {
19 buffer: buffer
20 }
21 }
22
23 fn write_base(&mut self, buf: &[u8]) -> Result<usize, BufferError> {
31 if buf.is_empty() {
32 return Err(BufferError::ProvidedSliceEmpty);
33 }
34
35 self.buffer.inner.lock_mut(|inner|{
36 let cap = inner.maybe_shift();
37
38 if cap == 0 {
39 return Err(BufferError::NoCapacity);
40 }
41
42 let tgt = inner.writeable_data();
43
44 if cap < buf.len() {
45 tgt.copy_from_slice(&buf[0..cap]);
46 inner.write_commit(cap)?;
47 Ok(cap)
48 } else {
49 let tgt = &mut tgt[0..buf.len()];
50 tgt.copy_from_slice(buf);
51 inner.write_commit(cap)?;
52 Ok(buf.len())
53 }
54 })
55 }
56
57 fn poll_write(&self, buf: &[u8], cx: &mut Context<'_>) -> Poll<Result<usize, BufferError>> {
58 if buf.is_empty() {
59 return Poll::Ready(Err(BufferError::ProvidedSliceEmpty));
60 }
61
62 self.buffer.inner.lock_mut(|inner|{
63 let cap = match inner.poll_ensure_capacity(cx, 1) {
64 Poll::Pending => return Poll::Pending,
65 Poll::Ready(Ok(n)) => n,
66 Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
67 };
68
69 let tgt = inner.writeable_data();
70
71 if cap < buf.len() {
72 tgt.copy_from_slice(&buf[0..cap]);
73 inner.write_commit(cap).unwrap();
74 Poll::Ready(Ok(cap))
75 } else {
76 let tgt = &mut tgt[0..buf.len()];
77 tgt.copy_from_slice(buf);
78 inner.write_commit(buf.len()).unwrap();
79 Poll::Ready(Ok(buf.len()))
80 }
81 })
82 }
83
84 pub fn push(&self, data: &[u8]) -> Result<(), BufferError> {
85 self.buffer.inner.lock_mut(|inner| {
86 if inner.has_capacity(data.len()) {
87 let tgt = inner.writeable_data();
88 let tgt = &mut tgt[..data.len()];
89 tgt.copy_from_slice(data);
90 inner.write_commit(data.len())
91 .expect("must not throw because capacity is checked before");
92 Ok(())
93 } else {
94 Err(BufferError::NoCapacity)
95 }
96 })
97 }
98
99 pub fn write_slice<F>(&self, f: F) -> Result<(), BufferError> where F: FnOnce(&mut [u8]) -> usize {
100 self.buffer.inner.lock_mut(|inner| {
101 inner.shift();
102 let writeable = inner.writeable_data();
103 let bytes_written = f(writeable);
104 if bytes_written > writeable.len() {
105 Err(BufferError::NoCapacity)
106 } else {
107 inner.write_commit(bytes_written).unwrap();
108 Ok(())
109 }
110 })
111 }
112
113 pub fn write_slice_async<F>(&self, f: F) -> impl Future<Output = Result<(), BufferError>> where F: FnMut(&mut [u8]) -> WriteSliceAsyncResult{
114 WriteSliceAsyncFuture{
115 writer: self,
116 f: f
117 }
118 }
119
120 fn poll_write_slice<F>(&self, f: &mut F, cx: &mut Context<'_>) -> Poll<Result<(), BufferError>> where F: FnMut(&mut [u8]) -> WriteSliceAsyncResult {
121 self.buffer.inner.lock_mut(|inner| {
122 inner.shift();
123 let writeable = inner.writeable_data();
124 match f(writeable) {
125 WriteSliceAsyncResult::Wait => if writeable.len() < inner.capacity() {
126 inner.add_write_waker(cx);
127 Poll::Pending
128 } else {
129 Poll::Ready(Err(BufferError::NoCapacity))
130 },
131 WriteSliceAsyncResult::Ready(bytes_written) if bytes_written > writeable.len() => {
132 Poll::Ready(Err(BufferError::NoCapacity))
133 },
134 WriteSliceAsyncResult::Ready(bytes_written) => {
135 inner.write_commit(bytes_written).unwrap();
136 Poll::Ready(Ok(()))
137 }
138 }
139 })
140 }
141
142 pub fn await_capacity<'b>(&'b self, expected_capacity: usize) -> CapacityFuture<'a, 'b, C, T> {
143 CapacityFuture {
144 writer: self,
145 expected_capacity: expected_capacity
146 }
147 }
148}
149
150pub struct WriteSliceAsyncFuture<'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>, F> where F: FnMut(&mut [u8]) -> WriteSliceAsyncResult {
151 writer: &'b BufferWriter<'a, C, T>,
152 f: F
153}
154
155impl <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>, F> Unpin for WriteSliceAsyncFuture<'a, 'b, C, T, F> where F: FnMut(&mut [u8]) -> WriteSliceAsyncResult {}
156
157impl <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>, F> Future for WriteSliceAsyncFuture<'a, 'b, C, T, F> where F: FnMut(&mut [u8]) -> WriteSliceAsyncResult {
158 type Output = Result<(), BufferError>;
159
160 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
161 self.writer.poll_write_slice(&mut self.f, cx)
162 }
163}
164
165#[cfg(feature = "embedded")]
166impl <'a, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> embedded_io::ErrorType for BufferWriter<'a, C, T> {
167 type Error = embedded_io::ErrorKind;
168}
169
170#[cfg(feature = "embedded")]
171struct EmbeddedWriteFuture<'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> {
172 writer: &'a BufferWriter<'a, C, T>,
173 buf: &'b [u8]
174}
175
176#[cfg(feature = "embedded")]
177impl <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> Unpin for EmbeddedWriteFuture<'a, 'b, C, T> {}
178
179#[cfg(feature = "embedded")]
180impl <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> Future for EmbeddedWriteFuture<'a, 'b, C, T> {
181 type Output = Result<usize, embedded_io::ErrorKind>;
182
183 fn poll(self: core::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
184 self.writer.poll_write(self.buf, cx)
185 .map(|result| match result {
186 Ok(n) => Ok(n),
187 Err(BufferError::ProvidedSliceEmpty) => Ok(0),
188 Err(err) => panic!("unexpected err returned from poll_write(): {}", err)
189 })
190 }
191}
192
193#[cfg(feature = "embedded")]
194impl <'a, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> embedded_io_async::Write for BufferWriter<'a, C, T> {
195 async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
196 let f = EmbeddedWriteFuture{
197 writer: self,
198 buf: buf
199 };
200
201 f.await
202 }
203}
204
205#[cfg(feature = "std")]
206impl <'a, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> std::io::Write for BufferWriter<'a, C, T> {
207
208 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
209 use std::io::ErrorKind;
210 match self.write_base(buf) {
211 Ok(n) => Ok(n),
212 Err(BufferError::ProvidedSliceEmpty) => Ok(0),
213 Err(BufferError::NoCapacity) => Err(ErrorKind::WouldBlock.into()),
214 Err(e) => {
215 panic!("unexpected error writing to buffer: {}", e);
216 }
217 }
218 }
219
220 fn flush(&mut self) -> std::io::Result<()> {
221 Ok(())
222 }
223}
224
225pub struct CapacityFuture <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> {
226 writer: &'b BufferWriter<'a, C, T>,
227 expected_capacity: usize
228}
229
230impl <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> Unpin for CapacityFuture<'a, 'b, C, T> {}
231
232impl <'a, 'b, const C: usize, T: AsRef<[u8]> + AsMut<[u8]>> Future for CapacityFuture<'a, 'b, C, T> {
233 type Output = Result<(), BufferError>;
234
235 fn poll(self: core::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
236 self.writer.buffer.inner.lock_mut(|inner|{
237 inner.poll_ensure_capacity(cx, self.expected_capacity)
238 .map_ok(|_| ())
239 })
240 }
241}