aws_multipart_upload/types/
upload.rs

1use futures::Sink;
2use pin_project_lite::pin_project;
3use std::task::{ready, Context, Poll};
4use std::{io::Error as IoError, pin::Pin, sync::Arc};
5use tokio_util::codec::{Encoder, FramedWrite};
6
7use crate::{
8    types::{api::*, write_parts::WriteParts, UploadClient, UploadControl},
9    AwsError,
10};
11
12pin_project! {
13    /// `Upload` is a sink that implements the lifecycle of a single multipart
14    /// upload.  It writes items to an inner `AsyncWrite` that periodically adds
15    /// parts to the upload, then completes the upload when the inner writer
16    /// has uploaded enough parts, bytes, or whatever else would make the method
17    /// `UploadControl::is_upload_ready` return `true`.
18    pub struct Upload<E> {
19        #[pin]
20        inner: FramedWrite<WriteParts, E>,
21        client: Arc<dyn UploadClient + Send + Sync>,
22        ctrl: Arc<dyn UploadControl + Send + Sync>,
23    }
24}
25
26impl<E> Upload<E> {
27    pub fn new<C, U>(client: U, ctrl: C, encoder: E, params: UploadRequestParams) -> Self
28    where
29        C: UploadControl + Send + Sync + 'static,
30        U: UploadClient + Send + Sync + 'static,
31    {
32        let client = Arc::new(client);
33        let ctrl = Arc::new(ctrl);
34        let write = WriteParts::new(Arc::clone(&client), Arc::clone(&ctrl), params);
35        let inner = FramedWrite::new(write, encoder);
36        Self {
37            inner,
38            client,
39            ctrl,
40        }
41    }
42
43    fn should_upload_part(&self) -> bool {
44        let part_size = self.inner.write_buffer().len();
45        self.ctrl.is_part_ready(part_size)
46    }
47
48    fn poll_complete_upload<I>(
49        self: Pin<&mut Self>,
50        cx: &mut Context<'_>,
51    ) -> Poll<Result<(), AwsError>>
52    where
53        E: Encoder<I>,
54        E::Error: From<IoError>,
55        AwsError: From<E::Error>,
56    {
57        let parts = self.inner.get_ref().uploaded_parts();
58        let params = self.inner.get_ref().params();
59        tracing::trace!(?parts, ?params, "completing upload");
60
61        let etag = ready!(self
62            .client
63            .complete_upload(&params, &parts)
64            .as_mut()
65            .poll(cx))?;
66        // Callback with the uploaded object's entity tag.
67        ready!(self.client.on_upload_complete(etag).as_mut().poll(cx))?;
68
69        Poll::Ready(Ok(()))
70    }
71}
72
73impl<E, I> Sink<I> for Upload<E>
74where
75    E: Encoder<I> + Clone,
76    E::Error: From<IoError>,
77    AwsError: From<E::Error>,
78{
79    type Error = AwsError;
80
81    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
82        if self.should_upload_part() {
83            ready!(self.as_mut().project().inner.poll_flush(cx))?;
84        }
85
86        Poll::Ready(Ok(()))
87    }
88
89    fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
90        self.project().inner.start_send(item)?;
91        Ok(())
92    }
93
94    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
95        // Flush the framed writer, which has the effect of uploading the last
96        // part with whatever was flushed to it.  This is OK with AWS because
97        // the last part isn't held to the minimum part size requirement.
98        ready!(self.as_mut().project().inner.poll_flush(cx))?;
99        self.poll_complete_upload(cx)
100    }
101
102    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
103        ready!(self.as_mut().project().inner.poll_flush(cx))?;
104        self.poll_complete_upload(cx)
105    }
106}