1use std::pin::Pin;
6use std::task::{Context, Poll};
7
8use bytesize::ByteSize;
9use multipart_write::{FusedMultipartWrite, MultipartWrite};
10
11use crate::client::part::PartBody;
12use crate::client::{UploadApi, UploadClient};
13use crate::encoder::PartEncoder;
14use crate::error::Error;
15use crate::uri::{EmptyUri, ObjectUri, ObjectUriIter, OneTimeUse};
16use crate::write::multipart_upload::MultipartUpload;
17use crate::write::with_part_encoder::WithPartEncoder;
18
19pub use crate::write::multipart_upload::UploadStatus;
20pub use crate::write::with_part_encoder::EncoderStatus;
21pub use crate::write::{ShouldComplete, Uploaded};
22
23pub mod stream {
24 pub use crate::write::{CollectUpload, TryUploadWhen, UploadStreamExt};
26}
27
28pub(crate) const AWS_MAX_OBJECT_SIZE: ByteSize = ByteSize::gib(48800);
30pub(crate) const AWS_MIN_PART_SIZE: ByteSize = ByteSize::mib(5);
31pub(crate) const AWS_MAX_PART_SIZE: ByteSize = ByteSize::gib(5);
32pub(crate) const DEFAULT_MAX_OBJECT_SIZE: ByteSize = ByteSize::mib(128);
33pub(crate) const DEFAULT_MAX_PART_SIZE: ByteSize = ByteSize::mib(10);
34
35#[derive(Debug)]
37pub struct UploadBuilder {
38 client: UploadClient,
39 max_bytes: ByteSize,
40 max_part_bytes: ByteSize,
41 iter: ObjectUriIter,
42 capacity: Option<usize>,
43}
44
45impl UploadBuilder {
46 pub fn new<C>(client: C) -> Self
48 where
49 C: UploadApi + 'static,
50 {
51 Self {
52 client: UploadClient::new(client),
53 max_bytes: DEFAULT_MAX_OBJECT_SIZE,
54 max_part_bytes: DEFAULT_MAX_PART_SIZE,
55 iter: ObjectUriIter::new(EmptyUri.into_iter()),
56 capacity: Some(10),
57 }
58 }
59
60 pub fn with_upload_size(self, limit: ByteSize) -> Self {
66 Self { max_bytes: limit.min(AWS_MAX_OBJECT_SIZE), ..self }
67 }
68
69 pub fn with_part_size(self, limit: ByteSize) -> Self {
72 Self {
73 max_part_bytes: limit
75 .max(AWS_MIN_PART_SIZE)
76 .min(AWS_MAX_PART_SIZE)
77 .min(ByteSize::b(usize::MAX as u64)),
78 ..self
79 }
80 }
81
82 pub fn with_uri<T: Into<ObjectUri>>(self, uri: T) -> Self {
86 let inner = OneTimeUse::new(uri.into());
87 Self { iter: ObjectUriIter::new(inner), ..self }
88 }
89
90 pub fn with_uri_iter<I>(self, inner: I) -> Self
96 where
97 I: Iterator<Item = ObjectUri> + Send + Sync + 'static,
98 {
99 let iter = ObjectUriIter::new(inner);
100 Self { iter, ..self }
101 }
102
103 pub fn with_capacity<T: Into<Option<usize>>>(self, capacity: T) -> Self {
109 Self { capacity: capacity.into(), ..self }
110 }
111
112 pub fn build(self) -> Upload {
114 Upload::new(self.client, self.iter, self.max_bytes, self.capacity)
115 }
116
117 pub fn build_encoded<Item, E>(self, encoder: E) -> EncodeUpload<Item, E> {
120 let bytes = self.max_bytes;
121 let part_bytes = self.max_part_bytes;
122 let upload = self.build();
123 EncodeUpload::new(upload, encoder, bytes, part_bytes)
124 }
125}
126
127#[derive(Debug)]
155#[must_use = "futures do nothing unless polled"]
156#[pin_project::pin_project]
157pub struct Upload {
158 #[pin]
159 inner: MultipartUpload,
160}
161
162impl Upload {
163 fn new<T: Into<Option<usize>>>(
164 client: UploadClient,
165 iter: ObjectUriIter,
166 upload_bytes: ByteSize,
167 capacity: T,
168 ) -> Self {
169 let inner =
170 MultipartUpload::new(client, iter, upload_bytes, capacity.into());
171 Self { inner }
172 }
173}
174
175impl FusedMultipartWrite<PartBody> for Upload {
176 fn is_terminated(&self) -> bool {
177 self.inner.is_terminated()
178 }
179}
180
181impl MultipartWrite<PartBody> for Upload {
182 type Error = Error;
183 type Output = Uploaded;
184 type Recv = UploadStatus;
185
186 fn poll_ready(
187 self: Pin<&mut Self>,
188 cx: &mut Context<'_>,
189 ) -> Poll<Result<(), Self::Error>> {
190 self.project().inner.poll_ready(cx)
191 }
192
193 fn start_send(
194 self: Pin<&mut Self>,
195 part: PartBody,
196 ) -> Result<Self::Recv, Self::Error> {
197 self.project().inner.start_send(part)
198 }
199
200 fn poll_flush(
201 self: Pin<&mut Self>,
202 cx: &mut Context<'_>,
203 ) -> Poll<Result<(), Self::Error>> {
204 self.project().inner.poll_flush(cx)
205 }
206
207 fn poll_complete(
208 self: Pin<&mut Self>,
209 cx: &mut Context<'_>,
210 ) -> Poll<Result<Self::Output, Self::Error>> {
211 self.project().inner.poll_complete(cx)
212 }
213}
214
215#[derive(Debug)]
221#[must_use = "futures do nothing unless polled"]
222#[pin_project::pin_project]
223pub struct EncodeUpload<Item, E> {
224 #[pin]
225 inner: WithPartEncoder<Upload, Item, E>,
226}
227
228impl<Item, E> EncodeUpload<Item, E> {
229 fn new(
230 upload: Upload,
231 encoder: E,
232 bytes: ByteSize,
233 part_bytes: ByteSize,
234 ) -> EncodeUpload<Item, E> {
235 let inner = WithPartEncoder::new(upload, encoder, bytes, part_bytes);
236 EncodeUpload { inner }
237 }
238}
239
240impl<Item, E: PartEncoder<Item>> FusedMultipartWrite<Item>
241 for EncodeUpload<Item, E>
242{
243 fn is_terminated(&self) -> bool {
244 self.inner.is_terminated()
245 }
246}
247
248impl<Item, E: PartEncoder<Item>> MultipartWrite<Item>
249 for EncodeUpload<Item, E>
250{
251 type Error = Error;
252 type Output = Uploaded;
253 type Recv = EncoderStatus;
254
255 fn poll_ready(
256 self: Pin<&mut Self>,
257 cx: &mut Context<'_>,
258 ) -> Poll<Result<(), Self::Error>> {
259 self.project().inner.poll_ready(cx)
260 }
261
262 fn start_send(
263 self: Pin<&mut Self>,
264 part: Item,
265 ) -> Result<Self::Recv, Self::Error> {
266 self.project().inner.start_send(part)
267 }
268
269 fn poll_flush(
270 self: Pin<&mut Self>,
271 cx: &mut Context<'_>,
272 ) -> Poll<Result<(), Self::Error>> {
273 self.project().inner.poll_flush(cx)
274 }
275
276 fn poll_complete(
277 self: Pin<&mut Self>,
278 cx: &mut Context<'_>,
279 ) -> Poll<Result<Self::Output, Self::Error>> {
280 self.project().inner.poll_complete(cx)
281 }
282}