use std::pin::Pin;
use std::task::{Context, Poll};
use bytesize::ByteSize;
use multipart_write::{FusedMultipartWrite, MultipartWrite};
use crate::client::part::PartBody;
use crate::client::{UploadApi, UploadClient};
use crate::encoder::PartEncoder;
use crate::error::Error;
use crate::uri::{EmptyUri, ObjectUri, ObjectUriIter, OneTimeUse};
use crate::write::multipart_upload::MultipartUpload;
use crate::write::with_part_encoder::WithPartEncoder;
pub use crate::write::multipart_upload::UploadStatus;
pub use crate::write::with_part_encoder::EncoderStatus;
pub use crate::write::{ShouldComplete, Uploaded};
pub mod stream {
pub use crate::write::{CollectUpload, TryUploadWhen, UploadStreamExt};
}
pub(crate) const AWS_MAX_OBJECT_SIZE: ByteSize = ByteSize::gib(48800);
pub(crate) const AWS_MIN_PART_SIZE: ByteSize = ByteSize::mib(5);
pub(crate) const AWS_MAX_PART_SIZE: ByteSize = ByteSize::gib(5);
pub(crate) const DEFAULT_MAX_OBJECT_SIZE: ByteSize = ByteSize::mib(128);
pub(crate) const DEFAULT_MAX_PART_SIZE: ByteSize = ByteSize::mib(10);
#[derive(Debug)]
pub struct UploadBuilder {
client: UploadClient,
max_bytes: ByteSize,
max_part_bytes: ByteSize,
iter: ObjectUriIter,
capacity: Option<usize>,
}
impl UploadBuilder {
pub fn new<C>(client: C) -> Self
where
C: UploadApi + 'static,
{
Self {
client: UploadClient::new(client),
max_bytes: DEFAULT_MAX_OBJECT_SIZE,
max_part_bytes: DEFAULT_MAX_PART_SIZE,
iter: ObjectUriIter::new(EmptyUri.into_iter()),
capacity: Some(10),
}
}
pub fn with_upload_size(self, limit: ByteSize) -> Self {
Self { max_bytes: limit.min(AWS_MAX_OBJECT_SIZE), ..self }
}
pub fn with_part_size(self, limit: ByteSize) -> Self {
Self {
max_part_bytes: limit
.max(AWS_MIN_PART_SIZE)
.min(AWS_MAX_PART_SIZE)
.min(ByteSize::b(usize::MAX as u64)),
..self
}
}
pub fn with_uri<T: Into<ObjectUri>>(self, uri: T) -> Self {
let inner = OneTimeUse::new(uri.into());
Self { iter: ObjectUriIter::new(inner), ..self }
}
pub fn with_uri_iter<I>(self, inner: I) -> Self
where
I: Iterator<Item = ObjectUri> + Send + Sync + 'static,
{
let iter = ObjectUriIter::new(inner);
Self { iter, ..self }
}
pub fn with_capacity<T: Into<Option<usize>>>(self, capacity: T) -> Self {
Self { capacity: capacity.into(), ..self }
}
pub fn build(self) -> Upload {
Upload::new(self.client, self.iter, self.max_bytes, self.capacity)
}
pub fn build_encoded<Item, E>(self, encoder: E) -> EncodeUpload<Item, E> {
let bytes = self.max_bytes;
let part_bytes = self.max_part_bytes;
let upload = self.build();
EncodeUpload::new(upload, encoder, bytes, part_bytes)
}
}
#[derive(Debug)]
#[must_use = "futures do nothing unless polled"]
#[pin_project::pin_project]
pub struct Upload {
#[pin]
inner: MultipartUpload,
}
impl Upload {
fn new<T: Into<Option<usize>>>(
client: UploadClient,
iter: ObjectUriIter,
upload_bytes: ByteSize,
capacity: T,
) -> Self {
let inner =
MultipartUpload::new(client, iter, upload_bytes, capacity.into());
Self { inner }
}
}
impl FusedMultipartWrite<PartBody> for Upload {
fn is_terminated(&self) -> bool {
self.inner.is_terminated()
}
}
impl MultipartWrite<PartBody> for Upload {
type Error = Error;
type Output = Uploaded;
type Recv = UploadStatus;
fn poll_ready(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
self.project().inner.poll_ready(cx)
}
fn start_send(
self: Pin<&mut Self>,
part: PartBody,
) -> Result<Self::Recv, Self::Error> {
self.project().inner.start_send(part)
}
fn poll_flush(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
self.project().inner.poll_flush(cx)
}
fn poll_complete(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Output, Self::Error>> {
self.project().inner.poll_complete(cx)
}
}
#[derive(Debug)]
#[must_use = "futures do nothing unless polled"]
#[pin_project::pin_project]
pub struct EncodeUpload<Item, E> {
#[pin]
inner: WithPartEncoder<Upload, Item, E>,
}
impl<Item, E> EncodeUpload<Item, E> {
fn new(
upload: Upload,
encoder: E,
bytes: ByteSize,
part_bytes: ByteSize,
) -> EncodeUpload<Item, E> {
let inner = WithPartEncoder::new(upload, encoder, bytes, part_bytes);
EncodeUpload { inner }
}
}
impl<Item, E: PartEncoder<Item>> FusedMultipartWrite<Item>
for EncodeUpload<Item, E>
{
fn is_terminated(&self) -> bool {
self.inner.is_terminated()
}
}
impl<Item, E: PartEncoder<Item>> MultipartWrite<Item>
for EncodeUpload<Item, E>
{
type Error = Error;
type Output = Uploaded;
type Recv = EncoderStatus;
fn poll_ready(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
self.project().inner.poll_ready(cx)
}
fn start_send(
self: Pin<&mut Self>,
part: Item,
) -> Result<Self::Recv, Self::Error> {
self.project().inner.start_send(part)
}
fn poll_flush(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
self.project().inner.poll_flush(cx)
}
fn poll_complete(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Output, Self::Error>> {
self.project().inner.poll_complete(cx)
}
}