aws_multipart_upload/write/
upload.rs1use crate::client::part::{CompletedParts, PartBody, PartNumber};
2use crate::client::request::*;
3use crate::client::{UploadClient, UploadData, UploadId};
4use crate::error::{Error as UploadError, Result};
5use crate::uri::{ObjectUri, ObjectUriIter};
6
7use futures::ready;
8use multipart_write::{FusedMultipartWrite, MultipartWrite};
9use std::fmt::{self, Debug, Formatter};
10use std::pin::Pin;
11use std::task::{Context, Poll};
12
13#[derive(Debug, Clone, Default)]
18pub struct UploadSent {
19 pub id: UploadId,
21 pub uri: ObjectUri,
23 pub part: PartNumber,
25 pub bytes: u64,
27}
28
29impl UploadSent {
30 fn new(data: &UploadData, part: PartNumber, bytes: usize) -> Self {
31 Self {
32 id: data.get_id(),
33 uri: data.get_uri(),
34 part,
35 bytes: bytes as u64,
36 }
37 }
38}
39
40#[must_use = "futures do nothing unless polled"]
53#[pin_project::pin_project]
54pub struct Upload<Buf> {
55 #[pin]
56 inner: UploadImpl<Buf>,
57 #[pin]
58 fut: Option<SendCreateUpload>,
59 next_uri: Option<ObjectUri>,
60 iter: ObjectUriIter,
61}
62
63impl<Buf> Upload<Buf> {
64 pub(crate) fn new(buf: Buf, client: &UploadClient, mut iter: ObjectUriIter) -> Self {
65 let inner = UploadImpl::new(buf, client);
66 let fut = iter.next_upload(client);
67 Self {
68 inner,
69 fut,
70 next_uri: None,
71 iter,
72 }
73 }
74
75 fn poll_new_upload(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
76 let mut this = self.project();
77
78 if let Some(uri) = this.next_uri.take() {
79 trace!(?uri, "starting new upload");
80 let req = CreateRequest::new(uri);
81 let fut = SendCreateUpload::new(&this.inner.client, req);
82 this.fut.set(Some(fut));
83 }
84
85 if let Some(fut) = this.fut.as_mut().as_pin_mut() {
86 match ready!(fut.poll(cx)) {
87 Ok(data) => {
88 this.fut.set(None);
89 trace!(id = %data.id, uri = ?data.uri, "started new upload");
90 this.inner.as_mut().set_upload_data(data);
91 }
92 Err(e) => {
93 this.fut.set(None);
94 return Poll::Ready(Err(e));
95 }
96 }
97 }
98
99 Poll::Ready(Ok(()))
100 }
101}
102
103impl<Buf> FusedMultipartWrite<PartBody> for Upload<Buf>
104where
105 Buf: MultipartWrite<SendUploadPart, Output = CompletedParts, Error = UploadError>,
106{
107 fn is_terminated(&self) -> bool {
108 self.inner.is_terminated() && self.fut.is_none() && self.next_uri.is_none()
111 }
112}
113
114impl<Buf> MultipartWrite<PartBody> for Upload<Buf>
115where
116 Buf: MultipartWrite<SendUploadPart, Error = UploadError, Output = CompletedParts>,
117{
118 type Ret = UploadSent;
119 type Error = UploadError;
120 type Output = CompletedUpload;
121
122 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
123 ready!(self.as_mut().poll_new_upload(cx))?;
124 self.project().inner.poll_ready(cx)
125 }
126
127 fn start_send(self: Pin<&mut Self>, part: PartBody) -> Result<Self::Ret> {
128 self.project().inner.start_send(part)
129 }
130
131 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
132 self.project().inner.poll_flush(cx)
133 }
134
135 fn poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<Self::Output>> {
136 let mut this = self.project();
137 let out = ready!(this.inner.as_mut().poll_complete(cx));
138 *this.next_uri = this.iter.next();
139
140 trace!(next_uri = ?this.next_uri, "completed upload");
141 Poll::Ready(out)
142 }
143}
144
145impl<Buf: Debug> Debug for Upload<Buf> {
146 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
147 f.debug_struct("Upload")
148 .field("inner", &self.inner)
149 .field("fut", &self.fut)
150 .field("next_uri", &self.next_uri)
151 .field("iter", &self.iter)
152 .finish()
153 }
154}
155
156#[must_use = "futures do nothing unless polled"]
158#[pin_project::pin_project]
159struct UploadImpl<Buf> {
160 #[pin]
161 buf: Buf,
162 #[pin]
163 fut: Option<SendCompleteUpload>,
164 data: Option<UploadData>,
165 client: UploadClient,
166 completed: CompletedParts,
167 part: PartNumber,
168}
169
170impl<Buf> UploadImpl<Buf> {
171 fn new(buf: Buf, client: &UploadClient) -> Self {
172 Self {
173 buf,
174 fut: None,
175 data: None,
176 client: client.clone(),
177 completed: CompletedParts::default(),
178 part: PartNumber::default(),
179 }
180 }
181
182 fn set_upload_data(self: Pin<&mut Self>, data: UploadData) {
183 *self.project().data = Some(data);
184 }
185}
186
187impl<Buf> FusedMultipartWrite<PartBody> for UploadImpl<Buf>
188where
189 Buf: MultipartWrite<SendUploadPart, Output = CompletedParts, Error = UploadError>,
190{
191 fn is_terminated(&self) -> bool {
192 self.data.is_none()
193 }
194}
195
196impl<Buf> MultipartWrite<PartBody> for UploadImpl<Buf>
197where
198 Buf: MultipartWrite<SendUploadPart, Error = UploadError, Output = CompletedParts>,
199{
200 type Ret = UploadSent;
201 type Error = UploadError;
202 type Output = CompletedUpload;
203
204 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
205 self.project().buf.as_mut().poll_ready(cx)
206 }
207
208 fn start_send(self: Pin<&mut Self>, part: PartBody) -> Result<Self::Ret> {
209 let mut this = self.project();
210 let bytes = part.size();
211 let data = this.data.as_ref().expect("polled Upload after completion");
212 let pt_num = this.part.increment();
213
214 let req = UploadPartRequest::new(data, part, pt_num);
215 let fut = SendUploadPart::new(this.client, req);
216 let _ = this.buf.as_mut().start_send(fut)?;
217 let sent = UploadSent::new(data, pt_num, bytes);
218 trace!(
219 id = %sent.id,
220 uri = %sent.uri,
221 part = %sent.part,
222 bytes = sent.bytes,
223 "part upload initiated",
224 );
225 Ok(sent)
226 }
227
228 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
229 let this = self.project();
230 let parts = ready!(this.buf.poll_complete(cx))?;
231 this.completed.extend(parts);
232 Poll::Ready(Ok(()))
233 }
234
235 fn poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<Self::Output>> {
236 let mut this = self.project();
237
238 if this.fut.is_none() {
239 let data = this.data.as_ref().expect("polled Upload after completion");
240 let parts = ready!(this.buf.poll_complete(cx))?;
241 this.completed.extend(parts);
242 let completed = std::mem::take(this.completed);
243 let req = CompleteRequest::new(data, completed);
244 trace!(
245 id = %req.id(),
246 uri = ?req.uri(),
247 parts = ?req.completed_parts(),
248 "completing upload",
249 );
250 let fut = SendCompleteUpload::new(this.client, req);
251 this.fut.set(Some(fut));
252 }
253
254 let fut = this
255 .fut
256 .as_mut()
257 .as_pin_mut()
258 .expect("polled Upload after completion");
259 let out = ready!(fut.poll(cx));
260
261 this.fut.set(None);
262 *this.data = None;
263 *this.part = PartNumber::default();
264 trace!(result = ?out, "completed upload");
265
266 Poll::Ready(out)
267 }
268}
269
270impl<Buf: Debug> Debug for UploadImpl<Buf> {
271 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
272 f.debug_struct("UploadImpl")
273 .field("buf", &self.buf)
274 .field("fut", &self.fut)
275 .field("data", &self.data)
276 .field("client", &self.client)
277 .field("completed", &self.completed)
278 .field("part", &self.part)
279 .finish()
280 }
281}