aws_multipart_upload/
lib.rs1pub mod client;
2pub use self::client::aws::AwsClient;
3
4pub mod codec;
5
6pub mod types;
7pub use self::types::api as api_types;
8pub use self::types::upload::Upload;
9
10pub mod testing {
11 pub use super::client::fs::AsyncTempFileClient;
12 pub use super::client::hashmap::HashMapClient;
13}
14
15mod aws_ops {
16 pub use aws_sdk_s3::operation::complete_multipart_upload as complete;
17 pub use aws_sdk_s3::operation::create_multipart_upload as create;
18 pub use aws_sdk_s3::operation::upload_part as upload;
19}
20
21pub const AWS_MIN_PART_SIZE: usize = 5 * 1024 * 1024;
23pub const AWS_MAX_UPLOAD_SIZE: usize = 5 * 1024 * 1024 * 1024 * 1024;
25pub const AWS_MAX_UPLOAD_PARTS: usize = 10000;
27pub const AWS_DEFAULT_TARGET_UPLOAD_SIZE: usize = 100 * 1024 * 1024;
29
30#[non_exhaustive]
32#[derive(Debug, thiserror::Error)]
33pub enum AwsError {
34 #[error("error creating upload {0}")]
35 Create(#[from] aws_sdk_s3::error::SdkError<aws_ops::create::CreateMultipartUploadError>),
36 #[error("error uploading part {0}")]
37 Upload(#[from] aws_sdk_s3::error::SdkError<aws_ops::upload::UploadPartError>),
38 #[error("error completing upload {0}")]
39 Complete(#[from] aws_sdk_s3::error::SdkError<aws_ops::complete::CompleteMultipartUploadError>),
40 #[error("error creating bytestream {0}")]
41 ByteStream(#[from] aws_sdk_s3::primitives::ByteStreamError),
42 #[error("io error {0}")]
43 Io(#[from] std::io::Error),
44 #[error("missing required field {0}")]
45 Missing(&'static str),
46 #[error("encoding error {0}")]
47 Codec(String),
48 #[error("error formatting timestamp for s3 address {0}")]
49 AddrFmt(#[from] chrono::format::ParseError),
50 #[error("unable to produce the next upload destination")]
51 UploadForever,
52 #[error("ser/de error {0}")]
53 Serde(String),
54 #[error("user defined error {0}")]
55 Custom(String),
56 #[error(transparent)]
57 DynStd(Box<dyn std::error::Error + Send + Sync + 'static>),
58}
59
60impl From<AwsError> for std::io::Error {
61 fn from(v: AwsError) -> Self {
62 Self::new(std::io::ErrorKind::Other, v)
63 }
64}
65
66pub struct UploadBuilder<C, E, U> {
68 ctrl: C,
69 codec: E,
70 client: U,
71}
72
73impl<C, E, U> UploadBuilder<C, E, U> {
74 pub fn new(client: U, ctrl: C, codec: E) -> Self
75 where
76 C: types::UploadControl,
77 U: types::UploadClient,
78 {
79 Self {
80 client,
81 ctrl,
82 codec,
83 }
84 }
85
86 pub async fn init_upload<I>(self, bucket: String, key: String) -> Result<Upload<E>, AwsError>
89 where
90 C: types::UploadControl + Send + Sync + 'static,
91 E: tokio_util::codec::Encoder<I>,
92 U: types::UploadClient + Send + Sync + 'static,
93 {
94 let addr = api_types::UploadAddress::new(bucket, key);
95 let params = self.client.new_upload(&addr).await?;
96 let sink = Upload::new(self.client, self.ctrl, self.codec, params);
97 Ok(sink)
98 }
99}
100
101#[derive(Debug, Clone)]
103pub struct DefaultControl {
104 target_part_size: usize,
105 target_upload_size: Option<usize>,
106 target_num_parts: Option<usize>,
107}
108
109impl DefaultControl {
110 pub fn new() -> Self {
111 Self::default()
112 }
113
114 pub fn set_target_part_size(mut self, n: usize) -> Self {
115 self.target_part_size = n;
116 self
117 }
118
119 pub fn set_target_upload_size(mut self, n: usize) -> Self {
120 self.target_upload_size = Some(n);
121 self
122 }
123
124 pub fn set_target_num_parts(mut self, n: usize) -> Self {
125 self.target_num_parts = Some(n);
126 self
127 }
128}
129
130impl Default for DefaultControl {
131 fn default() -> Self {
132 Self {
133 target_part_size: AWS_MIN_PART_SIZE,
134 target_upload_size: None,
135 target_num_parts: None,
136 }
137 }
138}
139
140impl self::types::UploadControl for DefaultControl {
141 fn target_part_size(&self) -> usize {
142 self.target_part_size
143 }
144
145 fn is_upload_ready(&self, upload_size: usize, num_parts: usize) -> bool {
146 self.target_upload_size
147 .map(|n| n <= upload_size)
148 .unwrap_or_default()
149 || self
150 .target_num_parts
151 .map(|n| n <= num_parts)
152 .unwrap_or_default()
153 }
154}