1use std::ops::Deref as _;
2use std::os::unix::ffi::OsStrExt as _;
3use std::sync::{Arc, Mutex};
4use std::time::Instant;
5
6use async_trait::async_trait;
7use futures::channel::oneshot::{self, Receiver};
8use futures::FutureExt;
9use mountpoint_s3_crt::http::request_response::{Header, Headers, HeadersError};
10use mountpoint_s3_crt::io::stream::InputStream;
11use mountpoint_s3_crt::s3::client::{ChecksumConfig, MetaRequestResult, RequestType, UploadReview};
12use thiserror::Error;
13use tracing::error;
14use xmltree::Element;
15
16use crate::object_client::{
17 ObjectClientResult, PutObjectError, PutObjectParams, PutObjectRequest, PutObjectResult, PutObjectSingleParams,
18};
19
20use super::{
21 emit_throughput_metric, ETag, PutObjectTrailingChecksums, S3CrtClient, S3Message, S3MetaRequest, S3Operation,
22 S3RequestError,
23};
24
25const ETAG_HEADER_NAME: &str = "ETag";
26const SSE_TYPE_HEADER_NAME: &str = "x-amz-server-side-encryption";
27const SSE_KEY_ID_HEADER_NAME: &str = "x-amz-server-side-encryption-aws-kms-key-id";
28
29impl S3CrtClient {
30 pub(super) async fn put_object(
31 &self,
32 bucket: &str,
33 key: &str,
34 params: &PutObjectParams,
35 ) -> ObjectClientResult<S3PutObjectRequest, PutObjectError, S3RequestError> {
36 let review_callback = ReviewCallbackBox::default();
37 let (on_headers, response_headers) = response_headers_handler();
38 let (tx, rx) = oneshot::channel::<ObjectClientResult<(), PutObjectError, S3RequestError>>();
39 let (mpu_created_sender, mpu_created) = oneshot::channel();
41 let meta_request = {
42 let span = request_span!(self.inner, "put_object", bucket, key);
43 let mut message = self.new_put_request(
44 bucket,
45 key,
46 params.storage_class.as_deref(),
47 params.server_side_encryption.as_deref(),
48 params.ssekms_key_id.as_deref(),
49 )?;
50
51 let checksum_config = match params.trailing_checksums {
52 PutObjectTrailingChecksums::Enabled => Some(ChecksumConfig::trailing_crc32c()),
53 PutObjectTrailingChecksums::ReviewOnly => Some(ChecksumConfig::upload_review_crc32c()),
54 PutObjectTrailingChecksums::Disabled => None,
55 };
56 message.set_checksum_config(checksum_config);
57
58 for (name, value) in ¶ms.object_metadata {
59 message
60 .set_header(&Header::new(format!("x-amz-meta-{}", name), value))
61 .map_err(S3RequestError::construction_failure)?
62 }
63 for (name, value) in ¶ms.custom_headers {
64 message
65 .inner
66 .add_header(&Header::new(name, value))
67 .map_err(S3RequestError::construction_failure)?;
68 }
69
70 let callback = review_callback.clone();
71
72 let mut options = message.into_options(S3Operation::PutObject);
73 options.send_using_async_writes(true);
74 options.on_upload_review(move |review| callback.invoke(review));
75 options.part_size(self.inner.write_part_size as u64);
76
77 let on_mpu_created_sender = Arc::new(Mutex::new(Some(mpu_created_sender)));
78 let on_failure_sender = on_mpu_created_sender.clone();
79 self.inner.meta_request_with_callbacks(
80 options,
81 span,
82 move |metrics| {
83 if metrics.request_type() == RequestType::CreateMultipartUpload && !metrics.error().is_err() {
84 if let Some(sender) = on_mpu_created_sender.lock().unwrap().take() {
85 _ = sender.send(Ok(()));
86 }
87 }
88 },
89 on_headers,
90 |_, _| {},
91 |_| None,
92 move |result| {
93 if let Some(sender) = on_failure_sender.lock().unwrap().take() {
94 _ = sender.send(result.and_then(|_| {
96 Err(
97 S3RequestError::internal_failure(S3PutObjectRequestError::CreateMultipartUploadFailed)
98 .into(),
99 )
100 }));
101 } else {
102 _ = tx.send(result);
103 }
104 },
105 )?
106 };
107
108 mpu_created.await.unwrap()?;
110
111 let request = S3MetaRequest {
112 receiver: rx.fuse(),
113 meta_request,
114 };
115 Ok(S3PutObjectRequest {
116 request,
117 review_callback,
118 start_time: Instant::now(),
119 total_bytes: 0,
120 response_headers,
121 state: S3PutObjectRequestState::Idle,
122 })
123 }
124
125 pub(super) async fn put_object_single<'a>(
126 &self,
127 bucket: &str,
128 key: &str,
129 params: &PutObjectSingleParams,
130 contents: impl AsRef<[u8]> + Send + 'a,
131 ) -> ObjectClientResult<PutObjectResult, PutObjectError, S3RequestError> {
132 let span = request_span!(self.inner, "put_object_single", bucket, key);
133 let start_time = Instant::now();
134
135 let slice = contents.as_ref();
136 let content_length = slice.len();
137 let request = {
138 let mut message = self.new_put_request(
139 bucket,
140 key,
141 params.storage_class.as_deref(),
142 params.server_side_encryption.as_deref(),
143 params.ssekms_key_id.as_deref(),
144 )?;
145 message
146 .set_content_length_header(content_length)
147 .map_err(S3RequestError::construction_failure)?;
148 if let Some(checksum) = ¶ms.checksum {
149 message
150 .set_checksum_header(checksum)
151 .map_err(S3RequestError::construction_failure)?;
152 }
153 if let Some(offset) = params.write_offset_bytes {
154 message
155 .set_header(&Header::new("x-amz-write-offset-bytes", offset.to_string()))
156 .map_err(S3RequestError::construction_failure)?;
157 }
158 if let Some(etag) = ¶ms.if_match {
159 message
160 .set_header(&Header::new("If-Match", etag.as_str()))
161 .map_err(S3RequestError::construction_failure)?;
162 }
163 for (name, value) in ¶ms.object_metadata {
164 message
165 .set_header(&Header::new(format!("x-amz-meta-{}", name), value))
166 .map_err(S3RequestError::construction_failure)?;
167 }
168 for (name, value) in ¶ms.custom_headers {
169 message
170 .inner
171 .add_header(&Header::new(name, value))
172 .map_err(S3RequestError::construction_failure)?;
173 }
174
175 let body_input_stream =
176 InputStream::new_from_slice(&self.inner.allocator, slice).map_err(S3RequestError::CrtError)?;
177 message.set_body_stream(Some(body_input_stream));
178
179 self.inner.meta_request_with_headers_payload(
180 message.into_options(S3Operation::PutObjectSingle),
181 span,
182 parse_put_object_single_error,
183 )?
184 };
185
186 let headers = request.await?;
187
188 let elapsed = start_time.elapsed();
189 emit_throughput_metric(content_length as u64, elapsed, "put_object_single");
190
191 Ok(extract_result(headers)?)
192 }
193
194 fn new_put_request(
195 &self,
196 bucket: &str,
197 key: &str,
198 storage_class: Option<&str>,
199 server_side_encryption: Option<&str>,
200 ssekms_key_id: Option<&str>,
201 ) -> Result<S3Message<'_>, S3RequestError> {
202 let mut message = self
203 .inner
204 .new_request_template("PUT", bucket)
205 .map_err(S3RequestError::construction_failure)?;
206
207 let key = format!("/{key}");
208 message
209 .set_request_path(&key)
210 .map_err(S3RequestError::construction_failure)?;
211
212 if let Some(storage_class) = storage_class {
213 message
214 .set_header(&Header::new("x-amz-storage-class", storage_class))
215 .map_err(S3RequestError::construction_failure)?;
216 }
217
218 if let Some(sse) = server_side_encryption {
219 message
220 .set_header(&Header::new(SSE_TYPE_HEADER_NAME, sse))
221 .map_err(S3RequestError::construction_failure)?;
222 }
223 if let Some(key_id) = ssekms_key_id {
224 message
225 .set_header(&Header::new(SSE_KEY_ID_HEADER_NAME, key_id))
226 .map_err(S3RequestError::construction_failure)?;
227 }
228
229 Ok(message)
230 }
231}
232
233type ReviewCallback = dyn FnOnce(UploadReview) -> bool + Send;
234
235#[derive(Clone, Default)]
239struct ReviewCallbackBox {
240 callback: Arc<Mutex<Option<Box<ReviewCallback>>>>,
241}
242
243impl std::fmt::Debug for ReviewCallbackBox {
244 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
245 f.debug_struct("ReviewCallbackBox").finish()
246 }
247}
248
249impl ReviewCallbackBox {
250 fn set(&mut self, callback: impl FnOnce(UploadReview) -> bool + Send + 'static) {
251 let previous = self.callback.lock().unwrap().replace(Box::new(callback));
252 assert!(previous.is_none(), "review callback set twice");
253 }
254
255 fn invoke(self, review: UploadReview) -> bool {
256 let mut callback = self.callback.lock().unwrap();
257 let Some(callback) = callback.take() else {
258 error!("review callback was either never set or invoked twice");
259 return false;
260 };
261
262 (callback)(review)
263 }
264}
265
266#[derive(Debug)]
271pub struct S3PutObjectRequest {
272 request: S3MetaRequest<(), PutObjectError>,
273 review_callback: ReviewCallbackBox,
274 start_time: Instant,
275 total_bytes: u64,
276 response_headers: Receiver<Headers>,
279 state: S3PutObjectRequestState,
280}
281
282#[derive(Debug)]
284enum S3PutObjectRequestState {
285 PendingWrite,
287 Idle,
289}
290
291#[derive(Debug, Error)]
293enum S3PutObjectRequestError {
294 #[error("A previous write operation did not complete successfully")]
295 PreviousWriteFailed,
296 #[error("The CreateMultiPartUpload request did not succeed")]
297 CreateMultipartUploadFailed,
298}
299
300fn get_etag(response_headers: &Headers) -> Result<ETag, HeadersError> {
301 Ok(response_headers.get_as_string(ETAG_HEADER_NAME)?.into())
302}
303
304fn parse_put_object_single_error(result: &MetaRequestResult) -> Option<PutObjectError> {
305 match result.response_status {
306 400 => {
307 let body = result.error_response_body.as_ref()?;
308 let root = xmltree::Element::parse(body.as_bytes()).ok()?;
309 let error_code = root.get_child("Code")?;
310 let error_str = error_code.get_text()?;
311 match error_str.deref() {
312 "InvalidWriteOffset" => Some(PutObjectError::InvalidWriteOffset),
313 "BadDigest" => Some(PutObjectError::BadChecksum),
314 "InvalidArgument" => {
315 parse_if_error_message_starts_with("Request body cannot be empty", &root, PutObjectError::EmptyBody)
316 }
317 "InvalidRequest" => parse_if_error_message_starts_with(
318 "Checksum Type mismatch",
319 &root,
320 PutObjectError::InvalidChecksumType,
321 ),
322 _ => None,
323 }
324 }
325 404 => {
326 let body = result.error_response_body.as_ref()?;
327 let root = xmltree::Element::parse(body.as_bytes()).ok()?;
328 let error_code = root.get_child("Code")?;
329 let error_str = error_code.get_text()?;
330 match error_str.deref() {
331 "NoSuchKey" => Some(PutObjectError::NoSuchKey),
332 _ => None,
333 }
334 }
335 412 => Some(PutObjectError::PreconditionFailed),
336 501 => Some(PutObjectError::NotImplemented),
337 _ => None,
338 }
339}
340
341fn parse_if_error_message_starts_with<E: std::error::Error>(prefix: &str, element: &Element, error: E) -> Option<E> {
342 let error_message = element.get_child("Message")?.get_text();
343 if let Some(error_message) = error_message {
344 if error_message.starts_with(prefix) {
345 return Some(error);
346 }
347 }
348 None
349}
350
351fn extract_result(response_headers: Headers) -> Result<PutObjectResult, S3RequestError> {
352 fn extract_result_headers_err(response_headers: Headers) -> Result<PutObjectResult, HeadersError> {
353 Ok(PutObjectResult {
354 etag: get_etag(&response_headers)?,
355 sse_type: response_headers.get_as_optional_string(SSE_TYPE_HEADER_NAME)?,
356 sse_kms_key_id: response_headers.get_as_optional_string(SSE_KEY_ID_HEADER_NAME)?,
357 })
358 }
359 extract_result_headers_err(response_headers).map_err(|e| S3RequestError::InternalError(Box::new(e)))
360}
361
362fn response_headers_handler() -> (impl FnMut(&Headers, i32), Receiver<Headers>) {
364 let (response_headers_sender, response_headers) = oneshot::channel();
365 let mut response_headers_sender = Some(response_headers_sender);
370 let on_headers = move |headers: &Headers, _: i32| {
371 if let Some(sender) = response_headers_sender.take() {
372 let _ = sender.send(headers.clone());
373 }
374 };
375 (on_headers, response_headers)
376}
377
378#[cfg_attr(not(docsrs), async_trait)]
379impl PutObjectRequest for S3PutObjectRequest {
380 type ClientError = S3RequestError;
381
382 async fn write(&mut self, slice: &[u8]) -> ObjectClientResult<(), PutObjectError, Self::ClientError> {
383 match std::mem::replace(&mut self.state, S3PutObjectRequestState::PendingWrite) {
386 S3PutObjectRequestState::PendingWrite => {
387 return Err(S3RequestError::internal_failure(S3PutObjectRequestError::PreviousWriteFailed).into());
389 }
390 S3PutObjectRequestState::Idle => {}
391 }
392
393 let meta_request = &mut self.request.meta_request;
394 let mut slice = slice;
395 while !slice.is_empty() {
396 let remaining = meta_request
398 .write(slice, false)
399 .await
400 .map_err(S3RequestError::CrtError)?;
401 self.total_bytes += (slice.len() - remaining.len()) as u64;
402 slice = remaining;
403 }
404 self.state = S3PutObjectRequestState::Idle;
406 Ok(())
407 }
408
409 async fn complete(self) -> ObjectClientResult<PutObjectResult, PutObjectError, Self::ClientError> {
410 self.review_and_complete(|_| true).await
411 }
412
413 async fn review_and_complete(
414 mut self,
415 review_callback: impl FnOnce(UploadReview) -> bool + Send + 'static,
416 ) -> ObjectClientResult<PutObjectResult, PutObjectError, Self::ClientError> {
417 if matches!(self.state, S3PutObjectRequestState::PendingWrite) {
418 return Err(S3RequestError::internal_failure(S3PutObjectRequestError::PreviousWriteFailed).into());
420 }
421
422 self.review_callback.set(review_callback);
423
424 _ = self
426 .request
427 .meta_request
428 .write(&[], true)
429 .await
430 .map_err(S3RequestError::CrtError)?;
431
432 self.request.await?;
434
435 let elapsed = self.start_time.elapsed();
436 emit_throughput_metric(self.total_bytes, elapsed, "put_object");
437
438 Ok(extract_result(self.response_headers.await.expect(
439 "headers should be available since the request completed successfully",
440 ))?)
441 }
442}
443
444impl S3PutObjectRequest {
445 pub fn bytes_written(&self) -> u64 {
448 self.total_bytes
449 }
450}