Skip to main content

mountpoint_s3_client/s3_crt_client/
put_object.rs

1use std::ops::Deref as _;
2use std::os::unix::ffi::OsStrExt as _;
3use std::sync::{Arc, Mutex};
4use std::time::Instant;
5
6use async_trait::async_trait;
7use futures::FutureExt;
8use futures::channel::oneshot::{self, Receiver};
9use mountpoint_s3_crt::http::request_response::{Header, Headers, HeadersError};
10use mountpoint_s3_crt::io::stream::InputStream;
11use mountpoint_s3_crt::s3::client::{ChecksumConfig, MetaRequestResult, RequestType, UploadReview};
12use thiserror::Error;
13use tracing::error;
14use xmltree::Element;
15
16use crate::object_client::{
17    ObjectClientResult, PutObjectError, PutObjectParams, PutObjectRequest, PutObjectResult, PutObjectSingleParams,
18};
19
20use super::{
21    ETag, PutObjectTrailingChecksums, S3CrtClient, S3Message, S3MetaRequest, S3Operation, S3RequestError,
22    emit_throughput_metric,
23};
24
25const ETAG_HEADER_NAME: &str = "ETag";
26const SSE_TYPE_HEADER_NAME: &str = "x-amz-server-side-encryption";
27const SSE_KEY_ID_HEADER_NAME: &str = "x-amz-server-side-encryption-aws-kms-key-id";
28
29impl S3CrtClient {
30    pub(super) async fn put_object(
31        &self,
32        bucket: &str,
33        key: &str,
34        params: &PutObjectParams,
35    ) -> ObjectClientResult<S3PutObjectRequest, PutObjectError, S3RequestError> {
36        let review_callback = ReviewCallbackBox::default();
37        let (on_headers, response_headers) = response_headers_handler();
38        let (tx, rx) = oneshot::channel::<ObjectClientResult<(), PutObjectError, S3RequestError>>();
39        // Before the first write, we need to await for the multi-part upload to be created, so we can report errors.
40        let (mpu_created_sender, mpu_created) = oneshot::channel();
41        let meta_request = {
42            let span = request_span!(self.inner, "put_object", bucket, key);
43            let mut message = self.new_put_request(
44                bucket,
45                key,
46                params.storage_class.as_deref(),
47                params.server_side_encryption.as_deref(),
48                params.ssekms_key_id.as_deref(),
49            )?;
50
51            let checksum_config = match params.trailing_checksums {
52                PutObjectTrailingChecksums::Enabled => Some(ChecksumConfig::trailing_crc32c()),
53                PutObjectTrailingChecksums::ReviewOnly => Some(ChecksumConfig::upload_review_crc32c()),
54                PutObjectTrailingChecksums::Disabled => None,
55            };
56            message.set_checksum_config(checksum_config);
57
58            for (name, value) in &params.object_metadata {
59                message
60                    .set_header(&Header::new(format!("x-amz-meta-{name}"), value))
61                    .map_err(S3RequestError::construction_failure)?
62            }
63            for (name, value) in &params.custom_headers {
64                message
65                    .inner
66                    .add_header(&Header::new(name, value))
67                    .map_err(S3RequestError::construction_failure)?;
68            }
69
70            let callback = review_callback.clone();
71
72            let mut options = message.into_options(S3Operation::PutObject);
73            options.send_using_async_writes(true);
74            options.on_upload_review(move |review| callback.invoke(review));
75            options.part_size(self.inner.write_part_size as u64);
76            if let Some(id) = params.custom_id {
77                options.custom_id(id);
78            }
79
80            let on_mpu_created_sender = Arc::new(Mutex::new(Some(mpu_created_sender)));
81            let on_failure_sender = on_mpu_created_sender.clone();
82            self.inner.meta_request_with_callbacks(
83                options,
84                span,
85                move |metrics| {
86                    if metrics.request_type() == RequestType::CreateMultipartUpload && !metrics.error().is_err() {
87                        // Send signal on a successful CreateMultipartUpload request
88                        if let Some(sender) = on_mpu_created_sender.lock().unwrap().take() {
89                            _ = sender.send(Ok(()));
90                        }
91                    }
92                },
93                on_headers,
94                |_, _| {},
95                |_| None,
96                move |result| {
97                    if let Some(sender) = on_failure_sender.lock().unwrap().take() {
98                        // If the MPU was not created, the request must have failed.
99                        _ = sender.send(result.and_then(|_| {
100                            Err(
101                                S3RequestError::internal_failure(S3PutObjectRequestError::CreateMultipartUploadFailed)
102                                    .into(),
103                            )
104                        }));
105                    } else {
106                        _ = tx.send(result);
107                    }
108                },
109            )?
110        };
111
112        // Wait for CreateMultipartUpload to complete, or return error.
113        mpu_created.await.unwrap()?;
114
115        let request = S3MetaRequest {
116            receiver: rx.fuse(),
117            meta_request,
118        };
119        Ok(S3PutObjectRequest {
120            request,
121            review_callback,
122            start_time: Instant::now(),
123            total_bytes: 0,
124            response_headers,
125            state: S3PutObjectRequestState::Idle,
126        })
127    }
128
129    pub(super) async fn put_object_single<'a>(
130        &self,
131        bucket: &str,
132        key: &str,
133        params: &PutObjectSingleParams,
134        contents: impl AsRef<[u8]> + Send + 'a,
135    ) -> ObjectClientResult<PutObjectResult, PutObjectError, S3RequestError> {
136        let span = request_span!(self.inner, "put_object_single", bucket, key);
137        let start_time = Instant::now();
138
139        let slice = contents.as_ref();
140        let content_length = slice.len();
141        let request = {
142            let mut message = self.new_put_request(
143                bucket,
144                key,
145                params.storage_class.as_deref(),
146                params.server_side_encryption.as_deref(),
147                params.ssekms_key_id.as_deref(),
148            )?;
149            message
150                .set_content_length_header(content_length)
151                .map_err(S3RequestError::construction_failure)?;
152            if let Some(checksum) = &params.checksum {
153                message
154                    .set_checksum_header(checksum)
155                    .map_err(S3RequestError::construction_failure)?;
156            }
157            if let Some(offset) = params.write_offset_bytes {
158                message
159                    .set_header(&Header::new("x-amz-write-offset-bytes", offset.to_string()))
160                    .map_err(S3RequestError::construction_failure)?;
161            }
162            if let Some(etag) = &params.if_match {
163                message
164                    .set_header(&Header::new("If-Match", etag.as_str()))
165                    .map_err(S3RequestError::construction_failure)?;
166            }
167            for (name, value) in &params.object_metadata {
168                message
169                    .set_header(&Header::new(format!("x-amz-meta-{name}"), value))
170                    .map_err(S3RequestError::construction_failure)?;
171            }
172            for (name, value) in &params.custom_headers {
173                message
174                    .inner
175                    .add_header(&Header::new(name, value))
176                    .map_err(S3RequestError::construction_failure)?;
177            }
178
179            let body_input_stream =
180                InputStream::new_from_slice(&self.inner.allocator, slice).map_err(S3RequestError::CrtError)?;
181            message.set_body_stream(Some(body_input_stream));
182
183            self.inner.meta_request_with_headers_payload(
184                message.into_options(S3Operation::PutObjectSingle),
185                span,
186                parse_put_object_single_error,
187            )?
188        };
189
190        let headers = request.await?;
191
192        let elapsed = start_time.elapsed();
193        emit_throughput_metric(content_length as u64, elapsed, "put_object_single");
194
195        Ok(extract_result(headers)?)
196    }
197
198    fn new_put_request(
199        &self,
200        bucket: &str,
201        key: &str,
202        storage_class: Option<&str>,
203        server_side_encryption: Option<&str>,
204        ssekms_key_id: Option<&str>,
205    ) -> Result<S3Message<'_>, S3RequestError> {
206        let mut message = self
207            .inner
208            .new_request_template("PUT", bucket)
209            .map_err(S3RequestError::construction_failure)?;
210
211        let key = format!("/{key}");
212        message
213            .set_request_path(&key)
214            .map_err(S3RequestError::construction_failure)?;
215
216        if let Some(storage_class) = storage_class {
217            message
218                .set_header(&Header::new("x-amz-storage-class", storage_class))
219                .map_err(S3RequestError::construction_failure)?;
220        }
221
222        if let Some(sse) = server_side_encryption {
223            message
224                .set_header(&Header::new(SSE_TYPE_HEADER_NAME, sse))
225                .map_err(S3RequestError::construction_failure)?;
226        }
227        if let Some(key_id) = ssekms_key_id {
228            message
229                .set_header(&Header::new(SSE_KEY_ID_HEADER_NAME, key_id))
230                .map_err(S3RequestError::construction_failure)?;
231        }
232
233        Ok(message)
234    }
235}
236
237type ReviewCallback = dyn FnOnce(UploadReview) -> bool + Send;
238
239/// Holder for the upload review callback.
240/// Used to set the callback when initiating the PutObject request on the CRT client,
241/// but redirects to the actual callback the user can specify at completion time.
242#[derive(Clone, Default)]
243struct ReviewCallbackBox {
244    callback: Arc<Mutex<Option<Box<ReviewCallback>>>>,
245}
246
247impl std::fmt::Debug for ReviewCallbackBox {
248    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
249        f.debug_struct("ReviewCallbackBox").finish_non_exhaustive()
250    }
251}
252
253impl ReviewCallbackBox {
254    fn set(&mut self, callback: impl FnOnce(UploadReview) -> bool + Send + 'static) {
255        let previous = self.callback.lock().unwrap().replace(Box::new(callback));
256        assert!(previous.is_none(), "review callback set twice");
257    }
258
259    fn invoke(self, review: UploadReview) -> bool {
260        let mut callback = self.callback.lock().unwrap();
261        let Some(callback) = callback.take() else {
262            error!("review callback was either never set or invoked twice");
263            return false;
264        };
265
266        (callback)(review)
267    }
268}
269
270/// An in-progress streaming PutObject request to S3.
271///
272/// You can write to or complete the upload using the [`PutObjectRequest`] implementation on this
273/// object.
274#[derive(Debug)]
275pub struct S3PutObjectRequest {
276    request: S3MetaRequest<(), PutObjectError>,
277    review_callback: ReviewCallbackBox,
278    start_time: Instant,
279    total_bytes: u64,
280    /// Future for the headers of the CompleteMultipartUpload response.
281    /// Guaranteed to be available after the request finishes successfully.
282    response_headers: Receiver<Headers>,
283    state: S3PutObjectRequestState,
284}
285
286/// Internal state for a [S3PutObjectRequest].
287#[derive(Debug)]
288enum S3PutObjectRequestState {
289    /// A write operation is in progress or was interrupted before completion.
290    PendingWrite,
291    /// Idle state between write calls.
292    Idle,
293}
294
295/// Internal errors for a [S3PutObjectRequest].
296#[derive(Debug, Error)]
297enum S3PutObjectRequestError {
298    #[error("A previous write operation did not complete successfully")]
299    PreviousWriteFailed,
300    #[error("The CreateMultiPartUpload request did not succeed")]
301    CreateMultipartUploadFailed,
302}
303
304fn get_etag(response_headers: &Headers) -> Result<ETag, HeadersError> {
305    Ok(response_headers.get_as_string(ETAG_HEADER_NAME)?.into())
306}
307
308fn parse_put_object_single_error(result: &MetaRequestResult) -> Option<PutObjectError> {
309    match result.response_status {
310        400 => {
311            let body = result.error_response_body.as_ref()?;
312            let root = xmltree::Element::parse(body.as_bytes()).ok()?;
313            let error_code = root.get_child("Code")?;
314            let error_str = error_code.get_text()?;
315            match error_str.deref() {
316                "InvalidWriteOffset" => Some(PutObjectError::InvalidWriteOffset),
317                "BadDigest" => Some(PutObjectError::BadChecksum),
318                "InvalidArgument" => {
319                    parse_if_error_message_starts_with("Request body cannot be empty", &root, PutObjectError::EmptyBody)
320                }
321                "InvalidRequest" => parse_if_error_message_starts_with(
322                    "Checksum Type mismatch",
323                    &root,
324                    PutObjectError::InvalidChecksumType,
325                ),
326                _ => None,
327            }
328        }
329        404 => {
330            let body = result.error_response_body.as_ref()?;
331            let root = xmltree::Element::parse(body.as_bytes()).ok()?;
332            let error_code = root.get_child("Code")?;
333            let error_str = error_code.get_text()?;
334            match error_str.deref() {
335                "NoSuchKey" => Some(PutObjectError::NoSuchKey),
336                _ => None,
337            }
338        }
339        412 => Some(PutObjectError::PreconditionFailed),
340        501 => Some(PutObjectError::NotImplemented),
341        _ => None,
342    }
343}
344
345fn parse_if_error_message_starts_with<E: std::error::Error>(prefix: &str, element: &Element, error: E) -> Option<E> {
346    let error_message = element.get_child("Message")?.get_text();
347    if let Some(error_message) = error_message
348        && error_message.starts_with(prefix)
349    {
350        return Some(error);
351    }
352    None
353}
354
355fn extract_result(response_headers: Headers) -> Result<PutObjectResult, S3RequestError> {
356    fn extract_result_headers_err(response_headers: Headers) -> Result<PutObjectResult, HeadersError> {
357        Ok(PutObjectResult {
358            etag: get_etag(&response_headers)?,
359            sse_type: response_headers.get_as_optional_string(SSE_TYPE_HEADER_NAME)?,
360            sse_kms_key_id: response_headers.get_as_optional_string(SSE_KEY_ID_HEADER_NAME)?,
361        })
362    }
363    extract_result_headers_err(response_headers).map_err(|e| S3RequestError::InternalError(Box::new(e)))
364}
365
366/// Creates `on_headers` callback that will send the response headers to the matching `Receiver`.
367fn response_headers_handler() -> (impl FnMut(&Headers, i32), Receiver<Headers>) {
368    let (response_headers_sender, response_headers) = oneshot::channel();
369    // The callback signature (`FnMut`) allows for it to be invoked multiple times,
370    // but for PUT requests it will only be called once on CompleteMultipartUpload.
371    // Wrapping the `oneshot::Sender` in an `Option` allows it to be consumed
372    // on the first (and only!) invocation.
373    let mut response_headers_sender = Some(response_headers_sender);
374    let on_headers = move |headers: &Headers, _: i32| {
375        if let Some(sender) = response_headers_sender.take() {
376            let _ = sender.send(headers.clone());
377        }
378    };
379    (on_headers, response_headers)
380}
381
382#[cfg_attr(not(docsrs), async_trait)]
383impl PutObjectRequest for S3PutObjectRequest {
384    type ClientError = S3RequestError;
385
386    async fn write(&mut self, slice: &[u8]) -> ObjectClientResult<(), PutObjectError, Self::ClientError> {
387        // Writing to the meta request may require multiple calls. Set the internal
388        // state to `PendingWrite` until we are done.
389        match std::mem::replace(&mut self.state, S3PutObjectRequestState::PendingWrite) {
390            S3PutObjectRequestState::PendingWrite => {
391                // Fail if a previous write was not completed.
392                return Err(S3RequestError::internal_failure(S3PutObjectRequestError::PreviousWriteFailed).into());
393            }
394            S3PutObjectRequestState::Idle => {}
395        }
396
397        let meta_request = &mut self.request.meta_request;
398        let mut slice = slice;
399        while !slice.is_empty() {
400            // Write will fail if the request has already finished (because of an error).
401            let remaining = meta_request
402                .write(slice, false)
403                .await
404                .map_err(S3RequestError::CrtError)?;
405            self.total_bytes += (slice.len() - remaining.len()) as u64;
406            slice = remaining;
407        }
408        // Write completed with no errors, we can reset to `Idle`.
409        self.state = S3PutObjectRequestState::Idle;
410        Ok(())
411    }
412
413    async fn complete(self) -> ObjectClientResult<PutObjectResult, PutObjectError, Self::ClientError> {
414        self.review_and_complete(|_| true).await
415    }
416
417    async fn review_and_complete(
418        mut self,
419        review_callback: impl FnOnce(UploadReview) -> bool + Send + 'static,
420    ) -> ObjectClientResult<PutObjectResult, PutObjectError, Self::ClientError> {
421        if matches!(self.state, S3PutObjectRequestState::PendingWrite) {
422            // Fail if a previous write was not completed.
423            return Err(S3RequestError::internal_failure(S3PutObjectRequestError::PreviousWriteFailed).into());
424        }
425
426        self.review_callback.set(review_callback);
427
428        // Write will fail if the request has already finished (because of an error).
429        _ = self
430            .request
431            .meta_request
432            .write(&[], true)
433            .await
434            .map_err(S3RequestError::CrtError)?;
435
436        // Now wait for the request to finish.
437        self.request.await?;
438
439        let elapsed = self.start_time.elapsed();
440        emit_throughput_metric(self.total_bytes, elapsed, "put_object");
441
442        Ok(extract_result(self.response_headers.await.expect(
443            "headers should be available since the request completed successfully",
444        ))?)
445    }
446}
447
448impl S3PutObjectRequest {
449    /// The number of bytes written to this request so far.
450    // TODO: consider exposing on the `PutObjectRequest` trait.
451    pub fn bytes_written(&self) -> u64 {
452        self.total_bytes
453    }
454}