1use std::ops::Deref as _;
2use std::os::unix::ffi::OsStrExt as _;
3use std::sync::{Arc, Mutex};
4use std::time::Instant;
5
6use async_trait::async_trait;
7use futures::FutureExt;
8use futures::channel::oneshot::{self, Receiver};
9use mountpoint_s3_crt::http::request_response::{Header, Headers, HeadersError};
10use mountpoint_s3_crt::io::stream::InputStream;
11use mountpoint_s3_crt::s3::client::{ChecksumConfig, MetaRequestResult, RequestType, UploadReview};
12use thiserror::Error;
13use tracing::error;
14use xmltree::Element;
15
16use crate::object_client::{
17 ObjectClientResult, PutObjectError, PutObjectParams, PutObjectRequest, PutObjectResult, PutObjectSingleParams,
18};
19
20use super::{
21 ETag, PutObjectTrailingChecksums, S3CrtClient, S3Message, S3MetaRequest, S3Operation, S3RequestError,
22 emit_throughput_metric,
23};
24
25const ETAG_HEADER_NAME: &str = "ETag";
26const SSE_TYPE_HEADER_NAME: &str = "x-amz-server-side-encryption";
27const SSE_KEY_ID_HEADER_NAME: &str = "x-amz-server-side-encryption-aws-kms-key-id";
28
29impl S3CrtClient {
30 pub(super) async fn put_object(
31 &self,
32 bucket: &str,
33 key: &str,
34 params: &PutObjectParams,
35 ) -> ObjectClientResult<S3PutObjectRequest, PutObjectError, S3RequestError> {
36 let review_callback = ReviewCallbackBox::default();
37 let (on_headers, response_headers) = response_headers_handler();
38 let (tx, rx) = oneshot::channel::<ObjectClientResult<(), PutObjectError, S3RequestError>>();
39 let (mpu_created_sender, mpu_created) = oneshot::channel();
41 let meta_request = {
42 let span = request_span!(self.inner, "put_object", bucket, key);
43 let mut message = self.new_put_request(
44 bucket,
45 key,
46 params.storage_class.as_deref(),
47 params.server_side_encryption.as_deref(),
48 params.ssekms_key_id.as_deref(),
49 )?;
50
51 let checksum_config = match params.trailing_checksums {
52 PutObjectTrailingChecksums::Enabled => Some(ChecksumConfig::trailing_crc32c()),
53 PutObjectTrailingChecksums::ReviewOnly => Some(ChecksumConfig::upload_review_crc32c()),
54 PutObjectTrailingChecksums::Disabled => None,
55 };
56 message.set_checksum_config(checksum_config);
57
58 for (name, value) in ¶ms.object_metadata {
59 message
60 .set_header(&Header::new(format!("x-amz-meta-{name}"), value))
61 .map_err(S3RequestError::construction_failure)?
62 }
63 for (name, value) in ¶ms.custom_headers {
64 message
65 .inner
66 .add_header(&Header::new(name, value))
67 .map_err(S3RequestError::construction_failure)?;
68 }
69
70 let callback = review_callback.clone();
71
72 let mut options = message.into_options(S3Operation::PutObject);
73 options.send_using_async_writes(true);
74 options.on_upload_review(move |review| callback.invoke(review));
75 options.part_size(self.inner.write_part_size as u64);
76 if let Some(id) = params.custom_id {
77 options.custom_id(id);
78 }
79
80 let on_mpu_created_sender = Arc::new(Mutex::new(Some(mpu_created_sender)));
81 let on_failure_sender = on_mpu_created_sender.clone();
82 self.inner.meta_request_with_callbacks(
83 options,
84 span,
85 move |metrics| {
86 if metrics.request_type() == RequestType::CreateMultipartUpload && !metrics.error().is_err() {
87 if let Some(sender) = on_mpu_created_sender.lock().unwrap().take() {
89 _ = sender.send(Ok(()));
90 }
91 }
92 },
93 on_headers,
94 |_, _| {},
95 |_| None,
96 move |result| {
97 if let Some(sender) = on_failure_sender.lock().unwrap().take() {
98 _ = sender.send(result.and_then(|_| {
100 Err(
101 S3RequestError::internal_failure(S3PutObjectRequestError::CreateMultipartUploadFailed)
102 .into(),
103 )
104 }));
105 } else {
106 _ = tx.send(result);
107 }
108 },
109 )?
110 };
111
112 mpu_created.await.unwrap()?;
114
115 let request = S3MetaRequest {
116 receiver: rx.fuse(),
117 meta_request,
118 };
119 Ok(S3PutObjectRequest {
120 request,
121 review_callback,
122 start_time: Instant::now(),
123 total_bytes: 0,
124 response_headers,
125 state: S3PutObjectRequestState::Idle,
126 })
127 }
128
129 pub(super) async fn put_object_single<'a>(
130 &self,
131 bucket: &str,
132 key: &str,
133 params: &PutObjectSingleParams,
134 contents: impl AsRef<[u8]> + Send + 'a,
135 ) -> ObjectClientResult<PutObjectResult, PutObjectError, S3RequestError> {
136 let span = request_span!(self.inner, "put_object_single", bucket, key);
137 let start_time = Instant::now();
138
139 let slice = contents.as_ref();
140 let content_length = slice.len();
141 let request = {
142 let mut message = self.new_put_request(
143 bucket,
144 key,
145 params.storage_class.as_deref(),
146 params.server_side_encryption.as_deref(),
147 params.ssekms_key_id.as_deref(),
148 )?;
149 message
150 .set_content_length_header(content_length)
151 .map_err(S3RequestError::construction_failure)?;
152 if let Some(checksum) = ¶ms.checksum {
153 message
154 .set_checksum_header(checksum)
155 .map_err(S3RequestError::construction_failure)?;
156 }
157 if let Some(offset) = params.write_offset_bytes {
158 message
159 .set_header(&Header::new("x-amz-write-offset-bytes", offset.to_string()))
160 .map_err(S3RequestError::construction_failure)?;
161 }
162 if let Some(etag) = ¶ms.if_match {
163 message
164 .set_header(&Header::new("If-Match", etag.as_str()))
165 .map_err(S3RequestError::construction_failure)?;
166 }
167 for (name, value) in ¶ms.object_metadata {
168 message
169 .set_header(&Header::new(format!("x-amz-meta-{name}"), value))
170 .map_err(S3RequestError::construction_failure)?;
171 }
172 for (name, value) in ¶ms.custom_headers {
173 message
174 .inner
175 .add_header(&Header::new(name, value))
176 .map_err(S3RequestError::construction_failure)?;
177 }
178
179 let body_input_stream =
180 InputStream::new_from_slice(&self.inner.allocator, slice).map_err(S3RequestError::CrtError)?;
181 message.set_body_stream(Some(body_input_stream));
182
183 self.inner.meta_request_with_headers_payload(
184 message.into_options(S3Operation::PutObjectSingle),
185 span,
186 parse_put_object_single_error,
187 )?
188 };
189
190 let headers = request.await?;
191
192 let elapsed = start_time.elapsed();
193 emit_throughput_metric(content_length as u64, elapsed, "put_object_single");
194
195 Ok(extract_result(headers)?)
196 }
197
198 fn new_put_request(
199 &self,
200 bucket: &str,
201 key: &str,
202 storage_class: Option<&str>,
203 server_side_encryption: Option<&str>,
204 ssekms_key_id: Option<&str>,
205 ) -> Result<S3Message<'_>, S3RequestError> {
206 let mut message = self
207 .inner
208 .new_request_template("PUT", bucket)
209 .map_err(S3RequestError::construction_failure)?;
210
211 let key = format!("/{key}");
212 message
213 .set_request_path(&key)
214 .map_err(S3RequestError::construction_failure)?;
215
216 if let Some(storage_class) = storage_class {
217 message
218 .set_header(&Header::new("x-amz-storage-class", storage_class))
219 .map_err(S3RequestError::construction_failure)?;
220 }
221
222 if let Some(sse) = server_side_encryption {
223 message
224 .set_header(&Header::new(SSE_TYPE_HEADER_NAME, sse))
225 .map_err(S3RequestError::construction_failure)?;
226 }
227 if let Some(key_id) = ssekms_key_id {
228 message
229 .set_header(&Header::new(SSE_KEY_ID_HEADER_NAME, key_id))
230 .map_err(S3RequestError::construction_failure)?;
231 }
232
233 Ok(message)
234 }
235}
236
237type ReviewCallback = dyn FnOnce(UploadReview) -> bool + Send;
238
239#[derive(Clone, Default)]
243struct ReviewCallbackBox {
244 callback: Arc<Mutex<Option<Box<ReviewCallback>>>>,
245}
246
247impl std::fmt::Debug for ReviewCallbackBox {
248 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
249 f.debug_struct("ReviewCallbackBox").finish_non_exhaustive()
250 }
251}
252
253impl ReviewCallbackBox {
254 fn set(&mut self, callback: impl FnOnce(UploadReview) -> bool + Send + 'static) {
255 let previous = self.callback.lock().unwrap().replace(Box::new(callback));
256 assert!(previous.is_none(), "review callback set twice");
257 }
258
259 fn invoke(self, review: UploadReview) -> bool {
260 let mut callback = self.callback.lock().unwrap();
261 let Some(callback) = callback.take() else {
262 error!("review callback was either never set or invoked twice");
263 return false;
264 };
265
266 (callback)(review)
267 }
268}
269
270#[derive(Debug)]
275pub struct S3PutObjectRequest {
276 request: S3MetaRequest<(), PutObjectError>,
277 review_callback: ReviewCallbackBox,
278 start_time: Instant,
279 total_bytes: u64,
280 response_headers: Receiver<Headers>,
283 state: S3PutObjectRequestState,
284}
285
286#[derive(Debug)]
288enum S3PutObjectRequestState {
289 PendingWrite,
291 Idle,
293}
294
295#[derive(Debug, Error)]
297enum S3PutObjectRequestError {
298 #[error("A previous write operation did not complete successfully")]
299 PreviousWriteFailed,
300 #[error("The CreateMultiPartUpload request did not succeed")]
301 CreateMultipartUploadFailed,
302}
303
304fn get_etag(response_headers: &Headers) -> Result<ETag, HeadersError> {
305 Ok(response_headers.get_as_string(ETAG_HEADER_NAME)?.into())
306}
307
308fn parse_put_object_single_error(result: &MetaRequestResult) -> Option<PutObjectError> {
309 match result.response_status {
310 400 => {
311 let body = result.error_response_body.as_ref()?;
312 let root = xmltree::Element::parse(body.as_bytes()).ok()?;
313 let error_code = root.get_child("Code")?;
314 let error_str = error_code.get_text()?;
315 match error_str.deref() {
316 "InvalidWriteOffset" => Some(PutObjectError::InvalidWriteOffset),
317 "BadDigest" => Some(PutObjectError::BadChecksum),
318 "InvalidArgument" => {
319 parse_if_error_message_starts_with("Request body cannot be empty", &root, PutObjectError::EmptyBody)
320 }
321 "InvalidRequest" => parse_if_error_message_starts_with(
322 "Checksum Type mismatch",
323 &root,
324 PutObjectError::InvalidChecksumType,
325 ),
326 _ => None,
327 }
328 }
329 404 => {
330 let body = result.error_response_body.as_ref()?;
331 let root = xmltree::Element::parse(body.as_bytes()).ok()?;
332 let error_code = root.get_child("Code")?;
333 let error_str = error_code.get_text()?;
334 match error_str.deref() {
335 "NoSuchKey" => Some(PutObjectError::NoSuchKey),
336 _ => None,
337 }
338 }
339 412 => Some(PutObjectError::PreconditionFailed),
340 501 => Some(PutObjectError::NotImplemented),
341 _ => None,
342 }
343}
344
345fn parse_if_error_message_starts_with<E: std::error::Error>(prefix: &str, element: &Element, error: E) -> Option<E> {
346 let error_message = element.get_child("Message")?.get_text();
347 if let Some(error_message) = error_message
348 && error_message.starts_with(prefix)
349 {
350 return Some(error);
351 }
352 None
353}
354
355fn extract_result(response_headers: Headers) -> Result<PutObjectResult, S3RequestError> {
356 fn extract_result_headers_err(response_headers: Headers) -> Result<PutObjectResult, HeadersError> {
357 Ok(PutObjectResult {
358 etag: get_etag(&response_headers)?,
359 sse_type: response_headers.get_as_optional_string(SSE_TYPE_HEADER_NAME)?,
360 sse_kms_key_id: response_headers.get_as_optional_string(SSE_KEY_ID_HEADER_NAME)?,
361 })
362 }
363 extract_result_headers_err(response_headers).map_err(|e| S3RequestError::InternalError(Box::new(e)))
364}
365
366fn response_headers_handler() -> (impl FnMut(&Headers, i32), Receiver<Headers>) {
368 let (response_headers_sender, response_headers) = oneshot::channel();
369 let mut response_headers_sender = Some(response_headers_sender);
374 let on_headers = move |headers: &Headers, _: i32| {
375 if let Some(sender) = response_headers_sender.take() {
376 let _ = sender.send(headers.clone());
377 }
378 };
379 (on_headers, response_headers)
380}
381
382#[cfg_attr(not(docsrs), async_trait)]
383impl PutObjectRequest for S3PutObjectRequest {
384 type ClientError = S3RequestError;
385
386 async fn write(&mut self, slice: &[u8]) -> ObjectClientResult<(), PutObjectError, Self::ClientError> {
387 match std::mem::replace(&mut self.state, S3PutObjectRequestState::PendingWrite) {
390 S3PutObjectRequestState::PendingWrite => {
391 return Err(S3RequestError::internal_failure(S3PutObjectRequestError::PreviousWriteFailed).into());
393 }
394 S3PutObjectRequestState::Idle => {}
395 }
396
397 let meta_request = &mut self.request.meta_request;
398 let mut slice = slice;
399 while !slice.is_empty() {
400 let remaining = meta_request
402 .write(slice, false)
403 .await
404 .map_err(S3RequestError::CrtError)?;
405 self.total_bytes += (slice.len() - remaining.len()) as u64;
406 slice = remaining;
407 }
408 self.state = S3PutObjectRequestState::Idle;
410 Ok(())
411 }
412
413 async fn complete(self) -> ObjectClientResult<PutObjectResult, PutObjectError, Self::ClientError> {
414 self.review_and_complete(|_| true).await
415 }
416
417 async fn review_and_complete(
418 mut self,
419 review_callback: impl FnOnce(UploadReview) -> bool + Send + 'static,
420 ) -> ObjectClientResult<PutObjectResult, PutObjectError, Self::ClientError> {
421 if matches!(self.state, S3PutObjectRequestState::PendingWrite) {
422 return Err(S3RequestError::internal_failure(S3PutObjectRequestError::PreviousWriteFailed).into());
424 }
425
426 self.review_callback.set(review_callback);
427
428 _ = self
430 .request
431 .meta_request
432 .write(&[], true)
433 .await
434 .map_err(S3RequestError::CrtError)?;
435
436 self.request.await?;
438
439 let elapsed = self.start_time.elapsed();
440 emit_throughput_metric(self.total_bytes, elapsed, "put_object");
441
442 Ok(extract_result(self.response_headers.await.expect(
443 "headers should be available since the request completed successfully",
444 ))?)
445 }
446}
447
448impl S3PutObjectRequest {
449 pub fn bytes_written(&self) -> u64 {
452 self.total_bytes
453 }
454}