multipart_write/
lib.rs

1//! # Description
2//!
3//! This crate contains the trait `MultipartWrite`, assorted implementations,
4//! and combinators.
5//!
6//! A `MultipartWrite` is a similar interface to [`Sink`], except that writing
7//! an item or completing the write both return values.
8//!
9//! [Here][example] is a conceptual example of a `MultipartWrite`.
10//!
11//! # Motivation
12//!
13//! `Sink` is a useful API, but it is just that: a sink, the end of a stream.
14//!
15//! It's valuable to have the backpressure mechanism that `poll_ready` combined
16//! with `start_send` enables, and it's nice to have the flexibility that the
17//! shape of `Sink` provides in what kinds of values you can send with it.
18//!
19//! The idea for `MultipartWrite` is to:
20//! 1. Have those same desirable properies: backpressure and generic input type.
21//! 2. Be able to be inserted earlier in a stream computation.
22//! 3. Replace `Sink` when the use case would need a value returned by sending to
23//!    it or closing it.
24//! 4. Transform a stream by writing it in parts, which is somewhat of a specific
25//!    rephrasing of the second and third points.
26//!
27//! [`Sink`]: https://docs.rs/crate/futures-sink/0.3.31
28//! [example]: https://github.com/quasi-coherent/multipart-write/blob/master/examples/author.rs
29#![cfg_attr(docsrs, feature(doc_cfg))]
30use std::collections::VecDeque;
31use std::convert::Infallible as Never;
32use std::ops::DerefMut;
33use std::pin::Pin;
34use std::task::{Context, Poll};
35
36pub mod io;
37pub mod stream;
38pub mod write;
39
40/// A prelude for this crate.
41pub mod prelude {
42    pub use super::stream::{self, MultipartStreamExt as _};
43    pub use super::write::{self, MultipartWriteExt as _};
44    pub use super::{FusedMultipartWrite, MultipartWrite};
45}
46
47/// `MultipartWrite` is a `Sink`-like interface for asynchronously writing an
48/// object in parts.
49pub trait MultipartWrite<Part> {
50    /// The type of value returned when writing the part began successfully.
51    type Ret;
52
53    /// The type of value returned when all parts are written.
54    type Output;
55
56    /// The type of value returned when an operation fails.
57    type Error;
58
59    /// Attempts to prepare the `MultipartWrite` to receive a new part.
60    ///
61    /// This method must be called and return `Poll::Ready` before each call to
62    /// `start_send`, indicating that the underlying writer is ready to have
63    /// another part written to it.
64    ///
65    /// This method returns `Poll::Pending` when the object being prepared cannot
66    /// accept another part.
67    ///
68    /// In most cases, if the writer encounters an error, it will be permanently
69    /// unable to write more parts.
70    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
71
72    /// Begin the process of writing a part to this writer, returning the
73    /// associated type confirming this was done successfully.
74    ///
75    /// Like `Sink`, this should be preceded by a call to `poll_ready` that
76    /// returns `Poll::Ready` to ensure that the `MultipartWrite` is ready to
77    /// receive a new part.
78    ///
79    /// # Errors
80    ///
81    /// Errors returned by this method are implementation-specific, but it is
82    /// always an error to call `start_send` when `poll_ready` would return
83    /// `Poll::Pending`.
84    ///
85    /// In most cases, if the writer encounters an error, it will be permanently
86    /// unable to write more parts.
87    fn start_send(self: Pin<&mut Self>, part: Part) -> Result<Self::Ret, Self::Error>;
88
89    /// Flush any remaining output from the writer.
90    ///
91    /// Returns `Poll::Ready` when no buffered, unwritten parts remain and
92    /// `Poll::Pending` if there is more work left to do.
93    ///
94    /// In most cases, if the writer encounters an error, it will be permanently
95    /// unable to write more parts.
96    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
97
98    /// Complete this writer, returning the output.
99    ///
100    /// This method returns `Poll::Pending` until no buffered, unwritten parts
101    /// remain and the complete output object is available.
102    ///
103    /// In most cases, if the writer encounters an error, it will be permanently
104    /// unable to write more parts.
105    fn poll_complete(
106        self: Pin<&mut Self>,
107        cx: &mut Context<'_>,
108    ) -> Poll<Result<Self::Output, Self::Error>>;
109}
110
111/// An owned, dynamically typed [`MultipartWrite`] for use in cases where it is
112/// not possible or desirable to statically type it.
113pub type BoxMultipartWrite<'a, Part, R, T, E> =
114    Pin<Box<dyn MultipartWrite<Part, Ret = R, Output = T, Error = E> + Send + 'a>>;
115
116/// `BoxMultipartWrite` but without the `Send` requirement.
117pub type LocalBoxMultipartWrite<'a, Part, R, T, E> =
118    Pin<Box<dyn MultipartWrite<Part, Ret = R, Output = T, Error = E> + 'a>>;
119
120/// A writer that tracks whether or not the underlying writer should no longer
121/// be polled.
122pub trait FusedMultipartWrite<Part>: MultipartWrite<Part> {
123    /// Returns `true` if the writer should no longer be polled.
124    fn is_terminated(&self) -> bool;
125}
126
127impl<W: ?Sized + MultipartWrite<Part> + Unpin, Part> MultipartWrite<Part> for &mut W {
128    type Ret = W::Ret;
129    type Output = W::Output;
130    type Error = W::Error;
131
132    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
133        Pin::new(&mut **self).poll_ready(cx)
134    }
135
136    fn start_send(mut self: Pin<&mut Self>, part: Part) -> Result<Self::Ret, Self::Error> {
137        Pin::new(&mut **self).start_send(part)
138    }
139
140    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
141        Pin::new(&mut **self).poll_flush(cx)
142    }
143
144    fn poll_complete(
145        mut self: Pin<&mut Self>,
146        cx: &mut Context<'_>,
147    ) -> Poll<Result<Self::Output, Self::Error>> {
148        Pin::new(&mut **self).poll_complete(cx)
149    }
150}
151
152impl<W: ?Sized + FusedMultipartWrite<Part> + Unpin, Part> FusedMultipartWrite<Part> for &mut W {
153    fn is_terminated(&self) -> bool {
154        <W as FusedMultipartWrite<Part>>::is_terminated(&**self)
155    }
156}
157
158impl<P, Part> MultipartWrite<Part> for Pin<P>
159where
160    P: DerefMut + Unpin,
161    P::Target: MultipartWrite<Part>,
162{
163    type Ret = <P::Target as MultipartWrite<Part>>::Ret;
164    type Output = <P::Target as MultipartWrite<Part>>::Output;
165    type Error = <P::Target as MultipartWrite<Part>>::Error;
166
167    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
168        self.get_mut().as_mut().poll_ready(cx)
169    }
170
171    fn start_send(self: Pin<&mut Self>, part: Part) -> Result<Self::Ret, Self::Error> {
172        self.get_mut().as_mut().start_send(part)
173    }
174
175    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
176        self.get_mut().as_mut().poll_flush(cx)
177    }
178
179    fn poll_complete(
180        self: Pin<&mut Self>,
181        cx: &mut Context<'_>,
182    ) -> Poll<Result<Self::Output, Self::Error>> {
183        self.get_mut().as_mut().poll_complete(cx)
184    }
185}
186
187impl<P, Part> FusedMultipartWrite<Part> for Pin<P>
188where
189    P: DerefMut + Unpin,
190    P::Target: FusedMultipartWrite<Part>,
191{
192    fn is_terminated(&self) -> bool {
193        <P::Target as FusedMultipartWrite<Part>>::is_terminated(&**self)
194    }
195}
196
197impl<T> MultipartWrite<T> for Vec<T> {
198    type Ret = ();
199    type Output = Self;
200    type Error = Never;
201
202    fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
203        Poll::Ready(Ok(()))
204    }
205
206    fn start_send(self: Pin<&mut Self>, part: T) -> Result<Self::Ret, Self::Error> {
207        unsafe { self.get_unchecked_mut() }.push(part);
208        Ok(())
209    }
210
211    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
212        Poll::Ready(Ok(()))
213    }
214
215    fn poll_complete(
216        self: Pin<&mut Self>,
217        _cx: &mut Context<'_>,
218    ) -> Poll<Result<Self::Output, Self::Error>> {
219        let this: &mut Vec<T> = unsafe { self.get_unchecked_mut() };
220        let out = std::mem::take(this);
221        Poll::Ready(Ok(out))
222    }
223}
224
225impl<T> MultipartWrite<T> for VecDeque<T> {
226    type Ret = ();
227    type Output = Self;
228    type Error = Never;
229
230    fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
231        Poll::Ready(Ok(()))
232    }
233
234    fn start_send(self: Pin<&mut Self>, part: T) -> Result<Self::Ret, Self::Error> {
235        unsafe { self.get_unchecked_mut() }.push_back(part);
236        Ok(())
237    }
238
239    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
240        Poll::Ready(Ok(()))
241    }
242
243    fn poll_complete(
244        self: Pin<&mut Self>,
245        _cx: &mut Context<'_>,
246    ) -> Poll<Result<Self::Output, Self::Error>> {
247        let this: &mut VecDeque<T> = unsafe { self.get_unchecked_mut() };
248        let out = std::mem::take(this);
249        Poll::Ready(Ok(out))
250    }
251}