aws_multipart_upload/write/
encoded.rs

1use super::UploadSent;
2use crate::client::UploadId;
3use crate::client::part::{PartBody, PartNumber};
4use crate::codec::PartEncoder;
5use crate::error::{Error as UploadError, Result};
6use crate::request::CompletedUpload;
7
8use futures::ready;
9use multipart_write::{FusedMultipartWrite, MultipartWrite};
10use std::fmt::{self, Debug, Formatter};
11use std::pin::Pin;
12use std::task::{Context, Poll};
13use std::time::{Duration, Instant};
14
15/// Value returned by the `EncodedUpload` writer.
16#[derive(Debug, Clone)]
17pub struct Status {
18    /// The ID of the upload being written to.
19    pub id: Option<UploadId>,
20    /// The part number of the last sent.
21    pub part: Option<PartNumber>,
22    /// Total uptime of the upload.
23    pub elapsed: Duration,
24    /// Current count of items written to the upload.
25    pub items: u64,
26    /// Current number of parts in the upload.
27    pub parts: u64,
28    /// Current size in bytes of the upload.
29    pub bytes: u64,
30    /// Whether the upload should be completed according to configuration.
31    pub should_complete: bool,
32    /// Current size in bytes of the part.
33    pub part_bytes: u64,
34    /// Whether the part should be uploaded according to configuration.
35    pub should_upload: bool,
36}
37
38/// Tracking size of the upload/part.
39#[derive(Debug, Clone, Default)]
40struct UploadState {
41    id: Option<UploadId>,
42    part: Option<PartNumber>,
43    part_bytes: u64,
44    total_bytes: u64,
45    total_items: u64,
46    total_parts: u64,
47}
48
49impl UploadState {
50    fn to_status(&self, max_bytes: u64, max_part_bytes: u64, start: Instant) -> Status {
51        Status {
52            id: self.id.clone(),
53            part: self.part,
54            elapsed: start.elapsed(),
55            items: self.total_items,
56            bytes: self.total_bytes,
57            should_complete: self.total_bytes >= max_bytes,
58            parts: self.total_parts,
59            part_bytes: self.part_bytes,
60            should_upload: self.part_bytes >= max_part_bytes,
61        }
62    }
63
64    fn update_encode(&mut self, bytes: usize) {
65        let n = bytes as u64;
66        self.total_bytes += n;
67        self.part_bytes += n;
68        self.total_items += 1;
69    }
70
71    fn update_sent(&mut self, sent: UploadSent) {
72        self.id = Some(sent.id);
73        self.part = Some(sent.part);
74        self.part_bytes = 0;
75        self.total_parts += 1;
76    }
77}
78
79/// A type for creating, building, and completing a multipart upload.
80///
81/// This composes a [`PartEncoder`] in front of a multipart upload in order to
82/// build the part upload request body from an arbitrary `Item`.  Parts are
83/// uploaded according the target part size this value is configured with.
84///
85/// This writer itself is reusable, i.e., one can continue writing `Item`s after
86/// completing an upload, if and only if `U` is.
87///
88/// [`PartEncoder`]: crate::codec::PartEncoder
89#[must_use = "futures do nothing unless polled"]
90#[pin_project::pin_project]
91pub struct EncodedUpload<E, U> {
92    #[pin]
93    uploader: U,
94    encoder: E,
95    max_bytes: u64,
96    max_part_bytes: u64,
97    start: Instant,
98    state: UploadState,
99    empty: bool,
100}
101
102impl<E, U> EncodedUpload<E, U> {
103    pub(crate) fn new(uploader: U, encoder: E, bytes: u64, part_bytes: u64) -> Self {
104        Self {
105            uploader,
106            encoder,
107            max_bytes: bytes,
108            max_part_bytes: part_bytes,
109            start: Instant::now(),
110            state: UploadState::default(),
111            empty: true,
112        }
113    }
114
115    fn poll_send_body<Item>(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>
116    where
117        E: PartEncoder<Item>,
118        U: MultipartWrite<
119                PartBody,
120                Ret = UploadSent,
121                Error = UploadError,
122                Output = CompletedUpload,
123            >,
124    {
125        let mut this = self.project();
126
127        match this.uploader.as_mut().poll_ready(cx)? {
128            Poll::Ready(()) => {
129                this.encoder.flush()?;
130                let new_encoder = this.encoder.clear()?;
131                let encoder = std::mem::replace(this.encoder, new_encoder);
132                let body = encoder.into_body()?;
133                let ret = this.uploader.as_mut().start_send(body)?;
134                this.state.update_sent(ret);
135                *this.empty = true;
136
137                Poll::Ready(Ok(()))
138            }
139            Poll::Pending => Poll::Pending,
140        }
141    }
142}
143
144impl<Item, E, U> FusedMultipartWrite<Item> for EncodedUpload<E, U>
145where
146    E: PartEncoder<Item>,
147    U: FusedMultipartWrite<
148            PartBody,
149            Ret = UploadSent,
150            Error = UploadError,
151            Output = CompletedUpload,
152        >,
153{
154    fn is_terminated(&self) -> bool {
155        self.uploader.is_terminated()
156    }
157}
158
159impl<Item, E, U> MultipartWrite<Item> for EncodedUpload<E, U>
160where
161    E: PartEncoder<Item>,
162    U: MultipartWrite<PartBody, Ret = UploadSent, Error = UploadError, Output = CompletedUpload>,
163{
164    type Ret = Status;
165    type Error = UploadError;
166    type Output = CompletedUpload;
167
168    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
169        if self.state.part_bytes >= self.max_part_bytes {
170            ready!(self.as_mut().poll_send_body(cx))?;
171        }
172        Poll::Ready(Ok(()))
173    }
174
175    fn start_send(self: Pin<&mut Self>, part: Item) -> Result<Self::Ret> {
176        let this = self.project();
177        let bytes = this.encoder.encode(part)?;
178        this.state.update_encode(bytes);
179        *this.empty = false;
180        let status = this
181            .state
182            .to_status(*this.max_bytes, *this.max_part_bytes, *this.start);
183        Ok(status)
184    }
185
186    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
187        if !self.empty {
188            ready!(self.as_mut().poll_send_body(cx))?;
189        }
190        ready!(self.project().uploader.poll_flush(cx))?;
191        Poll::Ready(Ok(()))
192    }
193
194    fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<Self::Output>> {
195        if !self.empty {
196            ready!(self.as_mut().poll_send_body(cx))?;
197        }
198        let mut this = self.project();
199        let out = ready!(this.uploader.as_mut().poll_complete(cx))?;
200        let new_encoder = this.encoder.restore()?;
201        *this.encoder = new_encoder;
202        *this.state = UploadState::default();
203        *this.start = Instant::now();
204        Poll::Ready(Ok(out))
205    }
206}
207
208impl<E, U> Debug for EncodedUpload<E, U>
209where
210    E: Debug,
211    U: Debug,
212{
213    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
214        f.debug_struct("EncodedUpload")
215            .field("uploader", &self.uploader)
216            .field("encoder", &self.encoder)
217            .field("max_bytes", &self.max_bytes)
218            .field("max_part_bytes", &self.max_part_bytes)
219            .field("start", &self.start)
220            .field("state", &self.state)
221            .field("empty", &self.empty)
222            .finish()
223    }
224}