Skip to main content

mountpoint_s3_client/
s3_crt_client.rs

1use std::cmp;
2use std::ffi::{OsStr, OsString};
3use std::future::Future;
4use std::num::NonZeroUsize;
5use std::ops::Range;
6use std::ops::{Deref, DerefMut};
7use std::os::unix::prelude::OsStrExt;
8use std::pin::Pin;
9use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
10use std::sync::{Arc, Mutex};
11use std::task::{Context, Poll};
12use std::time::{Duration, Instant};
13
14use futures::FutureExt;
15use futures::future::{Fuse, FusedFuture};
16pub use mountpoint_s3_crt::auth::credentials::{CredentialsProvider, CredentialsProviderStaticOptions};
17use mountpoint_s3_crt::auth::credentials::{CredentialsProviderChainDefaultOptions, CredentialsProviderProfileOptions};
18use mountpoint_s3_crt::auth::signing_config::SigningConfig;
19use mountpoint_s3_crt::common::allocator::Allocator;
20pub use mountpoint_s3_crt::common::error::Error as CrtError;
21use mountpoint_s3_crt::common::string::AwsString;
22use mountpoint_s3_crt::common::uri::Uri;
23use mountpoint_s3_crt::http::request_response::{Header, Headers, HeadersError, Message};
24use mountpoint_s3_crt::io::channel_bootstrap::{ClientBootstrap, ClientBootstrapOptions};
25pub use mountpoint_s3_crt::io::event_loop::EventLoopGroup;
26use mountpoint_s3_crt::io::host_resolver::{AddressKinds, HostResolver, HostResolverDefaultOptions};
27use mountpoint_s3_crt::io::retry_strategy::{ExponentialBackoffJitterMode, RetryStrategy, StandardRetryOptions};
28use mountpoint_s3_crt::io::stream::InputStream;
29use mountpoint_s3_crt::s3::buffer::Buffer;
30use mountpoint_s3_crt::s3::client::{
31    BufferPoolUsageStats, ChecksumConfig, Client, ClientConfig, MetaRequest, MetaRequestOptions, MetaRequestResult,
32    MetaRequestType, RequestMetrics, init_signing_config,
33};
34
35use async_trait::async_trait;
36use futures::channel::oneshot;
37use mountpoint_s3_crt::s3::pool::{CrtBufferPoolFactory, MemoryPool, MemoryPoolFactory, MemoryPoolFactoryWrapper};
38use percent_encoding::{AsciiSet, NON_ALPHANUMERIC, percent_encode};
39use pin_project::pin_project;
40use thiserror::Error;
41use tracing::{Span, debug, error, trace};
42
43use crate::checksums::{crc32_to_base64, crc32c_to_base64, crc64nvme_to_base64, sha1_to_base64, sha256_to_base64};
44use crate::endpoint_config::EndpointError;
45use crate::endpoint_config::{self, EndpointConfig};
46use crate::error_metadata::{ClientErrorMetadata, ProvideErrorMetadata};
47use crate::metrics::{
48    ATTR_HTTP_STATUS, ATTR_S3_REQUEST, S3_REQUEST_CANCELED, S3_REQUEST_COUNT, S3_REQUEST_ERRORS,
49    S3_REQUEST_FIRST_BYTE_LATENCY, S3_REQUEST_TOTAL_LATENCY,
50};
51use crate::object_client::*;
52use crate::user_agent::UserAgent;
53
54macro_rules! request_span {
55    ($self:expr, $method:expr, $($field:tt)*) => {{
56        let counter = $self.next_request_counter();
57        // I have confused myself at least 4 times about how to choose the level for tracing spans.
58        // We want this span to be constructed whenever events at WARN or lower severity (INFO,
59        // DEBUG, TRACE) are emitted. So we set its severity to WARN too.
60        let span = tracing::warn_span!(target: "mountpoint_s3_client::s3_crt_client::request", $method, id = counter, $($field)*);
61        span.in_scope(|| tracing::debug!("new request"));
62        span
63    }};
64    ($self:expr, $method:expr) => { request_span!($self, $method,) };
65}
66
67pub(crate) mod copy_object;
68pub(crate) mod delete_object;
69pub(crate) mod get_object;
70
71pub(crate) use get_object::S3GetObjectResponse;
72pub(crate) mod get_object_attributes;
73
74pub(crate) mod head_object;
75pub(crate) mod list_objects;
76
77pub(crate) mod rename_object;
78
79pub(crate) mod head_bucket;
80pub(crate) mod put_object;
81pub use head_bucket::HeadBucketError;
82pub(crate) use put_object::S3PutObjectRequest;
83
84/// `tracing` doesn't allow dynamic levels but we want to dynamically choose the log level for
85/// requests based on their response status. https://github.com/tokio-rs/tracing/issues/372
86macro_rules! event {
87    ($level:expr, $($args:tt)*) => {
88        match $level {
89            ::tracing::Level::ERROR => ::tracing::event!(::tracing::Level::ERROR, $($args)*),
90            ::tracing::Level::WARN => ::tracing::event!(::tracing::Level::WARN, $($args)*),
91            ::tracing::Level::INFO => ::tracing::event!(::tracing::Level::INFO, $($args)*),
92            ::tracing::Level::DEBUG => ::tracing::event!(::tracing::Level::DEBUG, $($args)*),
93            ::tracing::Level::TRACE => ::tracing::event!(::tracing::Level::TRACE, $($args)*),
94        }
95    }
96}
97
98/// Configurations for the CRT-based S3 client
99#[derive(Debug, Clone)]
100pub struct S3ClientConfig {
101    auth_config: S3ClientAuthConfig,
102    throughput_target_gbps: f64,
103    memory_limit_in_bytes: u64,
104    read_part_size: usize,
105    write_part_size: usize,
106    endpoint_config: EndpointConfig,
107    user_agent: Option<UserAgent>,
108    request_payer: Option<String>,
109    bucket_owner: Option<String>,
110    max_attempts: Option<NonZeroUsize>,
111    read_backpressure: bool,
112    initial_read_window: usize,
113    network_interface_names: Vec<String>,
114    telemetry_callback: Option<Arc<dyn OnTelemetry>>,
115    event_loop_threads: Option<u16>,
116    buffer_pool_factory: Option<MemoryPoolFactoryWrapper>,
117}
118
119impl Default for S3ClientConfig {
120    fn default() -> Self {
121        const DEFAULT_PART_SIZE: usize = 8 * 1024 * 1024;
122        Self {
123            auth_config: Default::default(),
124            throughput_target_gbps: 10.0,
125            memory_limit_in_bytes: 0,
126            read_part_size: DEFAULT_PART_SIZE,
127            write_part_size: DEFAULT_PART_SIZE,
128            endpoint_config: EndpointConfig::new("us-east-1"),
129            user_agent: None,
130            request_payer: None,
131            bucket_owner: None,
132            max_attempts: None,
133            read_backpressure: false,
134            initial_read_window: DEFAULT_PART_SIZE,
135            network_interface_names: vec![],
136            telemetry_callback: None,
137            event_loop_threads: None,
138            buffer_pool_factory: None,
139        }
140    }
141}
142
143impl S3ClientConfig {
144    pub fn new() -> Self {
145        Self::default()
146    }
147
148    /// Set the configuration for authenticating to S3
149    #[must_use = "S3ClientConfig follows a builder pattern"]
150    pub fn auth_config(mut self, auth_config: S3ClientAuthConfig) -> Self {
151        self.auth_config = auth_config;
152        self
153    }
154
155    /// Set the part size for multi-part operations to S3 (both PUT and GET)
156    #[must_use = "S3ClientConfig follows a builder pattern"]
157    pub fn part_size(mut self, part_size: usize) -> Self {
158        self.read_part_size = part_size;
159        self.write_part_size = part_size;
160        self
161    }
162
163    /// Set the part size for multi-part-get operations to S3.
164    #[must_use = "S3ClientConfig follows a builder pattern"]
165    pub fn read_part_size(mut self, size: usize) -> Self {
166        self.read_part_size = size;
167        self
168    }
169
170    /// Set the part size for multi-part-put operations to S3.
171    #[must_use = "S3ClientConfig follows a builder pattern"]
172    pub fn write_part_size(mut self, size: usize) -> Self {
173        self.write_part_size = size;
174        self
175    }
176
177    /// Set the target throughput in Gbps for the S3 client
178    #[must_use = "S3ClientConfig follows a builder pattern"]
179    pub fn throughput_target_gbps(mut self, throughput_target_gbps: f64) -> Self {
180        self.throughput_target_gbps = throughput_target_gbps;
181        self
182    }
183
184    /// Set the memory limit in bytes for the S3 client
185    #[must_use = "S3ClientConfig follows a builder pattern"]
186    pub fn memory_limit_in_bytes(mut self, memory_limit_in_bytes: u64) -> Self {
187        self.memory_limit_in_bytes = memory_limit_in_bytes;
188        self
189    }
190
191    /// Set the endpoint configuration for endpoint resolution
192    #[must_use = "S3ClientConfig follows a builder pattern"]
193    pub fn endpoint_config(mut self, endpoint_config: EndpointConfig) -> Self {
194        self.endpoint_config = endpoint_config;
195        self
196    }
197
198    /// Set a constructor for the HTTP User-agent header for S3 requests
199    #[must_use = "S3ClientConfig follows a builder pattern"]
200    pub fn user_agent(mut self, user_agent: UserAgent) -> Self {
201        self.user_agent = Some(user_agent);
202        self
203    }
204
205    /// Set a value for the request payer HTTP header for S3 requests
206    #[must_use = "S3ClientConfig follows a builder pattern"]
207    pub fn request_payer(mut self, request_payer: &str) -> Self {
208        self.request_payer = Some(request_payer.to_owned());
209        self
210    }
211
212    /// Set an expected bucket owner value
213    #[must_use = "S3ClientConfig follows a builder pattern"]
214    pub fn bucket_owner(mut self, bucket_owner: &str) -> Self {
215        self.bucket_owner = Some(bucket_owner.to_owned());
216        self
217    }
218
219    /// Set a maximum number of attempts for S3 requests. Will be overridden by the
220    /// `AWS_MAX_ATTEMPTS` environment variable if set.
221    #[must_use = "S3ClientConfig follows a builder pattern"]
222    pub fn max_attempts(mut self, max_attempts: NonZeroUsize) -> Self {
223        self.max_attempts = Some(max_attempts);
224        self
225    }
226
227    /// Set the flag for backpressure read
228    #[must_use = "S3ClientConfig follows a builder pattern"]
229    pub fn read_backpressure(mut self, read_backpressure: bool) -> Self {
230        self.read_backpressure = read_backpressure;
231        self
232    }
233
234    /// Set a value for initial backpressure read window size
235    #[must_use = "S3ClientConfig follows a builder pattern"]
236    pub fn initial_read_window(mut self, initial_read_window: usize) -> Self {
237        self.initial_read_window = initial_read_window;
238        self
239    }
240
241    /// Set a list of network interfaces to distribute S3 requests over
242    #[must_use = "S3ClientConfig follows a builder pattern"]
243    pub fn network_interface_names(mut self, network_interface_names: Vec<String>) -> Self {
244        self.network_interface_names = network_interface_names;
245        self
246    }
247
248    /// Set a custom telemetry callback handler
249    #[must_use = "S3ClientConfig follows a builder pattern"]
250    pub fn telemetry_callback(mut self, telemetry_callback: Arc<dyn OnTelemetry>) -> Self {
251        self.telemetry_callback = Some(telemetry_callback);
252        self
253    }
254
255    /// Override the number of threads used by the CRTs AwsEventLoop
256    #[must_use = "S3ClientConfig follows a builder pattern"]
257    pub fn event_loop_threads(mut self, event_loop_threads: u16) -> Self {
258        self.event_loop_threads = Some(event_loop_threads);
259        self
260    }
261
262    /// Set a custom memory pool
263    #[must_use = "S3ClientConfig follows a builder pattern"]
264    pub fn memory_pool(mut self, pool: impl MemoryPool) -> Self {
265        self.buffer_pool_factory = Some(MemoryPoolFactoryWrapper::new(move |_| pool.clone()));
266        self
267    }
268
269    /// Set a custom memory pool factory
270    #[must_use = "S3ClientConfig follows a builder pattern"]
271    pub fn memory_pool_factory(mut self, pool_factory: impl MemoryPoolFactory) -> Self {
272        self.buffer_pool_factory = Some(MemoryPoolFactoryWrapper::new(pool_factory));
273        self
274    }
275}
276
277/// Authentication configuration for the CRT-based S3 client
278#[derive(Debug, Clone, Default)]
279pub enum S3ClientAuthConfig {
280    /// The default AWS credentials resolution chain, similar to the AWS CLI
281    #[default]
282    Default,
283    /// Do not sign requests at all
284    NoSigning,
285    /// Explicitly load the given profile name from the AWS CLI configuration file
286    Profile(String),
287    /// Use a custom credentials provider
288    Provider(CredentialsProvider),
289}
290
291/// An S3 client that uses the [AWS Common Runtime (CRT)][crt] to make requests.
292///
293/// The AWS CRT is a C library that provides a common set of functionality for AWS SDKs. Its S3
294/// client provides high throughput by implementing S3 performance best practices, including
295/// automatic parallelization of GET and PUT requests.
296///
297/// To use this client, invoke the methods from the [`ObjectClient`] trait.
298///
299/// [crt]: https://docs.aws.amazon.com/sdkref/latest/guide/common-runtime.html
300#[derive(Debug, Clone)]
301pub struct S3CrtClient {
302    inner: Arc<S3CrtClientInner>,
303}
304
305impl S3CrtClient {
306    /// Construct a new S3 client with the given configuration.
307    pub fn new(config: S3ClientConfig) -> Result<Self, NewClientError> {
308        Ok(Self {
309            inner: Arc::new(S3CrtClientInner::new(config)?),
310        })
311    }
312
313    /// Return a copy of the [EndpointConfig] for this client
314    pub fn endpoint_config(&self) -> EndpointConfig {
315        self.inner.endpoint_config.clone()
316    }
317
318    #[doc(hidden)]
319    pub fn event_loop_group(&self) -> EventLoopGroup {
320        self.inner.event_loop_group.clone()
321    }
322}
323
324#[derive(Debug)]
325struct S3CrtClientInner {
326    s3_client: Client,
327    event_loop_group: EventLoopGroup,
328    endpoint_config: EndpointConfig,
329    allocator: Allocator,
330    next_request_counter: AtomicU64,
331    /// user_agent_header will be passed into CRT which add additional information "CRTS3NativeClient/0.1.x".
332    /// Here it will add the user agent prefix and s3 client information.
333    user_agent_header: String,
334    request_payer: Option<String>,
335    read_part_size: usize,
336    write_part_size: usize,
337    enable_backpressure: bool,
338    initial_read_window_size: usize,
339    bucket_owner: Option<String>,
340    credentials_provider: Option<CredentialsProvider>,
341    host_resolver: HostResolver,
342    telemetry_callback: Option<Arc<dyn OnTelemetry>>,
343}
344
345impl S3CrtClientInner {
346    fn new(config: S3ClientConfig) -> Result<Self, NewClientError> {
347        let allocator = Allocator::default();
348
349        let mut event_loop_group = EventLoopGroup::new_default(&allocator, config.event_loop_threads, || {}).unwrap();
350
351        let resolver_options = HostResolverDefaultOptions {
352            max_entries: 8,
353            event_loop_group: &mut event_loop_group,
354        };
355
356        let mut host_resolver = HostResolver::new_default(&allocator, &resolver_options).unwrap();
357
358        let bootstrap_options = ClientBootstrapOptions {
359            event_loop_group: &mut event_loop_group,
360            host_resolver: &mut host_resolver,
361        };
362
363        let mut client_bootstrap = ClientBootstrap::new(&allocator, &bootstrap_options).unwrap();
364
365        let mut client_config = ClientConfig::new();
366
367        let retry_strategy = {
368            let mut retry_strategy_options = StandardRetryOptions::default(&mut event_loop_group);
369            let max_attempts = std::env::var("AWS_MAX_ATTEMPTS")
370                .ok()
371                .and_then(|s| s.parse::<usize>().ok())
372                .or_else(|| config.max_attempts.map(|m| m.get()))
373                .unwrap_or(3);
374            // Max *attempts* includes the initial attempt, the CRT's max *retries* does not, so
375            // decrement by one
376            retry_strategy_options.backoff_retry_options.max_retries = max_attempts.saturating_sub(1);
377            retry_strategy_options.backoff_retry_options.backoff_scale_factor = Duration::from_millis(500);
378            retry_strategy_options.backoff_retry_options.jitter_mode = ExponentialBackoffJitterMode::Full;
379            RetryStrategy::standard(&allocator, &retry_strategy_options).unwrap()
380        };
381
382        trace!("constructing client with auth config {:?}", config.auth_config);
383        let credentials_provider = match config.auth_config {
384            S3ClientAuthConfig::Default => {
385                let credentials_chain_default_options = CredentialsProviderChainDefaultOptions {
386                    bootstrap: &mut client_bootstrap,
387                };
388                CredentialsProvider::new_chain_default(&allocator, credentials_chain_default_options)
389                    .map_err(NewClientError::ProviderFailure)?
390            }
391            S3ClientAuthConfig::NoSigning => {
392                CredentialsProvider::new_anonymous(&allocator).map_err(NewClientError::ProviderFailure)?
393            }
394            S3ClientAuthConfig::Profile(profile_name) => {
395                let credentials_profile_options = CredentialsProviderProfileOptions {
396                    bootstrap: &mut client_bootstrap,
397                    profile_name_override: &profile_name,
398                };
399                CredentialsProvider::new_profile(&allocator, credentials_profile_options)
400                    .map_err(NewClientError::ProviderFailure)?
401            }
402            S3ClientAuthConfig::Provider(provider) => provider,
403        };
404
405        let endpoint_config = config.endpoint_config;
406        client_config.region(endpoint_config.get_region());
407        let signing_config = init_signing_config(
408            endpoint_config.get_region(),
409            credentials_provider.clone(),
410            None,
411            None,
412            None,
413        );
414
415        let endpoint_config = match endpoint_config.get_endpoint() {
416            None => {
417                // No explicit endpoint was configured, let's try the environment variable
418                if let Ok(aws_endpoint_url) = std::env::var("AWS_ENDPOINT_URL") {
419                    debug!("using AWS_ENDPOINT_URL {}", aws_endpoint_url);
420                    let env_uri = Uri::new_from_str(&allocator, &aws_endpoint_url)
421                        .map_err(|e| EndpointError::InvalidUri(endpoint_config::InvalidUriError::CouldNotParse(e)))?;
422                    endpoint_config.endpoint(env_uri)
423                } else {
424                    endpoint_config
425                }
426            }
427            Some(_) => endpoint_config,
428        };
429
430        client_config.express_support(true);
431        client_config.read_backpressure(config.read_backpressure);
432        client_config.initial_read_window(config.initial_read_window);
433        client_config.signing_config(signing_config);
434
435        client_config
436            .client_bootstrap(client_bootstrap)
437            .retry_strategy(retry_strategy);
438
439        client_config.throughput_target_gbps(config.throughput_target_gbps);
440        client_config.memory_limit_in_bytes(config.memory_limit_in_bytes);
441
442        if let Some(wrapper) = config.buffer_pool_factory {
443            let pool_factory = CrtBufferPoolFactory::new(wrapper, event_loop_group.clone());
444            client_config.buffer_pool_factory(pool_factory);
445        }
446
447        // max_part_size is 5GB or less depending on the platform (4GB on 32-bit)
448        let max_part_size = cmp::min(5_u64 * 1024 * 1024 * 1024, usize::MAX as u64) as usize;
449        // TODO: Review the part size validation for read_part_size, read_part_size can have a more relax limit.
450        for part_size in [config.read_part_size, config.write_part_size] {
451            if !(5 * 1024 * 1024..=max_part_size).contains(&part_size) {
452                return Err(NewClientError::InvalidConfiguration(format!(
453                    "part size must be at between 5MiB and {}GiB",
454                    max_part_size / 1024 / 1024 / 1024
455                )));
456            }
457        }
458
459        if !config.network_interface_names.is_empty() {
460            client_config.network_interface_names(config.network_interface_names);
461        }
462
463        let user_agent = config.user_agent.unwrap_or_else(|| UserAgent::new(None));
464        let user_agent_header = user_agent.build();
465
466        let s3_client = Client::new(&allocator, client_config).map_err(NewClientError::CrtError)?;
467
468        Ok(Self {
469            allocator,
470            s3_client,
471            event_loop_group,
472            endpoint_config,
473            next_request_counter: AtomicU64::new(0),
474            user_agent_header,
475            request_payer: config.request_payer,
476            read_part_size: config.read_part_size,
477            write_part_size: config.write_part_size,
478            enable_backpressure: config.read_backpressure,
479            initial_read_window_size: config.initial_read_window,
480            bucket_owner: config.bucket_owner,
481            credentials_provider: Some(credentials_provider),
482            host_resolver,
483            telemetry_callback: config.telemetry_callback,
484        })
485    }
486
487    /// Create a new HTTP request template for the given HTTP method and S3 bucket name.
488    /// Pre-populates common headers used across all requests. Sets the "accept" header assuming the
489    /// response should be XML; this header should be overwritten for requests like GET that return
490    /// object data.
491    fn new_request_template(&self, method: &str, bucket: &str) -> Result<S3Message<'_>, ConstructionError> {
492        let endpoint = self.endpoint_config.resolve_for_bucket(bucket)?;
493        let uri = endpoint.uri()?;
494        trace!(?uri, "resolved endpoint");
495
496        let signing_config = if let Some(credentials_provider) = &self.credentials_provider {
497            let auth_scheme = match endpoint.auth_scheme() {
498                Ok(auth_scheme) => auth_scheme,
499                Err(e) => {
500                    error!(error=?e, "no auth scheme for endpoint");
501                    return Err(e.into());
502                }
503            };
504            trace!(?auth_scheme, "resolved auth scheme");
505            let algorithm = Some(auth_scheme.scheme_name());
506            let service = Some(auth_scheme.signing_name());
507            let use_double_uri_encode = Some(!auth_scheme.disable_double_encoding());
508            Some(init_signing_config(
509                auth_scheme.signing_region(),
510                credentials_provider.clone(),
511                algorithm,
512                service,
513                use_double_uri_encode,
514            ))
515        } else {
516            None
517        };
518
519        let hostname = uri.host_name().to_str().unwrap();
520        let path_prefix = uri.path().to_os_string().into_string().unwrap();
521        let port = uri.host_port();
522        let hostname_header = if port > 0 {
523            format!("{hostname}:{port}")
524        } else {
525            hostname.to_string()
526        };
527
528        let mut message = Message::new_request(&self.allocator)?;
529        message.set_request_method(method)?;
530        message.add_header(&Header::new("Host", hostname_header))?;
531        message.add_header(&Header::new("accept", "application/xml"))?;
532        message.add_header(&Header::new("User-Agent", &self.user_agent_header))?;
533
534        if let Some(ref payer) = self.request_payer {
535            message.add_header(&Header::new("x-amz-request-payer", payer))?;
536        }
537
538        if let Some(ref owner) = self.bucket_owner {
539            message.add_header(&Header::new("x-amz-expected-bucket-owner", owner))?;
540        }
541
542        Ok(S3Message {
543            inner: message,
544            uri,
545            path_prefix,
546            checksum_config: None,
547            signing_config,
548        })
549    }
550
551    /// Make a meta-request using this S3 client that invokes the given callbacks as the request
552    /// makes progress.
553    ///
554    /// The `parse_meta_request_error` callback is invoked on failed requests. It should return `None`
555    /// if it doesn't have a request-specific failure reason. The client will apply some generic error
556    /// parsing in this case (e.g. for permissions errors).
557    #[allow(clippy::too_many_arguments)]
558    fn meta_request_with_callbacks<E: std::error::Error + Send + 'static>(
559        &self,
560        mut options: MetaRequestOptions,
561        request_span: Span,
562        on_request_finish: impl Fn(&RequestMetrics) + Send + 'static,
563        mut on_headers: impl FnMut(&Headers, i32) + Send + 'static,
564        mut on_body: impl FnMut(u64, &Buffer) + Send + 'static,
565        parse_meta_request_error: impl FnOnce(&MetaRequestResult) -> Option<E> + Send + 'static,
566        on_meta_request_result: impl FnOnce(ObjectClientResult<(), E, S3RequestError>) + Send + 'static,
567    ) -> Result<CancellingMetaRequest, S3RequestError> {
568        let span_telemetry = request_span.clone();
569        let span_body = request_span.clone();
570        let span_finish = request_span;
571
572        let endpoint = options.get_endpoint().expect("S3Message always has an endpoint");
573        let hostname = endpoint.host_name().to_str().unwrap().to_owned();
574        let host_resolver = self.host_resolver.clone();
575        let telemetry_callback = self.telemetry_callback.clone();
576
577        let start_time = Instant::now();
578        let first_body_part = Arc::new(AtomicBool::new(true));
579        let first_body_part_clone = Arc::clone(&first_body_part);
580        let total_bytes = Arc::new(AtomicU64::new(0));
581        let total_bytes_clone = Arc::clone(&total_bytes);
582
583        options
584            .on_telemetry(move |metrics| {
585                let _guard = span_telemetry.enter();
586
587                on_request_finish(metrics);
588
589                let http_status = metrics.status_code();
590                let request_canceled = metrics.is_canceled();
591                let request_failure = http_status.map(|status| !(200..299).contains(&status)).unwrap_or(!request_canceled);
592                let crt_error = Some(metrics.error()).filter(|e| e.is_err());
593                let operation_name = operation_name_to_static_metrics_string(metrics.operation_name());
594                let request_id = metrics.request_id().unwrap_or_else(|| "<unknown>".into());
595                let duration = metrics.total_duration();
596                let ttfb = metrics.time_to_first_byte();
597                let range = metrics.response_headers().and_then(|headers| extract_range_header(&headers));
598
599                let message = if request_failure {
600                    "S3 request failed"
601                } else if request_canceled {
602                    "S3 request canceled"
603                } else {
604                    "S3 request finished"
605                };
606                debug!(?operation_name, ?crt_error, http_status, ?range, ?duration, ?ttfb, %request_id, "{}", message);
607                trace!(detailed_metrics=?metrics, "S3 request completed");
608
609                if let Some(ttfb) = ttfb {
610                    metrics::histogram!(S3_REQUEST_FIRST_BYTE_LATENCY, ATTR_S3_REQUEST => operation_name).record(ttfb.as_micros() as f64);
611                }
612                metrics::histogram!(S3_REQUEST_TOTAL_LATENCY, ATTR_S3_REQUEST => operation_name).record(duration.as_micros() as f64);
613                metrics::counter!(S3_REQUEST_COUNT, ATTR_S3_REQUEST => operation_name).increment(1);
614                if request_failure {
615                    metrics::counter!(S3_REQUEST_ERRORS, ATTR_S3_REQUEST => operation_name, ATTR_HTTP_STATUS => http_status.unwrap_or(-1).to_string()).increment(1);
616                } else if request_canceled {
617                    metrics::counter!(S3_REQUEST_CANCELED, ATTR_S3_REQUEST => operation_name).increment(1);
618                }
619
620                if let Some(telemetry_callback) = &telemetry_callback {
621                    telemetry_callback.on_telemetry(metrics);
622                }
623            })
624            .on_headers(move |headers, response_status| {
625                (on_headers)(headers, response_status);
626            })
627            .on_body(move |range_start, data| {
628                let _guard = span_body.enter();
629
630                if first_body_part.compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst).ok() == Some(true) {
631                    let latency = start_time.elapsed().as_micros() as f64;
632                    let op = span_body.metadata().map(|m| m.name()).unwrap_or("unknown");
633                    metrics::histogram!("s3.meta_requests.first_byte_latency_us", "op" => op).record(latency);
634                }
635                total_bytes.fetch_add(data.len() as u64, Ordering::SeqCst);
636
637                trace!(start = range_start, length = data.len(), "body part received");
638
639                (on_body)(range_start, data);
640            })
641            .on_finish(move |request_result| {
642                let _guard = span_finish.enter();
643
644                let op = span_finish.metadata().map(|m| m.name()).unwrap_or("unknown");
645                let duration = start_time.elapsed();
646
647                metrics::counter!("s3.meta_requests", "op" => op).increment(1);
648                metrics::histogram!("s3.meta_requests.total_latency_us", "op" => op).record(duration.as_micros() as f64);
649                // Some HTTP requests (like HEAD) don't have a body to stream back, so calculate TTFB now
650                if first_body_part_clone.compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst).ok() == Some(true)  {
651                    let latency = duration.as_micros() as f64;
652                    metrics::histogram!("s3.meta_requests.first_byte_latency_us", "op" => op).record(latency);
653                }
654                let total_bytes = total_bytes_clone.load(Ordering::SeqCst);
655                // We only log throughput of object data. PUT needs to be measured in its stream
656                // implementation rather than these callbacks, so we can only do GET here.
657                if op == "get_object" {
658                    emit_throughput_metric(total_bytes, duration, op);
659                }
660                let hostname_awsstring = AwsString::from_str(&hostname, &Allocator::default());
661                if let Ok(host_count) = host_resolver.get_host_address_count(&hostname_awsstring, AddressKinds::a()) {
662                    metrics::gauge!("s3.client.host_count", "host" => hostname).set(host_count as f64);
663                }
664
665                let status_code = request_result.response_status;
666                let log_level = if (200..=399).contains(&status_code) || status_code == 404 || request_result.is_canceled() {
667                    tracing::Level::DEBUG
668                } else {
669                    tracing::Level::WARN
670                };
671
672                let result =
673                    if !request_result.is_err() {
674                        event!(log_level, ?duration, "meta request finished");
675                        Ok(())
676                    } else {
677                        // The `parse_meta_request_error` callback has a choice of whether to give us an error or not.
678                        // If not, fall back to generic error parsing (e.g. for permissions errors), or just no error if that fails too.
679                        let error = parse_meta_request_error(&request_result).map(ObjectClientError::ServiceError);
680                        let maybe_err = error.or_else(|| try_parse_generic_error(&request_result).map(ObjectClientError::ClientError));
681
682                        // Try to parse request header out of the failure. We can't just use the
683                        // telemetry callback because there might be multiple requests per meta
684                        // request, but these headers are known to be from the failed request.
685                        let request_id = match &request_result.error_response_headers {
686                            Some(headers) => headers.get("x-amz-request-id").map(|s| s.value().to_string_lossy().to_string()).ok(),
687                            None => None,
688                        };
689                        let request_id = request_id.unwrap_or_else(|| "<unknown>".into());
690
691                        let message = if request_result.is_canceled() {
692                            "meta request canceled"
693                        } else {
694                            "meta request failed"
695                        };
696                        if let Some(error) = &maybe_err {
697                            event!(log_level, ?duration, %request_id, ?error, message);
698                            debug!("meta request result: {:?}", request_result);
699                        } else {
700                            event!(log_level, ?duration, %request_id, ?request_result, message);
701                        }
702
703                        if request_result.is_canceled() {
704                            metrics::counter!("s3.meta_requests.canceled", "op" => op).increment(1);
705                        } else {
706                            // If it's not a real HTTP status, encode the CRT error in the metric instead
707                            let error_status = if request_result.response_status >= 100 {
708                                request_result.response_status
709                            } else {
710                                -request_result.crt_error.raw_error()
711                            };
712                            metrics::counter!("s3.meta_requests.failures", "op" => op, "status" => format!("{error_status}")).increment(1);
713                        }
714
715                        // Fill in a generic error if we weren't able to parse one
716                        Err(maybe_err.unwrap_or_else(|| ObjectClientError::ClientError(S3RequestError::ResponseError(request_result))))
717                    };
718
719                on_meta_request_result(result);
720            });
721
722        // Issue the HTTP request using the CRT's S3 meta request API
723        let meta_request = self.s3_client.make_meta_request(options)?;
724        Self::poll_client_metrics(&self.s3_client);
725        Ok(CancellingMetaRequest::wrap(meta_request))
726    }
727
728    /// Make a meta-request using this S3 client that returns the response body on success or
729    /// invokes the given callback on failure.
730    ///
731    /// The `parse_meta_request_error` callback is invoked on failed requests. It should return `None`
732    /// if it doesn't have a request-specific failure reason. The client will apply some generic error
733    /// parsing in this case (e.g. for permissions errors).
734    fn meta_request_with_body_payload<E: std::error::Error + Send + 'static>(
735        &self,
736        options: MetaRequestOptions,
737        request_span: Span,
738        parse_meta_request_error: impl FnOnce(&MetaRequestResult) -> Option<E> + Send + 'static,
739    ) -> Result<S3MetaRequest<Vec<u8>, E>, S3RequestError> {
740        let (tx, rx) = oneshot::channel::<ObjectClientResult<Vec<u8>, E, S3RequestError>>();
741
742        // Accumulate the body of the response into this Vec<u8>
743        let body: Arc<Mutex<Vec<u8>>> = Default::default();
744        let body_clone = Arc::clone(&body);
745
746        let meta_request = self.meta_request_with_callbacks(
747            options,
748            request_span,
749            |_| {},
750            |_, _| {},
751            move |offset, data| {
752                let mut body = body_clone.lock().unwrap();
753                assert_eq!(offset as usize, body.len());
754                body.extend_from_slice(data);
755            },
756            parse_meta_request_error,
757            move |result| _ = tx.send(result.map(|_| std::mem::take(&mut *body.lock().unwrap()))),
758        )?;
759        Ok(S3MetaRequest {
760            receiver: rx.fuse(),
761            meta_request,
762        })
763    }
764
765    /// Make a meta-request using this S3 client that returns the response headers on success or
766    /// invokes the given callback on failure.
767    ///
768    /// The `parse_meta_request_error` callback is invoked on failed requests. It should return `None`
769    /// if it doesn't have a request-specific failure reason. The client will apply some generic error
770    /// parsing in this case (e.g. for permissions errors).
771    fn meta_request_with_headers_payload<E: std::error::Error + Send + 'static>(
772        &self,
773        options: MetaRequestOptions,
774        request_span: Span,
775        parse_meta_request_error: impl FnOnce(&MetaRequestResult) -> Option<E> + Send + 'static,
776    ) -> Result<S3MetaRequest<Headers, E>, S3RequestError> {
777        let (tx, rx) = oneshot::channel::<ObjectClientResult<Headers, E, S3RequestError>>();
778
779        // On success, stash the headers in this lock during the on_headers callback,
780        // and pull them out during the on_meta_request_result callback.
781        let on_headers: Arc<Mutex<Option<Headers>>> = Default::default();
782        let on_result = on_headers.clone();
783
784        let meta_request = self.meta_request_with_callbacks(
785            options,
786            request_span,
787            |_| {},
788            move |headers, status| {
789                // Only store headers if we have a 2xx status code. If we only get other status codes,
790                // then on_meta_request_result will send an error.
791                if (200..300).contains(&status) {
792                    *on_headers.lock().unwrap() = Some(headers.clone());
793                }
794            },
795            |_, _| {},
796            parse_meta_request_error,
797            move |result| {
798                // Return the headers on success, otherwise propagate the error.
799                let headers =
800                    result.and_then(|_| {
801                        on_result.lock().unwrap().take().ok_or_else(|| {
802                            S3RequestError::internal_failure(ResponseHeadersError::MissingHeaders).into()
803                        })
804                    });
805                _ = tx.send(headers);
806            },
807        )?;
808        Ok(S3MetaRequest {
809            receiver: rx.fuse(),
810            meta_request,
811        })
812    }
813
814    /// Make a meta-request using this S3 client that invokes the given callback on failure.
815    ///
816    /// The `parse_meta_request_error` callback is invoked on failed requests. It should return `None`
817    /// if it doesn't have a request-specific failure reason. The client will apply some generic error
818    /// parsing in this case (e.g. for permissions errors).
819    fn meta_request_without_payload<E: std::error::Error + Send + 'static>(
820        &self,
821        options: MetaRequestOptions,
822        request_span: Span,
823        parse_meta_request_error: impl FnOnce(&MetaRequestResult) -> Option<E> + Send + 'static,
824    ) -> Result<S3MetaRequest<(), E>, S3RequestError> {
825        let (tx, rx) = oneshot::channel::<ObjectClientResult<(), E, S3RequestError>>();
826
827        let meta_request = self.meta_request_with_callbacks(
828            options,
829            request_span,
830            |_| {},
831            |_, _| {},
832            |_, _| {},
833            parse_meta_request_error,
834            move |result| _ = tx.send(result),
835        )?;
836        Ok(S3MetaRequest {
837            receiver: rx.fuse(),
838            meta_request,
839        })
840    }
841
842    fn poll_client_metrics(s3_client: &Client) {
843        let metrics = s3_client.poll_client_metrics();
844        metrics::gauge!("s3.client.num_requests_being_processed").set(metrics.num_requests_tracked_requests as f64);
845        metrics::gauge!("s3.client.num_requests_being_prepared").set(metrics.num_requests_being_prepared as f64);
846        metrics::gauge!("s3.client.request_queue_size").set(metrics.request_queue_size as f64);
847        metrics::gauge!("s3.client.num_auto_default_network_io").set(metrics.num_auto_default_network_io as f64);
848        metrics::gauge!("s3.client.num_auto_ranged_get_network_io").set(metrics.num_auto_ranged_get_network_io as f64);
849        metrics::gauge!("s3.client.num_auto_ranged_put_network_io").set(metrics.num_auto_ranged_put_network_io as f64);
850        metrics::gauge!("s3.client.num_auto_ranged_copy_network_io")
851            .set(metrics.num_auto_ranged_copy_network_io as f64);
852        metrics::gauge!("s3.client.num_total_network_io").set(metrics.num_total_network_io() as f64);
853        metrics::gauge!("s3.client.num_requests_stream_queued_waiting")
854            .set(metrics.num_requests_stream_queued_waiting as f64);
855        metrics::gauge!("s3.client.num_requests_streaming_response")
856            .set(metrics.num_requests_streaming_response as f64);
857
858        // Buffer pool metrics
859        let start = Instant::now();
860        if let Some(buffer_pool_stats) = s3_client.poll_default_buffer_pool_usage_stats() {
861            metrics::histogram!("s3.client.buffer_pool.get_usage_latency_us")
862                .record(start.elapsed().as_micros() as f64);
863            metrics::gauge!("s3.client.buffer_pool.mem_limit").set(buffer_pool_stats.mem_limit as f64);
864            metrics::gauge!("s3.client.buffer_pool.primary_cutoff").set(buffer_pool_stats.primary_cutoff as f64);
865            metrics::gauge!("s3.client.buffer_pool.primary_used").set(buffer_pool_stats.primary_used as f64);
866            metrics::gauge!("s3.client.buffer_pool.primary_allocated").set(buffer_pool_stats.primary_allocated as f64);
867            metrics::gauge!("s3.client.buffer_pool.primary_reserved").set(buffer_pool_stats.primary_reserved as f64);
868            metrics::gauge!("s3.client.buffer_pool.primary_num_blocks")
869                .set(buffer_pool_stats.primary_num_blocks as f64);
870            metrics::gauge!("s3.client.buffer_pool.secondary_reserved")
871                .set(buffer_pool_stats.secondary_reserved as f64);
872            metrics::gauge!("s3.client.buffer_pool.secondary_used").set(buffer_pool_stats.secondary_used as f64);
873            metrics::gauge!("s3.client.buffer_pool.forced_used").set(buffer_pool_stats.forced_used as f64);
874        }
875    }
876
877    fn next_request_counter(&self) -> u64 {
878        self.next_request_counter.fetch_add(1, Ordering::SeqCst)
879    }
880}
881
882/// Failure retrieving headers
883#[derive(Debug, Error)]
884enum ResponseHeadersError {
885    #[error("response headers are missing")]
886    MissingHeaders,
887}
888
889/// S3 operation supported by this client.
890#[derive(Debug, Clone, Copy)]
891enum S3Operation {
892    DeleteObject,
893    GetObject,
894    GetObjectAttributes,
895    HeadBucket,
896    HeadObject,
897    ListObjects,
898    PutObject,
899    CopyObject,
900    PutObjectSingle,
901}
902
903impl S3Operation {
904    /// The [MetaRequestType] to use for this operation.
905    fn meta_request_type(&self) -> MetaRequestType {
906        match self {
907            S3Operation::GetObject => MetaRequestType::GetObject,
908            S3Operation::PutObject => MetaRequestType::PutObject,
909            S3Operation::CopyObject => MetaRequestType::CopyObject,
910            _ => MetaRequestType::Default,
911        }
912    }
913
914    /// The operation name to set when configuring a request. Required for operations that
915    /// have MetaRequestType::Default (see [meta_request_type]). `None` otherwise.
916    fn operation_name(&self) -> Option<&'static str> {
917        match self {
918            S3Operation::DeleteObject => Some("DeleteObject"),
919            S3Operation::GetObject => None,
920            S3Operation::GetObjectAttributes => Some("GetObjectAttributes"),
921            S3Operation::HeadBucket => Some("HeadBucket"),
922            S3Operation::HeadObject => Some("HeadObject"),
923            S3Operation::ListObjects => Some("ListObjectsV2"),
924            S3Operation::PutObject => None,
925            S3Operation::CopyObject => None,
926            S3Operation::PutObjectSingle => Some("PutObject"),
927        }
928    }
929}
930
931/// A HTTP message to be sent to S3. This is a wrapper around a plain HTTP message, except that it
932/// helps us correctly configure the endpoint and "Host" header to handle both path-style and
933/// virtual-hosted-style addresses. The `path_prefix` is appended to the front of all paths, and
934/// need not be terminated with a `/`.
935#[derive(Debug)]
936struct S3Message<'a> {
937    inner: Message<'a>,
938    uri: Uri,
939    path_prefix: String,
940    checksum_config: Option<ChecksumConfig>,
941    signing_config: Option<SigningConfig>,
942}
943
944// This is RFC 3986 but with '/' also considered a safe character for path fragments.
945const URLENCODE_QUERY_FRAGMENT: &AsciiSet = &NON_ALPHANUMERIC.remove(b'-').remove(b'.').remove(b'_').remove(b'~');
946const URLENCODE_PATH_FRAGMENT: &AsciiSet = &URLENCODE_QUERY_FRAGMENT.remove(b'/');
947
948fn write_encoded_fragment(s: &mut OsString, piece: impl AsRef<OsStr>, encoding: &'static AsciiSet) {
949    let iter = percent_encode(piece.as_ref().as_bytes(), encoding);
950    s.extend(iter.map(|s| OsStr::from_bytes(s.as_bytes())));
951}
952
953#[derive(Debug, Default)]
954enum QueryFragment<'a, P: AsRef<OsStr>> {
955    #[default]
956    Empty,
957    Query(&'a [(P, P)]),
958    Action(P),
959}
960
961impl<P: AsRef<OsStr>> QueryFragment<'_, P> {
962    // This estimate is exact if no characters need encoding, otherwise we'll end up
963    // reallocating a couple of times. The '?' for the query is counted in the first key-value
964    // pair.
965    fn size(&self) -> usize {
966        match self {
967            QueryFragment::Empty => 0,
968            QueryFragment::Query(query) => query
969                .iter()
970                .map(|(key, value)| key.as_ref().len() + value.as_ref().len() + 2)
971                .sum::<usize>(),
972            QueryFragment::Action(action) => 1 + action.as_ref().len(),
973        }
974    }
975
976    // Write the fragment to the query string
977    fn write(&self, path: &mut OsString) {
978        match &self {
979            QueryFragment::Query(query) if !query.is_empty() => {
980                path.push("?");
981                for (i, (key, value)) in query.iter().enumerate() {
982                    if i != 0 {
983                        path.push("&");
984                    }
985                    write_encoded_fragment(path, key, URLENCODE_QUERY_FRAGMENT);
986                    path.push("=");
987                    write_encoded_fragment(path, value, URLENCODE_QUERY_FRAGMENT);
988                }
989            }
990            QueryFragment::Action(action) => {
991                path.push("?");
992                path.push(action.as_ref());
993            }
994            _ => {}
995        }
996    }
997}
998
999impl<'a> S3Message<'a> {
1000    /// Add a header to this message. The header is added if necessary and any existing values for
1001    /// this header are removed.
1002    fn set_header(
1003        &mut self,
1004        header: &Header<impl AsRef<OsStr>, impl AsRef<OsStr>>,
1005    ) -> Result<(), mountpoint_s3_crt::common::error::Error> {
1006        self.inner.set_header(header)
1007    }
1008
1009    /// Set the request path and query for this message. The components should not be URL-encoded;
1010    /// this method will handle that.
1011    fn set_request_path_and_query<P: AsRef<OsStr>>(
1012        &mut self,
1013        path: impl AsRef<OsStr>,
1014        query_or_action: QueryFragment<P>,
1015    ) -> Result<(), mountpoint_s3_crt::common::error::Error> {
1016        let space_needed = self.path_prefix.len() + query_or_action.size();
1017
1018        let mut full_path = OsString::with_capacity(space_needed);
1019
1020        write_encoded_fragment(&mut full_path, &self.path_prefix, URLENCODE_PATH_FRAGMENT);
1021        write_encoded_fragment(&mut full_path, &path, URLENCODE_PATH_FRAGMENT);
1022
1023        // Build the query string
1024        query_or_action.write(&mut full_path);
1025        self.inner.set_request_path(full_path)
1026    }
1027
1028    /// Set the request path for this message. The path should not be URL-encoded; this method will
1029    /// handle that.
1030    fn set_request_path(&mut self, path: impl AsRef<OsStr>) -> Result<(), mountpoint_s3_crt::common::error::Error> {
1031        self.set_request_path_and_query::<&str>(path, Default::default())
1032    }
1033
1034    /// Sets the checksum configuration for this message.
1035    fn set_checksum_config(&mut self, checksum_config: Option<ChecksumConfig>) {
1036        self.checksum_config = checksum_config;
1037    }
1038
1039    /// Sets the body input stream for this message, and returns any previously set input stream.
1040    /// If input_stream is None, unsets the body.
1041    fn set_body_stream(&mut self, input_stream: Option<InputStream<'a>>) -> Option<InputStream<'a>> {
1042        self.inner.set_body_stream(input_stream)
1043    }
1044
1045    /// Set the content length header.
1046    fn set_content_length_header(
1047        &mut self,
1048        content_length: usize,
1049    ) -> Result<(), mountpoint_s3_crt::common::error::Error> {
1050        self.inner
1051            .set_header(&Header::new("Content-Length", content_length.to_string()))
1052    }
1053
1054    /// Set the checksum header.
1055    fn set_checksum_header(
1056        &mut self,
1057        checksum: &UploadChecksum,
1058    ) -> Result<(), mountpoint_s3_crt::common::error::Error> {
1059        let header = match checksum {
1060            UploadChecksum::Crc64nvme(crc64) => Header::new("x-amz-checksum-crc64nvme", crc64nvme_to_base64(crc64)),
1061            UploadChecksum::Crc32c(crc32c) => Header::new("x-amz-checksum-crc32c", crc32c_to_base64(crc32c)),
1062            UploadChecksum::Crc32(crc32) => Header::new("x-amz-checksum-crc32", crc32_to_base64(crc32)),
1063            UploadChecksum::Sha1(sha1) => Header::new("x-amz-checksum-sha1", sha1_to_base64(sha1)),
1064            UploadChecksum::Sha256(sha256) => Header::new("x-amz-checksum-sha256", sha256_to_base64(sha256)),
1065        };
1066        self.inner.set_header(&header)
1067    }
1068
1069    fn into_options(self, operation: S3Operation) -> MetaRequestOptions<'a> {
1070        let mut options = MetaRequestOptions::new();
1071        if let Some(checksum_config) = self.checksum_config {
1072            options.checksum_config(checksum_config);
1073        }
1074        if let Some(signing_config) = self.signing_config {
1075            options.signing_config(signing_config);
1076        }
1077        options
1078            .message(self.inner)
1079            .endpoint(self.uri)
1080            .request_type(operation.meta_request_type());
1081        if let Some(operation_name) = operation.operation_name() {
1082            options.operation_name(operation_name);
1083        }
1084        options
1085    }
1086}
1087
1088#[derive(Debug)]
1089#[pin_project]
1090#[must_use]
1091struct S3MetaRequest<T, E> {
1092    /// Receiver for the result of meta-request.
1093    #[pin]
1094    receiver: Fuse<oneshot::Receiver<ObjectClientResult<T, E, S3RequestError>>>,
1095    meta_request: CancellingMetaRequest,
1096}
1097
1098impl<T: Send, E: Send> Future for S3MetaRequest<T, E> {
1099    type Output = ObjectClientResult<T, E, S3RequestError>;
1100
1101    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1102        let this = self.project();
1103        this.receiver
1104            .poll(cx)
1105            .map(|result| result.unwrap_or_else(|err| Err(S3RequestError::internal_failure(err).into())))
1106    }
1107}
1108
1109impl<T: Send, E: Send> FusedFuture for S3MetaRequest<T, E> {
1110    fn is_terminated(&self) -> bool {
1111        self.receiver.is_terminated()
1112    }
1113}
1114
1115/// Wrapper for a [MetaRequest] that cancels it on drop.
1116///
1117/// Note that if the request has already completed, cancelling it has no effect.
1118#[derive(Debug)]
1119struct CancellingMetaRequest {
1120    inner: MetaRequest,
1121}
1122
1123impl CancellingMetaRequest {
1124    fn wrap(meta_request: MetaRequest) -> Self {
1125        Self { inner: meta_request }
1126    }
1127}
1128
1129impl Drop for CancellingMetaRequest {
1130    fn drop(&mut self) {
1131        self.inner.cancel();
1132    }
1133}
1134
1135impl Deref for CancellingMetaRequest {
1136    type Target = MetaRequest;
1137
1138    fn deref(&self) -> &Self::Target {
1139        &self.inner
1140    }
1141}
1142
1143impl DerefMut for CancellingMetaRequest {
1144    fn deref_mut(&mut self) -> &mut Self::Target {
1145        &mut self.inner
1146    }
1147}
1148
1149/// Failures to construct a new S3 client
1150#[derive(Error, Debug)]
1151#[non_exhaustive]
1152pub enum NewClientError {
1153    /// Invalid S3 endpoint
1154    #[error("invalid S3 endpoint")]
1155    InvalidEndpoint(#[from] EndpointError),
1156    /// Invalid AWS credentials
1157    #[error("invalid AWS credentials")]
1158    ProviderFailure(#[source] mountpoint_s3_crt::common::error::Error),
1159    /// Invalid configuration
1160    #[error("invalid configuration: {0}")]
1161    InvalidConfiguration(String),
1162    /// An internal error from within the AWS Common Runtime
1163    #[error("Unknown CRT error")]
1164    CrtError(#[source] mountpoint_s3_crt::common::error::Error),
1165}
1166
1167/// Errors returned by the CRT-based S3 client
1168#[derive(Error, Debug)]
1169pub enum S3RequestError {
1170    /// An internal error from within the S3 client. The request may have been sent.
1171    #[error("Internal S3 client error")]
1172    InternalError(#[source] Box<dyn std::error::Error + Send + Sync>),
1173
1174    /// An internal error from within the AWS Common Runtime. The request may have been sent.
1175    #[error("Unknown CRT error")]
1176    CrtError(#[from] mountpoint_s3_crt::common::error::Error),
1177
1178    /// An error during construction of a request. The request was not sent.
1179    #[error("Failed to construct request")]
1180    ConstructionFailure(#[from] ConstructionError),
1181
1182    /// The request was sent but an unknown or unhandled failure occurred while processing it.
1183    #[error("Unknown response error: {0:?}")]
1184    ResponseError(MetaRequestResult),
1185
1186    /// The request was made to the wrong region
1187    #[error("Wrong region (expecting {0})")]
1188    IncorrectRegion(String, ClientErrorMetadata),
1189
1190    /// Forbidden
1191    #[error("Forbidden: {0}")]
1192    Forbidden(String, ClientErrorMetadata),
1193
1194    /// The request was attempted but could not be signed due to no available credentials
1195    #[error("No signing credentials available, see CRT debug logs")]
1196    NoSigningCredentials,
1197
1198    /// The client was unable to get S3 Express session credentials.
1199    #[error("Failed to create S3 Express session, see CRT debug logs")]
1200    CreateSessionError,
1201
1202    /// The request was canceled
1203    #[error("Request canceled")]
1204    RequestCanceled,
1205
1206    /// The request was throttled by S3
1207    #[error("Request throttled")]
1208    Throttled,
1209
1210    /// Cannot fetch more data because current read window is exhausted. The read window must
1211    /// be advanced using [GetObjectRequest::increment_read_window(u64)] to continue fetching
1212    /// new data.
1213    #[error("Polled for data with empty read window")]
1214    EmptyReadWindow,
1215}
1216
1217impl S3RequestError {
1218    fn construction_failure(inner: impl Into<ConstructionError>) -> Self {
1219        S3RequestError::ConstructionFailure(inner.into())
1220    }
1221
1222    fn internal_failure(inner: impl std::error::Error + Send + Sync + 'static) -> Self {
1223        S3RequestError::InternalError(Box::new(inner))
1224    }
1225}
1226
1227impl ProvideErrorMetadata for S3RequestError {
1228    fn meta(&self) -> ClientErrorMetadata {
1229        match self {
1230            Self::ResponseError(request_result) => ClientErrorMetadata::from_meta_request_result(request_result),
1231            Self::Forbidden(_, metadata) => metadata.clone(),
1232            Self::Throttled => ClientErrorMetadata {
1233                http_code: Some(503),
1234                error_code: Some("SlowDown".to_string()),
1235                error_message: Some("Please reduce your request rate.".to_string()),
1236            },
1237            Self::IncorrectRegion(_, metadata) => metadata.clone(),
1238            _ => Default::default(),
1239        }
1240    }
1241}
1242
1243#[derive(Error, Debug)]
1244pub enum ConstructionError {
1245    /// CRT error while constructing the request
1246    #[error("Unknown CRT error")]
1247    CrtError(#[from] mountpoint_s3_crt::common::error::Error),
1248
1249    /// The S3 endpoint was invalid
1250    #[error("Invalid S3 endpoint")]
1251    InvalidEndpoint(#[from] EndpointError),
1252}
1253
1254/// Return a `&'static str` for a given non-static `&str`, as required by [metrics] crate.
1255fn operation_name_to_static_metrics_string(operation_name: Option<&str>) -> &'static str {
1256    const UNKNOWN_METRIC_STR: &str = "Unknown";
1257
1258    let Some(operation_name) = operation_name else {
1259        return UNKNOWN_METRIC_STR;
1260    };
1261
1262    // Take an input expression, and then a list of strings to map into their `&'static str` equivalent.
1263    // Use macro to avoid typos in mapping.
1264    macro_rules! map_to_static_str_for_known_str {
1265        ($input:expr, $($str_literal:literal),* $(,)?) => {
1266            match $input {
1267                $(
1268                    $str_literal => Some($str_literal),
1269                )*
1270                _ => None
1271            }
1272        };
1273    }
1274
1275    let static_str = map_to_static_str_for_known_str!(
1276        operation_name,
1277        "GetObject",
1278        "HeadObject",
1279        "ListParts",
1280        "CreateMultipartUpload",
1281        "UploadPart",
1282        "AbortMultipartUpload",
1283        "CompleteMultipartUpload",
1284        "UploadPartCopy",
1285        "CopyObject",
1286        "PutObject",
1287        "ListObjectsV2",
1288        "DeleteObject",
1289        "GetObjectAttributes",
1290        "HeadBucket",
1291        "RenameObject",
1292    );
1293
1294    debug_assert!(
1295        static_str.is_some(),
1296        "input set as {operation_name:?} but no matcher, update required",
1297    );
1298    static_str.unwrap_or(UNKNOWN_METRIC_STR)
1299}
1300
1301/// Extract the byte range from the Content-Range header if present and valid
1302fn extract_range_header(headers: &Headers) -> Option<Range<u64>> {
1303    let header = headers.get("Content-Range").ok()?;
1304    let value = header.value().to_str()?;
1305
1306    // Content-Range: <unit> <range-start>-<range-end>/<size>
1307
1308    if !value.starts_with("bytes ") {
1309        return None;
1310    }
1311    let (_, value) = value.split_at("bytes ".len());
1312    let (range, _) = value.split_once('/')?;
1313    let (start, end) = range.split_once('-')?;
1314    let start = start.parse::<u64>().ok()?;
1315    let end = end.parse::<u64>().ok()?;
1316
1317    // Rust ranges are exclusive at the end, but Content-Range is inclusive
1318    Some(start..end + 1)
1319}
1320
1321/// Extract the [Checksum] information from headers
1322fn parse_checksum(headers: &Headers) -> Result<Checksum, HeadersError> {
1323    let checksum_crc32 = headers.get_as_optional_string("x-amz-checksum-crc32")?;
1324    let checksum_crc32c = headers.get_as_optional_string("x-amz-checksum-crc32c")?;
1325    let checksum_sha1 = headers.get_as_optional_string("x-amz-checksum-sha1")?;
1326    let checksum_sha256 = headers.get_as_optional_string("x-amz-checksum-sha256")?;
1327    let checksum_crc64nvme = headers.get_as_optional_string("x-amz-checksum-crc64nvme")?;
1328
1329    Ok(Checksum {
1330        checksum_crc64nvme,
1331        checksum_crc32,
1332        checksum_crc32c,
1333        checksum_sha1,
1334        checksum_sha256,
1335    })
1336}
1337
1338/// Try to parse a modeled error out of a failing meta request
1339fn try_parse_generic_error(request_result: &MetaRequestResult) -> Option<S3RequestError> {
1340    /// Look for a redirect header pointing to a different region for the bucket
1341    fn try_parse_redirect(request_result: &MetaRequestResult) -> Option<S3RequestError> {
1342        let headers = request_result.error_response_headers.as_ref()?;
1343        let region_header = headers.get("x-amz-bucket-region").ok()?;
1344        let region = region_header.value().to_owned().into_string().ok()?;
1345        Some(S3RequestError::IncorrectRegion(
1346            region,
1347            ClientErrorMetadata::from_meta_request_result(request_result),
1348        ))
1349    }
1350
1351    /// Look for access-related errors
1352    fn try_parse_forbidden(request_result: &MetaRequestResult) -> Option<S3RequestError> {
1353        let Some(body) = request_result.error_response_body.as_ref() else {
1354            // Header-only requests like HeadObject and HeadBucket can't give us a more detailed
1355            // error, so just trust the response code
1356            return Some(S3RequestError::Forbidden(
1357                "<no message>".to_owned(),
1358                ClientErrorMetadata {
1359                    http_code: Some(request_result.response_status),
1360                    ..Default::default()
1361                },
1362            ));
1363        };
1364        let error_elem = xmltree::Element::parse(body.as_bytes()).ok()?;
1365        let error_code = error_elem.get_child("Code")?;
1366        let error_code_str = error_code.get_text()?;
1367        // Always translate 403 to Forbidden, but otherwise first check the error code, since other
1368        // response statuses are overloaded and not always access-related errors.
1369        if request_result.response_status == 403
1370            || matches!(
1371                error_code_str.deref(),
1372                "AccessDenied" | "InvalidToken" | "ExpiredToken" | "SignatureDoesNotMatch"
1373            )
1374        {
1375            let message = error_elem
1376                .get_child("Message")
1377                .and_then(|e| e.get_text())
1378                .unwrap_or(error_code_str.clone());
1379            Some(S3RequestError::Forbidden(
1380                message.to_string(),
1381                ClientErrorMetadata {
1382                    http_code: Some(request_result.response_status),
1383                    error_code: Some(error_code_str.to_string()),
1384                    error_message: Some(message.into_owned()),
1385                },
1386            ))
1387        } else {
1388            None
1389        }
1390    }
1391
1392    /// Try to look for error related to failing to have credentials to make S3 requests, return generic error otherwise
1393    fn try_parse_no_credentials_or_generic(request_result: &MetaRequestResult) -> S3RequestError {
1394        let crt_error_code = request_result.crt_error.raw_error();
1395        if crt_error_code == mountpoint_s3_crt::auth::ErrorCode::AWS_AUTH_SIGNING_NO_CREDENTIALS as i32 {
1396            S3RequestError::NoSigningCredentials
1397        } else if crt_error_code == mountpoint_s3_crt::s3::ErrorCode::AWS_ERROR_S3EXPRESS_CREATE_SESSION_FAILED as i32 {
1398            S3RequestError::CreateSessionError
1399        } else {
1400            S3RequestError::CrtError(crt_error_code.into())
1401        }
1402    }
1403
1404    fn try_parse_throttled(request_result: &MetaRequestResult) -> Option<S3RequestError> {
1405        let crt_error_code = request_result.crt_error.raw_error();
1406        if crt_error_code == mountpoint_s3_crt::s3::ErrorCode::AWS_ERROR_S3_SLOW_DOWN as i32 {
1407            Some(S3RequestError::Throttled)
1408        } else {
1409            None
1410        }
1411    }
1412
1413    /// Handle canceled requests
1414    fn try_parse_canceled_request(request_result: &MetaRequestResult) -> Option<S3RequestError> {
1415        request_result.is_canceled().then_some(S3RequestError::RequestCanceled)
1416    }
1417
1418    match request_result.response_status {
1419        301 => try_parse_redirect(request_result),
1420        // 400 is overloaded, it can be an access error (invalid token) or (for MRAP) a bucket
1421        // redirect
1422        400 => try_parse_forbidden(request_result).or_else(|| try_parse_redirect(request_result)),
1423        403 => try_parse_forbidden(request_result),
1424        // if the http response status is not set, we look into crt_error_code to identify the error
1425        0 => try_parse_throttled(request_result)
1426            .or_else(|| try_parse_canceled_request(request_result))
1427            .or_else(|| Some(try_parse_no_credentials_or_generic(request_result))),
1428        _ => None,
1429    }
1430}
1431
1432/// Record a throughput metric for GET/PUT. We can't inline this into S3CrtClient callbacks because
1433/// PUT bytes don't transit those callbacks.
1434fn emit_throughput_metric(bytes: u64, duration: Duration, op: &'static str) {
1435    let throughput_mbps = bytes as f64 / 1024.0 / 1024.0 / duration.as_secs_f64();
1436    // Semi-arbitrary choices here to avoid averaging out large and small requests
1437    const MEGABYTE: u64 = 1024 * 1024;
1438    let bucket = if bytes < MEGABYTE {
1439        "<1MiB"
1440    } else if bytes <= 16 * MEGABYTE {
1441        "1-16MiB"
1442    } else {
1443        ">16MiB"
1444    };
1445    metrics::histogram!("s3.meta_requests.throughput_mibs", "op" => op, "size" => bucket).record(throughput_mbps);
1446}
1447
1448#[cfg_attr(not(docsrs), async_trait)]
1449impl ObjectClient for S3CrtClient {
1450    type GetObjectResponse = S3GetObjectResponse;
1451    type PutObjectRequest = S3PutObjectRequest;
1452    type ClientError = S3RequestError;
1453
1454    fn read_part_size(&self) -> usize {
1455        self.inner.read_part_size
1456    }
1457
1458    fn write_part_size(&self) -> usize {
1459        // TODO: the CRT does some clamping to a max size rather than just swallowing the part size
1460        // we configured it with, so this might be wrong. Right now the only clamping is to the max
1461        // S3 part size (5GiB), so this shouldn't affect the result.
1462        // https://github.com/awslabs/aws-c-s3/blob/94e3342c12833c5199/source/s3_client.c#L337-L344
1463        self.inner.write_part_size
1464    }
1465
1466    fn initial_read_window_size(&self) -> Option<usize> {
1467        if self.inner.enable_backpressure {
1468            Some(self.inner.initial_read_window_size)
1469        } else {
1470            None
1471        }
1472    }
1473
1474    fn mem_usage_stats(&self) -> Option<BufferPoolUsageStats> {
1475        let start = Instant::now();
1476        self.inner
1477            .s3_client
1478            .poll_default_buffer_pool_usage_stats()
1479            .inspect(|_| {
1480                metrics::histogram!("s3.client.buffer_pool.get_usage_latency_us")
1481                    .record(start.elapsed().as_micros() as f64);
1482            })
1483    }
1484
1485    async fn delete_object(
1486        &self,
1487        bucket: &str,
1488        key: &str,
1489    ) -> ObjectClientResult<DeleteObjectResult, DeleteObjectError, Self::ClientError> {
1490        self.delete_object(bucket, key).await
1491    }
1492
1493    async fn copy_object(
1494        &self,
1495        source_bucket: &str,
1496        source_key: &str,
1497        destination_bucket: &str,
1498        destination_key: &str,
1499        params: &CopyObjectParams,
1500    ) -> ObjectClientResult<CopyObjectResult, CopyObjectError, S3RequestError> {
1501        self.copy_object(source_bucket, source_key, destination_bucket, destination_key, params)
1502            .await
1503    }
1504
1505    async fn get_object(
1506        &self,
1507        bucket: &str,
1508        key: &str,
1509        params: &GetObjectParams,
1510    ) -> ObjectClientResult<Self::GetObjectResponse, GetObjectError, Self::ClientError> {
1511        self.get_object(bucket, key, params).await
1512    }
1513
1514    async fn list_objects(
1515        &self,
1516        bucket: &str,
1517        continuation_token: Option<&str>,
1518        delimiter: &str,
1519        max_keys: usize,
1520        prefix: &str,
1521    ) -> ObjectClientResult<ListObjectsResult, ListObjectsError, Self::ClientError> {
1522        self.list_objects(bucket, continuation_token, delimiter, max_keys, prefix)
1523            .await
1524    }
1525
1526    async fn head_object(
1527        &self,
1528        bucket: &str,
1529        key: &str,
1530        params: &HeadObjectParams,
1531    ) -> ObjectClientResult<HeadObjectResult, HeadObjectError, Self::ClientError> {
1532        self.head_object(bucket, key, params).await
1533    }
1534
1535    async fn put_object(
1536        &self,
1537        bucket: &str,
1538        key: &str,
1539        params: &PutObjectParams,
1540    ) -> ObjectClientResult<Self::PutObjectRequest, PutObjectError, Self::ClientError> {
1541        self.put_object(bucket, key, params).await
1542    }
1543
1544    async fn put_object_single<'a>(
1545        &self,
1546        bucket: &str,
1547        key: &str,
1548        params: &PutObjectSingleParams,
1549        contents: impl AsRef<[u8]> + Send + 'a,
1550    ) -> ObjectClientResult<PutObjectResult, PutObjectError, Self::ClientError> {
1551        self.put_object_single(bucket, key, params, contents).await
1552    }
1553
1554    async fn get_object_attributes(
1555        &self,
1556        bucket: &str,
1557        key: &str,
1558        max_parts: Option<usize>,
1559        part_number_marker: Option<usize>,
1560        object_attributes: &[ObjectAttribute],
1561    ) -> ObjectClientResult<GetObjectAttributesResult, GetObjectAttributesError, Self::ClientError> {
1562        self.get_object_attributes(bucket, key, max_parts, part_number_marker, object_attributes)
1563            .await
1564    }
1565
1566    async fn rename_object(
1567        &self,
1568        bucket: &str,
1569        src_key: &str,
1570        dst_key: &str,
1571        params: &RenameObjectParams,
1572    ) -> ObjectClientResult<RenameObjectResult, RenameObjectError, Self::ClientError> {
1573        self.rename_object(bucket, src_key, dst_key, params).await
1574    }
1575}
1576
1577/// Custom handling of telemetry events
1578pub trait OnTelemetry: std::fmt::Debug + Send + Sync {
1579    fn on_telemetry(&self, request_metrics: &RequestMetrics);
1580}
1581
1582#[cfg(test)]
1583mod tests {
1584    use mountpoint_s3_crt::common::error::Error;
1585    use rusty_fork::rusty_fork_test;
1586    use std::assert_eq;
1587
1588    use super::*;
1589    use test_case::test_case;
1590
1591    /// Test explicit validation in [Client::new]
1592    fn client_new_fails_with_invalid_part_size(part_size: usize) {
1593        let config = S3ClientConfig::default().part_size(part_size);
1594        let e = S3CrtClient::new(config).expect_err("creating a new client should fail");
1595        let message = if cfg!(target_pointer_width = "64") {
1596            "invalid configuration: part size must be at between 5MiB and 5GiB".to_string()
1597        } else {
1598            format!(
1599                "invalid configuration: part size must be at between 5MiB and {}GiB",
1600                usize::MAX / 1024 / 1024 / 1024
1601            )
1602        };
1603        assert_eq!(e.to_string(), message);
1604    }
1605
1606    /// On 32-bit platform we limit part size with usize:MAX,
1607    /// it is impossible to provide a greater value
1608    #[cfg(target_pointer_width = "64")]
1609    #[test]
1610    fn client_new_fails_with_greater_part_size() {
1611        let part_size = 6 * 1024 * 1024 * 1024; // greater than 5GiB
1612        client_new_fails_with_invalid_part_size(part_size);
1613    }
1614
1615    #[test]
1616    fn client_new_fails_with_smaller_part_size() {
1617        let part_size = 4 * 1024 * 1024; // less than 5MiB
1618        client_new_fails_with_invalid_part_size(part_size);
1619    }
1620
1621    /// Test if the prefix is added correctly to the User-Agent header
1622    #[test]
1623    fn test_user_agent_with_prefix() {
1624        let user_agent_prefix = String::from("someprefix");
1625        let expected_user_agent = "someprefix mountpoint-s3-client/";
1626
1627        let config = S3ClientConfig {
1628            user_agent: Some(UserAgent::new(Some(user_agent_prefix))),
1629            ..Default::default()
1630        };
1631
1632        let client = S3CrtClient::new(config).expect("Create test client");
1633
1634        let mut message = client
1635            .inner
1636            .new_request_template("GET", "amzn-s3-demo-bucket")
1637            .expect("new request template expected");
1638
1639        let headers = message.inner.get_headers().expect("Expected a block of HTTP headers");
1640
1641        let user_agent_header = headers
1642            .get("User-Agent")
1643            .expect("User Agent Header expected with given prefix");
1644        let user_agent_header_value = user_agent_header.value();
1645
1646        assert!(
1647            user_agent_header_value
1648                .to_string_lossy()
1649                .starts_with(expected_user_agent)
1650        );
1651    }
1652
1653    fn assert_expected_host(expected_host: &str, endpoint_config: EndpointConfig) {
1654        let config = S3ClientConfig {
1655            endpoint_config,
1656            ..Default::default()
1657        };
1658
1659        let client = S3CrtClient::new(config).expect("create test client");
1660
1661        let mut message = client
1662            .inner
1663            .new_request_template("GET", "")
1664            .expect("new request template expected");
1665
1666        let headers = message.inner.get_headers().expect("expected a block of HTTP headers");
1667
1668        let host_header = headers.get("Host").expect("Host header expected");
1669        let host_header_value = host_header.value();
1670
1671        assert_eq!(host_header_value.to_string_lossy(), expected_host);
1672    }
1673
1674    // run with rusty_fork to avoid issues with other tests and their env variables.
1675    rusty_fork_test! {
1676        #[test]
1677        fn test_endpoint_favors_parameter_over_env_variable() {
1678            let endpoint_uri = Uri::new_from_str(&Allocator::default(), "https://s3.us-west-2.amazonaws.com").unwrap();
1679            let endpoint_config = EndpointConfig::new("region-place-holder").endpoint(endpoint_uri);
1680
1681            // SAFETY: This test is run in a forked process, so won't affect any other concurrently running tests.
1682            unsafe { std::env::set_var("AWS_ENDPOINT_URL", "https://s3.us-east-1.amazonaws.com"); }
1683
1684            // even though we set the environment variable, the parameter takes precedence
1685            assert_expected_host("s3.us-west-2.amazonaws.com", endpoint_config);
1686        }
1687
1688        #[test]
1689        fn test_endpoint_favors_env_variable() {
1690            let endpoint_config = EndpointConfig::new("us-east-1");
1691
1692            // SAFETY: This test is run in a forked process, so won't affect any other concurrently running tests.
1693            unsafe { std::env::set_var("AWS_ENDPOINT_URL", "https://s3.eu-west-1.amazonaws.com"); }
1694
1695            assert_expected_host("s3.eu-west-1.amazonaws.com", endpoint_config);
1696        }
1697
1698        #[test]
1699        fn test_endpoint_with_invalid_env_variable() {
1700            let endpoint_config = EndpointConfig::new("us-east-1");
1701
1702            // SAFETY: This test is run in a forked process, so won't affect any other concurrently running tests.
1703            unsafe { std::env::set_var("AWS_ENDPOINT_URL", "htp:/bad:url"); }
1704
1705            let config = S3ClientConfig {
1706                endpoint_config,
1707                ..Default::default()
1708            };
1709
1710            let client = S3CrtClient::new(config);
1711            match client {
1712                Ok(_) => panic!("expected an error"),
1713                Err(e) => assert_eq!(e.to_string().to_lowercase(), "invalid s3 endpoint"),
1714            }
1715        }
1716
1717    }
1718
1719    /// Simple test to ensure the user agent header is correct even when prefix is not added
1720    #[test]
1721    fn test_user_agent_without_prefix() {
1722        let expected_user_agent = "mountpoint-s3-client/";
1723
1724        let config: S3ClientConfig = Default::default();
1725
1726        let client = S3CrtClient::new(config).expect("Create test client");
1727
1728        let mut message = client
1729            .inner
1730            .new_request_template("GET", "amzn-s3-demo-bucket")
1731            .expect("new request template expected");
1732
1733        let headers = message.inner.get_headers().expect("Expected a block of HTTP headers");
1734
1735        let user_agent_header = headers
1736            .get("User-Agent")
1737            .expect("User Agent Header expected with given prefix");
1738        let user_agent_header_value = user_agent_header.value();
1739
1740        assert!(
1741            user_agent_header_value
1742                .to_string_lossy()
1743                .starts_with(expected_user_agent)
1744        );
1745    }
1746
1747    #[test_case("bytes 200-1000/67589" => Some(200..1001))]
1748    #[test_case("bytes 200-1000/*" => Some(200..1001))]
1749    #[test_case("bytes 200-1000" => None)]
1750    #[test_case("bytes */67589" => None)]
1751    #[test_case("octets 200-1000]" => None)]
1752    fn parse_content_range(range: &str) -> Option<Range<u64>> {
1753        let mut headers = Headers::new(&Allocator::default()).unwrap();
1754        let header = Header::new("Content-Range", range);
1755        headers.add_header(&header).unwrap();
1756        extract_range_header(&headers)
1757    }
1758
1759    /// Simple test to ensure the expected bucket owner can be set
1760    #[test]
1761    fn test_expected_bucket_owner() {
1762        let expected_bucket_owner = "111122223333";
1763
1764        let config: S3ClientConfig = S3ClientConfig::new().bucket_owner("111122223333");
1765
1766        let client = S3CrtClient::new(config).expect("Create test client");
1767
1768        let mut message = client
1769            .inner
1770            .new_request_template("GET", "amzn-s3-demo-bucket")
1771            .expect("new request template expected");
1772
1773        let headers = message.inner.get_headers().expect("Expected a block of HTTP headers");
1774
1775        let expected_bucket_owner_header = headers
1776            .get("x-amz-expected-bucket-owner")
1777            .expect("the headers should contain x-amz-expected-bucket-owner");
1778        let expected_bucket_owner_value = expected_bucket_owner_header.value();
1779
1780        assert!(
1781            expected_bucket_owner_value
1782                .to_string_lossy()
1783                .starts_with(expected_bucket_owner)
1784        );
1785    }
1786
1787    fn make_result(
1788        response_status: i32,
1789        body: impl Into<OsString>,
1790        bucket_region_header: Option<&str>,
1791    ) -> MetaRequestResult {
1792        let error_response_headers = bucket_region_header.map(|h| {
1793            let mut headers = Headers::new(&Allocator::default()).unwrap();
1794            headers.add_header(&Header::new("x-amz-bucket-region", h)).unwrap();
1795            headers
1796        });
1797        MetaRequestResult {
1798            response_status,
1799            crt_error: 1i32.into(),
1800            error_response_headers,
1801            error_response_body: Some(body.into()),
1802        }
1803    }
1804
1805    #[test]
1806    fn parse_301_redirect() {
1807        let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>PermanentRedirect</Code><Message>The bucket you are attempting to access must be addressed using the specified endpoint. Please send all future requests to this endpoint.</Message><Endpoint>amzn-s3-demo-bucket.s3-us-west-2.amazonaws.com</Endpoint><Bucket>amzn-s3-demo-bucket</Bucket><RequestId>CM0Z9YFABRVSWXDJ</RequestId><HostId>HHmbUixasrJ02DlkOSCvJId897Jm0ERHuE2XMkSn2Oax1J/ad2+AU9nFrODN1ay13cWFgIAYBnI=</HostId></Error>"#;
1808        let result = make_result(301, OsStr::from_bytes(&body[..]), Some("us-west-2"));
1809        let result = try_parse_generic_error(&result);
1810        let Some(S3RequestError::IncorrectRegion(region, _)) = result else {
1811            panic!("wrong result, got: {result:?}");
1812        };
1813        assert_eq!(region, "us-west-2");
1814    }
1815
1816    #[test]
1817    fn parse_403_access_denied() {
1818        let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>AccessDenied</Code><Message>Access Denied</Message><RequestId>CM0R497NB0WAQ977</RequestId><HostId>w1TqUKGaIuNAIgzqm/L2azuzgEBINxTngWPbV1iH2IvpLsVCCTKHJTh4HsGp4JnggHqVkA+KN1MGqHDw1+WEuA==</HostId></Error>"#;
1819        let result = make_result(403, OsStr::from_bytes(&body[..]), None);
1820        let result = try_parse_generic_error(&result);
1821        let Some(S3RequestError::Forbidden(message, _)) = result else {
1822            panic!("wrong result, got: {result:?}");
1823        };
1824        assert_eq!(message, "Access Denied");
1825    }
1826
1827    #[test]
1828    fn parse_400_invalid_token() {
1829        let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>InvalidToken</Code><Message>The provided token is malformed or otherwise invalid.</Message><Token-0>THEREALTOKENGOESHERE</Token-0><RequestId>CBFNVADDAZ8661HK</RequestId><HostId>rb5dpgYeIFxi8p5BzVK8s8wG/nQ4a7C5kMBp/KWIT4bvOUihugpssMTy7xS0mispbz6IIaX8W1g=</HostId></Error>"#;
1830        let result = make_result(400, OsStr::from_bytes(&body[..]), None);
1831        let result = try_parse_generic_error(&result);
1832        let Some(S3RequestError::Forbidden(message, _)) = result else {
1833            panic!("wrong result, got: {result:?}");
1834        };
1835        assert_eq!(message, "The provided token is malformed or otherwise invalid.");
1836    }
1837
1838    #[test]
1839    fn parse_400_expired_token() {
1840        let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>ExpiredToken</Code><Message>The provided token has expired.</Message><Token-0>THEREALTOKENGOESHERE</Token-0><RequestId>RFXW0E15XSRPJYSW</RequestId><HostId>djitP7S+g43JSzR4pMOJpOO3RYpQUOUsmD4AqhRe3v24+JB/c+vwOEZgI8A35KDUe1cqQ5yKHwg=</HostId></Error>"#;
1841        let result = make_result(400, OsStr::from_bytes(&body[..]), None);
1842        let result = try_parse_generic_error(&result);
1843        let Some(S3RequestError::Forbidden(message, _)) = result else {
1844            panic!("wrong result, got: {result:?}");
1845        };
1846        assert_eq!(message, "The provided token has expired.");
1847    }
1848
1849    #[test]
1850    fn parse_400_redirect() {
1851        // From an s3-accelerate endpoint with the wrong region set for signing
1852        let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>AuthorizationHeaderMalformed</Code><Message>The authorization header is malformed; the region \'us-east-1\' is wrong; expecting \'us-west-2\'</Message><Region>us-west-2</Region><RequestId>VR3NH4JF5F39GB66</RequestId><HostId>ZDzYFC1w0E5K34+ZCAnvh9ZiGaAhvx5COyZVYTUnKvSP/694xCiXmJ2AEGZd5T1Epy9vB4EOOjk=</HostId></Error>"#;
1853        let result = make_result(400, OsStr::from_bytes(&body[..]), Some("us-west-2"));
1854        let result = try_parse_generic_error(&result);
1855        let Some(S3RequestError::IncorrectRegion(region, _)) = result else {
1856            panic!("wrong result, got: {result:?}");
1857        };
1858        assert_eq!(region, "us-west-2");
1859    }
1860
1861    #[test]
1862    fn parse_403_signature_does_not_match() {
1863        let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>SignatureDoesNotMatch</Code><Message>The request signature we calculated does not match the signature you provided. Check your key and signing method.</Message><AWSAccessKeyId>ASIASMEXAMPLE0000000</AWSAccessKeyId><StringToSign>EXAMPLE</StringToSign><SignatureProvided>EXAMPLE</SignatureProvided><StringToSignBytes>EXAMPLE</StringToSignBytes><CanonicalRequest>EXAMPLE</CanonicalRequest><CanonicalRequestBytes>EXAMPLE</CanonicalRequestBytes><RequestId>A1F516XX5M8AATSQ</RequestId><HostId>qs9dULIp5ABM7U+H8nGfzKtMYTxvqxIVvOYZ8lEFBDyTF4Fe+876Y4bLptG4mb+PTZFyG4yaUjg=</HostId></Error>"#;
1864        let result = make_result(403, OsStr::from_bytes(&body[..]), None);
1865        let result = try_parse_generic_error(&result);
1866        let Some(S3RequestError::Forbidden(message, _)) = result else {
1867            panic!("wrong result, got: {result:?}");
1868        };
1869        assert_eq!(
1870            message,
1871            "The request signature we calculated does not match the signature you provided. Check your key and signing method."
1872        );
1873    }
1874
1875    #[test]
1876    fn parse_403_made_up_error() {
1877        // A made up error to check that we map all 403s even if we don't recognize them
1878        let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>NotARealError</Code><Message>This error is made up.</Message><RequestId>CM0R497NB0WAQ977</RequestId><HostId>w1TqUKGaIuNAIgzqm/L2azuzgEBINxTngWPbV1iH2IvpLsVCCTKHJTh4HsGp4JnggHqVkA+KN1MGqHDw1+WEuA==</HostId></Error>"#;
1879        let result = make_result(403, OsStr::from_bytes(&body[..]), None);
1880        let result = try_parse_generic_error(&result);
1881        let Some(S3RequestError::Forbidden(message, _)) = result else {
1882            panic!("wrong result, got: {result:?}");
1883        };
1884        assert_eq!(message, "This error is made up.");
1885    }
1886
1887    fn make_crt_error_result(response_status: i32, crt_error: Error) -> MetaRequestResult {
1888        MetaRequestResult {
1889            response_status,
1890            crt_error,
1891            error_response_headers: None,
1892            error_response_body: None,
1893        }
1894    }
1895
1896    #[test]
1897    fn parse_no_signing_credential_error() {
1898        let error_code = mountpoint_s3_crt::auth::ErrorCode::AWS_AUTH_SIGNING_NO_CREDENTIALS as i32;
1899        let result = make_crt_error_result(0, error_code.into());
1900        let result = try_parse_generic_error(&result);
1901        let Some(S3RequestError::NoSigningCredentials) = result else {
1902            panic!("wrong result, got: {result:?}");
1903        };
1904    }
1905
1906    #[test]
1907    fn parse_test_other_crt_error() {
1908        // A signing error that isn't "no signing credentials"
1909        let error_code = mountpoint_s3_crt::auth::ErrorCode::AWS_AUTH_SIGNING_UNSUPPORTED_ALGORITHM as i32;
1910        let result = make_crt_error_result(0, error_code.into());
1911        let result = try_parse_generic_error(&result);
1912        let Some(S3RequestError::CrtError(error)) = result else {
1913            panic!("wrong result, got: {result:?}");
1914        };
1915        assert_eq!(error, error_code.into());
1916    }
1917
1918    #[test]
1919    fn test_checksum_sha256() {
1920        let mut headers = Headers::new(&Allocator::default()).unwrap();
1921        let value = "QwzjTQIHJO11oZbfwq1nx3dy0Wk=";
1922        let header = Header::new("x-amz-checksum-sha256", value.to_owned());
1923        headers.add_header(&header).unwrap();
1924
1925        let checksum = parse_checksum(&headers).expect("failed to parse headers");
1926        assert_eq!(checksum.checksum_crc32, None, "other checksums shouldn't be set");
1927        assert_eq!(checksum.checksum_crc32c, None, "other checksums shouldn't be set");
1928        assert_eq!(checksum.checksum_sha1, None, "other checksums shouldn't be set");
1929        assert_eq!(
1930            checksum.checksum_sha256,
1931            Some(value.to_owned()),
1932            "sha256 header should match"
1933        );
1934    }
1935
1936    #[test]
1937    fn test_operation_name_to_static_metrics_string() {
1938        assert_eq!(operation_name_to_static_metrics_string(Some("GetObject")), "GetObject");
1939        assert_eq!(
1940            operation_name_to_static_metrics_string(Some("RenameObject")),
1941            "RenameObject",
1942        );
1943        assert_eq!(operation_name_to_static_metrics_string(None), "Unknown");
1944    }
1945}