mountpoint_s3_client/s3_crt_client/
put_object.rs

1use std::ops::Deref as _;
2use std::os::unix::ffi::OsStrExt as _;
3use std::sync::{Arc, Mutex};
4use std::time::Instant;
5
6use async_trait::async_trait;
7use futures::FutureExt;
8use futures::channel::oneshot::{self, Receiver};
9use mountpoint_s3_crt::http::request_response::{Header, Headers, HeadersError};
10use mountpoint_s3_crt::io::stream::InputStream;
11use mountpoint_s3_crt::s3::client::{ChecksumConfig, MetaRequestResult, RequestType, UploadReview};
12use thiserror::Error;
13use tracing::error;
14use xmltree::Element;
15
16use crate::object_client::{
17    ObjectClientResult, PutObjectError, PutObjectParams, PutObjectRequest, PutObjectResult, PutObjectSingleParams,
18};
19
20use super::{
21    ETag, PutObjectTrailingChecksums, S3CrtClient, S3Message, S3MetaRequest, S3Operation, S3RequestError,
22    emit_throughput_metric,
23};
24
25const ETAG_HEADER_NAME: &str = "ETag";
26const SSE_TYPE_HEADER_NAME: &str = "x-amz-server-side-encryption";
27const SSE_KEY_ID_HEADER_NAME: &str = "x-amz-server-side-encryption-aws-kms-key-id";
28
29impl S3CrtClient {
30    pub(super) async fn put_object(
31        &self,
32        bucket: &str,
33        key: &str,
34        params: &PutObjectParams,
35    ) -> ObjectClientResult<S3PutObjectRequest, PutObjectError, S3RequestError> {
36        let review_callback = ReviewCallbackBox::default();
37        let (on_headers, response_headers) = response_headers_handler();
38        let (tx, rx) = oneshot::channel::<ObjectClientResult<(), PutObjectError, S3RequestError>>();
39        // Before the first write, we need to await for the multi-part upload to be created, so we can report errors.
40        let (mpu_created_sender, mpu_created) = oneshot::channel();
41        let meta_request = {
42            let span = request_span!(self.inner, "put_object", bucket, key);
43            let mut message = self.new_put_request(
44                bucket,
45                key,
46                params.storage_class.as_deref(),
47                params.server_side_encryption.as_deref(),
48                params.ssekms_key_id.as_deref(),
49            )?;
50
51            let checksum_config = match params.trailing_checksums {
52                PutObjectTrailingChecksums::Enabled => Some(ChecksumConfig::trailing_crc32c()),
53                PutObjectTrailingChecksums::ReviewOnly => Some(ChecksumConfig::upload_review_crc32c()),
54                PutObjectTrailingChecksums::Disabled => None,
55            };
56            message.set_checksum_config(checksum_config);
57
58            for (name, value) in &params.object_metadata {
59                message
60                    .set_header(&Header::new(format!("x-amz-meta-{name}"), value))
61                    .map_err(S3RequestError::construction_failure)?
62            }
63            for (name, value) in &params.custom_headers {
64                message
65                    .inner
66                    .add_header(&Header::new(name, value))
67                    .map_err(S3RequestError::construction_failure)?;
68            }
69
70            let callback = review_callback.clone();
71
72            let mut options = message.into_options(S3Operation::PutObject);
73            options.send_using_async_writes(true);
74            options.on_upload_review(move |review| callback.invoke(review));
75            options.part_size(self.inner.write_part_size as u64);
76
77            let on_mpu_created_sender = Arc::new(Mutex::new(Some(mpu_created_sender)));
78            let on_failure_sender = on_mpu_created_sender.clone();
79            self.inner.meta_request_with_callbacks(
80                options,
81                span,
82                move |metrics| {
83                    if metrics.request_type() == RequestType::CreateMultipartUpload && !metrics.error().is_err() {
84                        // Send signal on a successful CreateMultipartUpload request
85                        if let Some(sender) = on_mpu_created_sender.lock().unwrap().take() {
86                            _ = sender.send(Ok(()));
87                        }
88                    }
89                },
90                on_headers,
91                |_, _| {},
92                |_| None,
93                move |result| {
94                    if let Some(sender) = on_failure_sender.lock().unwrap().take() {
95                        // If the MPU was not created, the request must have failed.
96                        _ = sender.send(result.and_then(|_| {
97                            Err(
98                                S3RequestError::internal_failure(S3PutObjectRequestError::CreateMultipartUploadFailed)
99                                    .into(),
100                            )
101                        }));
102                    } else {
103                        _ = tx.send(result);
104                    }
105                },
106            )?
107        };
108
109        // Wait for CreateMultipartUpload to complete, or return error.
110        mpu_created.await.unwrap()?;
111
112        let request = S3MetaRequest {
113            receiver: rx.fuse(),
114            meta_request,
115        };
116        Ok(S3PutObjectRequest {
117            request,
118            review_callback,
119            start_time: Instant::now(),
120            total_bytes: 0,
121            response_headers,
122            state: S3PutObjectRequestState::Idle,
123        })
124    }
125
126    pub(super) async fn put_object_single<'a>(
127        &self,
128        bucket: &str,
129        key: &str,
130        params: &PutObjectSingleParams,
131        contents: impl AsRef<[u8]> + Send + 'a,
132    ) -> ObjectClientResult<PutObjectResult, PutObjectError, S3RequestError> {
133        let span = request_span!(self.inner, "put_object_single", bucket, key);
134        let start_time = Instant::now();
135
136        let slice = contents.as_ref();
137        let content_length = slice.len();
138        let request = {
139            let mut message = self.new_put_request(
140                bucket,
141                key,
142                params.storage_class.as_deref(),
143                params.server_side_encryption.as_deref(),
144                params.ssekms_key_id.as_deref(),
145            )?;
146            message
147                .set_content_length_header(content_length)
148                .map_err(S3RequestError::construction_failure)?;
149            if let Some(checksum) = &params.checksum {
150                message
151                    .set_checksum_header(checksum)
152                    .map_err(S3RequestError::construction_failure)?;
153            }
154            if let Some(offset) = params.write_offset_bytes {
155                message
156                    .set_header(&Header::new("x-amz-write-offset-bytes", offset.to_string()))
157                    .map_err(S3RequestError::construction_failure)?;
158            }
159            if let Some(etag) = &params.if_match {
160                message
161                    .set_header(&Header::new("If-Match", etag.as_str()))
162                    .map_err(S3RequestError::construction_failure)?;
163            }
164            for (name, value) in &params.object_metadata {
165                message
166                    .set_header(&Header::new(format!("x-amz-meta-{name}"), value))
167                    .map_err(S3RequestError::construction_failure)?;
168            }
169            for (name, value) in &params.custom_headers {
170                message
171                    .inner
172                    .add_header(&Header::new(name, value))
173                    .map_err(S3RequestError::construction_failure)?;
174            }
175
176            let body_input_stream =
177                InputStream::new_from_slice(&self.inner.allocator, slice).map_err(S3RequestError::CrtError)?;
178            message.set_body_stream(Some(body_input_stream));
179
180            self.inner.meta_request_with_headers_payload(
181                message.into_options(S3Operation::PutObjectSingle),
182                span,
183                parse_put_object_single_error,
184            )?
185        };
186
187        let headers = request.await?;
188
189        let elapsed = start_time.elapsed();
190        emit_throughput_metric(content_length as u64, elapsed, "put_object_single");
191
192        Ok(extract_result(headers)?)
193    }
194
195    fn new_put_request(
196        &self,
197        bucket: &str,
198        key: &str,
199        storage_class: Option<&str>,
200        server_side_encryption: Option<&str>,
201        ssekms_key_id: Option<&str>,
202    ) -> Result<S3Message<'_>, S3RequestError> {
203        let mut message = self
204            .inner
205            .new_request_template("PUT", bucket)
206            .map_err(S3RequestError::construction_failure)?;
207
208        let key = format!("/{key}");
209        message
210            .set_request_path(&key)
211            .map_err(S3RequestError::construction_failure)?;
212
213        if let Some(storage_class) = storage_class {
214            message
215                .set_header(&Header::new("x-amz-storage-class", storage_class))
216                .map_err(S3RequestError::construction_failure)?;
217        }
218
219        if let Some(sse) = server_side_encryption {
220            message
221                .set_header(&Header::new(SSE_TYPE_HEADER_NAME, sse))
222                .map_err(S3RequestError::construction_failure)?;
223        }
224        if let Some(key_id) = ssekms_key_id {
225            message
226                .set_header(&Header::new(SSE_KEY_ID_HEADER_NAME, key_id))
227                .map_err(S3RequestError::construction_failure)?;
228        }
229
230        Ok(message)
231    }
232}
233
234type ReviewCallback = dyn FnOnce(UploadReview) -> bool + Send;
235
236/// Holder for the upload review callback.
237/// Used to set the callback when initiating the PutObject request on the CRT client,
238/// but redirects to the actual callback the user can specify at completion time.
239#[derive(Clone, Default)]
240struct ReviewCallbackBox {
241    callback: Arc<Mutex<Option<Box<ReviewCallback>>>>,
242}
243
244impl std::fmt::Debug for ReviewCallbackBox {
245    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
246        f.debug_struct("ReviewCallbackBox").finish_non_exhaustive()
247    }
248}
249
250impl ReviewCallbackBox {
251    fn set(&mut self, callback: impl FnOnce(UploadReview) -> bool + Send + 'static) {
252        let previous = self.callback.lock().unwrap().replace(Box::new(callback));
253        assert!(previous.is_none(), "review callback set twice");
254    }
255
256    fn invoke(self, review: UploadReview) -> bool {
257        let mut callback = self.callback.lock().unwrap();
258        let Some(callback) = callback.take() else {
259            error!("review callback was either never set or invoked twice");
260            return false;
261        };
262
263        (callback)(review)
264    }
265}
266
267/// An in-progress streaming PutObject request to S3.
268///
269/// You can write to or complete the upload using the [`PutObjectRequest`] implementation on this
270/// object.
271#[derive(Debug)]
272pub struct S3PutObjectRequest {
273    request: S3MetaRequest<(), PutObjectError>,
274    review_callback: ReviewCallbackBox,
275    start_time: Instant,
276    total_bytes: u64,
277    /// Future for the headers of the CompleteMultipartUpload response.
278    /// Guaranteed to be available after the request finishes successfully.
279    response_headers: Receiver<Headers>,
280    state: S3PutObjectRequestState,
281}
282
283/// Internal state for a [S3PutObjectRequest].
284#[derive(Debug)]
285enum S3PutObjectRequestState {
286    /// A write operation is in progress or was interrupted before completion.
287    PendingWrite,
288    /// Idle state between write calls.
289    Idle,
290}
291
292/// Internal errors for a [S3PutObjectRequest].
293#[derive(Debug, Error)]
294enum S3PutObjectRequestError {
295    #[error("A previous write operation did not complete successfully")]
296    PreviousWriteFailed,
297    #[error("The CreateMultiPartUpload request did not succeed")]
298    CreateMultipartUploadFailed,
299}
300
301fn get_etag(response_headers: &Headers) -> Result<ETag, HeadersError> {
302    Ok(response_headers.get_as_string(ETAG_HEADER_NAME)?.into())
303}
304
305fn parse_put_object_single_error(result: &MetaRequestResult) -> Option<PutObjectError> {
306    match result.response_status {
307        400 => {
308            let body = result.error_response_body.as_ref()?;
309            let root = xmltree::Element::parse(body.as_bytes()).ok()?;
310            let error_code = root.get_child("Code")?;
311            let error_str = error_code.get_text()?;
312            match error_str.deref() {
313                "InvalidWriteOffset" => Some(PutObjectError::InvalidWriteOffset),
314                "BadDigest" => Some(PutObjectError::BadChecksum),
315                "InvalidArgument" => {
316                    parse_if_error_message_starts_with("Request body cannot be empty", &root, PutObjectError::EmptyBody)
317                }
318                "InvalidRequest" => parse_if_error_message_starts_with(
319                    "Checksum Type mismatch",
320                    &root,
321                    PutObjectError::InvalidChecksumType,
322                ),
323                _ => None,
324            }
325        }
326        404 => {
327            let body = result.error_response_body.as_ref()?;
328            let root = xmltree::Element::parse(body.as_bytes()).ok()?;
329            let error_code = root.get_child("Code")?;
330            let error_str = error_code.get_text()?;
331            match error_str.deref() {
332                "NoSuchKey" => Some(PutObjectError::NoSuchKey),
333                _ => None,
334            }
335        }
336        412 => Some(PutObjectError::PreconditionFailed),
337        501 => Some(PutObjectError::NotImplemented),
338        _ => None,
339    }
340}
341
342fn parse_if_error_message_starts_with<E: std::error::Error>(prefix: &str, element: &Element, error: E) -> Option<E> {
343    let error_message = element.get_child("Message")?.get_text();
344    if let Some(error_message) = error_message
345        && error_message.starts_with(prefix)
346    {
347        return Some(error);
348    }
349    None
350}
351
352fn extract_result(response_headers: Headers) -> Result<PutObjectResult, S3RequestError> {
353    fn extract_result_headers_err(response_headers: Headers) -> Result<PutObjectResult, HeadersError> {
354        Ok(PutObjectResult {
355            etag: get_etag(&response_headers)?,
356            sse_type: response_headers.get_as_optional_string(SSE_TYPE_HEADER_NAME)?,
357            sse_kms_key_id: response_headers.get_as_optional_string(SSE_KEY_ID_HEADER_NAME)?,
358        })
359    }
360    extract_result_headers_err(response_headers).map_err(|e| S3RequestError::InternalError(Box::new(e)))
361}
362
363/// Creates `on_headers` callback that will send the response headers to the matching `Receiver`.
364fn response_headers_handler() -> (impl FnMut(&Headers, i32), Receiver<Headers>) {
365    let (response_headers_sender, response_headers) = oneshot::channel();
366    // The callback signature (`FnMut`) allows for it to be invoked multiple times,
367    // but for PUT requests it will only be called once on CompleteMultipartUpload.
368    // Wrapping the `oneshot::Sender` in an `Option` allows it to be consumed
369    // on the first (and only!) invocation.
370    let mut response_headers_sender = Some(response_headers_sender);
371    let on_headers = move |headers: &Headers, _: i32| {
372        if let Some(sender) = response_headers_sender.take() {
373            let _ = sender.send(headers.clone());
374        }
375    };
376    (on_headers, response_headers)
377}
378
379#[cfg_attr(not(docsrs), async_trait)]
380impl PutObjectRequest for S3PutObjectRequest {
381    type ClientError = S3RequestError;
382
383    async fn write(&mut self, slice: &[u8]) -> ObjectClientResult<(), PutObjectError, Self::ClientError> {
384        // Writing to the meta request may require multiple calls. Set the internal
385        // state to `PendingWrite` until we are done.
386        match std::mem::replace(&mut self.state, S3PutObjectRequestState::PendingWrite) {
387            S3PutObjectRequestState::PendingWrite => {
388                // Fail if a previous write was not completed.
389                return Err(S3RequestError::internal_failure(S3PutObjectRequestError::PreviousWriteFailed).into());
390            }
391            S3PutObjectRequestState::Idle => {}
392        }
393
394        let meta_request = &mut self.request.meta_request;
395        let mut slice = slice;
396        while !slice.is_empty() {
397            // Write will fail if the request has already finished (because of an error).
398            let remaining = meta_request
399                .write(slice, false)
400                .await
401                .map_err(S3RequestError::CrtError)?;
402            self.total_bytes += (slice.len() - remaining.len()) as u64;
403            slice = remaining;
404        }
405        // Write completed with no errors, we can reset to `Idle`.
406        self.state = S3PutObjectRequestState::Idle;
407        Ok(())
408    }
409
410    async fn complete(self) -> ObjectClientResult<PutObjectResult, PutObjectError, Self::ClientError> {
411        self.review_and_complete(|_| true).await
412    }
413
414    async fn review_and_complete(
415        mut self,
416        review_callback: impl FnOnce(UploadReview) -> bool + Send + 'static,
417    ) -> ObjectClientResult<PutObjectResult, PutObjectError, Self::ClientError> {
418        if matches!(self.state, S3PutObjectRequestState::PendingWrite) {
419            // Fail if a previous write was not completed.
420            return Err(S3RequestError::internal_failure(S3PutObjectRequestError::PreviousWriteFailed).into());
421        }
422
423        self.review_callback.set(review_callback);
424
425        // Write will fail if the request has already finished (because of an error).
426        _ = self
427            .request
428            .meta_request
429            .write(&[], true)
430            .await
431            .map_err(S3RequestError::CrtError)?;
432
433        // Now wait for the request to finish.
434        self.request.await?;
435
436        let elapsed = self.start_time.elapsed();
437        emit_throughput_metric(self.total_bytes, elapsed, "put_object");
438
439        Ok(extract_result(self.response_headers.await.expect(
440            "headers should be available since the request completed successfully",
441        ))?)
442    }
443}
444
445impl S3PutObjectRequest {
446    /// The number of bytes written to this request so far.
447    // TODO: consider exposing on the `PutObjectRequest` trait.
448    pub fn bytes_written(&self) -> u64 {
449        self.total_bytes
450    }
451}