aws_multipart_upload/write/
encoded.rs1use super::UploadSent;
2use crate::client::UploadId;
3use crate::client::part::{PartBody, PartNumber};
4use crate::codec::PartEncoder;
5use crate::error::{Error as UploadError, Result};
6use crate::request::CompletedUpload;
7
8use futures::ready;
9use multipart_write::{FusedMultipartWrite, MultipartWrite};
10use std::fmt::{self, Debug, Formatter};
11use std::pin::Pin;
12use std::task::{Context, Poll};
13use std::time::{Duration, Instant};
14
15#[derive(Debug, Clone)]
17pub struct Status {
18 pub id: Option<UploadId>,
20 pub part: Option<PartNumber>,
22 pub elapsed: Duration,
24 pub items: u64,
26 pub parts: u64,
28 pub bytes: u64,
30 pub should_complete: bool,
32 pub part_bytes: u64,
34 pub should_upload: bool,
36}
37
38#[derive(Debug, Clone, Default)]
40struct UploadState {
41 id: Option<UploadId>,
42 part: Option<PartNumber>,
43 part_bytes: u64,
44 total_bytes: u64,
45 total_items: u64,
46 total_parts: u64,
47}
48
49impl UploadState {
50 fn to_status(&self, max_bytes: u64, max_part_bytes: u64, start: Instant) -> Status {
51 Status {
52 id: self.id.clone(),
53 part: self.part,
54 elapsed: start.elapsed(),
55 items: self.total_items,
56 bytes: self.total_bytes,
57 should_complete: self.total_bytes >= max_bytes,
58 parts: self.total_parts,
59 part_bytes: self.part_bytes,
60 should_upload: self.part_bytes >= max_part_bytes,
61 }
62 }
63
64 fn update_encode(&mut self, bytes: usize) {
65 let n = bytes as u64;
66 self.total_bytes += n;
67 self.part_bytes += n;
68 self.total_items += 1;
69 }
70
71 fn update_sent(&mut self, sent: UploadSent) {
72 self.id = Some(sent.id);
73 self.part = Some(sent.part);
74 self.part_bytes = 0;
75 self.total_parts += 1;
76 }
77}
78
79#[must_use = "futures do nothing unless polled"]
90#[pin_project::pin_project]
91pub struct EncodedUpload<E, U> {
92 #[pin]
93 uploader: U,
94 encoder: E,
95 max_bytes: u64,
96 max_part_bytes: u64,
97 start: Instant,
98 state: UploadState,
99 empty: bool,
100}
101
102impl<E, U> EncodedUpload<E, U> {
103 pub(crate) fn new(uploader: U, encoder: E, bytes: u64, part_bytes: u64) -> Self {
104 Self {
105 uploader,
106 encoder,
107 max_bytes: bytes,
108 max_part_bytes: part_bytes,
109 start: Instant::now(),
110 state: UploadState::default(),
111 empty: true,
112 }
113 }
114
115 fn poll_send_body<Item>(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>
116 where
117 E: PartEncoder<Item>,
118 U: MultipartWrite<
119 PartBody,
120 Ret = UploadSent,
121 Error = UploadError,
122 Output = CompletedUpload,
123 >,
124 {
125 let mut this = self.project();
126
127 match this.uploader.as_mut().poll_ready(cx)? {
128 Poll::Ready(()) => {
129 this.encoder.flush()?;
130 let new_encoder = this.encoder.clear()?;
131 let encoder = std::mem::replace(this.encoder, new_encoder);
132 let body = encoder.into_body()?;
133 let ret = this.uploader.as_mut().start_send(body)?;
134 this.state.update_sent(ret);
135 *this.empty = true;
136
137 Poll::Ready(Ok(()))
138 }
139 Poll::Pending => Poll::Pending,
140 }
141 }
142}
143
144impl<Item, E, U> FusedMultipartWrite<Item> for EncodedUpload<E, U>
145where
146 E: PartEncoder<Item>,
147 U: FusedMultipartWrite<
148 PartBody,
149 Ret = UploadSent,
150 Error = UploadError,
151 Output = CompletedUpload,
152 >,
153{
154 fn is_terminated(&self) -> bool {
155 self.uploader.is_terminated()
156 }
157}
158
159impl<Item, E, U> MultipartWrite<Item> for EncodedUpload<E, U>
160where
161 E: PartEncoder<Item>,
162 U: MultipartWrite<PartBody, Ret = UploadSent, Error = UploadError, Output = CompletedUpload>,
163{
164 type Ret = Status;
165 type Error = UploadError;
166 type Output = CompletedUpload;
167
168 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
169 if self.state.part_bytes >= self.max_part_bytes {
170 ready!(self.as_mut().poll_send_body(cx))?;
171 }
172 Poll::Ready(Ok(()))
173 }
174
175 fn start_send(self: Pin<&mut Self>, part: Item) -> Result<Self::Ret> {
176 let this = self.project();
177 let bytes = this.encoder.encode(part)?;
178 this.state.update_encode(bytes);
179 *this.empty = false;
180 let status = this
181 .state
182 .to_status(*this.max_bytes, *this.max_part_bytes, *this.start);
183 Ok(status)
184 }
185
186 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
187 if !self.empty {
188 ready!(self.as_mut().poll_send_body(cx))?;
189 }
190 ready!(self.project().uploader.poll_flush(cx))?;
191 Poll::Ready(Ok(()))
192 }
193
194 fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<Self::Output>> {
195 if !self.empty {
196 ready!(self.as_mut().poll_send_body(cx))?;
197 }
198 let mut this = self.project();
199 let out = ready!(this.uploader.as_mut().poll_complete(cx))?;
200 let new_encoder = this.encoder.restore()?;
201 *this.encoder = new_encoder;
202 *this.state = UploadState::default();
203 *this.start = Instant::now();
204 Poll::Ready(Ok(out))
205 }
206}
207
208impl<E, U> Debug for EncodedUpload<E, U>
209where
210 E: Debug,
211 U: Debug,
212{
213 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
214 f.debug_struct("EncodedUpload")
215 .field("uploader", &self.uploader)
216 .field("encoder", &self.encoder)
217 .field("max_bytes", &self.max_bytes)
218 .field("max_part_bytes", &self.max_part_bytes)
219 .field("start", &self.start)
220 .field("state", &self.state)
221 .field("empty", &self.empty)
222 .finish()
223 }
224}