Skip to main content

mountpoint_s3_crt/s3/
client.rs

1//! A client for high-throughput access to Amazon S3
2
3use crate::auth::credentials::CredentialsProvider;
4use crate::auth::signing_config::{SigningAlgorithm, SigningConfig, SigningConfigInner};
5use crate::common::allocator::Allocator;
6use crate::common::error::Error;
7use crate::common::thread::ThreadId;
8use crate::common::uri::Uri;
9use crate::http::request_response::{Headers, Message};
10use crate::io::channel_bootstrap::ClientBootstrap;
11use crate::io::retry_strategy::RetryStrategy;
12use crate::{CrtError, ToAwsByteCursor, aws_byte_cursor_as_slice};
13use futures::Future;
14use mountpoint_s3_crt_sys::*;
15
16use std::ffi::{OsStr, OsString};
17use std::fmt::{Debug, Display};
18use std::marker::PhantomPinned;
19use std::mem::MaybeUninit;
20use std::os::unix::prelude::OsStrExt;
21use std::pin::Pin;
22use std::ptr::NonNull;
23use std::sync::{Arc, Mutex};
24use std::task::Waker;
25use std::time::Duration;
26
27use super::buffer::Buffer;
28use super::pool::CrtBufferPoolFactory;
29use super::s3_library_init;
30
31/// A client for high-throughput access to Amazon S3
32#[derive(Debug)]
33pub struct Client {
34    /// A pointer to the underlying `aws_s3_client`
35    inner: NonNull<aws_s3_client>,
36
37    /// Hold on to an owned copy of the configuration so that it doesn't get dropped while the
38    /// client still exists. This is because the client config holds ownership of some strings
39    /// (like the region) that could still be used while the client exists.
40    config: ClientConfig,
41}
42
43// SAFETY: We assume that the CRT allows making requests using the same client from multiple threads.
44unsafe impl Send for Client {}
45// SAFETY: We assume that the CRT allows making requests using the same client from multiple threads.
46unsafe impl Sync for Client {}
47
48/// Options for creating a new [Client]. Follows the builder pattern.
49#[derive(Debug, Default)]
50pub struct ClientConfig {
51    /// The struct we can pass into the CRT's functions.
52    inner: aws_s3_client_config,
53
54    /// The [ClientBootstrap] to use to create connections to S3
55    client_bootstrap: Option<ClientBootstrap>,
56
57    /// The [RetryStrategy] to use to reschedule failed requests to S3. This is reference counted,
58    /// so we only need to hold onto it until this [ClientConfig] is consumed, at which point the
59    /// client will take ownership.
60    retry_strategy: Option<RetryStrategy>,
61
62    /// The default signing config for the CRT client.
63    signing_config: Option<SigningConfig>,
64
65    /// The region
66    region: Option<String>,
67
68    /// List of network interfaces to be used by the client.
69    ///
70    /// The client will determine the logic for balancing requests over the network interfaces
71    /// according to its own logic.
72    network_interface_names: Option<NetworkInterfaceNames>,
73
74    /// Holds the custom pool implementation factory if set.
75    pool_factory: Option<CrtBufferPoolFactory>,
76}
77
78/// This struct bundles together the list of owned strings for the network interfaces, and the
79/// cursors pointing to them.
80///
81/// This allows [ClientConfig] to keep the strings backing the cursors allocated until at least
82/// the time of [Client::new] (where the content of the cursors is copied),
83/// and deallocate when [ClientConfig] is dropped.
84#[derive(Debug)]
85struct NetworkInterfaceNames {
86    /// The list of network interface names.
87    ///
88    /// We use a boxed array of strings, as we have no need for a mutable list like [Vec].
89    /// The [String] entries must never be mutated, the cursors point to their underlying memory.
90    _names: Box<[String]>,
91
92    /// List of owned cursors. This will be pointed at by [ClientConfig]'s inner [aws_s3_client_config].
93    cursors: Box<[aws_byte_cursor]>,
94}
95
96impl NetworkInterfaceNames {
97    /// Create a new [NetworkInterfaceNamesInner].
98    pub fn new(names: Vec<String>) -> Self {
99        // Turn this into a read-only representation of the vector of strings.
100        let names = names.into_boxed_slice();
101
102        let cursors = names
103            .iter()
104            .map(|name| {
105                // SAFETY: The names are stored alongside the cursors in NetworkInterfaceNamesInner.
106                //         The lifetime of NetworkInterfaceNamesInner will always meet the lifetime of the cursors.
107                //         We never mutate the String backing these byte cursors once created.
108                unsafe { name.as_aws_byte_cursor() }
109            })
110            .collect::<Vec<_>>()
111            .into_boxed_slice();
112
113        Self { _names: names, cursors }
114    }
115
116    /// Immutable slice of interface names as [aws_byte_cursor].
117    pub fn aws_byte_cursors(&self) -> &[aws_byte_cursor] {
118        self.cursors.as_ref()
119    }
120}
121
122impl ClientConfig {
123    /// Create a new [ClientConfig] with default options.
124    pub fn new() -> Self {
125        Self::default()
126    }
127
128    /// Client bootstrap used for common staples such as event loop group, host resolver, etc.
129    pub fn client_bootstrap(&mut self, client_bootstrap: ClientBootstrap) -> &mut Self {
130        self.inner.client_bootstrap = client_bootstrap.inner.as_ptr();
131        self.client_bootstrap = Some(client_bootstrap);
132        self
133    }
134
135    /// Region
136    pub fn region(&mut self, region: &str) -> &mut Self {
137        self.region = Some(region.to_owned());
138        // SAFETY: `self.inner.region` is not mutated further and lives as long as the `ClientConfig`, which outlives the client
139        self.inner.region = unsafe { self.region.as_ref().unwrap().as_aws_byte_cursor() };
140        self
141    }
142
143    /// Replace list of network interface names to be used by S3 clients created with this config.
144    pub fn network_interface_names(&mut self, network_interface_names: Vec<String>) -> &mut Self {
145        let network_interface_names = self
146            .network_interface_names
147            .insert(NetworkInterfaceNames::new(network_interface_names));
148
149        // The cursor array will always outlive the inner config.
150        self.inner.network_interface_names_array = network_interface_names.aws_byte_cursors().as_ptr();
151        self.inner.num_network_interface_names = network_interface_names.aws_byte_cursors().len();
152
153        self
154    }
155
156    /// Retry strategy used to reschedule failed requests
157    pub fn retry_strategy(&mut self, retry_strategy: RetryStrategy) -> &mut Self {
158        self.inner.retry_strategy = retry_strategy.inner.as_ptr();
159        self.retry_strategy = Some(retry_strategy);
160        self
161    }
162
163    /// Default signing config for the requests.
164    pub fn signing_config(&mut self, signing_config: SigningConfig) -> &mut Self {
165        self.inner.signing_config = signing_config.to_inner_ptr() as *mut aws_signing_config_aws;
166        self.signing_config = Some(signing_config);
167        self
168    }
169
170    /// Enable S3 Express One Zone
171    pub fn express_support(&mut self, express_support: bool) -> &mut Self {
172        self.inner.enable_s3express = express_support;
173        self
174    }
175
176    /// Enable backpressure read
177    pub fn read_backpressure(&mut self, read_backpressure: bool) -> &mut Self {
178        self.inner.enable_read_backpressure = read_backpressure;
179        self
180    }
181
182    /// Initial read window size for backpressure read feature
183    pub fn initial_read_window(&mut self, initial_read_window: usize) -> &mut Self {
184        self.inner.initial_read_window = initial_read_window;
185        self
186    }
187
188    /// Size in bytes of parts the files will be downloaded or uploaded in.
189    ///
190    /// The AWS CRT client may adjust this value per-request where possible
191    /// to address service limits (such as the max number of parts).
192    pub fn part_size(&mut self, part_size: usize) -> &mut Self {
193        self.inner.part_size = part_size as u64;
194        self
195    }
196
197    /// If the part size needs to be adjusted for service limits, this is the maximum size it will be adjusted to.
198    pub fn max_part_size(&mut self, max_part_size: usize) -> &mut Self {
199        self.inner.max_part_size = max_part_size as u64;
200        self
201    }
202
203    /// Throughput target in Gbps that we are trying to reach.
204    pub fn throughput_target_gbps(&mut self, throughput_target_gbps: f64) -> &mut Self {
205        self.inner.throughput_target_gbps = throughput_target_gbps;
206        self
207    }
208
209    /// Memory limit in bytes the client can use.
210    ///
211    /// When not set, the client will determine this value based on throughput_target_gbps.
212    pub fn memory_limit_in_bytes(&mut self, memory_limit_in_bytes: u64) -> &mut Self {
213        self.inner.memory_limit_in_bytes = memory_limit_in_bytes;
214        self
215    }
216
217    /// Custom buffer pool factory.
218    ///
219    /// When not set, the client will use the default pool with the configured memory limit.
220    pub fn buffer_pool_factory(&mut self, pool_factory: CrtBufferPoolFactory) -> &mut Self {
221        // SAFETY: `pool_factory` is stored in `self.pool_factory` below so that it
222        // remains alive until the client is initialized.
223        let (factory_fn, user_data) = unsafe { pool_factory.as_inner() };
224        self.inner.buffer_pool_factory_fn = factory_fn;
225        self.inner.buffer_pool_user_data = user_data;
226        self.pool_factory = Some(pool_factory);
227        self
228    }
229
230    /// When set, this will cap the number of active connections. Otherwise, the client will
231    /// determine this value based on throughput_target_gbps. (Recommended)
232    pub fn max_active_connections_override(&mut self, max_active_connections_override: u32) -> &mut Self {
233        self.inner.max_active_connections_override = max_active_connections_override;
234        self
235    }
236}
237
238/// Callback for telemetry received as part of a successful meta request.
239type TelemetryCallback = Box<dyn Fn(&RequestMetrics) + Send>;
240
241/// Callback for when headers are received as part of a successful HTTP request. Given (headers, response_status).
242type HeadersCallback = Box<dyn FnMut(&Headers, i32) + Send>;
243
244/// Callback for when part of the response body is received. Given (range_start, data).
245type BodyExCallback = Box<dyn FnMut(u64, &Buffer) + Send>;
246
247/// Callback for reviewing an upload before it completes.
248type UploadReviewCallback = Box<dyn FnOnce(UploadReview) -> bool + Send>;
249
250/// Callback for when the request is finished. Given (error_code, optional_error_body).
251type FinishCallback = Box<dyn FnOnce(MetaRequestResult) + Send>;
252
253/// Options for meta requests to S3. This is not a public interface, since clients should always
254/// be using the [MetaRequestOptions] wrapper, which pins this struct behind a pointer.
255struct MetaRequestOptionsInner<'a> {
256    /// Inner struct to pass to CRT functions.
257    inner: aws_s3_meta_request_options,
258
259    /// Owned copy of the message, if provided.
260    message: Option<Message<'a>>,
261
262    /// Owned copy of the endpoint URI, if provided
263    endpoint: Option<Uri>,
264
265    /// Owned signing config, if provided.
266    signing_config: Option<SigningConfig>,
267
268    /// Owned checksum config, if provided.
269    checksum_config: Option<ChecksumConfig>,
270
271    /// Owned source uri for copy request, if provided.
272    copy_source_uri: Option<String>,
273
274    /// Telemetry callback, if provided
275    on_telemetry: Option<TelemetryCallback>,
276
277    /// Headers callback, if provided.
278    on_headers: Option<HeadersCallback>,
279
280    /// Body callback, if provided.
281    on_body_ex: Option<BodyExCallback>,
282
283    /// Upload review callback, if provided (and not already called, since it's FnOnce).
284    on_upload_review: Option<UploadReviewCallback>,
285
286    /// Finish callback, if provided (and not already called, since it's FnOnce).
287    on_finish: Option<FinishCallback>,
288
289    /// Pin this struct because inner.user_data will be a pointer to this object.
290    _pinned: PhantomPinned,
291}
292
293impl<'a> MetaRequestOptionsInner<'a> {
294    /// Convert from user_data in a callback to a reference to this struct.
295    ///
296    /// ## Safety
297    ///
298    /// Don't use except in a MetaRequest callback. The lifetime 'a of the returned
299    /// [MetaRequestOptionsInner] is unconstrained, so the caller must make sure that the lifetime
300    /// of the returned reference does not outlive the [MetaRequestOptionsInner].
301    unsafe fn from_user_data_ptr(user_data: *mut libc::c_void) -> &'a mut Self {
302        // SAFETY: `user_data` is initialized in `MetaRequestOptions::new`.
303        unsafe { (user_data as *mut Self).as_mut().unwrap() }
304    }
305
306    /// Convert from user_data in a callback to an owned Box holding this struct, so it will be
307    /// freed when dropped.
308    ///
309    /// ## Safety
310    ///
311    /// Don't use except in the shutdown callback, once we are certain not to be called back again.
312    unsafe fn from_user_data_ptr_owned(user_data: *mut libc::c_void) -> Box<Self> {
313        // SAFETY: `user_data` is leaked in `MetaRequestOptions::new`.
314        unsafe { Box::from_raw(user_data as *mut Self) }
315    }
316}
317
318impl Debug for MetaRequestOptionsInner<'_> {
319    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
320        f.debug_struct("MetaRequestOptionsInner")
321            .field("inner", &self.inner)
322            .field("message", &self.message)
323            .field("signing_config", &self.signing_config)
324            .finish_non_exhaustive()
325    }
326}
327
328/// Options for a meta request to S3.
329// Implementation details: this wraps the inner struct in a pinned box to enforce we don't move out of it.
330#[derive(Debug)]
331pub struct MetaRequestOptions<'a>(Pin<Box<MetaRequestOptionsInner<'a>>>);
332
333impl<'a> MetaRequestOptions<'a> {
334    /// Create a new default options struct. It follows the builder pattern so clients can use
335    /// methods to set various options.
336    pub fn new() -> Self {
337        // Create the default options, binding the callbacks to our predefined callback shims.
338        // Set user_data to null first, since we need to create the Box first to find out what
339        // the address of the inner struct is.
340        let options = Box::new(MetaRequestOptionsInner {
341            inner: aws_s3_meta_request_options {
342                telemetry_callback: Some(meta_request_telemetry_callback),
343                headers_callback: Some(meta_request_headers_callback),
344                body_callback_ex: Some(meta_request_receive_body_callback_ex),
345                finish_callback: Some(meta_request_finish_callback),
346                shutdown_callback: Some(meta_request_shutdown_callback),
347                upload_review_callback: Some(meta_request_upload_review_callback),
348                user_data: std::ptr::null_mut(), // Set to null until the Box is made.
349                ..Default::default()
350            },
351            message: None,
352            endpoint: None,
353            signing_config: None,
354            checksum_config: None,
355            copy_source_uri: None,
356            on_telemetry: None,
357            on_headers: None,
358            on_body_ex: None,
359            on_upload_review: None,
360            on_finish: None,
361            _pinned: Default::default(),
362        });
363
364        // Pin the options in-place. This is because it's about to become self-referential.
365        let mut options = Box::into_pin(options);
366
367        // Now set the user_data to a self-referential pointer to the options struct.
368        // SAFETY: We're setting up the struct to be self-referential, and we're not moving out
369        // of the struct, so the unchecked deref of the pinned pointer is okay.
370        unsafe {
371            let options = Pin::get_unchecked_mut(Pin::as_mut(&mut options));
372            options.inner.user_data = options as *mut MetaRequestOptionsInner as *mut libc::c_void;
373        }
374
375        Self(options)
376    }
377
378    /// Set the S3 operation name of the request.
379    pub fn operation_name(&mut self, operation_name: &'static str) -> &mut Self {
380        // SAFETY: we aren't moving out of the struct.
381        let options = unsafe { Pin::get_unchecked_mut(Pin::as_mut(&mut self.0)) };
382        // SAFETY: `operation_name` has a static lifetime.
383        options.inner.operation_name = unsafe { operation_name.as_aws_byte_cursor() };
384        self
385    }
386
387    /// Set the message of the request.
388    pub fn message(&mut self, message: Message<'a>) -> &mut Self {
389        // SAFETY: we aren't moving out of the struct.
390        let options = unsafe { Pin::get_unchecked_mut(Pin::as_mut(&mut self.0)) };
391        options.message = Some(message);
392        options.inner.message = options.message.as_mut().unwrap().inner.as_ptr();
393        self
394    }
395
396    /// Set the endpoint of the request. If set, the host portion of the endpoint URI must match the
397    /// "Host" header in the `message`.
398    pub fn endpoint(&mut self, endpoint: Uri) -> &mut Self {
399        // SAFETY: we aren't moving out of the struct.
400        let options = unsafe { Pin::get_unchecked_mut(Pin::as_mut(&mut self.0)) };
401        options.endpoint = Some(endpoint);
402        options.inner.endpoint = options.endpoint.as_mut().unwrap().to_inner_ptr() as *mut aws_uri;
403        self
404    }
405
406    /// Get the endpoint of the request
407    pub fn get_endpoint(&self) -> Option<Uri> {
408        self.0.as_ref().endpoint.clone()
409    }
410
411    /// Set the checksum config used for this message.
412    pub fn checksum_config(&mut self, checksum_config: ChecksumConfig) -> &mut Self {
413        // SAFETY: we aren't moving out of the struct.
414        let options = unsafe { Pin::get_unchecked_mut(Pin::as_mut(&mut self.0)) };
415        options.checksum_config = Some(checksum_config);
416        options.inner.checksum_config =
417            options.checksum_config.as_mut().unwrap().to_inner_ptr() as *mut aws_s3_checksum_config;
418        self
419    }
420
421    /// Set the signing config used for this message. Not public because we copy it from the client
422    /// when making a request.
423    pub fn signing_config(&mut self, signing_config: SigningConfig) -> &mut Self {
424        // SAFETY: we aren't moving out of the struct.
425        let options = unsafe { Pin::get_unchecked_mut(Pin::as_mut(&mut self.0)) };
426        options.signing_config = Some(signing_config);
427        options.inner.signing_config =
428            options.signing_config.as_mut().unwrap().to_inner_ptr() as *mut aws_signing_config_aws;
429        self
430    }
431
432    /// Provide a callback to run when telemetry for individual requests made by this meta request
433    /// arrives. The callback is invoked once for each request made, after the request completes
434    /// (including failures).
435    pub fn on_telemetry(&mut self, callback: impl Fn(&RequestMetrics) + Send + 'static) -> &mut Self {
436        // SAFETY: we aren't moving out of the struct.
437        let options = unsafe { Pin::get_unchecked_mut(Pin::as_mut(&mut self.0)) };
438        options.on_telemetry = Some(Box::new(callback));
439        self
440    }
441
442    /// Provide a callback to run when the request's headers arrive. Given (headers, response_status)
443    pub fn on_headers(&mut self, callback: impl FnMut(&Headers, i32) + Send + 'static) -> &mut Self {
444        // SAFETY: we aren't moving out of the struct.
445        let options = unsafe { Pin::get_unchecked_mut(Pin::as_mut(&mut self.0)) };
446        options.on_headers = Some(Box::new(callback));
447        self
448    }
449
450    /// Provide a callback to run when the request's body arrives.
451    pub fn on_body(&mut self, callback: impl FnMut(u64, &Buffer) + Send + 'static) -> &mut Self {
452        // SAFETY: we aren't moving out of the struct.
453        let options = unsafe { Pin::get_unchecked_mut(Pin::as_mut(&mut self.0)) };
454        options.on_body_ex = Some(Box::new(callback));
455        self
456    }
457
458    /// Provide a callback to run when the upload request is ready to complete.
459    pub fn on_upload_review(&mut self, callback: impl FnOnce(UploadReview) -> bool + Send + 'static) -> &mut Self {
460        // SAFETY: we aren't moving out of the struct.
461        let options = unsafe { Pin::get_unchecked_mut(Pin::as_mut(&mut self.0)) };
462        options.on_upload_review = Some(Box::new(callback));
463        self
464    }
465
466    /// Provide a callback to run when the meta request completes.
467    pub fn on_finish(&mut self, callback: impl FnOnce(MetaRequestResult) + Send + 'static) -> &mut Self {
468        // SAFETY: we aren't moving out of the struct.
469        let options = unsafe { Pin::get_unchecked_mut(Pin::as_mut(&mut self.0)) };
470        options.on_finish = Some(Box::new(callback));
471        self
472    }
473
474    /// Set the type of this request
475    pub fn request_type(&mut self, request_type: MetaRequestType) -> &mut Self {
476        // SAFETY: we aren't moving out of the struct.
477        let options = unsafe { Pin::get_unchecked_mut(Pin::as_mut(&mut self.0)) };
478        options.inner.type_ = request_type.into();
479        self
480    }
481
482    /// Set the part size of this request
483    pub fn part_size(&mut self, part_size: u64) -> &mut Self {
484        // SAFETY: we aren't moving out of the struct.
485        let options = unsafe { Pin::get_unchecked_mut(Pin::as_mut(&mut self.0)) };
486        options.inner.part_size = part_size;
487        self
488    }
489
490    /// Set this to send request body data using the async [MetaRequest::write] function.
491    /// This only works with [MetaRequestType::PutObject].
492    pub fn send_using_async_writes(&mut self, send_using_async_writes: bool) -> &mut Self {
493        // SAFETY: we aren't moving out of the struct.
494        let options = unsafe { Pin::get_unchecked_mut(Pin::as_mut(&mut self.0)) };
495        options.inner.send_using_async_writes = send_using_async_writes;
496        self
497    }
498
499    /// Set the URI of source bucket/key for COPY request only
500    pub fn copy_source_uri(&mut self, source_uri: String) -> &mut Self {
501        // SAFETY: we aren't moving out of the struct.
502        let options = unsafe { Pin::get_unchecked_mut(Pin::as_mut(&mut self.0)) };
503        options.copy_source_uri = Some(source_uri);
504        // SAFETY: We ensure that the cursor points to data that lives
505        // as long as the options struct
506        options.inner.copy_source_uri = unsafe { options.copy_source_uri.as_mut().unwrap().as_aws_byte_cursor() };
507        self
508    }
509}
510
511impl Default for MetaRequestOptions<'_> {
512    fn default() -> Self {
513        Self::new()
514    }
515}
516
517/// What transformation to apply to a single [MetaRequest] to transform it into a collection of
518/// requests to S3.
519#[derive(Debug)]
520pub enum MetaRequestType {
521    /// Send the request as-is (no transformation)
522    Default,
523    /// Split the GetObject request into a series of ranged requests executed in parallel
524    GetObject,
525    /// Split the PutObject request into multi-part uploads executed in parallel
526    PutObject,
527    /// Perform a multi-part copy using multiple UploadPartCopy requests executed in parallel
528    CopyObject,
529}
530
531impl From<MetaRequestType> for aws_s3_meta_request_type {
532    fn from(typ: MetaRequestType) -> Self {
533        match typ {
534            MetaRequestType::Default => aws_s3_meta_request_type::AWS_S3_META_REQUEST_TYPE_DEFAULT,
535            MetaRequestType::GetObject => aws_s3_meta_request_type::AWS_S3_META_REQUEST_TYPE_GET_OBJECT,
536            MetaRequestType::PutObject => aws_s3_meta_request_type::AWS_S3_META_REQUEST_TYPE_PUT_OBJECT,
537            MetaRequestType::CopyObject => aws_s3_meta_request_type::AWS_S3_META_REQUEST_TYPE_COPY_OBJECT,
538        }
539    }
540}
541
542impl From<aws_s3_meta_request_type> for MetaRequestType {
543    fn from(value: aws_s3_meta_request_type) -> Self {
544        match value {
545            aws_s3_meta_request_type::AWS_S3_META_REQUEST_TYPE_DEFAULT => MetaRequestType::Default,
546            aws_s3_meta_request_type::AWS_S3_META_REQUEST_TYPE_COPY_OBJECT => MetaRequestType::CopyObject,
547            aws_s3_meta_request_type::AWS_S3_META_REQUEST_TYPE_GET_OBJECT => MetaRequestType::GetObject,
548            aws_s3_meta_request_type::AWS_S3_META_REQUEST_TYPE_PUT_OBJECT => MetaRequestType::PutObject,
549            _ => panic!("unknown meta request type {value:?}"),
550        }
551    }
552}
553
554/// SAFETY: Don't call this function directly, only called by the CRT as a callback.
555unsafe extern "C" fn meta_request_telemetry_callback(
556    _request: *mut aws_s3_meta_request,
557    metrics: *mut aws_s3_request_metrics,
558    user_data: *mut libc::c_void,
559) {
560    // SAFETY: user_data always will be a MetaRequestOptionsInner since that's what we set it to
561    // in MetaRequestOptions::new.
562    let user_data = unsafe { MetaRequestOptionsInner::from_user_data_ptr(user_data) };
563
564    if let Some(callback) = user_data.on_telemetry.as_ref() {
565        let metrics = NonNull::new(metrics).expect("request metrics is never null");
566        let metrics = RequestMetrics { inner: metrics };
567        // The docs say "`metrics` is only valid for the duration of the callback", so we need to
568        // pass a reference here.
569        callback(&metrics);
570    }
571}
572
573/// SAFETY: Don't call this function directly, only called by the CRT as a callback.
574unsafe extern "C" fn meta_request_headers_callback(
575    _request: *mut aws_s3_meta_request,
576    headers: *const aws_http_headers,
577    response_status: i32,
578    user_data: *mut libc::c_void,
579) -> i32 {
580    // SAFETY: user_data always will be a MetaRequestOptionsInner since that's what we set it to
581    // in MetaRequestOptions::new.
582    let user_data = unsafe { MetaRequestOptionsInner::from_user_data_ptr(user_data) };
583
584    if let Some(callback) = user_data.on_headers.as_mut() {
585        let headers = NonNull::new(headers as *mut aws_http_headers).expect("request headers is never null");
586        // SAFETY: `headers` is a valid `aws_http_headers` returned by the CRT.
587        let headers = unsafe { Headers::from_crt(headers) };
588        callback(&headers, response_status);
589    }
590
591    AWS_OP_SUCCESS
592}
593
594/// SAFETY: Don't call this function directly, only called by the CRT as a callback.
595unsafe extern "C" fn meta_request_receive_body_callback_ex(
596    _request: *mut aws_s3_meta_request,
597    body: *const aws_byte_cursor,
598    meta: aws_s3_meta_request_receive_body_extra_info,
599    user_data: *mut libc::c_void,
600) -> i32 {
601    // SAFETY: user_data always will be a MetaRequestOptionsInner since that's what we set it to
602    // in MetaRequestOptions::new.
603    let user_data = unsafe { MetaRequestOptionsInner::from_user_data_ptr(user_data) };
604
605    if let Some(callback) = user_data.on_body_ex.as_mut() {
606        // SAFETY: `body` and `meta.ticket` outlive `buffer`.
607        let buffer = unsafe { Buffer::new_unchecked(&*body, &meta.ticket) };
608        callback(meta.range_start, &buffer);
609    }
610
611    AWS_OP_SUCCESS
612}
613
614/// SAFETY: Don't call this function directly, only called by the CRT as a callback.
615unsafe extern "C" fn meta_request_finish_callback(
616    _request: *mut aws_s3_meta_request,
617    result: *const aws_s3_meta_request_result,
618    user_data: *mut libc::c_void,
619) {
620    // SAFETY: `result` points to a valid `aws_s3_meta_request_result`.
621    let result = unsafe { result.as_ref().expect("result cannot be NULL") };
622
623    // SAFETY: user_data always will be a MetaRequestOptionsInner since that's what we set it to
624    // in MetaRequestOptions::new.
625    let user_data = unsafe { MetaRequestOptionsInner::from_user_data_ptr(user_data) };
626
627    // take ownership of the callback, since it can only be called once.
628    if let Some(callback) = user_data.on_finish.take() {
629        // SAFETY: `result` is a valid `aws_s3_meta_request_result` returned by the CRT.
630        callback(unsafe { MetaRequestResult::from_crt_result(result) });
631    }
632}
633
634/// Safety: Don't call this function directly, only called by the CRT as a callback.
635unsafe extern "C" fn meta_request_shutdown_callback(user_data: *mut libc::c_void) {
636    // Take back ownership of the user data so it will be freed when dropped.
637    // SAFETY: user_data always will be a MetaRequestOptionsInner since that's what we set it to
638    // in MetaRequestOptions::new.
639    let user_data = unsafe { MetaRequestOptionsInner::from_user_data_ptr_owned(user_data) };
640
641    // SAFETY: at this point, we shouldn't receieve any more callbacks for this request.
642    std::mem::drop(user_data);
643}
644
645/// Safety: Don't call this function directly, only called by the CRT as a callback.
646unsafe extern "C" fn meta_request_upload_review_callback(
647    _request: *mut aws_s3_meta_request,
648    upload_review: *const aws_s3_upload_review,
649    user_data: *mut libc::c_void,
650) -> i32 {
651    // SAFETY: user_data always will be a MetaRequestOptionsInner since that's what we set it to
652    // in MetaRequestOptions::new.
653    let user_data = unsafe { MetaRequestOptionsInner::from_user_data_ptr(user_data) };
654
655    let Some(callback) = user_data.on_upload_review.take() else {
656        return AWS_OP_SUCCESS;
657    };
658
659    // SAFETY: `upload_review` points to a valid `aws_s3_upload_review`.
660    let upload_review = unsafe {
661        upload_review
662            .as_ref()
663            .expect("CRT should provide a valid upload_review")
664    };
665    if callback(UploadReview::new(upload_review)) {
666        AWS_OP_SUCCESS
667    } else {
668        // SAFETY: we are returning from the CRT callback.
669        unsafe { aws_raise_error(aws_s3_errors::AWS_ERROR_S3_CANCELED as i32) }
670    }
671}
672
673/// An in-progress request to S3.
674///
675/// A dropped [MetaRequest] will still progress. See [MetaRequest::cancel()].
676#[derive(Debug)]
677pub struct MetaRequest {
678    inner: NonNull<aws_s3_meta_request>,
679}
680
681impl MetaRequest {
682    /// Cancel the meta request. Does nothing (but does not fail/panic) if the request has already
683    /// completed. If the request has not already completed, parts may still be delivered to the
684    /// `body_callback` after this method completes, and the `finish_callback` will still be
685    /// invoked, but with the `crt_error` field set to `AWS_ERROR_S3_CANCELED`.
686    pub fn cancel(&self) {
687        // SAFETY: `self.inner` is a valid `aws_s3_meta_request`, even if the request has otherwise
688        // finished, since we hold a ref count to it
689        unsafe {
690            aws_s3_meta_request_cancel(self.inner.as_ptr());
691        }
692    }
693
694    /// Write a chunk of data and indicate whether it is the last. Returns a [MetaRequestWrite]
695    /// future that starts writing when polled. May perform incomplete writes: in that case,
696    /// the future returns the suffix of the input slice that has not been written. The caller
697    /// is expected to invoke `write` again with the remaining data, until the empty slice is
698    /// returned.
699    /// Once an invocation with `eof == true` returns with the empty slice, subsequent invocations
700    /// will fail with an AWS_ERROR_INVALID_STATE error.
701    pub fn write<'r, 's>(&'r mut self, slice: &'s [u8], eof: bool) -> MetaRequestWrite<'r, 's> {
702        MetaRequestWrite::new(self, slice, eof)
703    }
704
705    /// Increment the end position of the flow-control window.
706    ///
707    /// The AWS CRT uses the flow-control window to determine how many requests to schedule in order to return data,
708    /// allowing consumers to limit the amount of data read ahead to protect memory or other resources.
709    /// The AWS CRT will download any part required to read up to the window size -
710    /// i.e. if you request 4MiB with an 8MiB part size, the 8MiB part will be downloaded to fulfil the requested window.
711    ///
712    /// See CRT's function [aws_s3_meta_request_increment_read_window] for more documentation.
713    pub fn increment_read_window(&mut self, bytes: u64) {
714        // SAFETY: `self.inner` is a valid `aws_s3_meta_request` since we hold a ref count to it.
715        unsafe { aws_s3_meta_request_increment_read_window(self.inner.as_ptr(), bytes) };
716    }
717}
718
719impl Drop for MetaRequest {
720    fn drop(&mut self) {
721        // SAFETY: we will no longer use the pointer after this MetaRequest is dropped, so it's safe
722        // to give up our refcount on it now.
723        unsafe {
724            aws_s3_meta_request_release(self.inner.as_ptr());
725        }
726    }
727}
728
729impl Clone for MetaRequest {
730    fn clone(&self) -> Self {
731        // SAFETY: self.inner is a valid aws_s3_meta_request and aws_s3_meta_request_acquire
732        // increments the reference count for it (and always returns a copy of the input, which is non-null).
733        let inner = unsafe { NonNull::new_unchecked(aws_s3_meta_request_acquire(self.inner.as_ptr())) };
734        Self { inner }
735    }
736}
737
738// SAFETY: `aws_s3_meta_request` is thread-safe
739unsafe impl Send for MetaRequest {}
740// SAFETY: `aws_s3_meta_request` is thread safe
741unsafe impl Sync for MetaRequest {}
742
743/// Future returned by `MetaRequest::write()`. Wraps `aws_s3_meta_request_poll_write`.
744#[derive(Debug)]
745pub struct MetaRequestWrite<'r, 's> {
746    /// The meta-request to write to.
747    request: &'r mut MetaRequest,
748    /// The slice to write
749    slice: &'s [u8],
750    /// Is end-of-file?
751    eof: bool,
752    /// Holds the waker from the current context. Passed to `poll_write_waker_callback`
753    /// in order to trigger another poll.
754    waker: Arc<Mutex<Option<Waker>>>,
755}
756
757impl<'r, 's> MetaRequestWrite<'r, 's> {
758    fn new(request: &'r mut MetaRequest, slice: &'s [u8], eof: bool) -> Self {
759        Self {
760            request,
761            slice,
762            eof,
763            waker: Default::default(),
764        }
765    }
766}
767
768impl<'s> Future for MetaRequestWrite<'_, 's> {
769    type Output = Result<&'s [u8], Error>;
770
771    fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
772        let mut waker = self.waker.lock().unwrap();
773        if let Some(ref mut waker) = *waker {
774            // The previous `aws_s3_meta_request_poll_write` call returned `Pending` but has not
775            // invoked the callback yet. Do not call it again, but make sure to store the waker
776            // from the current context.
777            waker.clone_from(cx.waker());
778            return std::task::Poll::Pending;
779        }
780
781        // Store the waker.
782        *waker = Some(cx.waker().clone());
783
784        // `user_data` will be dropped in `poll_write_waker_callback` (or below).
785        let user_data = Arc::into_raw(self.waker.clone()) as *mut ::libc::c_void;
786
787        // SAFETY: `aws_s3_meta_request_poll_write` does not store `data`.
788        let data = unsafe { self.slice.as_aws_byte_cursor() };
789
790        // SAFETY: `self.request` wraps a valid `aws_s3_meta_request` pointer.
791        let result = unsafe {
792            aws_s3_meta_request_poll_write(
793                self.request.inner.as_ptr(),
794                data,
795                self.eof,
796                Some(poll_write_waker_callback),
797                user_data,
798            )
799        };
800        if result.is_pending {
801            return std::task::Poll::Pending;
802        }
803
804        // SAFETY: `aws_s3_meta_request_poll_write` completed. It will not invoke `poll_write_waker_callback`,
805        //         so we need to drop `user_data` here.
806        _ = unsafe { Arc::from_raw(user_data as *mut Mutex<Option<Waker>>) };
807
808        let error_result: crate::common::error::Error = result.error_code.into();
809        let result = if error_result.is_err() {
810            Err(error_result)
811        } else {
812            Ok(&self.slice[result.bytes_processed..])
813        };
814
815        std::task::Poll::Ready(result)
816    }
817}
818
819/// Safety: Don't call this function directly, only called by the CRT as a callback.
820unsafe extern "C" fn poll_write_waker_callback(user_data: *mut ::libc::c_void) {
821    // SAFETY: `user_data` was returned by `Arc::into_raw` in `MetaRequestWrite::poll`.
822    let waker = unsafe { Arc::from_raw(user_data as *mut Mutex<Option<Waker>>) };
823    // Notify the waker.
824    waker
825        .lock()
826        .unwrap()
827        .take()
828        .expect("user_data always contains a waker")
829        .wake();
830}
831
832/// Client metrics which represent current workload of a client.
833/// Overall, num_requests_tracked_requests shows total number of requests being processed by the client at a time.
834/// It can be broken down into these numbers by states of the client.
835///     (1) num_requests_being_prepared: this is the first state when CRT receives requests and begins preparing them.
836///     (2) request_queue_size: prepared requests are added into the request_queue, waiting to be assigned to connections.
837///     (3) num_total_network_io: requests are removed from the request_queue and sent over the network.
838///         We can also see number of requests by their meta request types.
839///         (3.1) num_auto_default_network_io
840///         (3.2) num_auto_ranged_get_network_io
841///         (3.3) num_auto_ranged_put_network_io
842///         (3.4) num_auto_ranged_copy_network_io
843///     (4) num_requests_stream_queued_waiting: responses from the server are added into meta request priority queue, waiting to be streamed.
844///     (5) num_requests_streaming_response: responses are removed from the queue and streamed back to the callers.
845#[derive(Debug, Default)]
846#[non_exhaustive]
847pub struct ClientMetrics {
848    /// Number of overall requests currently being processed by the client.
849    pub num_requests_tracked_requests: u32,
850
851    /// Number of requests currently being prepared.
852    pub num_requests_being_prepared: u32,
853
854    /// Number of requests in the request_queue linked_list.
855    pub request_queue_size: u32,
856
857    /// Number of requests being sent/received over network for meta request type DEFAULT.
858    pub num_auto_default_network_io: u32,
859
860    /// Number of requests being sent/received over network for meta request type GET.
861    pub num_auto_ranged_get_network_io: u32,
862
863    /// Number of requests being sent/received over network for meta request type PUT.
864    pub num_auto_ranged_put_network_io: u32,
865
866    /// Number of requests being sent/received over network for meta request type COPY.
867    pub num_auto_ranged_copy_network_io: u32,
868
869    /// Number of requests sitting in their meta request priority queue, waiting to be streamed.
870    pub num_requests_stream_queued_waiting: u32,
871
872    /// Number of requests currently scheduled to be streamed the response body or are actively being streamed.
873    pub num_requests_streaming_response: u32,
874}
875
876impl ClientMetrics {
877    /// Total number of requests being sent/received over network.
878    pub fn num_total_network_io(&self) -> u32 {
879        self.num_auto_default_network_io
880            + self.num_auto_ranged_get_network_io
881            + self.num_auto_ranged_put_network_io
882            + self.num_auto_ranged_copy_network_io
883    }
884}
885
886/// S3 buffer pool usage stats
887#[derive(Debug)]
888pub struct BufferPoolUsageStats {
889    /// Effective Max memory limit. Memory limit value provided during construction minus
890    /// buffer reserved for overhead of the pool
891    pub mem_limit: u64,
892
893    /// Max size of buffer to be allocated from primary.
894    pub primary_cutoff: u64,
895
896    /// Primary mem used, including memory used by blocks
897    /// that are waiting on all allocs to release before being put back in circulation.
898    pub primary_used: u64,
899
900    /// Overall memory allocated for blocks.
901    pub primary_allocated: u64,
902
903    /// Reserved memory for primary storage.
904    pub primary_reserved: u64,
905
906    /// Number of blocks allocated in primary.
907    pub primary_num_blocks: u64,
908
909    /// Secondary mem used. Accurate, maps directly to base allocator.
910    pub secondary_reserved: u64,
911
912    /// Secondary mem reserved. Accurate, maps directly to base allocator.
913    pub secondary_used: u64,
914
915    /// Bytes used in "forced" buffers (created even if they exceed memory limits).
916    pub forced_used: u64,
917}
918
919impl Client {
920    /// Create a new S3 [Client].
921    pub fn new(allocator: &Allocator, config: ClientConfig) -> Result<Self, Error> {
922        s3_library_init(allocator);
923
924        // SAFETY: `config` is moved into the [Client] on success, so `config.inner` (and the values
925        // inside) are guaranteed to live at least as long as this [Client] does. `allocator` is
926        // guaranteed to be a valid allocator because of the type-safe wrapper.
927        let inner = unsafe { aws_s3_client_new(allocator.inner.as_ptr(), &config.inner).ok_or_last_error()? };
928
929        Ok(Self { inner, config })
930    }
931
932    /// Make a meta request to S3 using this [Client]. A meta request is an HTTP request that
933    /// the CRT might internally split up into multiple requests for performance.
934    pub fn make_meta_request(&self, options: MetaRequestOptions) -> Result<MetaRequest, Error> {
935        // SAFETY: The inner struct pointed to by MetaRequestOptions will live as long as the
936        // request does, since we only drop it in the shutdown callback. That struct owns everything
937        // related to the request, like the message, signing config, etc.
938        unsafe {
939            // Unpin the options (we won't move out of it, nor will the callbacks).
940            let options = Pin::into_inner_unchecked(options.0);
941
942            // Leak the options since it's the user_data pointer in the callback. It will be dropped
943            // later on once the shutdown callback is invoked.
944            let options = Box::leak(options);
945
946            // Make the request on this client.
947            // TODO: do we need to clone the client into the options struct? Or will the CRT internally
948            // increment the refcount for us?
949            let inner = aws_s3_client_make_meta_request(self.inner.as_ptr(), &options.inner)
950                .ok_or_last_error()
951                // Drop the options Box if we failed to make the meta request.
952                // Assumption: CRT won't call shutdown callback if make_meta_request returns null.
953                .inspect_err(|_| std::mem::drop(Box::from_raw(options)))?;
954
955            Ok(MetaRequest { inner })
956        }
957    }
958
959    /// Poll [ClientMetrics] from underlying CRT client.
960    pub fn poll_client_metrics(&self) -> ClientMetrics {
961        // SAFETY: The `aws_s3_client` in `self.inner` is guaranteed to be initialized and
962        // dereferencable as long as Client lives. The `aws_atomic_load_int` calls are safe because
963        // they're always `int`s, whose lifetime belongs to the stats struct.
964        unsafe {
965            let client = self.inner.as_ref();
966            let stats = client.stats;
967
968            let num_requests_tracked_requests = aws_atomic_load_int(&stats.num_requests_in_flight) as u32;
969
970            let num_auto_ranged_get_network_io = Client::get_num_requests_network_io(
971                client,
972                aws_s3_meta_request_type::AWS_S3_META_REQUEST_TYPE_GET_OBJECT,
973            );
974
975            let num_auto_ranged_put_network_io = Client::get_num_requests_network_io(
976                client,
977                aws_s3_meta_request_type::AWS_S3_META_REQUEST_TYPE_PUT_OBJECT,
978            );
979
980            let num_auto_default_network_io =
981                Client::get_num_requests_network_io(client, aws_s3_meta_request_type::AWS_S3_META_REQUEST_TYPE_DEFAULT);
982
983            let num_auto_ranged_copy_network_io = Client::get_num_requests_network_io(
984                client,
985                aws_s3_meta_request_type::AWS_S3_META_REQUEST_TYPE_COPY_OBJECT,
986            );
987
988            let num_requests_stream_queued_waiting =
989                aws_atomic_load_int(&stats.num_requests_stream_queued_waiting) as u32;
990
991            let num_requests_streaming_response = aws_atomic_load_int(&stats.num_requests_streaming_response) as u32;
992
993            // These are "threaded data" and so technically we don't know that it's safe to read them
994            // here, but it's just metrics data so we're not too concerned.
995            let num_requests_being_prepared = client.threaded_data.num_requests_being_prepared;
996            let request_queue_size = client.threaded_data.request_queue_size;
997
998            ClientMetrics {
999                num_requests_tracked_requests,
1000                num_requests_being_prepared,
1001                request_queue_size,
1002                num_auto_default_network_io,
1003                num_auto_ranged_get_network_io,
1004                num_auto_ranged_put_network_io,
1005                num_auto_ranged_copy_network_io,
1006                num_requests_stream_queued_waiting,
1007                num_requests_streaming_response,
1008            }
1009        }
1010    }
1011
1012    /// Poll [BufferPoolUsageStats] from underlying CRT client, if using the default memory pool.
1013    pub fn poll_default_buffer_pool_usage_stats(&self) -> Option<BufferPoolUsageStats> {
1014        if self.config.pool_factory.is_some() {
1015            // We are using a custom pool implementation rather than the default pool.
1016            return None;
1017        }
1018
1019        // Retrieve usage stats from the default pool.
1020
1021        // SAFETY: The `aws_s3_client` in `self.inner` is guaranteed to be initialized and
1022        // dereferencable as long as Client lives.
1023        let inner_stats = unsafe {
1024            let client = self.inner.as_ref();
1025            aws_s3_default_buffer_pool_get_usage(client.buffer_pool)
1026        };
1027
1028        let mem_limit = inner_stats.mem_limit as u64;
1029        let primary_cutoff = inner_stats.primary_cutoff as u64;
1030        let primary_used = inner_stats.primary_used as u64;
1031        let primary_allocated = inner_stats.primary_allocated as u64;
1032        let primary_reserved = inner_stats.primary_reserved as u64;
1033        let primary_num_blocks = inner_stats.primary_num_blocks as u64;
1034        let secondary_reserved = inner_stats.secondary_reserved as u64;
1035        let secondary_used = inner_stats.secondary_used as u64;
1036        let forced_used = inner_stats.forced_used as u64;
1037
1038        Some(BufferPoolUsageStats {
1039            mem_limit,
1040            primary_cutoff,
1041            primary_used,
1042            primary_allocated,
1043            primary_reserved,
1044            primary_num_blocks,
1045            secondary_reserved,
1046            secondary_used,
1047            forced_used,
1048        })
1049    }
1050
1051    fn get_num_requests_network_io(client: &aws_s3_client, meta_request_type: aws_s3_meta_request_type) -> u32 {
1052        let mut num_requests_network_io: u32 = 0;
1053        if meta_request_type == aws_s3_meta_request_type::AWS_S3_META_REQUEST_TYPE_MAX {
1054            let max_req_type = aws_s3_meta_request_type::AWS_S3_META_REQUEST_TYPE_MAX as usize;
1055            for i in 0..max_req_type {
1056                // SAFETY: these atomics are known to be integers, and `client` is valid
1057                let n = unsafe { aws_atomic_load_int(&client.stats.num_requests_network_io[i]) } as u32;
1058                num_requests_network_io += n;
1059            }
1060        } else {
1061            let meta_request_type = meta_request_type as usize;
1062            // SAFETY: these atomics are known to be integers, and `client` is valid
1063            let n = unsafe { aws_atomic_load_int(&client.stats.num_requests_network_io[meta_request_type]) } as u32;
1064            num_requests_network_io = n;
1065        }
1066        num_requests_network_io
1067    }
1068}
1069
1070impl Drop for Client {
1071    fn drop(&mut self) {
1072        // SAFETY: `self.inner` points to a valid s3_client, and when this [Client] is dropped, it's
1073        // safe to decrement the reference counter by one.
1074        unsafe {
1075            aws_s3_client_release(self.inner.as_ptr());
1076        }
1077    }
1078}
1079
1080/// The result of a meta request using an S3 [Client].
1081#[derive(Debug)]
1082pub struct MetaRequestResult {
1083    /// Response status of the failed request or of the entire meta request.
1084    pub response_status: i32,
1085
1086    /// Final error code of the meta request.
1087    pub crt_error: Error,
1088
1089    /// Error HTTP body, if present.
1090    pub error_response_headers: Option<Headers>,
1091
1092    /// Error HTTP response, if present.
1093    pub error_response_body: Option<OsString>,
1094}
1095
1096impl MetaRequestResult {
1097    /// Returns whether this HTTP request result represents an error.
1098    pub fn is_err(&self) -> bool {
1099        self.crt_error.is_err()
1100    }
1101
1102    /// Return whether this request was canceled according to its error code.
1103    pub fn is_canceled(&self) -> bool {
1104        self.crt_error.raw_error() == mountpoint_s3_crt_sys::aws_s3_errors::AWS_ERROR_S3_CANCELED as i32
1105    }
1106
1107    /// Convert the CRT's meta request result struct into a safe, owned result.
1108    /// SAFETY: This copies from the raw pointer inside of the request result, so only call on
1109    /// results given to us from the CRT.
1110    unsafe fn from_crt_result(inner: &aws_s3_meta_request_result) -> Self {
1111        // SAFETY: `.error_response_headers` points to a valid `aws_http_headers`.
1112        let error_response_headers = unsafe {
1113            inner
1114                .error_response_headers
1115                .as_ref()
1116                .map(|headers| Headers::from_crt(NonNull::from(headers)))
1117        };
1118
1119        // SAFETY: `.error_response_body` points to a valid `aws_byte_buf`.
1120        let error_response_body: Option<OsString> = unsafe {
1121            inner.error_response_body.as_ref().map(|byte_buf| {
1122                assert!(!byte_buf.buffer.is_null(), "error_response_body.buffer is null");
1123                let slice: &[u8] = std::slice::from_raw_parts(byte_buf.buffer, byte_buf.len);
1124                OsStr::from_bytes(slice).to_owned()
1125            })
1126        };
1127
1128        Self {
1129            response_status: inner.response_status,
1130            crt_error: inner.error_code.into(),
1131            error_response_headers,
1132            error_response_body,
1133        }
1134    }
1135}
1136
1137/// Metrics for an individual request
1138pub struct RequestMetrics {
1139    inner: NonNull<aws_s3_request_metrics>,
1140}
1141
1142impl RequestMetrics {
1143    /// Return the type of this request
1144    pub fn request_type(&self) -> RequestType {
1145        let mut out: aws_s3_request_type = aws_s3_request_type::AWS_S3_REQUEST_TYPE_MAX;
1146        // SAFETY: `inner` is a valid aws_s3_request_metrics
1147        unsafe { aws_s3_request_metrics_get_request_type(self.inner.as_ptr(), &mut out) };
1148        out.into()
1149    }
1150
1151    /// Return the request ID for this request, or None if unavailable (e.g. the request failed
1152    /// before sending).
1153    pub fn request_id(&self) -> Option<String> {
1154        let mut out: *const aws_string = std::ptr::null();
1155        // SAFETY: `inner` is a valid aws_s3_request_metrics
1156        unsafe {
1157            aws_s3_request_metrics_get_request_id(self.inner.as_ptr(), &mut out)
1158                .ok_or_last_error()
1159                .ok()?
1160        };
1161        assert!(!out.is_null(), "request ID should be available if call succeeded");
1162        // SAFETY: `out` is now a valid pointer to an aws_string, and we'll copy the bytes
1163        // out of it so it won't live beyond this function call
1164        unsafe {
1165            let byte_cursor = aws_byte_cursor_from_string(out);
1166            let os_str = OsStr::from_bytes(aws_byte_cursor_as_slice(&byte_cursor));
1167            Some(os_str.to_string_lossy().into_owned())
1168        }
1169    }
1170
1171    /// Get the start time of the request in nanoseconds
1172    pub fn start_timestamp_ns(&self) -> u64 {
1173        let mut out: u64 = 0;
1174        // SAFETY: `inner` is a valid aws_s3_request_metrics
1175        unsafe {
1176            aws_s3_request_metrics_get_start_timestamp_ns(self.inner.as_ptr(), &mut out);
1177        }
1178        out
1179    }
1180
1181    /// Get the start time of the request in nanoseconds
1182    pub fn end_timestamp_ns(&self) -> u64 {
1183        let mut out: u64 = 0;
1184        // SAFETY: `inner` is a valid aws_s3_request_metrics
1185        unsafe {
1186            aws_s3_request_metrics_get_end_timestamp_ns(self.inner.as_ptr(), &mut out);
1187        }
1188        out
1189    }
1190
1191    /// Return the total duration for this request
1192    pub fn total_duration(&self) -> Duration {
1193        let mut out: u64 = 0;
1194        // SAFETY: `inner` is a valid aws_s3_request_metrics
1195        unsafe { aws_s3_request_metrics_get_total_duration_ns(self.inner.as_ptr(), &mut out) };
1196        Duration::from_nanos(out)
1197    }
1198
1199    /// Get the time when the request started to be encoded in nanoseconds
1200    pub fn send_start_timestamp_ns(&self) -> Option<u64> {
1201        let mut out: u64 = 0;
1202        // SAFETY: `inner` is a valid aws_s3_request_metrics
1203        unsafe {
1204            aws_s3_request_metrics_get_send_start_timestamp_ns(self.inner.as_ptr(), &mut out)
1205                .ok_or_last_error()
1206                .ok()?;
1207        }
1208        Some(out)
1209    }
1210
1211    /// Get the time when the request finished being encoded in nanoseconds
1212    pub fn send_end_timestamp_ns(&self) -> Option<u64> {
1213        let mut out: u64 = 0;
1214        // SAFETY: `inner` is a valid aws_s3_request_metrics
1215        unsafe {
1216            aws_s3_request_metrics_get_send_end_timestamp_ns(self.inner.as_ptr(), &mut out)
1217                .ok_or_last_error()
1218                .ok()?;
1219        }
1220        Some(out)
1221    }
1222
1223    /// Get the time when the response started to be received from the network in nanoseconds
1224    pub fn receive_start_timestamp_ns(&self) -> Option<u64> {
1225        let mut out: u64 = 0;
1226        // SAFETY: `inner` is a valid aws_s3_request_metrics
1227        unsafe {
1228            aws_s3_request_metrics_get_receive_start_timestamp_ns(self.inner.as_ptr(), &mut out)
1229                .ok_or_last_error()
1230                .ok()?;
1231        }
1232        Some(out)
1233    }
1234
1235    /// Get the time when the response finished being received from the network in nanoseconds
1236    pub fn receive_end_timestamp_ns(&self) -> Option<u64> {
1237        let mut out: u64 = 0;
1238        // SAFETY: `inner` is a valid aws_s3_request_metrics
1239        unsafe {
1240            aws_s3_request_metrics_get_receive_end_timestamp_ns(self.inner.as_ptr(), &mut out)
1241                .ok_or_last_error()
1242                .ok()?;
1243        }
1244        Some(out)
1245    }
1246
1247    /// Get the time when the request started being signed
1248    pub fn sign_start_timestamp_ns(&self) -> Option<u64> {
1249        let mut out: u64 = 0;
1250        // SAFETY: `inner` is a valid aws_s3_request_metrics
1251        unsafe {
1252            aws_s3_request_metrics_get_sign_start_timestamp_ns(self.inner.as_ptr(), &mut out)
1253                .ok_or_last_error()
1254                .ok()?;
1255        }
1256        Some(out)
1257    }
1258
1259    ///Get the time when the request finished being signed
1260    pub fn sign_end_timestamp_ns(&self) -> Option<u64> {
1261        let mut out: u64 = 0;
1262        // SAFETY: `inner` is a valid aws_s3_request_metrics
1263        unsafe {
1264            aws_s3_request_metrics_get_sign_end_timestamp_ns(self.inner.as_ptr(), &mut out)
1265                .ok_or_last_error()
1266                .ok()?;
1267        }
1268        Some(out)
1269    }
1270
1271    /// Get the time when the request started to acquire memory
1272    pub fn mem_acquire_start_timestamp_ns(&self) -> Option<u64> {
1273        let mut out: u64 = 0;
1274        // SAFETY: `inner` is a valid aws_s3_request_metrics
1275        unsafe {
1276            aws_s3_request_metrics_get_mem_acquire_start_timestamp_ns(self.inner.as_ptr(), &mut out)
1277                .ok_or_last_error()
1278                .ok()?;
1279        }
1280        Some(out)
1281    }
1282
1283    /// Get the time when the request finished acquiring memory
1284    pub fn mem_acquire_end_timestamp_ns(&self) -> Option<u64> {
1285        let mut out: u64 = 0;
1286        // SAFETY: `inner` is a valid aws_s3_request_metrics
1287        unsafe {
1288            aws_s3_request_metrics_get_mem_acquire_end_timestamp_ns(self.inner.as_ptr(), &mut out)
1289                .ok_or_last_error()
1290                .ok()?;
1291        }
1292        Some(out)
1293    }
1294
1295    /// Get the time when the request started to be delivered (i.e. on body callback is invoked)
1296    pub fn delivery_start_timestamp_ns(&self) -> Option<u64> {
1297        let mut out: u64 = 0;
1298        // SAFETY: `inner` is a valid aws_s3_request_metrics
1299        unsafe {
1300            aws_s3_request_metrics_get_delivery_start_timestamp_ns(self.inner.as_ptr(), &mut out)
1301                .ok_or_last_error()
1302                .ok()?;
1303        }
1304        Some(out)
1305    }
1306
1307    /// Get the time when the response finished being delivered
1308    pub fn delivery_end_timestamp_ns(&self) -> Option<u64> {
1309        let mut out: u64 = 0;
1310        // SAFETY: `inner` is a valid aws_s3_request_metrics
1311        unsafe {
1312            aws_s3_request_metrics_get_delivery_end_timestamp_ns(self.inner.as_ptr(), &mut out)
1313                .ok_or_last_error()
1314                .ok()?;
1315        }
1316        Some(out)
1317    }
1318
1319    /// Return the response status code for this request, or None if unavailable (e.g. the
1320    /// request failed before sending).
1321    pub fn status_code(&self) -> Option<i32> {
1322        let mut out: i32 = 0;
1323        // SAFETY: `inner` is a valid aws_s3_request_metrics
1324        unsafe {
1325            aws_s3_request_metrics_get_response_status_code(self.inner.as_ptr(), &mut out)
1326                .ok_or_last_error()
1327                .ok()?
1328        };
1329        Some(out)
1330    }
1331
1332    /// Return the response headers for this request, or None if unavailable (e.g. the request
1333    /// failed before sending).
1334    pub fn response_headers(&self) -> Option<Headers> {
1335        let mut out: *mut aws_http_headers = std::ptr::null_mut();
1336        // SAFETY: `inner` is a valid aws_s3_request_metrics
1337        unsafe {
1338            aws_s3_request_metrics_get_response_headers(self.inner.as_ptr(), &mut out)
1339                .ok_or_last_error()
1340                .ok()?
1341        };
1342        assert!(!out.is_null(), "headers should be available if call succeeded");
1343        // SAFETY: `out` is now a valid pointer to an aws_http_headers, and [Headers::from_crt]
1344        // will acquire a reference to keep it alive after this function call, so it's safe to
1345        // return the owned version here.
1346        unsafe { Some(Headers::from_crt(NonNull::new_unchecked(out))) }
1347    }
1348
1349    /// Get the path and query fragment of the request URL
1350    pub fn request_path_query(&self) -> Option<String> {
1351        let mut out: *const aws_string = std::ptr::null();
1352        // SAFETY: `inner` is a valid aws_s3_request_metrics
1353        unsafe {
1354            aws_s3_request_metrics_get_request_path_query(self.inner.as_ptr(), &mut out);
1355        };
1356        if out.is_null() {
1357            return None;
1358        }
1359        // SAFETY: `out` is now a valid pointer to an aws_string, and we'll copy the bytes
1360        // out of it so it won't live beyond this function call
1361        unsafe {
1362            let byte_cursor = aws_byte_cursor_from_string(out);
1363            let os_str = OsStr::from_bytes(aws_byte_cursor_as_slice(&byte_cursor));
1364            Some(os_str.to_string_lossy().into_owned())
1365        }
1366    }
1367
1368    /// Get the host address of the request
1369    pub fn host_address(&self) -> Option<String> {
1370        let mut out: *const aws_string = std::ptr::null();
1371        // SAFETY: `inner` is a valid aws_s3_request_metrics
1372        unsafe {
1373            aws_s3_request_metrics_get_host_address(self.inner.as_ptr(), &mut out);
1374        };
1375        if out.is_null() {
1376            return None;
1377        }
1378        // SAFETY: `out` is now a valid pointer to an aws_string, and we'll copy the bytes
1379        // out of it so it won't live beyond this function call
1380        unsafe {
1381            let byte_cursor = aws_byte_cursor_from_string(out);
1382            let os_str = OsStr::from_bytes(aws_byte_cursor_as_slice(&byte_cursor));
1383            Some(os_str.to_string_lossy().into_owned())
1384        }
1385    }
1386
1387    /// Get the IP address the request connected to
1388    pub fn ip_address(&self) -> Option<String> {
1389        let mut out: *const aws_string = std::ptr::null();
1390        // SAFETY: `inner` is a valid aws_s3_request_metrics
1391        unsafe {
1392            aws_s3_request_metrics_get_ip_address(self.inner.as_ptr(), &mut out)
1393                .ok_or_last_error()
1394                .ok()?
1395        };
1396        assert!(!out.is_null(), "IP address should be available if call succeeded");
1397        // SAFETY: `out` is now a valid pointer to an aws_string, and we'll copy the bytes
1398        // out of it so it won't live beyond this function call
1399        unsafe {
1400            let byte_cursor = aws_byte_cursor_from_string(out);
1401            let os_str = OsStr::from_bytes(aws_byte_cursor_as_slice(&byte_cursor));
1402            Some(os_str.to_string_lossy().into_owned())
1403        }
1404    }
1405
1406    /// Get the ID of the connection that request was made from
1407    pub fn connection_id(&self) -> Option<usize> {
1408        let mut out: usize = 0;
1409        // SAFETY: `inner` is a valid aws_s3_request_metrics
1410        unsafe {
1411            aws_s3_request_metrics_get_connection_id(self.inner.as_ptr(), &mut out)
1412                .ok_or_last_error()
1413                .ok()?
1414        };
1415        Some(out)
1416    }
1417
1418    /// Get the ID of the thread the request was made from
1419    pub fn thread_id(&self) -> Option<ThreadId> {
1420        let mut out: MaybeUninit<aws_thread_id_t> = MaybeUninit::uninit();
1421        // SAFETY: `inner` is a valid aws_s3_request_metrics, `out` will point to initialized memory if no error was set
1422        let thread_id = unsafe {
1423            aws_s3_request_metrics_get_thread_id(self.inner.as_ptr(), out.as_mut_ptr())
1424                .ok_or_last_error()
1425                .ok()?;
1426            out.assume_init()
1427        };
1428        Some(thread_id.into())
1429    }
1430
1431    /// Get the stream ID of the request
1432    pub fn request_stream_id(&self) -> Option<u32> {
1433        let mut out: u32 = 0;
1434        // SAFETY: `inner` is a valid aws_s3_request_metrics
1435        unsafe {
1436            aws_s3_request_metrics_get_request_stream_id(self.inner.as_ptr(), &mut out)
1437                .ok_or_last_error()
1438                .ok()?
1439        };
1440        Some(out)
1441    }
1442
1443    /// Get the AWS CRT error code of the request
1444    pub fn error(&self) -> Error {
1445        // SAFETY: `inner` is a valid aws_s3_request_metrics
1446        let err = unsafe { aws_s3_request_metrics_get_error_code(self.inner.as_ptr()) };
1447        err.into()
1448    }
1449
1450    /// Return the first-byte latency for this request (time first byte received - time last byte
1451    /// sent), or None if unavailable (e.g. the request failed before sending).
1452    pub fn time_to_first_byte(&self) -> Option<Duration> {
1453        let mut send_end: u64 = 0;
1454        let mut receive_start: u64 = 0;
1455        // SAFETY: `inner` is a valid aws_s3_request_metrics
1456        unsafe {
1457            aws_s3_request_metrics_get_send_end_timestamp_ns(self.inner.as_ptr(), &mut send_end)
1458                .ok_or_last_error()
1459                .ok()?;
1460            aws_s3_request_metrics_get_receive_start_timestamp_ns(self.inner.as_ptr(), &mut receive_start)
1461                .ok_or_last_error()
1462                .ok()?;
1463        };
1464        Some(Duration::from_nanos(receive_start.saturating_sub(send_end)))
1465    }
1466
1467    /// Get the S3 operation name of the request (e.g. "HeadObject").
1468    ///
1469    /// This is tied to the lifetime of the [RequestMetrics] struct.
1470    pub fn operation_name(&self) -> Option<&str> {
1471        let mut out: *const aws_string = std::ptr::null();
1472        // SAFETY: `inner` is a valid aws_s3_request_metrics
1473        unsafe {
1474            aws_s3_request_metrics_get_operation_name(self.inner.as_ptr(), &mut out)
1475                .ok_or_last_error()
1476                .ok()?
1477        };
1478        assert!(!out.is_null(), "operation name should be available if call succeeded");
1479        // SAFETY: `out` is now a valid pointer to an aws_string that lives as long as the RequestMetrics
1480        unsafe {
1481            let byte_cursor = aws_byte_cursor_from_string(out);
1482            let slice = aws_byte_cursor_as_slice(&byte_cursor);
1483            std::str::from_utf8(slice).ok()
1484        }
1485    }
1486
1487    /// Return whether the request was canceled according to its error code
1488    pub fn is_canceled(&self) -> bool {
1489        self.error().raw_error() == mountpoint_s3_crt_sys::aws_s3_errors::AWS_ERROR_S3_CANCELED as i32
1490    }
1491}
1492
1493impl Debug for RequestMetrics {
1494    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1495        f.debug_struct("RequestMetrics")
1496            .field("request_id", &self.request_id())
1497            .field("start_timestamp_ns", &self.start_timestamp_ns())
1498            .field("end_timestamp_ns", &self.end_timestamp_ns())
1499            .field("send_start_timestamp_ns", &self.send_start_timestamp_ns())
1500            .field("send_end_timestamp_ns", &self.send_end_timestamp_ns())
1501            .field("receive_start_timestamp_ns", &self.receive_start_timestamp_ns())
1502            .field("receive_end_timestamp_ns", &self.receive_end_timestamp_ns())
1503            .field("sign_start_timestamp_ns", &self.sign_start_timestamp_ns())
1504            .field("sign_end_timestamp_ns", &self.sign_end_timestamp_ns())
1505            .field("mem_acquire_start_timestamp_ns", &self.mem_acquire_start_timestamp_ns())
1506            .field("mem_acquire_end_timestamp_ns", &self.mem_acquire_end_timestamp_ns())
1507            .field("delivery_start_timestamp_ns", &self.delivery_start_timestamp_ns())
1508            .field("delivery_end_timestamp_ns", &self.delivery_end_timestamp_ns())
1509            .field("response_status_code", &self.status_code())
1510            .field("response_headers", &self.response_headers())
1511            .field("request_path_query", &self.request_path_query())
1512            .field("host_address", &self.host_address())
1513            .field("ip_address", &self.ip_address())
1514            .field("connection_id", &self.connection_id())
1515            .field("thread_id", &self.thread_id())
1516            .field("request_stream_id", &self.request_stream_id())
1517            .field("request_type", &self.request_type())
1518            .field("operation_name", &self.operation_name())
1519            .field("error_code", &self.error())
1520            .finish()
1521    }
1522}
1523
1524/// The type of an S3 request reported by [RequestMetrics]. A single meta request might perform
1525/// multiple requests to various S3 APIs; this type can be used to distinguish them.
1526#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1527pub enum RequestType {
1528    /// When the request type is unknown to the CRT. Operation name may have been attached to non-meta CRT requests.
1529    Unknown,
1530    /// HeadObject: <https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html>
1531    HeadObject,
1532    /// GetObject: <https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html>
1533    GetObject,
1534    /// ListParts: <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListParts.html>
1535    ListParts,
1536    /// CreateMultipartUpload: <https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html>
1537    CreateMultipartUpload,
1538    /// UploadPart: <https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPart.html>
1539    UploadPart,
1540    /// AbortMultipartUpload: <https://docs.aws.amazon.com/AmazonS3/latest/API/API_AbortMultipartUpload.html>
1541    AbortMultipartUpload,
1542    /// CompleteMultipartUpload: <https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html>
1543    CompleteMultipartUpload,
1544    /// UploadPartCopy: <https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPartCopy.html>
1545    UploadPartCopy,
1546    /// CopyObject: <https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html>
1547    CopyObject,
1548    /// PutObject: <https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html>
1549    PutObject,
1550}
1551
1552impl From<aws_s3_request_type> for RequestType {
1553    fn from(value: aws_s3_request_type) -> Self {
1554        match value {
1555            aws_s3_request_type::AWS_S3_REQUEST_TYPE_UNKNOWN => RequestType::Unknown,
1556            aws_s3_request_type::AWS_S3_REQUEST_TYPE_HEAD_OBJECT => RequestType::HeadObject,
1557            aws_s3_request_type::AWS_S3_REQUEST_TYPE_GET_OBJECT => RequestType::GetObject,
1558            aws_s3_request_type::AWS_S3_REQUEST_TYPE_LIST_PARTS => RequestType::ListParts,
1559            aws_s3_request_type::AWS_S3_REQUEST_TYPE_CREATE_MULTIPART_UPLOAD => RequestType::CreateMultipartUpload,
1560            aws_s3_request_type::AWS_S3_REQUEST_TYPE_UPLOAD_PART => RequestType::UploadPart,
1561            aws_s3_request_type::AWS_S3_REQUEST_TYPE_ABORT_MULTIPART_UPLOAD => RequestType::AbortMultipartUpload,
1562            aws_s3_request_type::AWS_S3_REQUEST_TYPE_COMPLETE_MULTIPART_UPLOAD => RequestType::CompleteMultipartUpload,
1563            aws_s3_request_type::AWS_S3_REQUEST_TYPE_UPLOAD_PART_COPY => RequestType::UploadPartCopy,
1564            aws_s3_request_type::AWS_S3_REQUEST_TYPE_COPY_OBJECT => RequestType::CopyObject,
1565            aws_s3_request_type::AWS_S3_REQUEST_TYPE_PUT_OBJECT => RequestType::PutObject,
1566            _ => panic!("unknown request type {value:?}"),
1567        }
1568    }
1569}
1570
1571/// Create a new [SigningConfig] with the given configuration for signing S3 requests to a region
1572/// using the given [CredentialsProvider]
1573pub fn init_signing_config(
1574    region: &str,
1575    credentials_provider: CredentialsProvider,
1576    algorithm: Option<SigningAlgorithm>,
1577    service: Option<&str>,
1578    use_double_uri_encode: Option<bool>,
1579) -> SigningConfig {
1580    let mut signing_config = Box::new(SigningConfigInner::new(region, credentials_provider));
1581
1582    if let Some(service) = service {
1583        signing_config.service(service);
1584    }
1585    if let Some(use_double_uri_encode) = use_double_uri_encode {
1586        signing_config.use_double_uri_encode(use_double_uri_encode);
1587    }
1588    if let Some(algorithm) = algorithm {
1589        signing_config.algorithm(algorithm);
1590    }
1591
1592    SigningConfig(Box::into_pin(signing_config))
1593}
1594
1595/// The checksum configuration.
1596#[derive(Debug, Clone, Default)]
1597pub struct ChecksumConfig {
1598    /// The struct we can pass into the CRT's functions.
1599    inner: aws_s3_checksum_config,
1600}
1601
1602impl ChecksumConfig {
1603    /// Create a [ChecksumConfig] enabling Crc32c trailing checksums in PUT requests.
1604    pub fn trailing_crc32c() -> Self {
1605        Self {
1606            inner: aws_s3_checksum_config {
1607                location: aws_s3_checksum_location::AWS_SCL_TRAILER,
1608                checksum_algorithm: aws_s3_checksum_algorithm::AWS_SCA_CRC32C,
1609                ..Default::default()
1610            },
1611        }
1612    }
1613
1614    /// Create a [ChecksumConfig] enabling Crc32c trailing checksums only for upload review.
1615    pub fn upload_review_crc32c() -> Self {
1616        Self {
1617            inner: aws_s3_checksum_config {
1618                location: aws_s3_checksum_location::AWS_SCL_NONE,
1619                checksum_algorithm: aws_s3_checksum_algorithm::AWS_SCA_CRC32C,
1620                ..Default::default()
1621            },
1622        }
1623    }
1624
1625    /// Get out the inner pointer to the checksum config
1626    pub(crate) fn to_inner_ptr(&self) -> *const aws_s3_checksum_config {
1627        &self.inner
1628    }
1629}
1630
1631/// Checksum algorithm.
1632#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
1633#[non_exhaustive]
1634pub enum ChecksumAlgorithm {
1635    /// Crc64nvme checksum.
1636    Crc64nvme,
1637    /// Crc32c checksum.
1638    Crc32c,
1639    /// Crc32 checksum.
1640    Crc32,
1641    /// Sha1 checksum.
1642    Sha1,
1643    /// Sha256 checksum.
1644    Sha256,
1645    /// Checksum of a type unknown to this S3 client.
1646    ///
1647    /// This type will be used if Mountpoint ever encounters a checksum algorithm it doesn't recognize.
1648    /// This should allow Mountpoint to continue with most file operations which don't depend on the checksum algorithm.
1649    Unknown(String),
1650}
1651
1652impl ChecksumAlgorithm {
1653    fn from_aws_s3_checksum_algorithm(algorithm: aws_s3_checksum_algorithm) -> Option<Self> {
1654        match algorithm {
1655            aws_s3_checksum_algorithm::AWS_SCA_NONE => None,
1656            aws_s3_checksum_algorithm::AWS_SCA_CRC64NVME => Some(ChecksumAlgorithm::Crc64nvme),
1657            aws_s3_checksum_algorithm::AWS_SCA_CRC32C => Some(ChecksumAlgorithm::Crc32c),
1658            aws_s3_checksum_algorithm::AWS_SCA_CRC32 => Some(ChecksumAlgorithm::Crc32),
1659            aws_s3_checksum_algorithm::AWS_SCA_SHA1 => Some(ChecksumAlgorithm::Sha1),
1660            aws_s3_checksum_algorithm::AWS_SCA_SHA256 => Some(ChecksumAlgorithm::Sha256),
1661            _ => unreachable!("unknown aws_s3_checksum_algorithm"),
1662        }
1663    }
1664}
1665
1666impl Display for ChecksumAlgorithm {
1667    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1668        match self {
1669            ChecksumAlgorithm::Crc64nvme => f.write_str("CRC64NVME"),
1670            ChecksumAlgorithm::Crc32c => f.write_str("CRC32C"),
1671            ChecksumAlgorithm::Crc32 => f.write_str("CRC32"),
1672            ChecksumAlgorithm::Sha1 => f.write_str("SHA1"),
1673            ChecksumAlgorithm::Sha256 => f.write_str("SHA256"),
1674            ChecksumAlgorithm::Unknown(algorithm) => write!(f, "Unknown algorithm: {algorithm:?}"),
1675        }
1676    }
1677}
1678
1679/// Info for the caller to review before an upload completes.
1680#[derive(Debug)]
1681pub struct UploadReview {
1682    /// Info about each part uploaded.
1683    pub parts: Vec<UploadReviewPart>,
1684    /// The checksum algorithm used.
1685    pub checksum_algorithm: Option<ChecksumAlgorithm>,
1686}
1687
1688impl UploadReview {
1689    fn new(review: &aws_s3_upload_review) -> Self {
1690        let checksum_algorithm = ChecksumAlgorithm::from_aws_s3_checksum_algorithm(review.checksum_algorithm);
1691        let count = review.part_count;
1692        assert!(count == 0 || !review.part_array.is_null());
1693        let mut parts = Vec::new();
1694        for i in 0..count {
1695            // SAFETY: `part_array` is an array of length `count`.
1696            let part = unsafe { &*review.part_array.add(i) };
1697            parts.push(UploadReviewPart::new(part));
1698        }
1699        Self {
1700            parts,
1701            checksum_algorithm,
1702        }
1703    }
1704}
1705
1706/// Info about a single part, for the caller to review before the upload completes.
1707#[derive(Debug)]
1708pub struct UploadReviewPart {
1709    /// Size in bytes of this part.
1710    pub size: u64,
1711
1712    /// Checksum string (usually base64-encoded), if computed.
1713    pub checksum: Option<String>,
1714}
1715
1716impl UploadReviewPart {
1717    fn new(part: &aws_s3_upload_part_review) -> Self {
1718        // SAFETY: `part.checksum` is a valid aws_byte_cursor. The returned slice is only used in current scope.
1719        let slice = unsafe { aws_byte_cursor_as_slice(&part.checksum) };
1720        let checksum = if slice.is_empty() {
1721            None
1722        } else {
1723            let str = std::str::from_utf8(slice).expect("Checksum should be a valid UTF-8 string.");
1724            Some(str.to_owned())
1725        };
1726        let size = part.size;
1727        Self { size, checksum }
1728    }
1729}
1730
1731#[cfg(test)]
1732mod tests {
1733    use test_case::test_case;
1734
1735    use crate::aws_s3_request_type;
1736    use crate::s3::client::RequestType;
1737
1738    #[test_case(aws_s3_request_type::AWS_S3_REQUEST_TYPE_UNKNOWN, RequestType::Unknown)]
1739    #[test_case(aws_s3_request_type::AWS_S3_REQUEST_TYPE_HEAD_OBJECT, RequestType::HeadObject)]
1740    #[test_case(aws_s3_request_type::AWS_S3_REQUEST_TYPE_GET_OBJECT, RequestType::GetObject)]
1741    fn request_type_from_aws_s3_request_type(c_request_type: aws_s3_request_type, expected_request_type: RequestType) {
1742        // Simple, but was previously broken.
1743        assert_eq!(expected_request_type, RequestType::from(c_request_type));
1744    }
1745}