aws_multipart_upload/write/
with_part_encoder.rs1use std::fmt::{self, Debug, Formatter};
2use std::marker::PhantomData;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5use std::time::{Duration, Instant};
6
7use bytesize::ByteSize;
8use futures::ready;
9use multipart_write::{FusedMultipartWrite, MultipartWrite};
10
11use super::{ShouldComplete, UploadStatus, Uploaded};
12use crate::client::part::{PartBody, PartNumber};
13use crate::encoder::PartEncoder;
14use crate::error::{Error, Result};
15
16#[derive(Debug, Clone, Copy)]
18pub struct EncoderStatus {
19 pub upload_bytes: u64,
22 pub total_bytes: u64,
24 pub total_parts: u64,
26 pub total_items: u64,
28 pub part_bytes: u64,
30 pub part_items: u64,
32 pub duration: Duration,
34 pub should_send: bool,
36 pub should_complete: bool,
38}
39
40impl ShouldComplete for EncoderStatus {
41 fn should_complete(&self) -> bool {
42 self.should_complete
43 }
44}
45
46#[must_use = "futures do nothing unless polled"]
48#[pin_project::pin_project]
49pub(crate) struct WithPartEncoder<Wr, Item, E> {
50 #[pin]
51 writer: Wr,
52 buf: PartBody,
53 encoder: E,
54 state: EncoderState,
55 config: EncoderConfig,
56 _it: PhantomData<Item>,
57}
58
59impl<Wr, Item, E> WithPartEncoder<Wr, Item, E> {
60 pub(crate) fn new(
61 writer: Wr,
62 encoder: E,
63 bytes: ByteSize,
64 part_bytes: ByteSize,
65 ) -> Self {
66 let max_part_bytes = part_bytes.as_u64();
67 let config =
68 EncoderConfig { max_bytes: bytes.as_u64(), max_part_bytes };
69 let state = EncoderState::new(config);
70 let buf = PartBody::with_capacity(max_part_bytes as usize);
72 Self { writer, buf, encoder, state, config, _it: PhantomData }
73 }
74}
75
76impl<Wr, Item, E> FusedMultipartWrite<Item> for WithPartEncoder<Wr, Item, E>
77where
78 E: PartEncoder<Item>,
79 Wr: FusedMultipartWrite<
80 PartBody,
81 Error = Error,
82 Output = Uploaded,
83 Recv = UploadStatus,
84 >,
85{
86 fn is_terminated(&self) -> bool {
87 self.writer.is_terminated()
88 }
89}
90
91impl<Wr, Item, E> MultipartWrite<Item> for WithPartEncoder<Wr, Item, E>
92where
93 E: PartEncoder<Item>,
94 Wr: MultipartWrite<
95 PartBody,
96 Error = Error,
97 Output = Uploaded,
98 Recv = UploadStatus,
99 >,
100{
101 type Error = Error;
102 type Output = Uploaded;
103 type Recv = EncoderStatus;
104
105 fn poll_ready(
106 self: Pin<&mut Self>,
107 cx: &mut Context<'_>,
108 ) -> Poll<Result<(), Self::Error>> {
109 let mut this = self.project();
110 if this.state.should_send() {
113 ready!(this.writer.as_mut().poll_ready(cx))?;
114 let part = this.buf.remove();
115 let recv = this.writer.start_send(part)?;
116 trace!(
117 upload_bytes = recv.upload_bytes,
118 total_bytes = recv.total_bytes,
119 total_parts = recv.total_parts,
120 duration = ?recv.duration,
121 "part received",
122 );
123 this.state.flushed(recv);
124 }
125 Poll::Ready(Ok(()))
126 }
127
128 fn start_send(
129 self: Pin<&mut Self>,
130 part: Item,
131 ) -> Result<Self::Recv, Self::Error> {
132 let this = self.project();
133 let before = this.buf.len();
134 this.encoder.encode(this.buf, this.state.part_number, part)?;
135 let after = this.buf.len();
136 Ok(this.state.encoded(after - before))
137 }
138
139 fn poll_flush(
140 self: Pin<&mut Self>,
141 cx: &mut Context<'_>,
142 ) -> Poll<Result<(), Self::Error>> {
143 let mut this = self.project();
144 if this.state.should_send() {
145 ready!(this.writer.as_mut().poll_ready(cx))?;
146 let part = this.buf.split().into();
147 let recv = this.writer.as_mut().start_send(part)?;
148 trace!(
149 upload_bytes = recv.upload_bytes,
150 total_bytes = recv.total_bytes,
151 total_parts = recv.total_parts,
152 duration = ?recv.duration,
153 "part received",
154 );
155 this.state.flushed(recv);
156 }
157 this.writer.as_mut().poll_flush(cx)
158 }
159
160 fn poll_complete(
161 self: Pin<&mut Self>,
162 cx: &mut Context<'_>,
163 ) -> Poll<Result<Self::Output, Self::Error>> {
164 let mut this = self.project();
165 if !this.state.is_empty() {
166 ready!(this.writer.as_mut().poll_ready(cx))?;
167 let part = this.buf.split().into();
168 let recv = this.writer.as_mut().start_send(part)?;
169 trace!(
170 upload_bytes = recv.upload_bytes,
171 total_bytes = recv.total_bytes,
172 total_parts = recv.total_parts,
173 duration = ?recv.duration,
174 "part received",
175 );
176 this.state.flushed(recv);
177 }
178 let output = ready!(this.writer.poll_complete(cx))
179 .map(|v| this.state.uploaded(v));
180 *this.state = EncoderState::new(*this.config);
181 Poll::Ready(output)
182 }
183}
184
185impl<Wr: Debug, Item, E: Debug> Debug for WithPartEncoder<Wr, Item, E> {
186 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
187 f.debug_struct("WithPartEncoder")
188 .field("writer", &self.writer)
189 .field("buf", &self.buf)
190 .field("encoder", &self.encoder)
191 .field("state", &self.state)
192 .field("config", &self.config)
193 .finish()
194 }
195}
196
197#[derive(Debug, Clone, Copy)]
198struct EncoderConfig {
199 max_part_bytes: u64,
200 max_bytes: u64,
201}
202
203#[derive(Debug, Clone, Copy)]
204struct EncoderState {
205 upload_bytes: u64,
206 total_bytes: u64,
207 total_parts: u64,
208 total_items: u64,
209 part_bytes: u64,
210 part_items: u64,
211 part_number: PartNumber,
212 start: Instant,
213 config: EncoderConfig,
214}
215
216impl EncoderState {
217 fn new(config: EncoderConfig) -> Self {
218 Self {
219 upload_bytes: 0,
220 total_bytes: 0,
221 total_parts: 0,
222 total_items: 0,
223 part_bytes: 0,
224 part_items: 0,
225 part_number: PartNumber::new(),
226 start: Instant::now(),
227 config,
228 }
229 }
230
231 fn is_empty(&self) -> bool {
232 self.part_items == 0
233 }
234
235 fn should_send(&self) -> bool {
236 self.part_bytes >= self.config.max_part_bytes
237 }
238
239 fn should_complete(&self) -> bool {
240 self.total_bytes >= self.config.max_bytes
241 }
242
243 fn encoded(&mut self, item_bytes: usize) -> EncoderStatus {
245 let item_bytes = item_bytes as u64;
246 self.total_bytes += item_bytes;
247 self.part_bytes += item_bytes;
248 self.total_items += 1;
249 self.part_items += 1;
250 EncoderStatus {
251 upload_bytes: self.upload_bytes,
252 total_bytes: self.total_bytes,
253 total_parts: self.total_parts,
254 total_items: self.total_items,
255 part_bytes: self.part_bytes,
256 part_items: self.part_items,
257 duration: self.start.elapsed(),
258 should_send: self.should_send(),
259 should_complete: self.should_complete(),
260 }
261 }
262
263 fn flushed(&mut self, status: UploadStatus) {
266 self.upload_bytes = status.upload_bytes;
267 self.part_bytes = 0;
268 self.part_items = 0;
269 let _ = self.part_number.fetch_incr();
272 }
273
274 fn uploaded(&self, output: Uploaded) -> Uploaded {
275 Uploaded { items: Some(self.total_items), ..output }
276 }
277}