aws_multipart_upload/write/
upload.rs

1use crate::client::part::{CompletedParts, PartBody, PartNumber};
2use crate::client::request::*;
3use crate::client::{UploadClient, UploadData, UploadId};
4use crate::error::{Error as UploadError, Result};
5use crate::uri::{ObjectUri, ObjectUriIter};
6
7use futures::ready;
8use multipart_write::{FusedMultipartWrite, MultipartWrite};
9use std::fmt::{self, Debug, Formatter};
10use std::pin::Pin;
11use std::task::{Context, Poll};
12
13/// Returned when a part upload request was sent.
14///
15/// Note this does not mean that the request was successful, only that it was
16/// able to be sent.
17#[derive(Debug, Clone, Default)]
18pub struct UploadSent {
19    /// The id of the active upload.
20    pub id: UploadId,
21    /// The destination URI of the active upload.
22    pub uri: ObjectUri,
23    /// The part number that was used in the part upload request.
24    pub part: PartNumber,
25    /// The size in bytes of the body of the part upload request.
26    pub bytes: u64,
27}
28
29impl UploadSent {
30    fn new(data: &UploadData, part: PartNumber, bytes: usize) -> Self {
31        Self {
32            id: data.get_id(),
33            uri: data.get_uri(),
34            part,
35            bytes: bytes as u64,
36        }
37    }
38}
39
40/// A type to manage the lifecycle of a multipart upload.
41///
42/// This `MultipartWrite` sends part upload requests from the input [`PartBody`]
43/// and completes the upload when polled for completion.
44///
45/// On completion, a new upload is created using the `ObjectUriIter` it was
46/// configured with, which makes the writer available to continue writing parts
47/// to with a new upload ID.  As long as the iterator `ObjectUriIter` can produce
48/// the next upload, this writer remains active.
49///
50/// [`PartBody`]: crate::client::part::PartBody
51/// [`CompletedUpload`]: crate::client::request::CompletedUpload
52#[must_use = "futures do nothing unless polled"]
53#[pin_project::pin_project]
54pub struct Upload<Buf> {
55    #[pin]
56    inner: UploadImpl<Buf>,
57    #[pin]
58    fut: Option<SendCreateUpload>,
59    next_uri: Option<ObjectUri>,
60    iter: ObjectUriIter,
61}
62
63impl<Buf> Upload<Buf> {
64    pub(crate) fn new(buf: Buf, client: &UploadClient, mut iter: ObjectUriIter) -> Self {
65        let inner = UploadImpl::new(buf, client);
66        let fut = iter.next_upload(client);
67        Self {
68            inner,
69            fut,
70            next_uri: None,
71            iter,
72        }
73    }
74
75    fn poll_new_upload(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
76        let mut this = self.project();
77
78        if let Some(uri) = this.next_uri.take() {
79            trace!(?uri, "starting new upload");
80            let req = CreateRequest::new(uri);
81            let fut = SendCreateUpload::new(&this.inner.client, req);
82            this.fut.set(Some(fut));
83        }
84
85        if let Some(fut) = this.fut.as_mut().as_pin_mut() {
86            match ready!(fut.poll(cx)) {
87                Ok(data) => {
88                    this.fut.set(None);
89                    trace!(id = %data.id, uri = ?data.uri, "started new upload");
90                    this.inner.as_mut().set_upload_data(data);
91                }
92                Err(e) => {
93                    this.fut.set(None);
94                    return Poll::Ready(Err(e));
95                }
96            }
97        }
98
99        Poll::Ready(Ok(()))
100    }
101}
102
103impl<Buf> FusedMultipartWrite<PartBody> for Upload<Buf>
104where
105    Buf: MultipartWrite<SendUploadPart, Output = CompletedParts, Error = UploadError>,
106{
107    fn is_terminated(&self) -> bool {
108        // If the inner upload is not active, and there is no request for a new
109        // upload nor next URI to make the request, we are terminated.
110        self.inner.is_terminated() && self.fut.is_none() && self.next_uri.is_none()
111    }
112}
113
114impl<Buf> MultipartWrite<PartBody> for Upload<Buf>
115where
116    Buf: MultipartWrite<SendUploadPart, Error = UploadError, Output = CompletedParts>,
117{
118    type Ret = UploadSent;
119    type Error = UploadError;
120    type Output = CompletedUpload;
121
122    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
123        ready!(self.as_mut().poll_new_upload(cx))?;
124        self.project().inner.poll_ready(cx)
125    }
126
127    fn start_send(self: Pin<&mut Self>, part: PartBody) -> Result<Self::Ret> {
128        self.project().inner.start_send(part)
129    }
130
131    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
132        self.project().inner.poll_flush(cx)
133    }
134
135    fn poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<Self::Output>> {
136        let mut this = self.project();
137        let out = ready!(this.inner.as_mut().poll_complete(cx));
138        *this.next_uri = this.iter.next();
139
140        trace!(next_uri = ?this.next_uri, "completed upload");
141        Poll::Ready(out)
142    }
143}
144
145impl<Buf: Debug> Debug for Upload<Buf> {
146    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
147        f.debug_struct("Upload")
148            .field("inner", &self.inner)
149            .field("fut", &self.fut)
150            .field("next_uri", &self.next_uri)
151            .field("iter", &self.iter)
152            .finish()
153    }
154}
155
156/// Responsible for a single upload, which `Upload` orchestrates.
157#[must_use = "futures do nothing unless polled"]
158#[pin_project::pin_project]
159struct UploadImpl<Buf> {
160    #[pin]
161    buf: Buf,
162    #[pin]
163    fut: Option<SendCompleteUpload>,
164    data: Option<UploadData>,
165    client: UploadClient,
166    completed: CompletedParts,
167    part: PartNumber,
168}
169
170impl<Buf> UploadImpl<Buf> {
171    fn new(buf: Buf, client: &UploadClient) -> Self {
172        Self {
173            buf,
174            fut: None,
175            data: None,
176            client: client.clone(),
177            completed: CompletedParts::default(),
178            part: PartNumber::default(),
179        }
180    }
181
182    fn set_upload_data(self: Pin<&mut Self>, data: UploadData) {
183        *self.project().data = Some(data);
184    }
185}
186
187impl<Buf> FusedMultipartWrite<PartBody> for UploadImpl<Buf>
188where
189    Buf: MultipartWrite<SendUploadPart, Output = CompletedParts, Error = UploadError>,
190{
191    fn is_terminated(&self) -> bool {
192        self.data.is_none()
193    }
194}
195
196impl<Buf> MultipartWrite<PartBody> for UploadImpl<Buf>
197where
198    Buf: MultipartWrite<SendUploadPart, Error = UploadError, Output = CompletedParts>,
199{
200    type Ret = UploadSent;
201    type Error = UploadError;
202    type Output = CompletedUpload;
203
204    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
205        self.project().buf.as_mut().poll_ready(cx)
206    }
207
208    fn start_send(self: Pin<&mut Self>, part: PartBody) -> Result<Self::Ret> {
209        let mut this = self.project();
210        let bytes = part.size();
211        let data = this.data.as_ref().expect("polled Upload after completion");
212        let pt_num = this.part.increment();
213
214        let req = UploadPartRequest::new(data, part, pt_num);
215        let fut = SendUploadPart::new(this.client, req);
216        let _ = this.buf.as_mut().start_send(fut)?;
217        let sent = UploadSent::new(data, pt_num, bytes);
218        trace!(
219            id = %sent.id,
220            uri = %sent.uri,
221            part = %sent.part,
222            bytes = sent.bytes,
223            "part upload initiated",
224        );
225        Ok(sent)
226    }
227
228    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
229        let this = self.project();
230        let parts = ready!(this.buf.poll_complete(cx))?;
231        this.completed.extend(parts);
232        Poll::Ready(Ok(()))
233    }
234
235    fn poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<Self::Output>> {
236        let mut this = self.project();
237
238        if this.fut.is_none() {
239            let data = this.data.as_ref().expect("polled Upload after completion");
240            let parts = ready!(this.buf.poll_complete(cx))?;
241            this.completed.extend(parts);
242            let completed = std::mem::take(this.completed);
243            let req = CompleteRequest::new(data, completed);
244            trace!(
245                id = %req.id(),
246                uri = ?req.uri(),
247                parts = ?req.completed_parts(),
248                "completing upload",
249            );
250            let fut = SendCompleteUpload::new(this.client, req);
251            this.fut.set(Some(fut));
252        }
253
254        let fut = this
255            .fut
256            .as_mut()
257            .as_pin_mut()
258            .expect("polled Upload after completion");
259        let out = ready!(fut.poll(cx));
260
261        this.fut.set(None);
262        *this.data = None;
263        *this.part = PartNumber::default();
264        trace!(result = ?out, "completed upload");
265
266        Poll::Ready(out)
267    }
268}
269
270impl<Buf: Debug> Debug for UploadImpl<Buf> {
271    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
272        f.debug_struct("UploadImpl")
273            .field("buf", &self.buf)
274            .field("fut", &self.fut)
275            .field("data", &self.data)
276            .field("client", &self.client)
277            .field("completed", &self.completed)
278            .field("part", &self.part)
279            .finish()
280    }
281}