1use std::ops::Deref as _;
2use std::os::unix::ffi::OsStrExt as _;
3use std::sync::{Arc, Mutex};
4use std::time::Instant;
5
6use async_trait::async_trait;
7use futures::FutureExt;
8use futures::channel::oneshot::{self, Receiver};
9use mountpoint_s3_crt::http::request_response::{Header, Headers, HeadersError};
10use mountpoint_s3_crt::io::stream::InputStream;
11use mountpoint_s3_crt::s3::client::{ChecksumConfig, MetaRequestResult, RequestType, UploadReview};
12use thiserror::Error;
13use tracing::error;
14use xmltree::Element;
15
16use crate::object_client::{
17 ObjectClientResult, PutObjectError, PutObjectParams, PutObjectRequest, PutObjectResult, PutObjectSingleParams,
18};
19
20use super::{
21 ETag, PutObjectTrailingChecksums, S3CrtClient, S3Message, S3MetaRequest, S3Operation, S3RequestError,
22 emit_throughput_metric,
23};
24
25const ETAG_HEADER_NAME: &str = "ETag";
26const SSE_TYPE_HEADER_NAME: &str = "x-amz-server-side-encryption";
27const SSE_KEY_ID_HEADER_NAME: &str = "x-amz-server-side-encryption-aws-kms-key-id";
28
29impl S3CrtClient {
30 pub(super) async fn put_object(
31 &self,
32 bucket: &str,
33 key: &str,
34 params: &PutObjectParams,
35 ) -> ObjectClientResult<S3PutObjectRequest, PutObjectError, S3RequestError> {
36 let review_callback = ReviewCallbackBox::default();
37 let (on_headers, response_headers) = response_headers_handler();
38 let (tx, rx) = oneshot::channel::<ObjectClientResult<(), PutObjectError, S3RequestError>>();
39 let (mpu_created_sender, mpu_created) = oneshot::channel();
41 let meta_request = {
42 let span = request_span!(self.inner, "put_object", bucket, key);
43 let mut message = self.new_put_request(
44 bucket,
45 key,
46 params.storage_class.as_deref(),
47 params.server_side_encryption.as_deref(),
48 params.ssekms_key_id.as_deref(),
49 )?;
50
51 let checksum_config = match params.trailing_checksums {
52 PutObjectTrailingChecksums::Enabled => Some(ChecksumConfig::trailing_crc32c()),
53 PutObjectTrailingChecksums::ReviewOnly => Some(ChecksumConfig::upload_review_crc32c()),
54 PutObjectTrailingChecksums::Disabled => None,
55 };
56 message.set_checksum_config(checksum_config);
57
58 for (name, value) in ¶ms.object_metadata {
59 message
60 .set_header(&Header::new(format!("x-amz-meta-{name}"), value))
61 .map_err(S3RequestError::construction_failure)?
62 }
63 for (name, value) in ¶ms.custom_headers {
64 message
65 .inner
66 .add_header(&Header::new(name, value))
67 .map_err(S3RequestError::construction_failure)?;
68 }
69
70 let callback = review_callback.clone();
71
72 let mut options = message.into_options(S3Operation::PutObject);
73 options.send_using_async_writes(true);
74 options.on_upload_review(move |review| callback.invoke(review));
75 options.part_size(self.inner.write_part_size as u64);
76
77 let on_mpu_created_sender = Arc::new(Mutex::new(Some(mpu_created_sender)));
78 let on_failure_sender = on_mpu_created_sender.clone();
79 self.inner.meta_request_with_callbacks(
80 options,
81 span,
82 move |metrics| {
83 if metrics.request_type() == RequestType::CreateMultipartUpload && !metrics.error().is_err() {
84 if let Some(sender) = on_mpu_created_sender.lock().unwrap().take() {
86 _ = sender.send(Ok(()));
87 }
88 }
89 },
90 on_headers,
91 |_, _| {},
92 |_| None,
93 move |result| {
94 if let Some(sender) = on_failure_sender.lock().unwrap().take() {
95 _ = sender.send(result.and_then(|_| {
97 Err(
98 S3RequestError::internal_failure(S3PutObjectRequestError::CreateMultipartUploadFailed)
99 .into(),
100 )
101 }));
102 } else {
103 _ = tx.send(result);
104 }
105 },
106 )?
107 };
108
109 mpu_created.await.unwrap()?;
111
112 let request = S3MetaRequest {
113 receiver: rx.fuse(),
114 meta_request,
115 };
116 Ok(S3PutObjectRequest {
117 request,
118 review_callback,
119 start_time: Instant::now(),
120 total_bytes: 0,
121 response_headers,
122 state: S3PutObjectRequestState::Idle,
123 })
124 }
125
126 pub(super) async fn put_object_single<'a>(
127 &self,
128 bucket: &str,
129 key: &str,
130 params: &PutObjectSingleParams,
131 contents: impl AsRef<[u8]> + Send + 'a,
132 ) -> ObjectClientResult<PutObjectResult, PutObjectError, S3RequestError> {
133 let span = request_span!(self.inner, "put_object_single", bucket, key);
134 let start_time = Instant::now();
135
136 let slice = contents.as_ref();
137 let content_length = slice.len();
138 let request = {
139 let mut message = self.new_put_request(
140 bucket,
141 key,
142 params.storage_class.as_deref(),
143 params.server_side_encryption.as_deref(),
144 params.ssekms_key_id.as_deref(),
145 )?;
146 message
147 .set_content_length_header(content_length)
148 .map_err(S3RequestError::construction_failure)?;
149 if let Some(checksum) = ¶ms.checksum {
150 message
151 .set_checksum_header(checksum)
152 .map_err(S3RequestError::construction_failure)?;
153 }
154 if let Some(offset) = params.write_offset_bytes {
155 message
156 .set_header(&Header::new("x-amz-write-offset-bytes", offset.to_string()))
157 .map_err(S3RequestError::construction_failure)?;
158 }
159 if let Some(etag) = ¶ms.if_match {
160 message
161 .set_header(&Header::new("If-Match", etag.as_str()))
162 .map_err(S3RequestError::construction_failure)?;
163 }
164 for (name, value) in ¶ms.object_metadata {
165 message
166 .set_header(&Header::new(format!("x-amz-meta-{name}"), value))
167 .map_err(S3RequestError::construction_failure)?;
168 }
169 for (name, value) in ¶ms.custom_headers {
170 message
171 .inner
172 .add_header(&Header::new(name, value))
173 .map_err(S3RequestError::construction_failure)?;
174 }
175
176 let body_input_stream =
177 InputStream::new_from_slice(&self.inner.allocator, slice).map_err(S3RequestError::CrtError)?;
178 message.set_body_stream(Some(body_input_stream));
179
180 self.inner.meta_request_with_headers_payload(
181 message.into_options(S3Operation::PutObjectSingle),
182 span,
183 parse_put_object_single_error,
184 )?
185 };
186
187 let headers = request.await?;
188
189 let elapsed = start_time.elapsed();
190 emit_throughput_metric(content_length as u64, elapsed, "put_object_single");
191
192 Ok(extract_result(headers)?)
193 }
194
195 fn new_put_request(
196 &self,
197 bucket: &str,
198 key: &str,
199 storage_class: Option<&str>,
200 server_side_encryption: Option<&str>,
201 ssekms_key_id: Option<&str>,
202 ) -> Result<S3Message<'_>, S3RequestError> {
203 let mut message = self
204 .inner
205 .new_request_template("PUT", bucket)
206 .map_err(S3RequestError::construction_failure)?;
207
208 let key = format!("/{key}");
209 message
210 .set_request_path(&key)
211 .map_err(S3RequestError::construction_failure)?;
212
213 if let Some(storage_class) = storage_class {
214 message
215 .set_header(&Header::new("x-amz-storage-class", storage_class))
216 .map_err(S3RequestError::construction_failure)?;
217 }
218
219 if let Some(sse) = server_side_encryption {
220 message
221 .set_header(&Header::new(SSE_TYPE_HEADER_NAME, sse))
222 .map_err(S3RequestError::construction_failure)?;
223 }
224 if let Some(key_id) = ssekms_key_id {
225 message
226 .set_header(&Header::new(SSE_KEY_ID_HEADER_NAME, key_id))
227 .map_err(S3RequestError::construction_failure)?;
228 }
229
230 Ok(message)
231 }
232}
233
234type ReviewCallback = dyn FnOnce(UploadReview) -> bool + Send;
235
236#[derive(Clone, Default)]
240struct ReviewCallbackBox {
241 callback: Arc<Mutex<Option<Box<ReviewCallback>>>>,
242}
243
244impl std::fmt::Debug for ReviewCallbackBox {
245 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
246 f.debug_struct("ReviewCallbackBox").finish_non_exhaustive()
247 }
248}
249
250impl ReviewCallbackBox {
251 fn set(&mut self, callback: impl FnOnce(UploadReview) -> bool + Send + 'static) {
252 let previous = self.callback.lock().unwrap().replace(Box::new(callback));
253 assert!(previous.is_none(), "review callback set twice");
254 }
255
256 fn invoke(self, review: UploadReview) -> bool {
257 let mut callback = self.callback.lock().unwrap();
258 let Some(callback) = callback.take() else {
259 error!("review callback was either never set or invoked twice");
260 return false;
261 };
262
263 (callback)(review)
264 }
265}
266
267#[derive(Debug)]
272pub struct S3PutObjectRequest {
273 request: S3MetaRequest<(), PutObjectError>,
274 review_callback: ReviewCallbackBox,
275 start_time: Instant,
276 total_bytes: u64,
277 response_headers: Receiver<Headers>,
280 state: S3PutObjectRequestState,
281}
282
283#[derive(Debug)]
285enum S3PutObjectRequestState {
286 PendingWrite,
288 Idle,
290}
291
292#[derive(Debug, Error)]
294enum S3PutObjectRequestError {
295 #[error("A previous write operation did not complete successfully")]
296 PreviousWriteFailed,
297 #[error("The CreateMultiPartUpload request did not succeed")]
298 CreateMultipartUploadFailed,
299}
300
301fn get_etag(response_headers: &Headers) -> Result<ETag, HeadersError> {
302 Ok(response_headers.get_as_string(ETAG_HEADER_NAME)?.into())
303}
304
305fn parse_put_object_single_error(result: &MetaRequestResult) -> Option<PutObjectError> {
306 match result.response_status {
307 400 => {
308 let body = result.error_response_body.as_ref()?;
309 let root = xmltree::Element::parse(body.as_bytes()).ok()?;
310 let error_code = root.get_child("Code")?;
311 let error_str = error_code.get_text()?;
312 match error_str.deref() {
313 "InvalidWriteOffset" => Some(PutObjectError::InvalidWriteOffset),
314 "BadDigest" => Some(PutObjectError::BadChecksum),
315 "InvalidArgument" => {
316 parse_if_error_message_starts_with("Request body cannot be empty", &root, PutObjectError::EmptyBody)
317 }
318 "InvalidRequest" => parse_if_error_message_starts_with(
319 "Checksum Type mismatch",
320 &root,
321 PutObjectError::InvalidChecksumType,
322 ),
323 _ => None,
324 }
325 }
326 404 => {
327 let body = result.error_response_body.as_ref()?;
328 let root = xmltree::Element::parse(body.as_bytes()).ok()?;
329 let error_code = root.get_child("Code")?;
330 let error_str = error_code.get_text()?;
331 match error_str.deref() {
332 "NoSuchKey" => Some(PutObjectError::NoSuchKey),
333 _ => None,
334 }
335 }
336 412 => Some(PutObjectError::PreconditionFailed),
337 501 => Some(PutObjectError::NotImplemented),
338 _ => None,
339 }
340}
341
342fn parse_if_error_message_starts_with<E: std::error::Error>(prefix: &str, element: &Element, error: E) -> Option<E> {
343 let error_message = element.get_child("Message")?.get_text();
344 if let Some(error_message) = error_message
345 && error_message.starts_with(prefix)
346 {
347 return Some(error);
348 }
349 None
350}
351
352fn extract_result(response_headers: Headers) -> Result<PutObjectResult, S3RequestError> {
353 fn extract_result_headers_err(response_headers: Headers) -> Result<PutObjectResult, HeadersError> {
354 Ok(PutObjectResult {
355 etag: get_etag(&response_headers)?,
356 sse_type: response_headers.get_as_optional_string(SSE_TYPE_HEADER_NAME)?,
357 sse_kms_key_id: response_headers.get_as_optional_string(SSE_KEY_ID_HEADER_NAME)?,
358 })
359 }
360 extract_result_headers_err(response_headers).map_err(|e| S3RequestError::InternalError(Box::new(e)))
361}
362
363fn response_headers_handler() -> (impl FnMut(&Headers, i32), Receiver<Headers>) {
365 let (response_headers_sender, response_headers) = oneshot::channel();
366 let mut response_headers_sender = Some(response_headers_sender);
371 let on_headers = move |headers: &Headers, _: i32| {
372 if let Some(sender) = response_headers_sender.take() {
373 let _ = sender.send(headers.clone());
374 }
375 };
376 (on_headers, response_headers)
377}
378
379#[cfg_attr(not(docsrs), async_trait)]
380impl PutObjectRequest for S3PutObjectRequest {
381 type ClientError = S3RequestError;
382
383 async fn write(&mut self, slice: &[u8]) -> ObjectClientResult<(), PutObjectError, Self::ClientError> {
384 match std::mem::replace(&mut self.state, S3PutObjectRequestState::PendingWrite) {
387 S3PutObjectRequestState::PendingWrite => {
388 return Err(S3RequestError::internal_failure(S3PutObjectRequestError::PreviousWriteFailed).into());
390 }
391 S3PutObjectRequestState::Idle => {}
392 }
393
394 let meta_request = &mut self.request.meta_request;
395 let mut slice = slice;
396 while !slice.is_empty() {
397 let remaining = meta_request
399 .write(slice, false)
400 .await
401 .map_err(S3RequestError::CrtError)?;
402 self.total_bytes += (slice.len() - remaining.len()) as u64;
403 slice = remaining;
404 }
405 self.state = S3PutObjectRequestState::Idle;
407 Ok(())
408 }
409
410 async fn complete(self) -> ObjectClientResult<PutObjectResult, PutObjectError, Self::ClientError> {
411 self.review_and_complete(|_| true).await
412 }
413
414 async fn review_and_complete(
415 mut self,
416 review_callback: impl FnOnce(UploadReview) -> bool + Send + 'static,
417 ) -> ObjectClientResult<PutObjectResult, PutObjectError, Self::ClientError> {
418 if matches!(self.state, S3PutObjectRequestState::PendingWrite) {
419 return Err(S3RequestError::internal_failure(S3PutObjectRequestError::PreviousWriteFailed).into());
421 }
422
423 self.review_callback.set(review_callback);
424
425 _ = self
427 .request
428 .meta_request
429 .write(&[], true)
430 .await
431 .map_err(S3RequestError::CrtError)?;
432
433 self.request.await?;
435
436 let elapsed = self.start_time.elapsed();
437 emit_throughput_metric(self.total_bytes, elapsed, "put_object");
438
439 Ok(extract_result(self.response_headers.await.expect(
440 "headers should be available since the request completed successfully",
441 ))?)
442 }
443}
444
445impl S3PutObjectRequest {
446 pub fn bytes_written(&self) -> u64 {
449 self.total_bytes
450 }
451}