Skip to main content

aws_multipart_upload/write/
with_part_encoder.rs

1use std::fmt::{self, Debug, Formatter};
2use std::marker::PhantomData;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5use std::time::{Duration, Instant};
6
7use bytesize::ByteSize;
8use futures::ready;
9use multipart_write::{FusedMultipartWrite, MultipartWrite};
10
11use super::{ShouldComplete, UploadStatus, Uploaded};
12use crate::client::part::{PartBody, PartNumber};
13use crate::encoder::PartEncoder;
14use crate::error::{Error, Result};
15
16/// Returned by `WithPartEncoder` when writing an item was successful.
17#[derive(Debug, Clone, Copy)]
18pub struct EncoderStatus {
19    /// Last recorded size in bytes of all parts that have been added to the
20    /// upload successfully.
21    pub upload_bytes: u64,
22    /// Total size in bytes of all items that have been written.
23    pub total_bytes: u64,
24    /// Total number of parts that have been written.
25    pub total_parts: u64,
26    /// Total number of items that have been written.
27    pub total_items: u64,
28    /// Size in bytes of the current part.
29    pub part_bytes: u64,
30    /// Number of items written to the current part.
31    pub part_items: u64,
32    /// Total duration of this upload.
33    pub duration: Duration,
34    /// Whether the current part should be sent.
35    pub should_send: bool,
36    /// Whether the current upload should be completed.
37    pub should_complete: bool,
38}
39
40impl ShouldComplete for EncoderStatus {
41    fn should_complete(&self) -> bool {
42        self.should_complete
43    }
44}
45
46/// Lift a `PartEncoder` in front of a multipart upload.
47#[must_use = "futures do nothing unless polled"]
48#[pin_project::pin_project]
49pub(crate) struct WithPartEncoder<Wr, Item, E> {
50    #[pin]
51    writer: Wr,
52    buf: PartBody,
53    encoder: E,
54    state: EncoderState,
55    config: EncoderConfig,
56    _it: PhantomData<Item>,
57}
58
59impl<Wr, Item, E> WithPartEncoder<Wr, Item, E> {
60    pub(crate) fn new(
61        writer: Wr,
62        encoder: E,
63        bytes: ByteSize,
64        part_bytes: ByteSize,
65    ) -> Self {
66        let max_part_bytes = part_bytes.as_u64();
67        let config =
68            EncoderConfig { max_bytes: bytes.as_u64(), max_part_bytes };
69        let state = EncoderState::new(config);
70        // Safe because the value was clamped to fit this at configuration.
71        let buf = PartBody::with_capacity(max_part_bytes as usize);
72        Self { writer, buf, encoder, state, config, _it: PhantomData }
73    }
74}
75
76impl<Wr, Item, E> FusedMultipartWrite<Item> for WithPartEncoder<Wr, Item, E>
77where
78    E: PartEncoder<Item>,
79    Wr: FusedMultipartWrite<
80            PartBody,
81            Error = Error,
82            Output = Uploaded,
83            Recv = UploadStatus,
84        >,
85{
86    fn is_terminated(&self) -> bool {
87        self.writer.is_terminated()
88    }
89}
90
91impl<Wr, Item, E> MultipartWrite<Item> for WithPartEncoder<Wr, Item, E>
92where
93    E: PartEncoder<Item>,
94    Wr: MultipartWrite<
95            PartBody,
96            Error = Error,
97            Output = Uploaded,
98            Recv = UploadStatus,
99        >,
100{
101    type Error = Error;
102    type Output = Uploaded;
103    type Recv = EncoderStatus;
104
105    fn poll_ready(
106        self: Pin<&mut Self>,
107        cx: &mut Context<'_>,
108    ) -> Poll<Result<(), Self::Error>> {
109        let mut this = self.project();
110        // This writer is always ready for parts unless the current one has
111        // enough bytes.
112        if this.state.should_send() {
113            ready!(this.writer.as_mut().poll_ready(cx))?;
114            let part = this.buf.remove();
115            let recv = this.writer.start_send(part)?;
116            trace!(
117                upload_bytes = recv.upload_bytes,
118                total_bytes = recv.total_bytes,
119                total_parts = recv.total_parts,
120                duration = ?recv.duration,
121                "part received",
122            );
123            this.state.flushed(recv);
124        }
125        Poll::Ready(Ok(()))
126    }
127
128    fn start_send(
129        self: Pin<&mut Self>,
130        part: Item,
131    ) -> Result<Self::Recv, Self::Error> {
132        let this = self.project();
133        let before = this.buf.len();
134        this.encoder.encode(this.buf, this.state.part_number, part)?;
135        let after = this.buf.len();
136        Ok(this.state.encoded(after - before))
137    }
138
139    fn poll_flush(
140        self: Pin<&mut Self>,
141        cx: &mut Context<'_>,
142    ) -> Poll<Result<(), Self::Error>> {
143        let mut this = self.project();
144        if this.state.should_send() {
145            ready!(this.writer.as_mut().poll_ready(cx))?;
146            let part = this.buf.split().into();
147            let recv = this.writer.as_mut().start_send(part)?;
148            trace!(
149                upload_bytes = recv.upload_bytes,
150                total_bytes = recv.total_bytes,
151                total_parts = recv.total_parts,
152                duration = ?recv.duration,
153                "part received",
154            );
155            this.state.flushed(recv);
156        }
157        this.writer.as_mut().poll_flush(cx)
158    }
159
160    fn poll_complete(
161        self: Pin<&mut Self>,
162        cx: &mut Context<'_>,
163    ) -> Poll<Result<Self::Output, Self::Error>> {
164        let mut this = self.project();
165        if !this.state.is_empty() {
166            ready!(this.writer.as_mut().poll_ready(cx))?;
167            let part = this.buf.split().into();
168            let recv = this.writer.as_mut().start_send(part)?;
169            trace!(
170                upload_bytes = recv.upload_bytes,
171                total_bytes = recv.total_bytes,
172                total_parts = recv.total_parts,
173                duration = ?recv.duration,
174                "part received",
175            );
176            this.state.flushed(recv);
177        }
178        let output = ready!(this.writer.poll_complete(cx))
179            .map(|v| this.state.uploaded(v));
180        *this.state = EncoderState::new(*this.config);
181        Poll::Ready(output)
182    }
183}
184
185impl<Wr: Debug, Item, E: Debug> Debug for WithPartEncoder<Wr, Item, E> {
186    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
187        f.debug_struct("WithPartEncoder")
188            .field("writer", &self.writer)
189            .field("buf", &self.buf)
190            .field("encoder", &self.encoder)
191            .field("state", &self.state)
192            .field("config", &self.config)
193            .finish()
194    }
195}
196
197#[derive(Debug, Clone, Copy)]
198struct EncoderConfig {
199    max_part_bytes: u64,
200    max_bytes: u64,
201}
202
203#[derive(Debug, Clone, Copy)]
204struct EncoderState {
205    upload_bytes: u64,
206    total_bytes: u64,
207    total_parts: u64,
208    total_items: u64,
209    part_bytes: u64,
210    part_items: u64,
211    part_number: PartNumber,
212    start: Instant,
213    config: EncoderConfig,
214}
215
216impl EncoderState {
217    fn new(config: EncoderConfig) -> Self {
218        Self {
219            upload_bytes: 0,
220            total_bytes: 0,
221            total_parts: 0,
222            total_items: 0,
223            part_bytes: 0,
224            part_items: 0,
225            part_number: PartNumber::new(),
226            start: Instant::now(),
227            config,
228        }
229    }
230
231    fn is_empty(&self) -> bool {
232        self.part_items == 0
233    }
234
235    fn should_send(&self) -> bool {
236        self.part_bytes >= self.config.max_part_bytes
237    }
238
239    fn should_complete(&self) -> bool {
240        self.total_bytes >= self.config.max_bytes
241    }
242
243    // Creates the current value of `WithPartEncoder::Recv`.
244    fn encoded(&mut self, item_bytes: usize) -> EncoderStatus {
245        let item_bytes = item_bytes as u64;
246        self.total_bytes += item_bytes;
247        self.part_bytes += item_bytes;
248        self.total_items += 1;
249        self.part_items += 1;
250        EncoderStatus {
251            upload_bytes: self.upload_bytes,
252            total_bytes: self.total_bytes,
253            total_parts: self.total_parts,
254            total_items: self.total_items,
255            part_bytes: self.part_bytes,
256            part_items: self.part_items,
257            duration: self.start.elapsed(),
258            should_send: self.should_send(),
259            should_complete: self.should_complete(),
260        }
261    }
262
263    // When adding a part use the snapshot `UploadStatus` to (re)set the global
264    // counters.
265    fn flushed(&mut self, status: UploadStatus) {
266        self.upload_bytes = status.upload_bytes;
267        self.part_bytes = 0;
268        self.part_items = 0;
269        // Increment the part number; ignore the current one.
270        // `PartEncoder` needs the correct, current part number.
271        let _ = self.part_number.fetch_incr();
272    }
273
274    fn uploaded(&self, output: Uploaded) -> Uploaded {
275        Uploaded { items: Some(self.total_items), ..output }
276    }
277}