completion_io/
write.rs

1use std::future::{self, Future};
2use std::io::{IoSlice, Result, Sink};
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6use completion_core::CompletionFuture;
7
8/// Write bytes to a source asynchronously.
9///
10/// This is an async version of [`std::io::Write`].
11///
12/// You should not implement this trait manually, instead implement [`AsyncWriteWith`].
13pub trait AsyncWrite: for<'a> AsyncWriteWith<'a> {}
14impl<T: for<'a> AsyncWriteWith<'a> + ?Sized> AsyncWrite for T {}
15
16/// Write bytes to a source asynchronously with a specific lifetime.
17pub trait AsyncWriteWith<'a> {
18    /// The future that writes to the source, and outputs the number of bytes written.
19    type WriteFuture: CompletionFuture<Output = Result<usize>>;
20
21    /// The future that writes a vector of buffers to the source, and outputs the number of bytes
22    /// written. If your writer does not have efficient vectored writes, set this to
23    /// [`DefaultWriteVectored<'a, Self>`](DefaultWriteVectored).
24    type WriteVectoredFuture: CompletionFuture<Output = Result<usize>>;
25
26    /// The future that flushes the output stream.
27    type FlushFuture: CompletionFuture<Output = Result<()>>;
28
29    /// Write a buffer to the writer, returning how many bytes were written.
30    fn write(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture;
31
32    /// Like [`write`](Self::write), except that it writes from a slice of buffers.
33    ///
34    /// Data is copied from each buffer in order, with the final buffer read from possibly being
35    /// only partially consumed. This method must behave as a call to [`write`](Self::write) with
36    /// the buffers concatenated would.
37    ///
38    /// If your writer does not have efficient vectored writes, call
39    /// [`DefaultWriteVectored::new(self, bufs)`](DefaultWriteVectored::new).
40    fn write_vectored(&'a mut self, bufs: &'a [IoSlice<'a>]) -> Self::WriteVectoredFuture;
41
42    /// Determines if this `AsyncWrite`r has an efficient [`write_vectored`](Self::write_vectored)
43    /// implementation.
44    ///
45    /// The default implementation returns `false`.
46    fn is_write_vectored(&self) -> bool {
47        false
48    }
49
50    /// Flush this output stream, ensuring that all intermediately buffered contents reach their
51    /// destination.
52    fn flush(&'a mut self) -> Self::FlushFuture;
53}
54
55impl<'a, W: AsyncWriteWith<'a> + ?Sized> AsyncWriteWith<'a> for &mut W {
56    type WriteFuture = W::WriteFuture;
57    type WriteVectoredFuture = W::WriteVectoredFuture;
58    type FlushFuture = W::FlushFuture;
59
60    #[inline]
61    fn write(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture {
62        (**self).write(buf)
63    }
64    #[inline]
65    fn write_vectored(&'a mut self, bufs: &'a [IoSlice<'a>]) -> Self::WriteVectoredFuture {
66        (**self).write_vectored(bufs)
67    }
68    #[inline]
69    fn is_write_vectored(&self) -> bool {
70        (**self).is_write_vectored()
71    }
72    #[inline]
73    fn flush(&'a mut self) -> Self::FlushFuture {
74        (**self).flush()
75    }
76}
77
78impl<'a, W: AsyncWriteWith<'a> + ?Sized> AsyncWriteWith<'a> for Box<W> {
79    type WriteFuture = W::WriteFuture;
80    type WriteVectoredFuture = W::WriteVectoredFuture;
81    type FlushFuture = W::FlushFuture;
82
83    #[inline]
84    fn write(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture {
85        (**self).write(buf)
86    }
87    #[inline]
88    fn write_vectored(&'a mut self, bufs: &'a [IoSlice<'a>]) -> Self::WriteVectoredFuture {
89        (**self).write_vectored(bufs)
90    }
91    #[inline]
92    fn is_write_vectored(&self) -> bool {
93        (**self).is_write_vectored()
94    }
95    #[inline]
96    fn flush(&'a mut self) -> Self::FlushFuture {
97        (**self).flush()
98    }
99}
100
101impl<'a> AsyncWriteWith<'a> for Sink {
102    type WriteFuture = future::Ready<Result<usize>>;
103    type WriteVectoredFuture = future::Ready<Result<usize>>;
104    type FlushFuture = future::Ready<Result<()>>;
105
106    #[inline]
107    fn write(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture {
108        future::ready(Ok(buf.len()))
109    }
110    #[inline]
111    fn write_vectored(&'a mut self, bufs: &'a [IoSlice<'a>]) -> Self::WriteVectoredFuture {
112        future::ready(Ok(bufs.iter().map(|b| b.len()).sum()))
113    }
114    #[inline]
115    fn is_write_vectored(&self) -> bool {
116        true
117    }
118    #[inline]
119    fn flush(&'a mut self) -> Self::FlushFuture {
120        future::ready(Ok(()))
121    }
122}
123impl<'a> AsyncWriteWith<'a> for &Sink {
124    type WriteFuture = future::Ready<Result<usize>>;
125    type WriteVectoredFuture = future::Ready<Result<usize>>;
126    type FlushFuture = future::Ready<Result<()>>;
127
128    #[inline]
129    fn write(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture {
130        future::ready(Ok(buf.len()))
131    }
132    #[inline]
133    fn write_vectored(&'a mut self, bufs: &'a [IoSlice<'a>]) -> Self::WriteVectoredFuture {
134        future::ready(Ok(bufs.iter().map(|b| b.len()).sum()))
135    }
136    #[inline]
137    fn is_write_vectored(&self) -> bool {
138        true
139    }
140    #[inline]
141    fn flush(&'a mut self) -> Self::FlushFuture {
142        future::ready(Ok(()))
143    }
144}
145
146impl<'a, 's> AsyncWriteWith<'a> for &'s mut [u8] {
147    type WriteFuture = WriteSlice<'a, 's>;
148    type WriteVectoredFuture = WriteVectoredSlice<'a, 's>;
149    type FlushFuture = future::Ready<Result<()>>;
150
151    #[inline]
152    fn write(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture {
153        WriteSlice {
154            slice: unsafe { &mut *(self as *mut _) },
155            buf,
156        }
157    }
158    #[inline]
159    fn write_vectored(&'a mut self, bufs: &'a [IoSlice<'a>]) -> Self::WriteVectoredFuture {
160        WriteVectoredSlice {
161            slice: unsafe { &mut *(self as *mut _) },
162            bufs,
163        }
164    }
165    #[inline]
166    fn is_write_vectored(&self) -> bool {
167        true
168    }
169    #[inline]
170    fn flush(&'a mut self) -> Self::FlushFuture {
171        future::ready(Ok(()))
172    }
173}
174
175/// Future for [`write`](AsyncWriteWith::write) on a byte slice (`&mut [u8]`).
176#[derive(Debug)]
177pub struct WriteSlice<'a, 's> {
178    // This is conceptually an &'a mut &'s mut [u8]. However, that would add the implicit bound
179    // 's: 'a which is incompatible with AsyncWriteWith.
180    slice: &'s mut &'s mut [u8],
181    buf: &'a [u8],
182}
183impl Future for WriteSlice<'_, '_> {
184    type Output = Result<usize>;
185
186    fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
187        let this = &mut *self;
188        Poll::Ready(std::io::Write::write(this.slice, this.buf))
189    }
190}
191impl CompletionFuture for WriteSlice<'_, '_> {
192    type Output = Result<usize>;
193
194    unsafe fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
195        Future::poll(self, cx)
196    }
197    unsafe fn poll_cancel(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
198        Poll::Ready(())
199    }
200}
201
202/// Future for [`write_vectored`](AsyncWriteWith::write_vectored) on a byte slice (`&mut [u8]`).
203#[derive(Debug)]
204pub struct WriteVectoredSlice<'a, 's> {
205    // This is conceptually an &'a mut &'s mut [u8]. However, that would add the implicit bound
206    // 's: 'a which is incompatible with AsyncWriteWith.
207    slice: &'s mut &'s mut [u8],
208    bufs: &'a [IoSlice<'a>],
209}
210impl Future for WriteVectoredSlice<'_, '_> {
211    type Output = Result<usize>;
212
213    fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
214        let this = &mut *self;
215        Poll::Ready(std::io::Write::write_vectored(this.slice, this.bufs))
216    }
217}
218impl CompletionFuture for WriteVectoredSlice<'_, '_> {
219    type Output = Result<usize>;
220
221    unsafe fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
222        Future::poll(self, cx)
223    }
224    unsafe fn poll_cancel(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
225        Poll::Ready(())
226    }
227}
228
229impl<'a> AsyncWriteWith<'a> for Vec<u8> {
230    type WriteFuture = WriteVec<'a>;
231    type WriteVectoredFuture = WriteVectoredVec<'a>;
232    type FlushFuture = future::Ready<Result<()>>;
233
234    #[inline]
235    fn write(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture {
236        WriteVec { vec: self, buf }
237    }
238    #[inline]
239    fn write_vectored(&'a mut self, bufs: &'a [IoSlice<'a>]) -> Self::WriteVectoredFuture {
240        WriteVectoredVec { vec: self, bufs }
241    }
242    #[inline]
243    fn is_write_vectored(&self) -> bool {
244        true
245    }
246    #[inline]
247    fn flush(&'a mut self) -> Self::FlushFuture {
248        future::ready(Ok(()))
249    }
250}
251
252/// Future for [`write`](AsyncWriteWith::write) on a [`Vec<u8>`](Vec).
253#[derive(Debug)]
254pub struct WriteVec<'a> {
255    vec: &'a mut Vec<u8>,
256    buf: &'a [u8],
257}
258impl Future for WriteVec<'_> {
259    type Output = Result<usize>;
260
261    fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
262        let this = &mut *self;
263        Poll::Ready(std::io::Write::write(this.vec, this.buf))
264    }
265}
266impl CompletionFuture for WriteVec<'_> {
267    type Output = Result<usize>;
268
269    unsafe fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
270        Future::poll(self, cx)
271    }
272    unsafe fn poll_cancel(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
273        Poll::Ready(())
274    }
275}
276
277/// Future for [`write_vectored`](AsyncWriteWith::write_vectored) on a [`Vec<u8>`](Vec).
278#[derive(Debug)]
279pub struct WriteVectoredVec<'a> {
280    vec: &'a mut Vec<u8>,
281    bufs: &'a [IoSlice<'a>],
282}
283impl Future for WriteVectoredVec<'_> {
284    type Output = Result<usize>;
285
286    fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
287        let this = &mut *self;
288        Poll::Ready(std::io::Write::write_vectored(this.vec, this.bufs))
289    }
290}
291impl CompletionFuture for WriteVectoredVec<'_> {
292    type Output = Result<usize>;
293
294    unsafe fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
295        Future::poll(self, cx)
296    }
297    unsafe fn poll_cancel(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
298        Poll::Ready(())
299    }
300}
301
302// TODO: implement AsyncWrite for:
303// - Cursor<&mut [u8]>
304// - Cursor<&mut Vec<u8>>
305// - Cursor<Box<[u8]>>
306// - Cursor<Vec<u8>>
307
308#[cfg(test)]
309#[allow(dead_code, clippy::extra_unused_lifetimes)]
310fn test_impls_traits<'a>() {
311    fn assert_impls<R: AsyncWrite>() {}
312
313    assert_impls::<Sink>();
314    assert_impls::<&'a mut Sink>();
315    assert_impls::<Box<Sink>>();
316    assert_impls::<&'a mut Box<&'a mut Sink>>();
317
318    assert_impls::<&'a mut [u8]>();
319    assert_impls::<&'a mut &'a mut [u8]>();
320
321    assert_impls::<Vec<u8>>();
322}
323
324/// A default implementation of [`WriteVectoredFuture`](AsyncWriteWith::WriteVectoredFuture) for
325/// types that don't have efficient vectored writes.
326///
327/// This will forward to [`write`](AsyncWriteWith::write) with the first nonempty buffer provided,
328/// or an empty one if none exists.
329#[derive(Debug)]
330pub struct DefaultWriteVectored<'a, T: AsyncWriteWith<'a>> {
331    future: T::WriteFuture,
332}
333
334impl<'a, T: AsyncWriteWith<'a>> DefaultWriteVectored<'a, T> {
335    /// Create a new `DefaultWriteVectored` future.
336    pub fn new(writer: &'a mut T, bufs: &'a [IoSlice<'a>]) -> Self {
337        Self {
338            future: writer.write(bufs.iter().find(|b| !b.is_empty()).map_or(&[], |b| &**b)),
339        }
340    }
341}
342
343impl<'a, T: AsyncWriteWith<'a>> CompletionFuture for DefaultWriteVectored<'a, T> {
344    type Output = Result<usize>;
345
346    unsafe fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
347        Pin::map_unchecked_mut(self, |this| &mut this.future).poll(cx)
348    }
349    unsafe fn poll_cancel(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
350        Pin::map_unchecked_mut(self, |this| &mut this.future).poll_cancel(cx)
351    }
352}
353impl<'a, T: AsyncWriteWith<'a>> Future for DefaultWriteVectored<'a, T>
354where
355    <T as AsyncWriteWith<'a>>::WriteFuture: Future<Output = Result<usize>>,
356{
357    type Output = Result<usize>;
358
359    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
360        unsafe { CompletionFuture::poll(self, cx) }
361    }
362}