Skip to main content

mountpoint_s3_fs/
upload.rs

1use std::fmt::Debug;
2
3use mountpoint_s3_client::ObjectClient;
4use mountpoint_s3_client::error::{HeadObjectError, ObjectClientError, PutObjectError};
5use mountpoint_s3_client::error_metadata::{ClientErrorMetadata, ProvideErrorMetadata};
6use mountpoint_s3_client::types::{ChecksumAlgorithm, ETag};
7use thiserror::Error;
8
9use crate::async_util::Runtime;
10use crate::fs::{ServerSideEncryption, SseCorruptedError};
11use crate::mem_limiter::MemoryLimiter;
12use crate::memory::PagedPool;
13use crate::sync::Arc;
14
15mod atomic;
16pub use atomic::UploadRequest;
17use atomic::UploadRequestParams;
18
19mod hasher;
20pub use hasher::ChecksumHasherError;
21
22mod incremental;
23use incremental::AppendUploadQueueParams;
24pub use incremental::AppendUploadRequest;
25
26/// An [Uploader] creates and manages streaming PutObject requests.
27#[derive(Debug)]
28pub struct Uploader<Client: ObjectClient> {
29    client: Client,
30    runtime: Runtime,
31    /// Memory pool used by the incremental uploader.
32    /// The atomic uploader does not directly reserve buffers, but relies on
33    /// the client, which is configured with the same pool in Mountpoint.
34    pool: PagedPool,
35    mem_limiter: Arc<MemoryLimiter>,
36    storage_class: Option<String>,
37    server_side_encryption: ServerSideEncryption,
38    buffer_size: usize,
39    /// Default checksum algorithm, if any, to be used for new S3 objects.
40    ///
41    /// Only [ChecksumAlgorithm::Crc32c] is supported for multi-part uploads.
42    /// For existing objects, Mountpoint will instead append using the existing checksum algorithm on the object.
43    default_checksum_algorithm: Option<ChecksumAlgorithm>,
44}
45
46#[derive(Debug, Error)]
47pub enum UploadError<E> {
48    #[error(
49        "out-of-order write is NOT supported by Mountpoint, aborting the upload; expected offset {expected_offset:?} but got {write_offset:?}"
50    )]
51    OutOfOrderWrite { write_offset: u64, expected_offset: u64 },
52
53    #[error("put request failed")]
54    PutRequestFailed(#[from] ObjectClientError<PutObjectError, E>),
55
56    #[error("upload was already terminated because of previous failures")]
57    UploadAlreadyTerminated,
58
59    #[error("SSE settings corrupted")]
60    SseCorruptedError(#[from] SseCorruptedError),
61
62    #[error("error computing checksums")]
63    ChecksumComputationFailed(#[from] ChecksumHasherError),
64
65    #[error("head object request failed")]
66    HeadObjectFailed(#[from] ObjectClientError<HeadObjectError, E>),
67
68    #[error("object exceeded maximum upload size of {maximum_size} bytes")]
69    ObjectTooBig { maximum_size: usize },
70}
71
72impl<E> ProvideErrorMetadata for UploadError<E>
73where
74    E: ProvideErrorMetadata,
75{
76    fn meta(&self) -> ClientErrorMetadata {
77        match self {
78            UploadError::ObjectTooBig { .. }
79            | UploadError::ChecksumComputationFailed(_)
80            | UploadError::SseCorruptedError(_)
81            | UploadError::UploadAlreadyTerminated
82            | UploadError::OutOfOrderWrite { .. } => Default::default(),
83            UploadError::PutRequestFailed(object_client_error) => object_client_error.meta(),
84            UploadError::HeadObjectFailed(object_client_error) => object_client_error.meta(),
85        }
86    }
87}
88
89#[derive(Debug)]
90pub struct UploaderConfig {
91    storage_class: Option<String>,
92    server_side_encryption: ServerSideEncryption,
93    buffer_size: usize,
94    default_checksum_algorithm: Option<ChecksumAlgorithm>,
95}
96
97impl UploaderConfig {
98    pub fn new(buffer_size: usize) -> Self {
99        Self {
100            storage_class: None,
101            server_side_encryption: Default::default(),
102            buffer_size,
103            default_checksum_algorithm: None,
104        }
105    }
106
107    pub fn storage_class(mut self, storage_class: Option<String>) -> Self {
108        self.storage_class = storage_class;
109        self
110    }
111
112    pub fn server_side_encryption(mut self, server_side_encryption: ServerSideEncryption) -> Self {
113        self.server_side_encryption = server_side_encryption;
114        self
115    }
116
117    pub fn default_checksum_algorithm(mut self, default_checksum_algorithm: Option<ChecksumAlgorithm>) -> Self {
118        self.default_checksum_algorithm = default_checksum_algorithm;
119        self
120    }
121}
122
123impl<Client> Uploader<Client>
124where
125    Client: ObjectClient + Clone + Send + Sync + 'static,
126{
127    /// Create a new [Uploader] that will make requests to the given client.
128    pub fn new(
129        client: Client,
130        runtime: Runtime,
131        pool: PagedPool,
132        mem_limiter: Arc<MemoryLimiter>,
133        config: UploaderConfig,
134    ) -> Self {
135        Self {
136            client,
137            runtime,
138            pool,
139            mem_limiter,
140            storage_class: config.storage_class,
141            server_side_encryption: config.server_side_encryption,
142            buffer_size: config.buffer_size,
143            default_checksum_algorithm: config.default_checksum_algorithm,
144        }
145    }
146
147    /// Start a new atomic upload.
148    pub fn start_atomic_upload(
149        &self,
150        bucket: String,
151        key: String,
152    ) -> Result<UploadRequest<Client>, UploadError<Client::ClientError>> {
153        let params = UploadRequestParams {
154            bucket,
155            key,
156            server_side_encryption: self.server_side_encryption.clone(),
157            default_checksum_algorithm: self.default_checksum_algorithm.clone(),
158            storage_class: self.storage_class.clone(),
159        };
160        UploadRequest::new(&self.runtime, self.client.clone(), params)
161    }
162
163    /// Start a new incremental upload.
164    pub fn start_incremental_upload(
165        &self,
166        bucket: String,
167        key: String,
168        initial_offset: u64,
169        initial_etag: Option<ETag>,
170    ) -> AppendUploadRequest<Client> {
171        // Limit the queue capacity to hold buffers for a total of at most
172        // MAX_BYTES_IN_QUEUE, but ensure it allows at least 1 buffer.
173        let capacity = (MAX_BYTES_IN_QUEUE / self.buffer_size).max(1);
174        let params = AppendUploadQueueParams {
175            bucket,
176            key,
177            initial_offset,
178            initial_etag,
179            server_side_encryption: self.server_side_encryption.clone(),
180            default_checksum_algorithm: self.default_checksum_algorithm.clone(),
181            capacity,
182        };
183        AppendUploadRequest::new(
184            &self.runtime,
185            self.client.clone(),
186            self.buffer_size,
187            self.pool.clone(),
188            self.mem_limiter.clone(),
189            params,
190        )
191    }
192
193    #[cfg(test)]
194    pub fn corrupt_sse(&mut self, sse_type: Option<String>, sse_kms_key_id: Option<String>) {
195        self.server_side_encryption.corrupt_data(sse_type, sse_kms_key_id)
196    }
197}
198
199/// Maximum number of bytes an `AppendUploadQueue` can take.
200///
201/// We use this limit to prevent a single pipeline from consuming all memory.
202/// The limit may slow down writes eventually, but the overall upload throughput
203/// is already capped by a single PutObject request.
204const MAX_BYTES_IN_QUEUE: usize = 2 * 1024 * 1024 * 1024;