mountpoint_s3_client/s3_crt_client/
put_object.rs

1use std::ops::Deref as _;
2use std::os::unix::ffi::OsStrExt as _;
3use std::sync::{Arc, Mutex};
4use std::time::Instant;
5
6use async_trait::async_trait;
7use futures::channel::oneshot::{self, Receiver};
8use futures::FutureExt;
9use mountpoint_s3_crt::http::request_response::{Header, Headers, HeadersError};
10use mountpoint_s3_crt::io::stream::InputStream;
11use mountpoint_s3_crt::s3::client::{ChecksumConfig, MetaRequestResult, RequestType, UploadReview};
12use thiserror::Error;
13use tracing::error;
14use xmltree::Element;
15
16use crate::object_client::{
17    ObjectClientResult, PutObjectError, PutObjectParams, PutObjectRequest, PutObjectResult, PutObjectSingleParams,
18};
19
20use super::{
21    emit_throughput_metric, ETag, PutObjectTrailingChecksums, S3CrtClient, S3Message, S3MetaRequest, S3Operation,
22    S3RequestError,
23};
24
25const ETAG_HEADER_NAME: &str = "ETag";
26const SSE_TYPE_HEADER_NAME: &str = "x-amz-server-side-encryption";
27const SSE_KEY_ID_HEADER_NAME: &str = "x-amz-server-side-encryption-aws-kms-key-id";
28
29impl S3CrtClient {
30    pub(super) async fn put_object(
31        &self,
32        bucket: &str,
33        key: &str,
34        params: &PutObjectParams,
35    ) -> ObjectClientResult<S3PutObjectRequest, PutObjectError, S3RequestError> {
36        let review_callback = ReviewCallbackBox::default();
37        let (on_headers, response_headers) = response_headers_handler();
38        let (tx, rx) = oneshot::channel::<ObjectClientResult<(), PutObjectError, S3RequestError>>();
39        // Before the first write, we need to await for the multi-part upload to be created, so we can report errors.
40        let (mpu_created_sender, mpu_created) = oneshot::channel();
41        let meta_request = {
42            let span = request_span!(self.inner, "put_object", bucket, key);
43            let mut message = self.new_put_request(
44                bucket,
45                key,
46                params.storage_class.as_deref(),
47                params.server_side_encryption.as_deref(),
48                params.ssekms_key_id.as_deref(),
49            )?;
50
51            let checksum_config = match params.trailing_checksums {
52                PutObjectTrailingChecksums::Enabled => Some(ChecksumConfig::trailing_crc32c()),
53                PutObjectTrailingChecksums::ReviewOnly => Some(ChecksumConfig::upload_review_crc32c()),
54                PutObjectTrailingChecksums::Disabled => None,
55            };
56            message.set_checksum_config(checksum_config);
57
58            for (name, value) in &params.object_metadata {
59                message
60                    .set_header(&Header::new(format!("x-amz-meta-{}", name), value))
61                    .map_err(S3RequestError::construction_failure)?
62            }
63            for (name, value) in &params.custom_headers {
64                message
65                    .inner
66                    .add_header(&Header::new(name, value))
67                    .map_err(S3RequestError::construction_failure)?;
68            }
69
70            let callback = review_callback.clone();
71
72            let mut options = message.into_options(S3Operation::PutObject);
73            options.send_using_async_writes(true);
74            options.on_upload_review(move |review| callback.invoke(review));
75            options.part_size(self.inner.write_part_size as u64);
76
77            let on_mpu_created_sender = Arc::new(Mutex::new(Some(mpu_created_sender)));
78            let on_failure_sender = on_mpu_created_sender.clone();
79            self.inner.meta_request_with_callbacks(
80                options,
81                span,
82                move |metrics| {
83                    if metrics.request_type() == RequestType::CreateMultipartUpload && !metrics.error().is_err() {
84                        if let Some(sender) = on_mpu_created_sender.lock().unwrap().take() {
85                            _ = sender.send(Ok(()));
86                        }
87                    }
88                },
89                on_headers,
90                |_, _| {},
91                |_| None,
92                move |result| {
93                    if let Some(sender) = on_failure_sender.lock().unwrap().take() {
94                        // If the MPU was not created, the request must have failed.
95                        _ = sender.send(result.and_then(|_| {
96                            Err(
97                                S3RequestError::internal_failure(S3PutObjectRequestError::CreateMultipartUploadFailed)
98                                    .into(),
99                            )
100                        }));
101                    } else {
102                        _ = tx.send(result);
103                    }
104                },
105            )?
106        };
107
108        // Wait for CreateMultipartUpload to complete, or return error.
109        mpu_created.await.unwrap()?;
110
111        let request = S3MetaRequest {
112            receiver: rx.fuse(),
113            meta_request,
114        };
115        Ok(S3PutObjectRequest {
116            request,
117            review_callback,
118            start_time: Instant::now(),
119            total_bytes: 0,
120            response_headers,
121            state: S3PutObjectRequestState::Idle,
122        })
123    }
124
125    pub(super) async fn put_object_single<'a>(
126        &self,
127        bucket: &str,
128        key: &str,
129        params: &PutObjectSingleParams,
130        contents: impl AsRef<[u8]> + Send + 'a,
131    ) -> ObjectClientResult<PutObjectResult, PutObjectError, S3RequestError> {
132        let span = request_span!(self.inner, "put_object_single", bucket, key);
133        let start_time = Instant::now();
134
135        let slice = contents.as_ref();
136        let content_length = slice.len();
137        let request = {
138            let mut message = self.new_put_request(
139                bucket,
140                key,
141                params.storage_class.as_deref(),
142                params.server_side_encryption.as_deref(),
143                params.ssekms_key_id.as_deref(),
144            )?;
145            message
146                .set_content_length_header(content_length)
147                .map_err(S3RequestError::construction_failure)?;
148            if let Some(checksum) = &params.checksum {
149                message
150                    .set_checksum_header(checksum)
151                    .map_err(S3RequestError::construction_failure)?;
152            }
153            if let Some(offset) = params.write_offset_bytes {
154                message
155                    .set_header(&Header::new("x-amz-write-offset-bytes", offset.to_string()))
156                    .map_err(S3RequestError::construction_failure)?;
157            }
158            if let Some(etag) = &params.if_match {
159                message
160                    .set_header(&Header::new("If-Match", etag.as_str()))
161                    .map_err(S3RequestError::construction_failure)?;
162            }
163            for (name, value) in &params.object_metadata {
164                message
165                    .set_header(&Header::new(format!("x-amz-meta-{}", name), value))
166                    .map_err(S3RequestError::construction_failure)?;
167            }
168            for (name, value) in &params.custom_headers {
169                message
170                    .inner
171                    .add_header(&Header::new(name, value))
172                    .map_err(S3RequestError::construction_failure)?;
173            }
174
175            let body_input_stream =
176                InputStream::new_from_slice(&self.inner.allocator, slice).map_err(S3RequestError::CrtError)?;
177            message.set_body_stream(Some(body_input_stream));
178
179            self.inner.meta_request_with_headers_payload(
180                message.into_options(S3Operation::PutObjectSingle),
181                span,
182                parse_put_object_single_error,
183            )?
184        };
185
186        let headers = request.await?;
187
188        let elapsed = start_time.elapsed();
189        emit_throughput_metric(content_length as u64, elapsed, "put_object_single");
190
191        Ok(extract_result(headers)?)
192    }
193
194    fn new_put_request(
195        &self,
196        bucket: &str,
197        key: &str,
198        storage_class: Option<&str>,
199        server_side_encryption: Option<&str>,
200        ssekms_key_id: Option<&str>,
201    ) -> Result<S3Message<'_>, S3RequestError> {
202        let mut message = self
203            .inner
204            .new_request_template("PUT", bucket)
205            .map_err(S3RequestError::construction_failure)?;
206
207        let key = format!("/{key}");
208        message
209            .set_request_path(&key)
210            .map_err(S3RequestError::construction_failure)?;
211
212        if let Some(storage_class) = storage_class {
213            message
214                .set_header(&Header::new("x-amz-storage-class", storage_class))
215                .map_err(S3RequestError::construction_failure)?;
216        }
217
218        if let Some(sse) = server_side_encryption {
219            message
220                .set_header(&Header::new(SSE_TYPE_HEADER_NAME, sse))
221                .map_err(S3RequestError::construction_failure)?;
222        }
223        if let Some(key_id) = ssekms_key_id {
224            message
225                .set_header(&Header::new(SSE_KEY_ID_HEADER_NAME, key_id))
226                .map_err(S3RequestError::construction_failure)?;
227        }
228
229        Ok(message)
230    }
231}
232
233type ReviewCallback = dyn FnOnce(UploadReview) -> bool + Send;
234
235/// Holder for the upload review callback.
236/// Used to set the callback when initiating the PutObject request on the CRT client,
237/// but redirects to the actual callback the user can specify at completion time.
238#[derive(Clone, Default)]
239struct ReviewCallbackBox {
240    callback: Arc<Mutex<Option<Box<ReviewCallback>>>>,
241}
242
243impl std::fmt::Debug for ReviewCallbackBox {
244    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
245        f.debug_struct("ReviewCallbackBox").finish()
246    }
247}
248
249impl ReviewCallbackBox {
250    fn set(&mut self, callback: impl FnOnce(UploadReview) -> bool + Send + 'static) {
251        let previous = self.callback.lock().unwrap().replace(Box::new(callback));
252        assert!(previous.is_none(), "review callback set twice");
253    }
254
255    fn invoke(self, review: UploadReview) -> bool {
256        let mut callback = self.callback.lock().unwrap();
257        let Some(callback) = callback.take() else {
258            error!("review callback was either never set or invoked twice");
259            return false;
260        };
261
262        (callback)(review)
263    }
264}
265
266/// An in-progress streaming PutObject request to S3.
267///
268/// You can write to or complete the upload using the [`PutObjectRequest`] implementation on this
269/// object.
270#[derive(Debug)]
271pub struct S3PutObjectRequest {
272    request: S3MetaRequest<(), PutObjectError>,
273    review_callback: ReviewCallbackBox,
274    start_time: Instant,
275    total_bytes: u64,
276    /// Future for the headers of the CompleteMultipartUpload response.
277    /// Guaranteed to be available after the request finishes successfully.
278    response_headers: Receiver<Headers>,
279    state: S3PutObjectRequestState,
280}
281
282/// Internal state for a [S3PutObjectRequest].
283#[derive(Debug)]
284enum S3PutObjectRequestState {
285    /// A write operation is in progress or was interrupted before completion.
286    PendingWrite,
287    /// Idle state between write calls.
288    Idle,
289}
290
291/// Internal errors for a [S3PutObjectRequest].
292#[derive(Debug, Error)]
293enum S3PutObjectRequestError {
294    #[error("A previous write operation did not complete successfully")]
295    PreviousWriteFailed,
296    #[error("The CreateMultiPartUpload request did not succeed")]
297    CreateMultipartUploadFailed,
298}
299
300fn get_etag(response_headers: &Headers) -> Result<ETag, HeadersError> {
301    Ok(response_headers.get_as_string(ETAG_HEADER_NAME)?.into())
302}
303
304fn parse_put_object_single_error(result: &MetaRequestResult) -> Option<PutObjectError> {
305    match result.response_status {
306        400 => {
307            let body = result.error_response_body.as_ref()?;
308            let root = xmltree::Element::parse(body.as_bytes()).ok()?;
309            let error_code = root.get_child("Code")?;
310            let error_str = error_code.get_text()?;
311            match error_str.deref() {
312                "InvalidWriteOffset" => Some(PutObjectError::InvalidWriteOffset),
313                "BadDigest" => Some(PutObjectError::BadChecksum),
314                "InvalidArgument" => {
315                    parse_if_error_message_starts_with("Request body cannot be empty", &root, PutObjectError::EmptyBody)
316                }
317                "InvalidRequest" => parse_if_error_message_starts_with(
318                    "Checksum Type mismatch",
319                    &root,
320                    PutObjectError::InvalidChecksumType,
321                ),
322                _ => None,
323            }
324        }
325        404 => {
326            let body = result.error_response_body.as_ref()?;
327            let root = xmltree::Element::parse(body.as_bytes()).ok()?;
328            let error_code = root.get_child("Code")?;
329            let error_str = error_code.get_text()?;
330            match error_str.deref() {
331                "NoSuchKey" => Some(PutObjectError::NoSuchKey),
332                _ => None,
333            }
334        }
335        412 => Some(PutObjectError::PreconditionFailed),
336        501 => Some(PutObjectError::NotImplemented),
337        _ => None,
338    }
339}
340
341fn parse_if_error_message_starts_with<E: std::error::Error>(prefix: &str, element: &Element, error: E) -> Option<E> {
342    let error_message = element.get_child("Message")?.get_text();
343    if let Some(error_message) = error_message {
344        if error_message.starts_with(prefix) {
345            return Some(error);
346        }
347    }
348    None
349}
350
351fn extract_result(response_headers: Headers) -> Result<PutObjectResult, S3RequestError> {
352    fn extract_result_headers_err(response_headers: Headers) -> Result<PutObjectResult, HeadersError> {
353        Ok(PutObjectResult {
354            etag: get_etag(&response_headers)?,
355            sse_type: response_headers.get_as_optional_string(SSE_TYPE_HEADER_NAME)?,
356            sse_kms_key_id: response_headers.get_as_optional_string(SSE_KEY_ID_HEADER_NAME)?,
357        })
358    }
359    extract_result_headers_err(response_headers).map_err(|e| S3RequestError::InternalError(Box::new(e)))
360}
361
362/// Creates `on_headers` callback that will send the response headers to the matching `Receiver`.
363fn response_headers_handler() -> (impl FnMut(&Headers, i32), Receiver<Headers>) {
364    let (response_headers_sender, response_headers) = oneshot::channel();
365    // The callback signature (`FnMut`) allows for it to be invoked multiple times,
366    // but for PUT requests it will only be called once on CompleteMultipartUpload.
367    // Wrapping the `oneshot::Sender` in an `Option` allows it to be consumed
368    // on the first (and only!) invocation.
369    let mut response_headers_sender = Some(response_headers_sender);
370    let on_headers = move |headers: &Headers, _: i32| {
371        if let Some(sender) = response_headers_sender.take() {
372            let _ = sender.send(headers.clone());
373        }
374    };
375    (on_headers, response_headers)
376}
377
378#[cfg_attr(not(docsrs), async_trait)]
379impl PutObjectRequest for S3PutObjectRequest {
380    type ClientError = S3RequestError;
381
382    async fn write(&mut self, slice: &[u8]) -> ObjectClientResult<(), PutObjectError, Self::ClientError> {
383        // Writing to the meta request may require multiple calls. Set the internal
384        // state to `PendingWrite` until we are done.
385        match std::mem::replace(&mut self.state, S3PutObjectRequestState::PendingWrite) {
386            S3PutObjectRequestState::PendingWrite => {
387                // Fail if a previous write was not completed.
388                return Err(S3RequestError::internal_failure(S3PutObjectRequestError::PreviousWriteFailed).into());
389            }
390            S3PutObjectRequestState::Idle => {}
391        }
392
393        let meta_request = &mut self.request.meta_request;
394        let mut slice = slice;
395        while !slice.is_empty() {
396            // Write will fail if the request has already finished (because of an error).
397            let remaining = meta_request
398                .write(slice, false)
399                .await
400                .map_err(S3RequestError::CrtError)?;
401            self.total_bytes += (slice.len() - remaining.len()) as u64;
402            slice = remaining;
403        }
404        // Write completed with no errors, we can reset to `Idle`.
405        self.state = S3PutObjectRequestState::Idle;
406        Ok(())
407    }
408
409    async fn complete(self) -> ObjectClientResult<PutObjectResult, PutObjectError, Self::ClientError> {
410        self.review_and_complete(|_| true).await
411    }
412
413    async fn review_and_complete(
414        mut self,
415        review_callback: impl FnOnce(UploadReview) -> bool + Send + 'static,
416    ) -> ObjectClientResult<PutObjectResult, PutObjectError, Self::ClientError> {
417        if matches!(self.state, S3PutObjectRequestState::PendingWrite) {
418            // Fail if a previous write was not completed.
419            return Err(S3RequestError::internal_failure(S3PutObjectRequestError::PreviousWriteFailed).into());
420        }
421
422        self.review_callback.set(review_callback);
423
424        // Write will fail if the request has already finished (because of an error).
425        _ = self
426            .request
427            .meta_request
428            .write(&[], true)
429            .await
430            .map_err(S3RequestError::CrtError)?;
431
432        // Now wait for the request to finish.
433        self.request.await?;
434
435        let elapsed = self.start_time.elapsed();
436        emit_throughput_metric(self.total_bytes, elapsed, "put_object");
437
438        Ok(extract_result(self.response_headers.await.expect(
439            "headers should be available since the request completed successfully",
440        ))?)
441    }
442}
443
444impl S3PutObjectRequest {
445    /// The number of bytes written to this request so far.
446    // TODO: consider exposing on the `PutObjectRequest` trait.
447    pub fn bytes_written(&self) -> u64 {
448        self.total_bytes
449    }
450}