multipart_write/
lib.rs

1//! # Description
2//!
3//! This crate contains the trait `MultipartWrite`, assorted implementations,
4//! and combinators.
5//!
6//! A `MultipartWrite` is a similar interface to [`Sink`], except that writing
7//! an item or completing the write both return values.
8//!
9//! [Here][example] is a conceptual example of a `MultipartWrite`.
10//!
11//! # Motivation
12//!
13//! `Sink` is a useful API, but it is just that: a sink, the end of a stream.
14//!
15//! It's valuable to have the backpressure mechanism that `poll_ready` combined
16//! with `start_send` enables, and it's nice to have the flexibility that the
17//! shape of `Sink` provides in what kinds of values you can send with it.
18//!
19//! The idea for `MultipartWrite` is to:
20//! 1. Have those same desirable properies: backpressure and generic input type.
21//! 2. Be able to be inserted earlier in a stream computation.
22//! 3. Replace `Sink` when the use case would need a value returned by sending to
23//!    it or closing it.
24//! 4. Transform a stream by writing it in parts, which is somewhat of a specific
25//!    rephrasing of the second and third points.
26//!
27//! [`Sink`]: https://docs.rs/crate/futures-sink/0.3.31
28//! [example]: https://github.com/quasi-coherent/multipart-write/blob/master/examples/author.rs
29#![cfg_attr(docsrs, feature(doc_cfg))]
30use std::collections::VecDeque;
31use std::convert::Infallible as Never;
32use std::ops::DerefMut;
33use std::pin::Pin;
34use std::task::{Context, Poll};
35
36pub mod io;
37
38pub mod stream;
39#[doc(inline)]
40pub use stream::MultipartStreamExt;
41
42pub mod write;
43#[doc(inline)]
44pub use write::MultipartWriteExt;
45
46/// `MultipartWrite` is a `Sink`-like interface for asynchronously writing an
47/// object in parts.
48pub trait MultipartWrite<Part> {
49    /// The type of value returned when writing the part began successfully.
50    type Ret;
51
52    /// The type of value returned when all parts are written.
53    type Output;
54
55    /// The type of value returned when an operation fails.
56    type Error;
57
58    /// Attempts to prepare the `MultipartWrite` to receive a new part.
59    ///
60    /// This method must be called and return `Poll::Ready` before each call to
61    /// `start_send`, indicating that the underlying writer is ready to have
62    /// another part written to it.
63    ///
64    /// This method returns `Poll::Pending` when the object being prepared cannot
65    /// accept another part.
66    ///
67    /// In most cases, if the writer encounters an error, it will be permanently
68    /// unable to write more parts.
69    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
70
71    /// Begin the process of writing a part to this writer, returning the
72    /// associated type confirming this was done successfully.
73    ///
74    /// Like `Sink`, this should be preceded by a call to `poll_ready` that
75    /// returns `Poll::Ready` to ensure that the `MultipartWrite` is ready to
76    /// receive a new part.
77    ///
78    /// # Errors
79    ///
80    /// Errors returned by this method are implementation-specific, but it is
81    /// always an error to call `start_send` when `poll_ready` would return
82    /// `Poll::Pending`.
83    ///
84    /// In most cases, if the writer encounters an error, it will be permanently
85    /// unable to write more parts.
86    fn start_send(self: Pin<&mut Self>, part: Part) -> Result<Self::Ret, Self::Error>;
87
88    /// Flush any remaining output from the writer.
89    ///
90    /// Returns `Poll::Ready` when no buffered, unwritten parts remain and
91    /// `Poll::Pending` if there is more work left to do.
92    ///
93    /// In most cases, if the writer encounters an error, it will be permanently
94    /// unable to write more parts.
95    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
96
97    /// Complete this writer, returning the output.
98    ///
99    /// This method returns `Poll::Pending` until no buffered, unwritten parts
100    /// remain and the complete output object is available.
101    ///
102    /// In most cases, if the writer encounters an error, it will be permanently
103    /// unable to write more parts.
104    fn poll_complete(
105        self: Pin<&mut Self>,
106        cx: &mut Context<'_>,
107    ) -> Poll<Result<Self::Output, Self::Error>>;
108}
109
110/// An owned, dynamically typed [`MultipartWrite`] for use in cases where it is
111/// not possible or desirable to statically type it.
112pub type BoxMultipartWrite<'a, Part, R, T, E> =
113    Pin<Box<dyn MultipartWrite<Part, Ret = R, Output = T, Error = E> + Send + 'a>>;
114
115/// `BoxMultipartWrite` but without the `Send` requirement.
116pub type LocalBoxMultipartWrite<'a, Part, R, T, E> =
117    Pin<Box<dyn MultipartWrite<Part, Ret = R, Output = T, Error = E> + 'a>>;
118
119/// A writer that tracks whether or not the underlying writer should no longer
120/// be polled.
121pub trait FusedMultipartWrite<Part>: MultipartWrite<Part> {
122    /// Returns `true` if the writer should no longer be polled.
123    fn is_terminated(&self) -> bool;
124}
125
126impl<W: ?Sized + MultipartWrite<Part> + Unpin, Part> MultipartWrite<Part> for &mut W {
127    type Ret = W::Ret;
128    type Output = W::Output;
129    type Error = W::Error;
130
131    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
132        Pin::new(&mut **self).poll_ready(cx)
133    }
134
135    fn start_send(mut self: Pin<&mut Self>, part: Part) -> Result<Self::Ret, Self::Error> {
136        Pin::new(&mut **self).start_send(part)
137    }
138
139    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
140        Pin::new(&mut **self).poll_flush(cx)
141    }
142
143    fn poll_complete(
144        mut self: Pin<&mut Self>,
145        cx: &mut Context<'_>,
146    ) -> Poll<Result<Self::Output, Self::Error>> {
147        Pin::new(&mut **self).poll_complete(cx)
148    }
149}
150
151impl<W: ?Sized + FusedMultipartWrite<Part> + Unpin, Part> FusedMultipartWrite<Part> for &mut W {
152    fn is_terminated(&self) -> bool {
153        <W as FusedMultipartWrite<Part>>::is_terminated(&**self)
154    }
155}
156
157impl<P, Part> MultipartWrite<Part> for Pin<P>
158where
159    P: DerefMut + Unpin,
160    P::Target: MultipartWrite<Part>,
161{
162    type Ret = <P::Target as MultipartWrite<Part>>::Ret;
163    type Output = <P::Target as MultipartWrite<Part>>::Output;
164    type Error = <P::Target as MultipartWrite<Part>>::Error;
165
166    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
167        self.get_mut().as_mut().poll_ready(cx)
168    }
169
170    fn start_send(self: Pin<&mut Self>, part: Part) -> Result<Self::Ret, Self::Error> {
171        self.get_mut().as_mut().start_send(part)
172    }
173
174    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
175        self.get_mut().as_mut().poll_flush(cx)
176    }
177
178    fn poll_complete(
179        self: Pin<&mut Self>,
180        cx: &mut Context<'_>,
181    ) -> Poll<Result<Self::Output, Self::Error>> {
182        self.get_mut().as_mut().poll_complete(cx)
183    }
184}
185
186impl<P, Part> FusedMultipartWrite<Part> for Pin<P>
187where
188    P: DerefMut + Unpin,
189    P::Target: FusedMultipartWrite<Part>,
190{
191    fn is_terminated(&self) -> bool {
192        <P::Target as FusedMultipartWrite<Part>>::is_terminated(&**self)
193    }
194}
195
196impl<T> MultipartWrite<T> for Vec<T> {
197    type Ret = ();
198    type Output = Self;
199    type Error = Never;
200
201    fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
202        Poll::Ready(Ok(()))
203    }
204
205    fn start_send(self: Pin<&mut Self>, part: T) -> Result<Self::Ret, Self::Error> {
206        unsafe { self.get_unchecked_mut() }.push(part);
207        Ok(())
208    }
209
210    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
211        Poll::Ready(Ok(()))
212    }
213
214    fn poll_complete(
215        self: Pin<&mut Self>,
216        _cx: &mut Context<'_>,
217    ) -> Poll<Result<Self::Output, Self::Error>> {
218        let this: &mut Vec<T> = unsafe { self.get_unchecked_mut() };
219        let out = std::mem::take(this);
220        Poll::Ready(Ok(out))
221    }
222}
223
224impl<T> MultipartWrite<T> for VecDeque<T> {
225    type Ret = ();
226    type Output = Self;
227    type Error = Never;
228
229    fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
230        Poll::Ready(Ok(()))
231    }
232
233    fn start_send(self: Pin<&mut Self>, part: T) -> Result<Self::Ret, Self::Error> {
234        unsafe { self.get_unchecked_mut() }.push_back(part);
235        Ok(())
236    }
237
238    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
239        Poll::Ready(Ok(()))
240    }
241
242    fn poll_complete(
243        self: Pin<&mut Self>,
244        _cx: &mut Context<'_>,
245    ) -> Poll<Result<Self::Output, Self::Error>> {
246        let this: &mut VecDeque<T> = unsafe { self.get_unchecked_mut() };
247        let out = std::mem::take(this);
248        Poll::Ready(Ok(out))
249    }
250}