1use std::fmt::{self, Debug, Formatter};
2use std::pin::Pin;
3use std::task::{Context, Poll};
4use std::time::{Duration, Instant};
5
6use bytesize::ByteSize;
7use futures::ready;
8use multipart_write::{FusedMultipartWrite, MultipartWrite};
9
10use super::part_buffer::PartBuffer;
11use super::{ShouldComplete, Uploaded};
12use crate::client::part::{CompletedParts, PartBody, PartNumber};
13use crate::client::request::*;
14use crate::client::{UploadClient, UploadData};
15use crate::error::{Error, ErrorWithUpload as _};
16use crate::uri::{ObjectUri, ObjectUriIter};
17
18#[derive(Debug, Clone, Copy)]
20pub struct UploadStatus {
21 pub upload_bytes: u64,
24 pub total_bytes: u64,
26 pub total_parts: u64,
28 pub duration: Duration,
30 pub should_complete: bool,
32}
33
34impl ShouldComplete for UploadStatus {
35 fn should_complete(&self) -> bool {
36 self.should_complete
37 }
38}
39
40#[derive(Debug)]
43#[must_use = "futures do nothing unless polled"]
44#[pin_project::pin_project]
45pub(crate) struct MultipartUpload {
46 #[pin]
47 inner: UploadInner,
48 #[pin]
49 fut: Option<SendCompleteUpload>,
50 client: UploadClient,
51 iter: ObjectUriIter,
52 state: GlobalState,
53 config: GlobalConfig,
54}
55
56impl MultipartUpload {
57 pub(crate) fn new(
58 client: UploadClient,
59 mut iter: ObjectUriIter,
60 bytes: ByteSize,
61 capacity: Option<usize>,
62 ) -> Self {
63 let max_bytes = bytes.as_u64();
64 let config = GlobalConfig { max_bytes, capacity };
65 let inner = UploadInner::create_upload_maybe(&client, iter.next());
66 let state = GlobalState::new(config);
67 Self { inner, fut: None, client, iter, state, config }
68 }
69}
70
71impl FusedMultipartWrite<PartBody> for MultipartUpload {
72 fn is_terminated(&self) -> bool {
73 self.inner.is_terminated()
74 }
75}
76
77impl MultipartWrite<PartBody> for MultipartUpload {
78 type Error = Error;
79 type Output = Uploaded;
80 type Recv = UploadStatus;
81
82 fn poll_ready(
83 mut self: Pin<&mut Self>,
84 cx: &mut Context<'_>,
85 ) -> Poll<Result<(), Self::Error>> {
86 let mut this = self.as_mut().project();
87 match this.inner.as_mut().project() {
88 UploadProj::Pending(fut) => {
90 let data = ready!(fut.poll(cx))?;
91 trace!(
92 id = %&data.id,
93 uri = %&data.uri,
94 "new upload created",
95 );
96 let new_inner = UploadInner::active(
97 this.client,
98 data,
99 this.config.capacity,
100 );
101 *this.state = GlobalState::new(*this.config);
102 this.inner.set(new_inner);
103 Poll::Ready(Ok(()))
104 },
105 UploadProj::Active(upl) => upl.poll_ready(cx),
106 UploadProj::Terminated => {
107 Poll::Ready(Err(Error::state("polled upload after completion")))
108 },
109 }
110 }
111
112 fn start_send(
113 mut self: Pin<&mut Self>,
114 part: PartBody,
115 ) -> Result<Self::Recv, Self::Error> {
116 let this = self.as_mut().project();
117 let upl =
118 this.inner.get_active_proj().expect("called send with no upload");
119 let recv = upl.start_send(part)?;
120 Ok(this.state.upload_status(recv))
121 }
122
123 fn poll_flush(
124 mut self: Pin<&mut Self>,
125 cx: &mut Context<'_>,
126 ) -> Poll<Result<(), Self::Error>> {
127 let this = self.as_mut().project();
128 let Some(upl) = this.inner.get_active_proj() else {
137 return Poll::Ready(Ok(()));
138 };
139 upl.poll_flush(cx)
140 }
141
142 fn poll_complete(
143 mut self: Pin<&mut Self>,
144 cx: &mut Context<'_>,
145 ) -> Poll<Result<Self::Output, Self::Error>> {
146 let mut this = self.as_mut().project();
147 if this.fut.is_none() {
149 let upl = this
150 .inner
151 .as_mut()
152 .get_active_proj()
153 .expect("called complete with no upload");
154 let out = ready!(upl.poll_complete(cx))?;
155 trace!(
156 id = %&out.data.id,
157 uri = %&out.data.uri,
158 bytes = out.parts.size(),
159 parts = out.parts.count(),
160 "starting complete upload",
161 );
162 let req = CompleteRequest::new(&out.data, out.parts);
163 let fut = SendCompleteUpload::new(this.client, req);
164 this.fut.set(Some(fut));
165 }
166 let fut = this.fut.as_mut().as_pin_mut().unwrap();
167 let resp = ready!(fut.poll(cx))?;
168 let output = this.state.uploaded(resp);
169 let next = this.iter.next();
176 trace!(
177 uri = %&output.uri,
178 etag = %&output.etag,
179 terminated = next.is_none(),
180 "completed upload",
181 );
182 let new_inner = UploadInner::create_upload_maybe(this.client, next);
183 this.fut.set(None);
184 this.inner.set(new_inner);
185 Poll::Ready(Ok(output))
186 }
187}
188
189#[derive(Debug)]
190#[must_use = "futures do nothing unless polled"]
191#[pin_project::pin_project(project = UploadProj)]
192enum UploadInner {
193 Active(#[pin] UploadParts),
194 Pending(#[pin] SendCreateUpload),
195 Terminated,
196}
197
198impl UploadInner {
199 fn active(
200 client: &UploadClient,
201 data: UploadData,
202 capacity: Option<usize>,
203 ) -> Self {
204 Self::Active(UploadParts::new(client, data, capacity))
205 }
206
207 fn create_upload_maybe(
210 client: &UploadClient,
211 uri: Option<ObjectUri>,
212 ) -> Self {
213 let Some(uri) = uri else {
214 return Self::Terminated;
215 };
216 let req = CreateRequest::new(uri);
217 let fut = SendCreateUpload::new(client, req);
218 Self::Pending(fut)
219 }
220
221 fn is_terminated(&self) -> bool {
222 matches!(self, Self::Terminated)
223 }
224
225 fn get_active_proj(self: Pin<&mut Self>) -> Option<Pin<&mut UploadParts>> {
226 let UploadProj::Active(upl) = self.project() else {
227 return None;
228 };
229 Some(upl)
230 }
231}
232
233#[must_use = "futures do nothing unless polled"]
235#[pin_project::pin_project]
236struct UploadParts {
237 #[pin]
238 buf: PartBuffer,
239 client: UploadClient,
240 data: UploadData,
241 parts: CompletedParts,
242 current: PartNumber,
243 is_terminated: bool,
244}
245
246impl UploadParts {
247 fn new(
248 client: &UploadClient,
249 data: UploadData,
250 capacity: Option<usize>,
251 ) -> Self {
252 Self {
253 buf: PartBuffer::new(capacity),
254 client: client.clone(),
255 data,
256 parts: CompletedParts::default(),
257 current: PartNumber::new(),
258 is_terminated: false,
259 }
260 }
261}
262
263impl FusedMultipartWrite<PartBody> for UploadParts {
264 fn is_terminated(&self) -> bool {
265 self.buf.is_terminated() || self.is_terminated
266 }
267}
268
269impl MultipartWrite<PartBody> for UploadParts {
270 type Error = Error;
271 type Output = PartBufOutput;
272 type Recv = PartBufRecv;
273
274 fn poll_ready(
275 self: Pin<&mut Self>,
276 cx: &mut Context<'_>,
277 ) -> Poll<Result<(), Self::Error>> {
278 let this = self.project();
279 ready!(this.buf.poll_ready(cx)).err_with_upl(
280 &this.data.id,
281 &this.data.uri,
282 this.parts,
283 )?;
284 Poll::Ready(Ok(()))
285 }
286
287 fn start_send(
288 self: Pin<&mut Self>,
289 body: PartBody,
290 ) -> Result<Self::Recv, Self::Error> {
291 let mut this = self.project();
292 let ptnum = this.current.fetch_incr();
294 let ptb = body.size();
295 let req = UploadPartRequest::new(this.data, body, ptnum);
296 let fut = SendUploadPart::new(this.client, req);
297 let upb = this.buf.as_mut().start_send(fut)?;
298 trace!(
299 id = %&this.data.id,
300 uri = %&this.data.uri,
301 part_number = %ptnum,
302 part_bytes = ptb,
303 upload_bytes = upb,
304 "sent part upload",
305 );
306 Ok(PartBufRecv::new(upb, ptb))
307 }
308
309 fn poll_flush(
310 self: Pin<&mut Self>,
311 cx: &mut Context<'_>,
312 ) -> Poll<Result<(), Self::Error>> {
313 let this = self.project();
314 let completed = ready!(this.buf.poll_complete(cx)).err_with_upl(
315 &this.data.id,
316 &this.data.uri,
317 this.parts,
318 )?;
319 this.parts.append(completed);
320 Poll::Ready(Ok(()))
321 }
322
323 fn poll_complete(
324 self: Pin<&mut Self>,
325 cx: &mut Context<'_>,
326 ) -> Poll<Result<Self::Output, Self::Error>> {
327 let this = self.project();
328 let completed = ready!(this.buf.poll_complete(cx)).err_with_upl(
329 &this.data.id,
330 &this.data.uri,
331 this.parts,
332 )?;
333 let mut parts = std::mem::take(this.parts);
334 parts.append(completed);
335 let data = this.data.clone();
336 trace!(
337 id = %&data.id,
338 uri = %&data.uri,
339 bytes = parts.size(),
340 parts = parts.count(),
341 "finished uploading parts",
342 );
343 *this.is_terminated = true;
345 Poll::Ready(Ok(PartBufOutput { parts, data }))
346 }
347}
348
349impl Debug for UploadParts {
350 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
351 f.debug_struct("UploadParts")
352 .field("buf", &self.buf)
353 .field("client", &self.client)
354 .field("data", &self.data)
355 .field("parts", &self.parts)
356 .field("current", &self.current)
357 .field("is_terminated", &self.is_terminated)
358 .finish()
359 }
360}
361
362#[derive(Debug, Clone, Copy)]
364struct GlobalConfig {
365 max_bytes: u64,
366 capacity: Option<usize>,
368}
369
370#[derive(Debug, Clone, Copy)]
372struct GlobalState {
373 total_bytes: u64,
374 total_parts: u64,
375 start: Instant,
376 config: GlobalConfig,
377}
378
379impl GlobalState {
380 fn new(config: GlobalConfig) -> Self {
381 Self { total_bytes: 0, total_parts: 0, start: Instant::now(), config }
382 }
383
384 fn should_complete(&self) -> bool {
385 self.total_bytes >= self.config.max_bytes
386 }
387
388 fn upload_status(&mut self, recv: PartBufRecv) -> UploadStatus {
389 let part_bytes = recv.part_bytes as u64;
390 self.total_bytes += part_bytes;
391 self.total_parts += 1;
392 UploadStatus {
393 total_bytes: self.total_bytes,
394 total_parts: self.total_parts,
395 upload_bytes: recv.upload_bytes as u64,
396 duration: self.start.elapsed(),
397 should_complete: self.should_complete(),
398 }
399 }
400
401 fn uploaded(&self, resp: CompletedUpload) -> Uploaded {
402 Uploaded {
403 uri: resp.uri,
404 etag: resp.etag,
405 bytes: self.total_bytes,
406 parts: self.total_parts,
407 items: None,
408 duration: self.start.elapsed(),
409 }
410 }
411}
412
413#[derive(Debug, Clone, Copy)]
414struct PartBufRecv {
415 upload_bytes: usize,
416 part_bytes: usize,
417}
418
419impl PartBufRecv {
420 fn new(upload_bytes: usize, part_bytes: usize) -> Self {
421 Self { upload_bytes, part_bytes }
422 }
423}
424
425#[derive(Debug, Clone)]
426struct PartBufOutput {
427 parts: CompletedParts,
428 data: UploadData,
429}