Skip to main content

multipart_write/
lib.rs

1//! # Description
2//!
3//! This crate contains the trait `MultipartWrite`, assorted implementations,
4//! and combinators.
5//!
6//! A `MultipartWrite` is a similar interface to [`Sink`], except that writing
7//! an item or completing the write both return values.
8//!
9//! [Here][example] is a conceptual example of a `MultipartWrite`.
10//!
11//! For a more extensive use of the interface, see the crate
12//! [`aws-multipart-upload`] where it is central to a "real world" and/or
13//! "more serious" domain.
14//!
15//! # Motivation
16//!
17//! `Sink` is a useful API, but it is just that: a sink, the end of a stream.
18//!
19//! It's valuable to have the backpressure mechanism that `poll_ready` combined
20//! with `start_send` enables, and it's nice to have the flexibility that the
21//! shape of `Sink` provides in what kinds of values you can send with it.
22//!
23//! The idea for `MultipartWrite` is to:
24//! 1. Allow the same desirable properies: backpressure and generic input type.
25//! 2. Be able to be inserted earlier in a stream computation.
26//! 3. Replace `Sink` when the use case would need a value returned by sending
27//!    to it or closing it.
28//! 4. Transform a stream by writing it in parts, which is somewhat of a
29//!    specific rephrasing of the second and third points.
30//!
31//! [`Sink`]: https://docs.rs/crate/futures-sink/latest
32//! [example]: https://github.com/quasi-coherent/multipart-write/blob/master/examples/author.rs
33//! [`aws-multipart-upload`]: https://docs.rs/crate/aws-multipart-upload/latest
34#![cfg_attr(docsrs, feature(doc_cfg))]
35use std::collections::VecDeque;
36use std::convert::Infallible as Never;
37use std::ops::DerefMut;
38use std::pin::Pin;
39use std::task::{Context, Poll};
40
41pub mod io;
42
43pub mod stream;
44#[doc(inline)]
45pub use stream::MultipartStreamExt;
46
47pub mod write;
48#[doc(inline)]
49pub use write::MultipartWriteExt;
50
51/// `MultipartWrite` is an interface for asynchronously writing an object in
52/// parts.
53pub trait MultipartWrite<Part> {
54    /// The type of value returned when sending a part to be written began
55    /// successfully.
56    type Recv;
57
58    /// The type of value assembled from the parts when they have all been
59    /// written.
60    type Output;
61
62    /// The type of value returned when an operation fails.
63    type Error;
64
65    /// Attempts to prepare the `MultipartWrite` to receive a new part.
66    ///
67    /// This method must be called and return `Poll::Ready` before each call to
68    /// `start_send`, indicating that the underlying writer is ready to have
69    /// another part written to it.
70    ///
71    /// This method returns `Poll::Pending` when the object being prepared
72    /// cannot accept another part.
73    ///
74    /// # Errors
75    ///
76    /// Errors returned by this method are entirely implementation-specific but
77    /// could render the writer permanently unusable.
78    fn poll_ready(
79        self: Pin<&mut Self>,
80        cx: &mut Context<'_>,
81    ) -> Poll<Result<(), Self::Error>>;
82
83    /// Begin the process of writing a part to this writer, returning the
84    /// associated type confirming it was received successfully.
85    ///
86    /// This method must be preceded by a call to `poll_ready` that returns
87    /// `Poll::Ready` to ensure that the `MultipartWrite` is ready to receive a
88    /// new part.
89    ///
90    /// # Errors
91    ///
92    /// Errors returned by this method are entirely implementation-specific but
93    /// could render the writer permanently unusable.  However, it is always an
94    /// error to call `start_send` when `poll_ready` would return the value
95    /// `Poll::Pending` indicating that the writer was not available to have a
96    /// part written.
97    fn start_send(
98        self: Pin<&mut Self>,
99        part: Part,
100    ) -> Result<Self::Recv, Self::Error>;
101
102    /// Flush any remaining output from the writer.
103    ///
104    /// Returns `Poll::Ready` when no unwritten parts remain and `Poll::Pending`
105    /// if there is more work left to do.
106    ///
107    /// # Errors
108    ///
109    /// Errors returned by this method are entirely implementation-specific but
110    /// could render the writer permanently unusable.
111    fn poll_flush(
112        self: Pin<&mut Self>,
113        cx: &mut Context<'_>,
114    ) -> Poll<Result<(), Self::Error>>;
115
116    /// Complete the write, returning the output assembled from the written
117    /// parts.
118    ///
119    /// This method returns `Poll::Pending` until no buffered, unwritten parts
120    /// remain and the complete output object is available.
121    ///
122    /// # Errors
123    ///
124    /// Errors returned by this method are entirely implementation-specific but
125    /// could render the writer permanently unusable.
126    fn poll_complete(
127        self: Pin<&mut Self>,
128        cx: &mut Context<'_>,
129    ) -> Poll<Result<Self::Output, Self::Error>>;
130}
131
132/// An owned, dynamically typed [`MultipartWrite`] for use in cases where it is
133/// not possible or desirable to statically type it.
134///
135/// This is also handy to aid in type inference.  Since a `MultipartWrite` is
136/// generic over the type of value being written, often it is not possible to
137/// infer the types of `Part`, `R`, `T`, and/or `E`.
138///
139/// Erasing the type with `BoxMultipartWrite` picks the implementation and can
140/// resolve a string of type inference failures.
141pub type BoxMultipartWrite<'a, Part, R, T, E> = Pin<
142    Box<dyn MultipartWrite<Part, Recv = R, Output = T, Error = E> + Send + 'a>,
143>;
144
145/// `BoxMultipartWrite` but without the `Send` requirement.
146pub type LocalBoxMultipartWrite<'a, Part, R, T, E> =
147    Pin<Box<dyn MultipartWrite<Part, Recv = R, Output = T, Error = E> + 'a>>;
148
149/// A writer that tracks whether or not the underlying writer should no longer
150/// be polled.
151pub trait FusedMultipartWrite<Part>: MultipartWrite<Part> {
152    /// Returns `true` if the writer should no longer be polled.
153    fn is_terminated(&self) -> bool;
154}
155
156/// An owned, dynamically typed [`FusedMultipartWrite`] for use in cases where
157/// it is not possible or desirable to statically type it.
158///
159/// This is also handy to aid in type inference.  Since a `FusedMultipartWrite`
160/// is generic over the type of value being written, often it is not possible to
161/// infer the types of `Part`, `R`, `T`, and/or `E`.
162///
163/// Erasing the type with `BoxFusedMultipartWrite` picks the implementation and
164/// can resolve a string of type inference failures.
165pub type BoxFusedMultipartWrite<'a, Part, R, T, E> = Pin<
166    Box<
167        dyn FusedMultipartWrite<Part, Recv = R, Output = T, Error = E>
168            + Send
169            + 'a,
170    >,
171>;
172
173/// `BoxFusedMultipartWrite` but without the `Send` requirement.
174pub type LocalBoxFusedMultipartWrite<'a, Part, R, T, E> = Pin<
175    Box<dyn FusedMultipartWrite<Part, Recv = R, Output = T, Error = E> + 'a>,
176>;
177
178impl<W: ?Sized + MultipartWrite<Part> + Unpin, Part> MultipartWrite<Part>
179    for &mut W
180{
181    type Error = W::Error;
182    type Output = W::Output;
183    type Recv = W::Recv;
184
185    fn poll_ready(
186        mut self: Pin<&mut Self>,
187        cx: &mut Context<'_>,
188    ) -> Poll<Result<(), Self::Error>> {
189        Pin::new(&mut **self).poll_ready(cx)
190    }
191
192    fn start_send(
193        mut self: Pin<&mut Self>,
194        part: Part,
195    ) -> Result<Self::Recv, Self::Error> {
196        Pin::new(&mut **self).start_send(part)
197    }
198
199    fn poll_flush(
200        mut self: Pin<&mut Self>,
201        cx: &mut Context<'_>,
202    ) -> Poll<Result<(), Self::Error>> {
203        Pin::new(&mut **self).poll_flush(cx)
204    }
205
206    fn poll_complete(
207        mut self: Pin<&mut Self>,
208        cx: &mut Context<'_>,
209    ) -> Poll<Result<Self::Output, Self::Error>> {
210        Pin::new(&mut **self).poll_complete(cx)
211    }
212}
213
214impl<W: ?Sized + FusedMultipartWrite<Part> + Unpin, Part>
215    FusedMultipartWrite<Part> for &mut W
216{
217    fn is_terminated(&self) -> bool {
218        <W as FusedMultipartWrite<Part>>::is_terminated(&**self)
219    }
220}
221
222impl<W: ?Sized + MultipartWrite<Part> + Unpin, Part> MultipartWrite<Part>
223    for Box<W>
224{
225    type Error = W::Error;
226    type Output = W::Output;
227    type Recv = W::Recv;
228
229    fn poll_ready(
230        self: Pin<&mut Self>,
231        cx: &mut Context<'_>,
232    ) -> Poll<Result<(), Self::Error>> {
233        Pin::new(self.get_mut().as_mut()).poll_ready(cx)
234    }
235
236    fn start_send(
237        self: Pin<&mut Self>,
238        part: Part,
239    ) -> Result<Self::Recv, Self::Error> {
240        Pin::new(self.get_mut().as_mut()).start_send(part)
241    }
242
243    fn poll_flush(
244        self: Pin<&mut Self>,
245        cx: &mut Context<'_>,
246    ) -> Poll<Result<(), Self::Error>> {
247        Pin::new(self.get_mut().as_mut()).poll_flush(cx)
248    }
249
250    fn poll_complete(
251        self: Pin<&mut Self>,
252        cx: &mut Context<'_>,
253    ) -> Poll<Result<Self::Output, Self::Error>> {
254        Pin::new(self.get_mut().as_mut()).poll_complete(cx)
255    }
256}
257
258impl<W: ?Sized + FusedMultipartWrite<Part> + Unpin, Part>
259    FusedMultipartWrite<Part> for Box<W>
260{
261    fn is_terminated(&self) -> bool {
262        W::is_terminated(self)
263    }
264}
265
266impl<P, Part> MultipartWrite<Part> for Pin<P>
267where
268    P: DerefMut + Unpin,
269    P::Target: MultipartWrite<Part>,
270{
271    type Error = <P::Target as MultipartWrite<Part>>::Error;
272    type Output = <P::Target as MultipartWrite<Part>>::Output;
273    type Recv = <P::Target as MultipartWrite<Part>>::Recv;
274
275    fn poll_ready(
276        self: Pin<&mut Self>,
277        cx: &mut Context<'_>,
278    ) -> Poll<Result<(), Self::Error>> {
279        self.get_mut().as_mut().poll_ready(cx)
280    }
281
282    fn start_send(
283        self: Pin<&mut Self>,
284        part: Part,
285    ) -> Result<Self::Recv, Self::Error> {
286        self.get_mut().as_mut().start_send(part)
287    }
288
289    fn poll_flush(
290        self: Pin<&mut Self>,
291        cx: &mut Context<'_>,
292    ) -> Poll<Result<(), Self::Error>> {
293        self.get_mut().as_mut().poll_flush(cx)
294    }
295
296    fn poll_complete(
297        self: Pin<&mut Self>,
298        cx: &mut Context<'_>,
299    ) -> Poll<Result<Self::Output, Self::Error>> {
300        self.get_mut().as_mut().poll_complete(cx)
301    }
302}
303
304impl<P, Part> FusedMultipartWrite<Part> for Pin<P>
305where
306    P: DerefMut + Unpin,
307    P::Target: FusedMultipartWrite<Part>,
308{
309    fn is_terminated(&self) -> bool {
310        <P::Target as FusedMultipartWrite<Part>>::is_terminated(&**self)
311    }
312}
313
314impl<T> MultipartWrite<T> for Vec<T> {
315    type Error = Never;
316    type Output = Self;
317    type Recv = ();
318
319    fn poll_ready(
320        self: Pin<&mut Self>,
321        _cx: &mut Context<'_>,
322    ) -> Poll<Result<(), Self::Error>> {
323        Poll::Ready(Ok(()))
324    }
325
326    fn start_send(
327        self: Pin<&mut Self>,
328        part: T,
329    ) -> Result<Self::Recv, Self::Error> {
330        // SAFETY: We may treat `Vec<T>: Unpin` since we are not pinning the
331        // elements.
332        unsafe { self.get_unchecked_mut() }.push(part);
333        Ok(())
334    }
335
336    fn poll_flush(
337        self: Pin<&mut Self>,
338        _cx: &mut Context<'_>,
339    ) -> Poll<Result<(), Self::Error>> {
340        Poll::Ready(Ok(()))
341    }
342
343    fn poll_complete(
344        self: Pin<&mut Self>,
345        _cx: &mut Context<'_>,
346    ) -> Poll<Result<Self::Output, Self::Error>> {
347        // SAFETY: We may treat `Vec<T>: Unpin` since we are not pinning the
348        // elements.
349        let this: &mut Vec<T> = unsafe { self.get_unchecked_mut() };
350        let out = std::mem::take(this);
351        Poll::Ready(Ok(out))
352    }
353}
354
355impl<T> MultipartWrite<T> for VecDeque<T> {
356    type Error = Never;
357    type Output = Self;
358    type Recv = ();
359
360    fn poll_ready(
361        self: Pin<&mut Self>,
362        _cx: &mut Context<'_>,
363    ) -> Poll<Result<(), Self::Error>> {
364        Poll::Ready(Ok(()))
365    }
366
367    fn start_send(
368        self: Pin<&mut Self>,
369        part: T,
370    ) -> Result<Self::Recv, Self::Error> {
371        // SAFETY: We may treat `VecDeque<T>: Unpin` since we are not pinning
372        // the elements.
373        unsafe { self.get_unchecked_mut() }.push_back(part);
374        Ok(())
375    }
376
377    fn poll_flush(
378        self: Pin<&mut Self>,
379        _cx: &mut Context<'_>,
380    ) -> Poll<Result<(), Self::Error>> {
381        Poll::Ready(Ok(()))
382    }
383
384    fn poll_complete(
385        self: Pin<&mut Self>,
386        _cx: &mut Context<'_>,
387    ) -> Poll<Result<Self::Output, Self::Error>> {
388        // SAFETY: We may treat `VecDeque<T>: Unpin` since we are not pinning
389        // the elements.
390        let this: &mut VecDeque<T> = unsafe { self.get_unchecked_mut() };
391        let out = std::mem::take(this);
392        Poll::Ready(Ok(out))
393    }
394}