1use std::fmt::Debug;
2
3use mountpoint_s3_client::ObjectClient;
4use mountpoint_s3_client::error::{HeadObjectError, ObjectClientError, PutObjectError};
5use mountpoint_s3_client::error_metadata::{ClientErrorMetadata, ProvideErrorMetadata};
6use mountpoint_s3_client::types::{ChecksumAlgorithm, ETag};
7use thiserror::Error;
8
9use crate::async_util::Runtime;
10use crate::fs::{ServerSideEncryption, SseCorruptedError};
11use crate::mem_limiter::MemoryLimiter;
12use crate::memory::PagedPool;
13use crate::sync::Arc;
14
15mod atomic;
16pub use atomic::UploadRequest;
17use atomic::UploadRequestParams;
18
19mod hasher;
20pub use hasher::ChecksumHasherError;
21
22mod incremental;
23use incremental::AppendUploadQueueParams;
24pub use incremental::AppendUploadRequest;
25
26#[derive(Debug)]
28pub struct Uploader<Client: ObjectClient> {
29 client: Client,
30 runtime: Runtime,
31 pool: PagedPool,
35 mem_limiter: Arc<MemoryLimiter>,
36 storage_class: Option<String>,
37 server_side_encryption: ServerSideEncryption,
38 buffer_size: usize,
39 default_checksum_algorithm: Option<ChecksumAlgorithm>,
44}
45
46#[derive(Debug, Error)]
47pub enum UploadError<E> {
48 #[error(
49 "out-of-order write is NOT supported by Mountpoint, aborting the upload; expected offset {expected_offset:?} but got {write_offset:?}"
50 )]
51 OutOfOrderWrite { write_offset: u64, expected_offset: u64 },
52
53 #[error("put request failed")]
54 PutRequestFailed(#[from] ObjectClientError<PutObjectError, E>),
55
56 #[error("upload was already terminated because of previous failures")]
57 UploadAlreadyTerminated,
58
59 #[error("SSE settings corrupted")]
60 SseCorruptedError(#[from] SseCorruptedError),
61
62 #[error("error computing checksums")]
63 ChecksumComputationFailed(#[from] ChecksumHasherError),
64
65 #[error("head object request failed")]
66 HeadObjectFailed(#[from] ObjectClientError<HeadObjectError, E>),
67
68 #[error("object exceeded maximum upload size of {maximum_size} bytes")]
69 ObjectTooBig { maximum_size: usize },
70}
71
72impl<E> ProvideErrorMetadata for UploadError<E>
73where
74 E: ProvideErrorMetadata,
75{
76 fn meta(&self) -> ClientErrorMetadata {
77 match self {
78 UploadError::ObjectTooBig { .. }
79 | UploadError::ChecksumComputationFailed(_)
80 | UploadError::SseCorruptedError(_)
81 | UploadError::UploadAlreadyTerminated
82 | UploadError::OutOfOrderWrite { .. } => Default::default(),
83 UploadError::PutRequestFailed(object_client_error) => object_client_error.meta(),
84 UploadError::HeadObjectFailed(object_client_error) => object_client_error.meta(),
85 }
86 }
87}
88
89#[derive(Debug)]
90pub struct UploaderConfig {
91 storage_class: Option<String>,
92 server_side_encryption: ServerSideEncryption,
93 buffer_size: usize,
94 default_checksum_algorithm: Option<ChecksumAlgorithm>,
95}
96
97impl UploaderConfig {
98 pub fn new(buffer_size: usize) -> Self {
99 Self {
100 storage_class: None,
101 server_side_encryption: Default::default(),
102 buffer_size,
103 default_checksum_algorithm: None,
104 }
105 }
106
107 pub fn storage_class(mut self, storage_class: Option<String>) -> Self {
108 self.storage_class = storage_class;
109 self
110 }
111
112 pub fn server_side_encryption(mut self, server_side_encryption: ServerSideEncryption) -> Self {
113 self.server_side_encryption = server_side_encryption;
114 self
115 }
116
117 pub fn default_checksum_algorithm(mut self, default_checksum_algorithm: Option<ChecksumAlgorithm>) -> Self {
118 self.default_checksum_algorithm = default_checksum_algorithm;
119 self
120 }
121}
122
123impl<Client> Uploader<Client>
124where
125 Client: ObjectClient + Clone + Send + Sync + 'static,
126{
127 pub fn new(
129 client: Client,
130 runtime: Runtime,
131 pool: PagedPool,
132 mem_limiter: Arc<MemoryLimiter>,
133 config: UploaderConfig,
134 ) -> Self {
135 Self {
136 client,
137 runtime,
138 pool,
139 mem_limiter,
140 storage_class: config.storage_class,
141 server_side_encryption: config.server_side_encryption,
142 buffer_size: config.buffer_size,
143 default_checksum_algorithm: config.default_checksum_algorithm,
144 }
145 }
146
147 pub fn start_atomic_upload(
149 &self,
150 bucket: String,
151 key: String,
152 ) -> Result<UploadRequest<Client>, UploadError<Client::ClientError>> {
153 let params = UploadRequestParams {
154 bucket,
155 key,
156 server_side_encryption: self.server_side_encryption.clone(),
157 default_checksum_algorithm: self.default_checksum_algorithm.clone(),
158 storage_class: self.storage_class.clone(),
159 };
160 UploadRequest::new(&self.runtime, self.client.clone(), params)
161 }
162
163 pub fn start_incremental_upload(
165 &self,
166 bucket: String,
167 key: String,
168 initial_offset: u64,
169 initial_etag: Option<ETag>,
170 ) -> AppendUploadRequest<Client> {
171 let capacity = (MAX_BYTES_IN_QUEUE / self.buffer_size).max(1);
174 let params = AppendUploadQueueParams {
175 bucket,
176 key,
177 initial_offset,
178 initial_etag,
179 server_side_encryption: self.server_side_encryption.clone(),
180 default_checksum_algorithm: self.default_checksum_algorithm.clone(),
181 capacity,
182 };
183 AppendUploadRequest::new(
184 &self.runtime,
185 self.client.clone(),
186 self.buffer_size,
187 self.pool.clone(),
188 self.mem_limiter.clone(),
189 params,
190 )
191 }
192
193 #[cfg(test)]
194 pub fn corrupt_sse(&mut self, sse_type: Option<String>, sse_kms_key_id: Option<String>) {
195 self.server_side_encryption.corrupt_data(sse_type, sse_kms_key_id)
196 }
197}
198
199const MAX_BYTES_IN_QUEUE: usize = 2 * 1024 * 1024 * 1024;