multipart_write/
lib.rs

1//! # Description
2//!
3//! This crate contains the trait `MultipartWrite`, assorted implementations,
4//! and combinators.
5//!
6//! A `MultipartWrite` is a similar interface to [`Sink`], except that writing
7//! an item or completing the write both return values.
8//!
9//! [Here][example] is a conceptual example of a `MultipartWrite`.
10//!
11//! # Motivation
12//!
13//! `Sink` is a useful API, but it is just that--a sink.  The end of a stream.
14//! It's useful to have the backpressure mechanism that `poll_ready`/`start_send`
15//! enables, and it's nice to have the flexibility that the shape of it provides
16//! in what kinds of things you can forward to it.
17//!
18//! The idea for `MultipartWrite` is to:
19//! 1. Have the same desirable properies as `Sink`.
20//! 2. Be able to be inserted at more locations in a stream computation.
21//! 3. Be useful in more cases by having a value returned when starting a write.
22//! 4. Be able to transform a stream into another stream, which is really just
23//!    a more specific phrasing of 3.
24//!
25//! [`Sink`]: https://docs.rs/crate/futures-sink/0.3.31
26//! [example]: https://github.com/quasi-coherent/multipart-write/blob/master/examples/author.rs
27#![cfg_attr(docsrs, feature(doc_cfg))]
28use std::collections::VecDeque;
29use std::convert::Infallible as Never;
30use std::ops::DerefMut;
31use std::pin::Pin;
32use std::task::{Context, Poll};
33
34pub mod io;
35pub mod stream;
36pub mod write;
37
38/// A prelude for this crate.
39pub mod prelude {
40    pub use super::stream::{self, MultipartStreamExt as _};
41    pub use super::write::{self, MultipartWriteExt as _};
42    pub use super::{FusedMultipartWrite, MultipartWrite};
43}
44
45/// `MultipartWrite` is a `Sink`-like interface for asynchronously writing an
46/// object in parts.
47pub trait MultipartWrite<Part> {
48    /// The type of value returned when writing the part began successfully.
49    type Ret;
50
51    /// The type of value returned when all parts are written.
52    type Output;
53
54    /// The type of value returned when an operation fails.
55    type Error;
56
57    /// Attempts to prepare the `MultipartWrite` to receive a new part.
58    ///
59    /// This method must be called and return `Poll::Ready` before each call to
60    /// `start_send`, indicating that the underlying writer is ready to have
61    /// another part written to it.
62    ///
63    /// This method returns `Poll::Pending` when the object being prepared cannot
64    /// accept another part.
65    ///
66    /// In most cases, if the writer encounters an error, it will be permanently
67    /// unable to write more parts.
68    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
69
70    /// Begin the process of writing a part to this writer, returning the
71    /// associated type confirming this was done successfully.
72    ///
73    /// Like `Sink`, this should be preceded by a call to `poll_ready` that
74    /// returns `Poll::Ready` to ensure that the `MultipartWrite` is ready to
75    /// receive a new part.
76    ///
77    /// # Errors
78    ///
79    /// Errors returned by this method are implementation-specific, but it is
80    /// always an error to call `start_send` when `poll_ready` would return
81    /// `Poll::Pending`.
82    ///
83    /// In most cases, if the writer encounters an error, it will be permanently
84    /// unable to write more parts.
85    fn start_send(self: Pin<&mut Self>, part: Part) -> Result<Self::Ret, Self::Error>;
86
87    /// Flush any remaining output from the writer.
88    ///
89    /// Returns `Poll::Ready` when no buffered, unwritten parts remain and
90    /// `Poll::Pending` if there is more work left to do.
91    ///
92    /// In most cases, if the writer encounters an error, it will be permanently
93    /// unable to write more parts.
94    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
95
96    /// Complete this writer, returning the output.
97    ///
98    /// This method returns `Poll::Pending` until no buffered, unwritten parts
99    /// remain and the complete output object is available.
100    ///
101    /// In most cases, if the writer encounters an error, it will be permanently
102    /// unable to write more parts.
103    fn poll_complete(
104        self: Pin<&mut Self>,
105        cx: &mut Context<'_>,
106    ) -> Poll<Result<Self::Output, Self::Error>>;
107}
108
109/// An owned, dynamically typed [`MultipartWrite`] for use in cases where it is
110/// not possible or desirable to statically type it.
111pub type BoxMultipartWrite<'a, Part, R, T, E> =
112    Pin<Box<dyn MultipartWrite<Part, Ret = R, Output = T, Error = E> + Send + 'a>>;
113
114/// `BoxMultipartWrite` but without the `Send` requirement.
115pub type LocalBoxMultipartWrite<'a, Part, R, T, E> =
116    Pin<Box<dyn MultipartWrite<Part, Ret = R, Output = T, Error = E> + 'a>>;
117
118/// A writer that tracks whether or not the underlying writer should no longer
119/// be polled.
120pub trait FusedMultipartWrite<Part>: MultipartWrite<Part> {
121    /// Returns `true` if the writer should no longer be polled.
122    fn is_terminated(&self) -> bool;
123}
124
125impl<W: ?Sized + MultipartWrite<Part> + Unpin, Part> MultipartWrite<Part> for &mut W {
126    type Ret = W::Ret;
127    type Output = W::Output;
128    type Error = W::Error;
129
130    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
131        Pin::new(&mut **self).poll_ready(cx)
132    }
133
134    fn start_send(mut self: Pin<&mut Self>, part: Part) -> Result<Self::Ret, Self::Error> {
135        Pin::new(&mut **self).start_send(part)
136    }
137
138    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
139        Pin::new(&mut **self).poll_flush(cx)
140    }
141
142    fn poll_complete(
143        mut self: Pin<&mut Self>,
144        cx: &mut Context<'_>,
145    ) -> Poll<Result<Self::Output, Self::Error>> {
146        Pin::new(&mut **self).poll_complete(cx)
147    }
148}
149
150impl<W: ?Sized + FusedMultipartWrite<Part> + Unpin, Part> FusedMultipartWrite<Part> for &mut W {
151    fn is_terminated(&self) -> bool {
152        <W as FusedMultipartWrite<Part>>::is_terminated(&**self)
153    }
154}
155
156impl<P, Part> MultipartWrite<Part> for Pin<P>
157where
158    P: DerefMut + Unpin,
159    P::Target: MultipartWrite<Part>,
160{
161    type Ret = <P::Target as MultipartWrite<Part>>::Ret;
162    type Output = <P::Target as MultipartWrite<Part>>::Output;
163    type Error = <P::Target as MultipartWrite<Part>>::Error;
164
165    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
166        self.get_mut().as_mut().poll_ready(cx)
167    }
168
169    fn start_send(self: Pin<&mut Self>, part: Part) -> Result<Self::Ret, Self::Error> {
170        self.get_mut().as_mut().start_send(part)
171    }
172
173    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
174        self.get_mut().as_mut().poll_flush(cx)
175    }
176
177    fn poll_complete(
178        self: Pin<&mut Self>,
179        cx: &mut Context<'_>,
180    ) -> Poll<Result<Self::Output, Self::Error>> {
181        self.get_mut().as_mut().poll_complete(cx)
182    }
183}
184
185impl<P, Part> FusedMultipartWrite<Part> for Pin<P>
186where
187    P: DerefMut + Unpin,
188    P::Target: FusedMultipartWrite<Part>,
189{
190    fn is_terminated(&self) -> bool {
191        <P::Target as FusedMultipartWrite<Part>>::is_terminated(&**self)
192    }
193}
194
195impl<T> MultipartWrite<T> for Vec<T> {
196    type Ret = ();
197    type Output = Self;
198    type Error = Never;
199
200    fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
201        Poll::Ready(Ok(()))
202    }
203
204    fn start_send(self: Pin<&mut Self>, part: T) -> Result<Self::Ret, Self::Error> {
205        unsafe { self.get_unchecked_mut() }.push(part);
206        Ok(())
207    }
208
209    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
210        Poll::Ready(Ok(()))
211    }
212
213    fn poll_complete(
214        self: Pin<&mut Self>,
215        _cx: &mut Context<'_>,
216    ) -> Poll<Result<Self::Output, Self::Error>> {
217        let this: &mut Vec<T> = unsafe { self.get_unchecked_mut() };
218        let out = std::mem::take(this);
219        Poll::Ready(Ok(out))
220    }
221}
222
223impl<T> MultipartWrite<T> for VecDeque<T> {
224    type Ret = ();
225    type Output = Self;
226    type Error = Never;
227
228    fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
229        Poll::Ready(Ok(()))
230    }
231
232    fn start_send(self: Pin<&mut Self>, part: T) -> Result<Self::Ret, Self::Error> {
233        unsafe { self.get_unchecked_mut() }.push_back(part);
234        Ok(())
235    }
236
237    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
238        Poll::Ready(Ok(()))
239    }
240
241    fn poll_complete(
242        self: Pin<&mut Self>,
243        _cx: &mut Context<'_>,
244    ) -> Poll<Result<Self::Output, Self::Error>> {
245        let this: &mut VecDeque<T> = unsafe { self.get_unchecked_mut() };
246        let out = std::mem::take(this);
247        Poll::Ready(Ok(out))
248    }
249}