Skip to main content

mountpoint_s3_client/
s3_crt_client.rs

1use std::cmp;
2use std::ffi::{OsStr, OsString};
3use std::future::Future;
4use std::num::NonZeroUsize;
5use std::ops::Range;
6use std::ops::{Deref, DerefMut};
7use std::os::unix::prelude::OsStrExt;
8use std::pin::Pin;
9use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
10use std::sync::{Arc, Mutex};
11use std::task::{Context, Poll};
12use std::time::{Duration, Instant};
13
14use futures::FutureExt;
15use futures::future::{Fuse, FusedFuture};
16pub use mountpoint_s3_crt::auth::credentials::{CredentialsProvider, CredentialsProviderStaticOptions};
17use mountpoint_s3_crt::auth::credentials::{CredentialsProviderChainDefaultOptions, CredentialsProviderProfileOptions};
18use mountpoint_s3_crt::auth::signing_config::SigningConfig;
19use mountpoint_s3_crt::common::allocator::Allocator;
20pub use mountpoint_s3_crt::common::error::Error as CrtError;
21use mountpoint_s3_crt::common::string::AwsString;
22use mountpoint_s3_crt::common::uri::Uri;
23use mountpoint_s3_crt::http::request_response::{Header, Headers, HeadersError, Message};
24use mountpoint_s3_crt::io::channel_bootstrap::{ClientBootstrap, ClientBootstrapOptions};
25pub use mountpoint_s3_crt::io::event_loop::EventLoopGroup;
26use mountpoint_s3_crt::io::host_resolver::{AddressKinds, HostResolver, HostResolverDefaultOptions};
27use mountpoint_s3_crt::io::retry_strategy::{ExponentialBackoffJitterMode, RetryStrategy, StandardRetryOptions};
28use mountpoint_s3_crt::io::stream::InputStream;
29use mountpoint_s3_crt::s3::buffer::Buffer;
30use mountpoint_s3_crt::s3::client::{
31    BufferPoolUsageStats, ChecksumConfig, Client, ClientConfig, MetaRequest, MetaRequestOptions, MetaRequestResult,
32    MetaRequestType, RequestMetrics, init_signing_config,
33};
34
35use async_trait::async_trait;
36use futures::channel::oneshot;
37use mountpoint_s3_crt::s3::pool::{CrtBufferPoolFactory, MemoryPool, MemoryPoolFactory, MemoryPoolFactoryWrapper};
38use percent_encoding::{AsciiSet, NON_ALPHANUMERIC, percent_encode};
39use pin_project::pin_project;
40use thiserror::Error;
41use tracing::{Span, debug, error, trace};
42
43use crate::checksums::{crc32_to_base64, crc32c_to_base64, crc64nvme_to_base64, sha1_to_base64, sha256_to_base64};
44use crate::endpoint_config::EndpointError;
45use crate::endpoint_config::{self, EndpointConfig};
46use crate::error_metadata::{ClientErrorMetadata, ProvideErrorMetadata};
47use crate::metrics::{
48    ATTR_HTTP_STATUS, ATTR_S3_REQUEST, S3_REQUEST_CANCELED, S3_REQUEST_COUNT, S3_REQUEST_ERRORS,
49    S3_REQUEST_FIRST_BYTE_LATENCY, S3_REQUEST_TOTAL_LATENCY,
50};
51use crate::object_client::*;
52use crate::user_agent::UserAgent;
53
54macro_rules! request_span {
55    ($self:expr, $method:expr, $($field:tt)*) => {{
56        let counter = $self.next_request_counter();
57        // I have confused myself at least 4 times about how to choose the level for tracing spans.
58        // We want this span to be constructed whenever events at WARN or lower severity (INFO,
59        // DEBUG, TRACE) are emitted. So we set its severity to WARN too.
60        let span = tracing::warn_span!(target: "mountpoint_s3_client::s3_crt_client::request", $method, id = counter, $($field)*);
61        span.in_scope(|| tracing::debug!("new request"));
62        span
63    }};
64    ($self:expr, $method:expr) => { request_span!($self, $method,) };
65}
66
67pub(crate) mod copy_object;
68pub(crate) mod delete_object;
69pub(crate) mod get_object;
70
71pub(crate) use get_object::S3GetObjectResponse;
72pub(crate) mod get_object_attributes;
73
74pub(crate) mod head_object;
75pub(crate) mod list_objects;
76
77pub(crate) mod rename_object;
78
79pub(crate) mod head_bucket;
80pub(crate) mod put_object;
81pub use head_bucket::HeadBucketError;
82pub(crate) use put_object::S3PutObjectRequest;
83
84/// `tracing` doesn't allow dynamic levels but we want to dynamically choose the log level for
85/// requests based on their response status. https://github.com/tokio-rs/tracing/issues/372
86macro_rules! event {
87    ($level:expr, $($args:tt)*) => {
88        match $level {
89            ::tracing::Level::ERROR => ::tracing::event!(::tracing::Level::ERROR, $($args)*),
90            ::tracing::Level::WARN => ::tracing::event!(::tracing::Level::WARN, $($args)*),
91            ::tracing::Level::INFO => ::tracing::event!(::tracing::Level::INFO, $($args)*),
92            ::tracing::Level::DEBUG => ::tracing::event!(::tracing::Level::DEBUG, $($args)*),
93            ::tracing::Level::TRACE => ::tracing::event!(::tracing::Level::TRACE, $($args)*),
94        }
95    }
96}
97
98/// Configurations for the CRT-based S3 client
99#[derive(Debug, Clone)]
100pub struct S3ClientConfig {
101    auth_config: S3ClientAuthConfig,
102    throughput_target_gbps: f64,
103    memory_limit_in_bytes: u64,
104    read_part_size: usize,
105    write_part_size: usize,
106    endpoint_config: EndpointConfig,
107    user_agent: Option<UserAgent>,
108    request_payer: Option<String>,
109    bucket_owner: Option<String>,
110    max_attempts: Option<NonZeroUsize>,
111    read_backpressure: bool,
112    initial_read_window: usize,
113    network_interface_names: Vec<String>,
114    telemetry_callback: Option<Arc<dyn OnTelemetry>>,
115    event_loop_threads: Option<u16>,
116    buffer_pool_factory: Option<MemoryPoolFactoryWrapper>,
117}
118
119impl Default for S3ClientConfig {
120    fn default() -> Self {
121        const DEFAULT_PART_SIZE: usize = 8 * 1024 * 1024;
122        Self {
123            auth_config: Default::default(),
124            throughput_target_gbps: 10.0,
125            memory_limit_in_bytes: 0,
126            read_part_size: DEFAULT_PART_SIZE,
127            write_part_size: DEFAULT_PART_SIZE,
128            endpoint_config: EndpointConfig::new("us-east-1"),
129            user_agent: None,
130            request_payer: None,
131            bucket_owner: None,
132            max_attempts: None,
133            read_backpressure: false,
134            initial_read_window: DEFAULT_PART_SIZE,
135            network_interface_names: vec![],
136            telemetry_callback: None,
137            event_loop_threads: None,
138            buffer_pool_factory: None,
139        }
140    }
141}
142
143impl S3ClientConfig {
144    pub fn new() -> Self {
145        Self::default()
146    }
147
148    /// Set the configuration for authenticating to S3
149    #[must_use = "S3ClientConfig follows a builder pattern"]
150    pub fn auth_config(mut self, auth_config: S3ClientAuthConfig) -> Self {
151        self.auth_config = auth_config;
152        self
153    }
154
155    /// Set the part size for multi-part operations to S3 (both PUT and GET)
156    #[must_use = "S3ClientConfig follows a builder pattern"]
157    pub fn part_size(mut self, part_size: usize) -> Self {
158        self.read_part_size = part_size;
159        self.write_part_size = part_size;
160        self
161    }
162
163    /// Set the part size for multi-part-get operations to S3.
164    #[must_use = "S3ClientConfig follows a builder pattern"]
165    pub fn read_part_size(mut self, size: usize) -> Self {
166        self.read_part_size = size;
167        self
168    }
169
170    /// Set the part size for multi-part-put operations to S3.
171    #[must_use = "S3ClientConfig follows a builder pattern"]
172    pub fn write_part_size(mut self, size: usize) -> Self {
173        self.write_part_size = size;
174        self
175    }
176
177    /// Set the target throughput in Gbps for the S3 client
178    #[must_use = "S3ClientConfig follows a builder pattern"]
179    pub fn throughput_target_gbps(mut self, throughput_target_gbps: f64) -> Self {
180        self.throughput_target_gbps = throughput_target_gbps;
181        self
182    }
183
184    /// Set the memory limit in bytes for the S3 client
185    #[must_use = "S3ClientConfig follows a builder pattern"]
186    pub fn memory_limit_in_bytes(mut self, memory_limit_in_bytes: u64) -> Self {
187        self.memory_limit_in_bytes = memory_limit_in_bytes;
188        self
189    }
190
191    /// Set the endpoint configuration for endpoint resolution
192    #[must_use = "S3ClientConfig follows a builder pattern"]
193    pub fn endpoint_config(mut self, endpoint_config: EndpointConfig) -> Self {
194        self.endpoint_config = endpoint_config;
195        self
196    }
197
198    /// Set a constructor for the HTTP User-agent header for S3 requests
199    #[must_use = "S3ClientConfig follows a builder pattern"]
200    pub fn user_agent(mut self, user_agent: UserAgent) -> Self {
201        self.user_agent = Some(user_agent);
202        self
203    }
204
205    /// Set a value for the request payer HTTP header for S3 requests
206    #[must_use = "S3ClientConfig follows a builder pattern"]
207    pub fn request_payer(mut self, request_payer: &str) -> Self {
208        self.request_payer = Some(request_payer.to_owned());
209        self
210    }
211
212    /// Set an expected bucket owner value
213    #[must_use = "S3ClientConfig follows a builder pattern"]
214    pub fn bucket_owner(mut self, bucket_owner: &str) -> Self {
215        self.bucket_owner = Some(bucket_owner.to_owned());
216        self
217    }
218
219    /// Set a maximum number of attempts for S3 requests. Will be overridden by the
220    /// `AWS_MAX_ATTEMPTS` environment variable if set.
221    #[must_use = "S3ClientConfig follows a builder pattern"]
222    pub fn max_attempts(mut self, max_attempts: NonZeroUsize) -> Self {
223        self.max_attempts = Some(max_attempts);
224        self
225    }
226
227    /// Set the flag for backpressure read
228    #[must_use = "S3ClientConfig follows a builder pattern"]
229    pub fn read_backpressure(mut self, read_backpressure: bool) -> Self {
230        self.read_backpressure = read_backpressure;
231        self
232    }
233
234    /// Set a value for initial backpressure read window size
235    #[must_use = "S3ClientConfig follows a builder pattern"]
236    pub fn initial_read_window(mut self, initial_read_window: usize) -> Self {
237        self.initial_read_window = initial_read_window;
238        self
239    }
240
241    /// Set a list of network interfaces to distribute S3 requests over
242    #[must_use = "S3ClientConfig follows a builder pattern"]
243    pub fn network_interface_names(mut self, network_interface_names: Vec<String>) -> Self {
244        self.network_interface_names = network_interface_names;
245        self
246    }
247
248    /// Set a custom telemetry callback handler
249    #[must_use = "S3ClientConfig follows a builder pattern"]
250    pub fn telemetry_callback(mut self, telemetry_callback: Arc<dyn OnTelemetry>) -> Self {
251        self.telemetry_callback = Some(telemetry_callback);
252        self
253    }
254
255    /// Override the number of threads used by the CRTs AwsEventLoop
256    #[must_use = "S3ClientConfig follows a builder pattern"]
257    pub fn event_loop_threads(mut self, event_loop_threads: u16) -> Self {
258        self.event_loop_threads = Some(event_loop_threads);
259        self
260    }
261
262    /// Set a custom memory pool
263    #[must_use = "S3ClientConfig follows a builder pattern"]
264    pub fn memory_pool(mut self, pool: impl MemoryPool) -> Self {
265        self.buffer_pool_factory = Some(MemoryPoolFactoryWrapper::new(move |_| pool.clone()));
266        self
267    }
268
269    /// Set a custom memory pool factory
270    #[must_use = "S3ClientConfig follows a builder pattern"]
271    pub fn memory_pool_factory(mut self, pool_factory: impl MemoryPoolFactory) -> Self {
272        self.buffer_pool_factory = Some(MemoryPoolFactoryWrapper::new(pool_factory));
273        self
274    }
275}
276
277/// Authentication configuration for the CRT-based S3 client
278#[derive(Debug, Clone, Default)]
279pub enum S3ClientAuthConfig {
280    /// The default AWS credentials resolution chain, similar to the AWS CLI
281    #[default]
282    Default,
283    /// Do not sign requests at all
284    NoSigning,
285    /// Explicitly load the given profile name from the AWS CLI configuration file
286    Profile(String),
287    /// Use a custom credentials provider
288    Provider(CredentialsProvider),
289}
290
291/// An S3 client that uses the [AWS Common Runtime (CRT)][crt] to make requests.
292///
293/// The AWS CRT is a C library that provides a common set of functionality for AWS SDKs. Its S3
294/// client provides high throughput by implementing S3 performance best practices, including
295/// automatic parallelization of GET and PUT requests.
296///
297/// To use this client, invoke the methods from the [`ObjectClient`] trait.
298///
299/// [crt]: https://docs.aws.amazon.com/sdkref/latest/guide/common-runtime.html
300#[derive(Debug, Clone)]
301pub struct S3CrtClient {
302    inner: Arc<S3CrtClientInner>,
303}
304
305impl S3CrtClient {
306    /// Construct a new S3 client with the given configuration.
307    pub fn new(config: S3ClientConfig) -> Result<Self, NewClientError> {
308        Ok(Self {
309            inner: Arc::new(S3CrtClientInner::new(config)?),
310        })
311    }
312
313    /// Return a copy of the [EndpointConfig] for this client
314    pub fn endpoint_config(&self) -> EndpointConfig {
315        self.inner.endpoint_config.clone()
316    }
317
318    #[doc(hidden)]
319    pub fn event_loop_group(&self) -> EventLoopGroup {
320        self.inner.event_loop_group.clone()
321    }
322}
323
324#[derive(Debug)]
325struct S3CrtClientInner {
326    s3_client: Client,
327    event_loop_group: EventLoopGroup,
328    endpoint_config: EndpointConfig,
329    allocator: Allocator,
330    next_request_counter: AtomicU64,
331    /// user_agent_header will be passed into CRT which add additional information "CRTS3NativeClient/0.1.x".
332    /// Here it will add the user agent prefix and s3 client information.
333    user_agent_header: String,
334    request_payer: Option<String>,
335    read_part_size: usize,
336    write_part_size: usize,
337    enable_backpressure: bool,
338    initial_read_window_size: usize,
339    bucket_owner: Option<String>,
340    credentials_provider: Option<CredentialsProvider>,
341    host_resolver: HostResolver,
342    telemetry_callback: Option<Arc<dyn OnTelemetry>>,
343}
344
345impl S3CrtClientInner {
346    fn new(config: S3ClientConfig) -> Result<Self, NewClientError> {
347        let allocator = Allocator::default();
348
349        let mut event_loop_group = EventLoopGroup::new_default(&allocator, config.event_loop_threads, || {}).unwrap();
350
351        let resolver_options = HostResolverDefaultOptions {
352            max_entries: 8,
353            event_loop_group: &mut event_loop_group,
354        };
355
356        let mut host_resolver = HostResolver::new_default(&allocator, &resolver_options).unwrap();
357
358        let bootstrap_options = ClientBootstrapOptions {
359            event_loop_group: &mut event_loop_group,
360            host_resolver: &mut host_resolver,
361        };
362
363        let mut client_bootstrap = ClientBootstrap::new(&allocator, &bootstrap_options).unwrap();
364
365        let mut client_config = ClientConfig::new();
366
367        let retry_strategy = {
368            let mut retry_strategy_options = StandardRetryOptions::default(&mut event_loop_group);
369            let max_attempts = std::env::var("AWS_MAX_ATTEMPTS")
370                .ok()
371                .and_then(|s| s.parse::<usize>().ok())
372                .or_else(|| config.max_attempts.map(|m| m.get()))
373                .unwrap_or(3);
374            // Max *attempts* includes the initial attempt, the CRT's max *retries* does not, so
375            // decrement by one
376            retry_strategy_options.backoff_retry_options.max_retries = max_attempts.saturating_sub(1);
377            retry_strategy_options.backoff_retry_options.backoff_scale_factor = Duration::from_millis(500);
378            retry_strategy_options.backoff_retry_options.jitter_mode = ExponentialBackoffJitterMode::Full;
379            RetryStrategy::standard(&allocator, &retry_strategy_options).unwrap()
380        };
381
382        trace!("constructing client with auth config {:?}", config.auth_config);
383        let credentials_provider = match config.auth_config {
384            S3ClientAuthConfig::Default => {
385                let credentials_chain_default_options = CredentialsProviderChainDefaultOptions {
386                    bootstrap: &mut client_bootstrap,
387                };
388                CredentialsProvider::new_chain_default(&allocator, credentials_chain_default_options)
389                    .map_err(NewClientError::ProviderFailure)?
390            }
391            S3ClientAuthConfig::NoSigning => {
392                CredentialsProvider::new_anonymous(&allocator).map_err(NewClientError::ProviderFailure)?
393            }
394            S3ClientAuthConfig::Profile(profile_name) => {
395                let credentials_profile_options = CredentialsProviderProfileOptions {
396                    bootstrap: &mut client_bootstrap,
397                    profile_name_override: &profile_name,
398                };
399                CredentialsProvider::new_profile(&allocator, credentials_profile_options)
400                    .map_err(NewClientError::ProviderFailure)?
401            }
402            S3ClientAuthConfig::Provider(provider) => provider,
403        };
404
405        let endpoint_config = config.endpoint_config;
406        client_config.region(endpoint_config.get_region());
407        let signing_config = init_signing_config(
408            endpoint_config.get_region(),
409            credentials_provider.clone(),
410            None,
411            None,
412            None,
413        );
414
415        let endpoint_config = match endpoint_config.get_endpoint() {
416            None => {
417                // No explicit endpoint was configured, let's try the environment variable
418                if let Ok(aws_endpoint_url) = std::env::var("AWS_ENDPOINT_URL") {
419                    debug!("using AWS_ENDPOINT_URL {}", aws_endpoint_url);
420                    let env_uri = Uri::new_from_str(&allocator, &aws_endpoint_url)
421                        .map_err(|e| EndpointError::InvalidUri(endpoint_config::InvalidUriError::CouldNotParse(e)))?;
422                    endpoint_config.endpoint(env_uri)
423                } else {
424                    endpoint_config
425                }
426            }
427            Some(_) => endpoint_config,
428        };
429
430        client_config.express_support(true);
431        client_config.read_backpressure(config.read_backpressure);
432        client_config.initial_read_window(config.initial_read_window);
433        client_config.signing_config(signing_config);
434
435        client_config
436            .client_bootstrap(client_bootstrap)
437            .retry_strategy(retry_strategy);
438
439        client_config.throughput_target_gbps(config.throughput_target_gbps);
440        client_config.memory_limit_in_bytes(config.memory_limit_in_bytes);
441
442        if let Some(wrapper) = config.buffer_pool_factory {
443            let pool_factory = CrtBufferPoolFactory::new(wrapper, event_loop_group.clone());
444            client_config.buffer_pool_factory(pool_factory);
445        }
446
447        // max_part_size is 5GB or less depending on the platform (4GB on 32-bit)
448        let max_part_size = cmp::min(5_u64 * 1024 * 1024 * 1024, usize::MAX as u64) as usize;
449        // TODO: Review the part size validation for read_part_size, read_part_size can have a more relax limit.
450        for part_size in [config.read_part_size, config.write_part_size] {
451            if !(5 * 1024 * 1024..=max_part_size).contains(&part_size) {
452                return Err(NewClientError::InvalidConfiguration(format!(
453                    "part size must be at between 5MiB and {}GiB",
454                    max_part_size / 1024 / 1024 / 1024
455                )));
456            }
457        }
458
459        if !config.network_interface_names.is_empty() {
460            client_config.network_interface_names(config.network_interface_names);
461        }
462
463        let user_agent = config.user_agent.unwrap_or_else(|| UserAgent::new(None));
464        let user_agent_header = user_agent.build();
465
466        let s3_client = Client::new(&allocator, client_config).map_err(NewClientError::CrtError)?;
467
468        Ok(Self {
469            allocator,
470            s3_client,
471            event_loop_group,
472            endpoint_config,
473            next_request_counter: AtomicU64::new(0),
474            user_agent_header,
475            request_payer: config.request_payer,
476            read_part_size: config.read_part_size,
477            write_part_size: config.write_part_size,
478            enable_backpressure: config.read_backpressure,
479            initial_read_window_size: config.initial_read_window,
480            bucket_owner: config.bucket_owner,
481            credentials_provider: Some(credentials_provider),
482            host_resolver,
483            telemetry_callback: config.telemetry_callback,
484        })
485    }
486
487    /// Create a new HTTP request template for the given HTTP method and S3 bucket name.
488    /// Pre-populates common headers used across all requests. Sets the "accept" header assuming the
489    /// response should be XML; this header should be overwritten for requests like GET that return
490    /// object data.
491    fn new_request_template(&self, method: &str, bucket: &str) -> Result<S3Message<'_>, ConstructionError> {
492        let endpoint = self.endpoint_config.resolve_for_bucket(bucket)?;
493        let uri = endpoint.uri()?;
494        trace!(?uri, "resolved endpoint");
495
496        let signing_config = if let Some(credentials_provider) = &self.credentials_provider {
497            let auth_scheme = match endpoint.auth_scheme() {
498                Ok(auth_scheme) => auth_scheme,
499                Err(e) => {
500                    error!(error=?e, "no auth scheme for endpoint");
501                    return Err(e.into());
502                }
503            };
504            trace!(?auth_scheme, "resolved auth scheme");
505            let algorithm = Some(auth_scheme.scheme_name());
506            let service = Some(auth_scheme.signing_name());
507            let use_double_uri_encode = Some(!auth_scheme.disable_double_encoding());
508            Some(init_signing_config(
509                auth_scheme.signing_region(),
510                credentials_provider.clone(),
511                algorithm,
512                service,
513                use_double_uri_encode,
514            ))
515        } else {
516            None
517        };
518
519        let hostname = uri.host_name().to_str().unwrap();
520        let path_prefix = uri.path().to_os_string().into_string().unwrap();
521        let port = uri.host_port();
522        let hostname_header = if port > 0 {
523            format!("{hostname}:{port}")
524        } else {
525            hostname.to_string()
526        };
527
528        let mut message = Message::new_request(&self.allocator)?;
529        message.set_request_method(method)?;
530        message.add_header(&Header::new("Host", hostname_header))?;
531        message.add_header(&Header::new("accept", "application/xml"))?;
532        message.add_header(&Header::new("User-Agent", &self.user_agent_header))?;
533
534        if let Some(ref payer) = self.request_payer {
535            message.add_header(&Header::new("x-amz-request-payer", payer))?;
536        }
537
538        if let Some(ref owner) = self.bucket_owner {
539            message.add_header(&Header::new("x-amz-expected-bucket-owner", owner))?;
540        }
541
542        Ok(S3Message {
543            inner: message,
544            uri,
545            path_prefix,
546            checksum_config: None,
547            signing_config,
548        })
549    }
550
551    /// Make a meta-request using this S3 client that invokes the given callbacks as the request
552    /// makes progress.
553    ///
554    /// The `parse_meta_request_error` callback is invoked on failed requests. It should return `None`
555    /// if it doesn't have a request-specific failure reason. The client will apply some generic error
556    /// parsing in this case (e.g. for permissions errors).
557    #[allow(clippy::too_many_arguments)]
558    fn meta_request_with_callbacks<E: std::error::Error + Send + 'static>(
559        &self,
560        mut options: MetaRequestOptions,
561        request_span: Span,
562        on_request_finish: impl Fn(&RequestMetrics) + Send + 'static,
563        mut on_headers: impl FnMut(&Headers, i32) + Send + 'static,
564        mut on_body: impl FnMut(u64, &Buffer) + Send + 'static,
565        parse_meta_request_error: impl FnOnce(&MetaRequestResult) -> Option<E> + Send + 'static,
566        on_meta_request_result: impl FnOnce(ObjectClientResult<(), E, S3RequestError>) + Send + 'static,
567    ) -> Result<CancellingMetaRequest, S3RequestError> {
568        let span_telemetry = request_span.clone();
569        let span_body = request_span.clone();
570        let span_finish = request_span;
571
572        let endpoint = options.get_endpoint().expect("S3Message always has an endpoint");
573        let hostname = endpoint.host_name().to_str().unwrap().to_owned();
574        let host_resolver = self.host_resolver.clone();
575        let telemetry_callback = self.telemetry_callback.clone();
576
577        let start_time = Instant::now();
578        let first_body_part = Arc::new(AtomicBool::new(true));
579        let first_body_part_clone = Arc::clone(&first_body_part);
580        let total_bytes = Arc::new(AtomicU64::new(0));
581        let total_bytes_clone = Arc::clone(&total_bytes);
582
583        options
584            .on_telemetry(move |metrics| {
585                let _guard = span_telemetry.enter();
586
587                on_request_finish(metrics);
588
589                let http_status = metrics.status_code();
590                let request_canceled = metrics.is_canceled();
591                let request_failure = http_status.map(|status| !(200..299).contains(&status)).unwrap_or(!request_canceled);
592                let crt_error = Some(metrics.error()).filter(|e| e.is_err());
593                let operation_name = operation_name_to_static_metrics_string(metrics.operation_name());
594                let request_id = metrics.request_id().unwrap_or_else(|| "<unknown>".into());
595                let duration = metrics.total_duration();
596                let ttfb = metrics.time_to_first_byte();
597                let range = metrics.response_headers().and_then(|headers| extract_range_header(&headers));
598
599                let message = if request_failure {
600                    "S3 request failed"
601                } else if request_canceled {
602                    "S3 request canceled"
603                } else {
604                    "S3 request finished"
605                };
606                debug!(?operation_name, ?crt_error, http_status, ?range, ?duration, ?ttfb, %request_id, "{}", message);
607                trace!(detailed_metrics=?metrics, "S3 request completed");
608
609                if let Some(ttfb) = ttfb {
610                    metrics::histogram!(S3_REQUEST_FIRST_BYTE_LATENCY, ATTR_S3_REQUEST => operation_name).record(ttfb.as_micros() as f64);
611                }
612                metrics::histogram!(S3_REQUEST_TOTAL_LATENCY, ATTR_S3_REQUEST => operation_name).record(duration.as_micros() as f64);
613                metrics::counter!(S3_REQUEST_COUNT, ATTR_S3_REQUEST => operation_name).increment(1);
614                if request_failure {
615                    metrics::counter!(S3_REQUEST_ERRORS, ATTR_S3_REQUEST => operation_name, ATTR_HTTP_STATUS => http_status.unwrap_or(-1).to_string()).increment(1);
616                } else if request_canceled {
617                    metrics::counter!(S3_REQUEST_CANCELED, ATTR_S3_REQUEST => operation_name).increment(1);
618                }
619
620                if let Some(telemetry_callback) = &telemetry_callback {
621                    telemetry_callback.on_telemetry(metrics);
622                }
623            })
624            .on_headers(move |headers, response_status| {
625                (on_headers)(headers, response_status);
626            })
627            .on_body(move |range_start, data| {
628                let _guard = span_body.enter();
629
630                if first_body_part.compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst).ok() == Some(true) {
631                    let latency = start_time.elapsed().as_micros() as f64;
632                    let op = span_body.metadata().map(|m| m.name()).unwrap_or("unknown");
633                    metrics::histogram!("s3.meta_requests.first_byte_latency_us", "op" => op).record(latency);
634                }
635                total_bytes.fetch_add(data.len() as u64, Ordering::SeqCst);
636
637                trace!(start = range_start, length = data.len(), "body part received");
638
639                (on_body)(range_start, data);
640            })
641            .on_finish(move |request_result| {
642                let _guard = span_finish.enter();
643
644                let op = span_finish.metadata().map(|m| m.name()).unwrap_or("unknown");
645                let duration = start_time.elapsed();
646
647                metrics::counter!("s3.meta_requests", "op" => op).increment(1);
648                metrics::histogram!("s3.meta_requests.total_latency_us", "op" => op).record(duration.as_micros() as f64);
649                // Some HTTP requests (like HEAD) don't have a body to stream back, so calculate TTFB now
650                if first_body_part_clone.compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst).ok() == Some(true)  {
651                    let latency = duration.as_micros() as f64;
652                    metrics::histogram!("s3.meta_requests.first_byte_latency_us", "op" => op).record(latency);
653                }
654                let total_bytes = total_bytes_clone.load(Ordering::SeqCst);
655                // We only log throughput of object data. PUT needs to be measured in its stream
656                // implementation rather than these callbacks, so we can only do GET here.
657                if op == "get_object" {
658                    emit_throughput_metric(total_bytes, duration, op);
659                }
660                let hostname_awsstring = AwsString::from_str(&hostname, &Allocator::default());
661                if let Ok(host_count) = host_resolver.get_host_address_count(&hostname_awsstring, AddressKinds::a()) {
662                    metrics::gauge!("s3.client.host_count", "host" => hostname).set(host_count as f64);
663                }
664
665                let status_code = request_result.response_status;
666                let log_level = if (200..=399).contains(&status_code) || status_code == 404 || request_result.is_canceled() {
667                    tracing::Level::DEBUG
668                } else {
669                    tracing::Level::WARN
670                };
671
672                let result =
673                    if !request_result.is_err() {
674                        event!(log_level, ?duration, "meta request finished");
675                        Ok(())
676                    } else {
677                        // The `parse_meta_request_error` callback has a choice of whether to give us an error or not.
678                        // If not, fall back to generic error parsing (e.g. for permissions errors), or just no error if that fails too.
679                        let error = parse_meta_request_error(&request_result).map(ObjectClientError::ServiceError);
680                        let maybe_err = error.or_else(|| try_parse_generic_error(&request_result).map(ObjectClientError::ClientError));
681
682                        // Try to parse request header out of the failure. We can't just use the
683                        // telemetry callback because there might be multiple requests per meta
684                        // request, but these headers are known to be from the failed request.
685                        let request_id = match &request_result.error_response_headers {
686                            Some(headers) => headers.get("x-amz-request-id").map(|s| s.value().to_string_lossy().to_string()).ok(),
687                            None => None,
688                        };
689                        let request_id = request_id.unwrap_or_else(|| "<unknown>".into());
690
691                        let message = if request_result.is_canceled() {
692                            "meta request canceled"
693                        } else {
694                            "meta request failed"
695                        };
696                        if let Some(error) = &maybe_err {
697                            event!(log_level, ?duration, %request_id, ?error, message);
698                            debug!("meta request result: {:?}", request_result);
699                        } else {
700                            event!(log_level, ?duration, %request_id, ?request_result, message);
701                        }
702
703                        if request_result.is_canceled() {
704                            metrics::counter!("s3.meta_requests.canceled", "op" => op).increment(1);
705                        } else {
706                            // If it's not a real HTTP status, encode the CRT error in the metric instead
707                            let error_status = if request_result.response_status >= 100 {
708                                request_result.response_status
709                            } else {
710                                -request_result.crt_error.raw_error()
711                            };
712                            metrics::counter!("s3.meta_requests.failures", "op" => op, "status" => format!("{error_status}")).increment(1);
713                        }
714
715                        // Fill in a generic error if we weren't able to parse one
716                        Err(maybe_err.unwrap_or_else(|| ObjectClientError::ClientError(S3RequestError::ResponseError(request_result))))
717                    };
718
719                on_meta_request_result(result);
720            });
721
722        // Issue the HTTP request using the CRT's S3 meta request API
723        let meta_request = self.s3_client.make_meta_request(options)?;
724        Self::poll_client_metrics(&self.s3_client);
725        Ok(CancellingMetaRequest::wrap(meta_request))
726    }
727
728    /// Make a meta-request using this S3 client that returns the response body on success or
729    /// invokes the given callback on failure.
730    ///
731    /// The `parse_meta_request_error` callback is invoked on failed requests. It should return `None`
732    /// if it doesn't have a request-specific failure reason. The client will apply some generic error
733    /// parsing in this case (e.g. for permissions errors).
734    fn meta_request_with_body_payload<E: std::error::Error + Send + 'static>(
735        &self,
736        options: MetaRequestOptions,
737        request_span: Span,
738        parse_meta_request_error: impl FnOnce(&MetaRequestResult) -> Option<E> + Send + 'static,
739    ) -> Result<S3MetaRequest<Vec<u8>, E>, S3RequestError> {
740        let (tx, rx) = oneshot::channel::<ObjectClientResult<Vec<u8>, E, S3RequestError>>();
741
742        // Accumulate the body of the response into this Vec<u8>
743        let body: Arc<Mutex<Vec<u8>>> = Default::default();
744        let body_clone = Arc::clone(&body);
745
746        let meta_request = self.meta_request_with_callbacks(
747            options,
748            request_span,
749            |_| {},
750            |_, _| {},
751            move |offset, data| {
752                let mut body = body_clone.lock().unwrap();
753                assert_eq!(offset as usize, body.len());
754                body.extend_from_slice(data);
755            },
756            parse_meta_request_error,
757            move |result| _ = tx.send(result.map(|_| std::mem::take(&mut *body.lock().unwrap()))),
758        )?;
759        Ok(S3MetaRequest {
760            receiver: rx.fuse(),
761            meta_request,
762        })
763    }
764
765    /// Make a meta-request using this S3 client that returns the response headers on success or
766    /// invokes the given callback on failure.
767    ///
768    /// The `parse_meta_request_error` callback is invoked on failed requests. It should return `None`
769    /// if it doesn't have a request-specific failure reason. The client will apply some generic error
770    /// parsing in this case (e.g. for permissions errors).
771    fn meta_request_with_headers_payload<E: std::error::Error + Send + 'static>(
772        &self,
773        options: MetaRequestOptions,
774        request_span: Span,
775        parse_meta_request_error: impl FnOnce(&MetaRequestResult) -> Option<E> + Send + 'static,
776    ) -> Result<S3MetaRequest<Headers, E>, S3RequestError> {
777        let (tx, rx) = oneshot::channel::<ObjectClientResult<Headers, E, S3RequestError>>();
778
779        // On success, stash the headers in this lock during the on_headers callback,
780        // and pull them out during the on_meta_request_result callback.
781        let on_headers: Arc<Mutex<Option<Headers>>> = Default::default();
782        let on_result = on_headers.clone();
783
784        let meta_request = self.meta_request_with_callbacks(
785            options,
786            request_span,
787            |_| {},
788            move |headers, status| {
789                // Only store headers if we have a 2xx status code. If we only get other status codes,
790                // then on_meta_request_result will send an error.
791                if (200..300).contains(&status) {
792                    *on_headers.lock().unwrap() = Some(headers.clone());
793                }
794            },
795            |_, _| {},
796            parse_meta_request_error,
797            move |result| {
798                // Return the headers on success, otherwise propagate the error.
799                let headers =
800                    result.and_then(|_| {
801                        on_result.lock().unwrap().take().ok_or_else(|| {
802                            S3RequestError::internal_failure(ResponseHeadersError::MissingHeaders).into()
803                        })
804                    });
805                _ = tx.send(headers);
806            },
807        )?;
808        Ok(S3MetaRequest {
809            receiver: rx.fuse(),
810            meta_request,
811        })
812    }
813
814    /// Make a meta-request using this S3 client that invokes the given callback on failure.
815    ///
816    /// The `parse_meta_request_error` callback is invoked on failed requests. It should return `None`
817    /// if it doesn't have a request-specific failure reason. The client will apply some generic error
818    /// parsing in this case (e.g. for permissions errors).
819    fn meta_request_without_payload<E: std::error::Error + Send + 'static>(
820        &self,
821        options: MetaRequestOptions,
822        request_span: Span,
823        parse_meta_request_error: impl FnOnce(&MetaRequestResult) -> Option<E> + Send + 'static,
824    ) -> Result<S3MetaRequest<(), E>, S3RequestError> {
825        let (tx, rx) = oneshot::channel::<ObjectClientResult<(), E, S3RequestError>>();
826
827        let meta_request = self.meta_request_with_callbacks(
828            options,
829            request_span,
830            |_| {},
831            |_, _| {},
832            |_, _| {},
833            parse_meta_request_error,
834            move |result| _ = tx.send(result),
835        )?;
836        Ok(S3MetaRequest {
837            receiver: rx.fuse(),
838            meta_request,
839        })
840    }
841
842    fn poll_client_metrics(s3_client: &Client) {
843        let metrics = s3_client.poll_client_metrics();
844        metrics::gauge!("s3.client.num_requests_being_processed").set(metrics.num_requests_tracked_requests as f64);
845        metrics::gauge!("s3.client.num_requests_being_prepared").set(metrics.num_requests_being_prepared as f64);
846        metrics::gauge!("s3.client.request_queue_size").set(metrics.request_queue_size as f64);
847        metrics::gauge!("s3.client.num_auto_default_network_io").set(metrics.num_auto_default_network_io as f64);
848        metrics::gauge!("s3.client.num_auto_ranged_get_network_io").set(metrics.num_auto_ranged_get_network_io as f64);
849        metrics::gauge!("s3.client.num_auto_ranged_put_network_io").set(metrics.num_auto_ranged_put_network_io as f64);
850        metrics::gauge!("s3.client.num_auto_ranged_copy_network_io")
851            .set(metrics.num_auto_ranged_copy_network_io as f64);
852        metrics::gauge!("s3.client.num_total_network_io").set(metrics.num_total_network_io() as f64);
853        metrics::gauge!("s3.client.num_requests_stream_queued_waiting")
854            .set(metrics.num_requests_stream_queued_waiting as f64);
855        metrics::gauge!("s3.client.num_requests_streaming_response")
856            .set(metrics.num_requests_streaming_response as f64);
857
858        // Buffer pool metrics
859        let start = Instant::now();
860        if let Some(buffer_pool_stats) = s3_client.poll_default_buffer_pool_usage_stats() {
861            metrics::histogram!("s3.client.buffer_pool.get_usage_latency_us")
862                .record(start.elapsed().as_micros() as f64);
863            metrics::gauge!("s3.client.buffer_pool.mem_limit").set(buffer_pool_stats.mem_limit as f64);
864            metrics::gauge!("s3.client.buffer_pool.primary_cutoff").set(buffer_pool_stats.primary_cutoff as f64);
865            metrics::gauge!("s3.client.buffer_pool.primary_used").set(buffer_pool_stats.primary_used as f64);
866            metrics::gauge!("s3.client.buffer_pool.primary_allocated").set(buffer_pool_stats.primary_allocated as f64);
867            metrics::gauge!("s3.client.buffer_pool.primary_reserved").set(buffer_pool_stats.primary_reserved as f64);
868            metrics::gauge!("s3.client.buffer_pool.primary_num_blocks")
869                .set(buffer_pool_stats.primary_num_blocks as f64);
870            metrics::gauge!("s3.client.buffer_pool.secondary_reserved")
871                .set(buffer_pool_stats.secondary_reserved as f64);
872            metrics::gauge!("s3.client.buffer_pool.secondary_used").set(buffer_pool_stats.secondary_used as f64);
873            metrics::gauge!("s3.client.buffer_pool.forced_used").set(buffer_pool_stats.forced_used as f64);
874        }
875    }
876
877    fn next_request_counter(&self) -> u64 {
878        self.next_request_counter.fetch_add(1, Ordering::SeqCst)
879    }
880}
881
882/// Failure retrieving headers
883#[derive(Debug, Error)]
884enum ResponseHeadersError {
885    #[error("response headers are missing")]
886    MissingHeaders,
887}
888
889/// S3 operation supported by this client.
890#[derive(Debug, Clone, Copy)]
891enum S3Operation {
892    DeleteObject,
893    GetObject,
894    GetObjectAttributes,
895    HeadBucket,
896    HeadObject,
897    ListObjects,
898    PutObject,
899    CopyObject,
900    PutObjectSingle,
901}
902
903impl S3Operation {
904    /// The [MetaRequestType] to use for this operation.
905    fn meta_request_type(&self) -> MetaRequestType {
906        match self {
907            S3Operation::GetObject => MetaRequestType::GetObject,
908            S3Operation::PutObject => MetaRequestType::PutObject,
909            S3Operation::CopyObject => MetaRequestType::CopyObject,
910            _ => MetaRequestType::Default,
911        }
912    }
913
914    /// The operation name to set when configuring a request. Required for operations that
915    /// have MetaRequestType::Default (see [meta_request_type]). `None` otherwise.
916    fn operation_name(&self) -> Option<&'static str> {
917        match self {
918            S3Operation::DeleteObject => Some("DeleteObject"),
919            S3Operation::GetObject => None,
920            S3Operation::GetObjectAttributes => Some("GetObjectAttributes"),
921            S3Operation::HeadBucket => Some("HeadBucket"),
922            S3Operation::HeadObject => Some("HeadObject"),
923            S3Operation::ListObjects => Some("ListObjectsV2"),
924            S3Operation::PutObject => None,
925            S3Operation::CopyObject => None,
926            S3Operation::PutObjectSingle => Some("PutObject"),
927        }
928    }
929}
930
931/// A HTTP message to be sent to S3. This is a wrapper around a plain HTTP message, except that it
932/// helps us correctly configure the endpoint and "Host" header to handle both path-style and
933/// virtual-hosted-style addresses. The `path_prefix` is appended to the front of all paths, and
934/// need not be terminated with a `/`.
935#[derive(Debug)]
936struct S3Message<'a> {
937    inner: Message<'a>,
938    uri: Uri,
939    path_prefix: String,
940    checksum_config: Option<ChecksumConfig>,
941    signing_config: Option<SigningConfig>,
942}
943
944// This is RFC 3986 but with '/' also considered a safe character for path fragments.
945const URLENCODE_QUERY_FRAGMENT: &AsciiSet = &NON_ALPHANUMERIC.remove(b'-').remove(b'.').remove(b'_').remove(b'~');
946const URLENCODE_PATH_FRAGMENT: &AsciiSet = &URLENCODE_QUERY_FRAGMENT.remove(b'/');
947
948fn write_encoded_fragment(s: &mut OsString, piece: impl AsRef<OsStr>, encoding: &'static AsciiSet) {
949    let iter = percent_encode(piece.as_ref().as_bytes(), encoding);
950    s.extend(iter.map(|s| OsStr::from_bytes(s.as_bytes())));
951}
952
953#[derive(Debug, Default)]
954enum QueryFragment<'a, P: AsRef<OsStr>> {
955    #[default]
956    Empty,
957    Query(&'a [(P, P)]),
958    Action(P),
959}
960
961impl<P: AsRef<OsStr>> QueryFragment<'_, P> {
962    // This estimate is exact if no characters need encoding, otherwise we'll end up
963    // reallocating a couple of times. The '?' for the query is counted in the first key-value
964    // pair.
965    fn size(&self) -> usize {
966        match self {
967            QueryFragment::Empty => 0,
968            QueryFragment::Query(query) => query
969                .iter()
970                .map(|(key, value)| key.as_ref().len() + value.as_ref().len() + 2)
971                .sum::<usize>(),
972            QueryFragment::Action(action) => 1 + action.as_ref().len(),
973        }
974    }
975
976    // Write the fragment to the query string
977    fn write(&self, path: &mut OsString) {
978        match &self {
979            QueryFragment::Query(query) if !query.is_empty() => {
980                path.push("?");
981                for (i, (key, value)) in query.iter().enumerate() {
982                    if i != 0 {
983                        path.push("&");
984                    }
985                    write_encoded_fragment(path, key, URLENCODE_QUERY_FRAGMENT);
986                    path.push("=");
987                    write_encoded_fragment(path, value, URLENCODE_QUERY_FRAGMENT);
988                }
989            }
990            QueryFragment::Action(action) => {
991                path.push("?");
992                path.push(action.as_ref());
993            }
994            _ => {}
995        }
996    }
997}
998
999impl<'a> S3Message<'a> {
1000    /// Add a header to this message. The header is added if necessary and any existing values for
1001    /// this header are removed.
1002    fn set_header(
1003        &mut self,
1004        header: &Header<impl AsRef<OsStr>, impl AsRef<OsStr>>,
1005    ) -> Result<(), mountpoint_s3_crt::common::error::Error> {
1006        self.inner.set_header(header)
1007    }
1008
1009    /// Set the request path and query for this message. The components should not be URL-encoded;
1010    /// this method will handle that.
1011    fn set_request_path_and_query<P: AsRef<OsStr>>(
1012        &mut self,
1013        path: impl AsRef<OsStr>,
1014        query_or_action: QueryFragment<P>,
1015    ) -> Result<(), mountpoint_s3_crt::common::error::Error> {
1016        let space_needed = self.path_prefix.len() + query_or_action.size();
1017
1018        let mut full_path = OsString::with_capacity(space_needed);
1019
1020        write_encoded_fragment(&mut full_path, &self.path_prefix, URLENCODE_PATH_FRAGMENT);
1021        write_encoded_fragment(&mut full_path, &path, URLENCODE_PATH_FRAGMENT);
1022
1023        // Build the query string
1024        query_or_action.write(&mut full_path);
1025        self.inner.set_request_path(full_path)
1026    }
1027
1028    /// Set the request path for this message. The path should not be URL-encoded; this method will
1029    /// handle that.
1030    fn set_request_path(&mut self, path: impl AsRef<OsStr>) -> Result<(), mountpoint_s3_crt::common::error::Error> {
1031        self.set_request_path_and_query::<&str>(path, Default::default())
1032    }
1033
1034    /// Sets the checksum configuration for this message.
1035    fn set_checksum_config(&mut self, checksum_config: Option<ChecksumConfig>) {
1036        self.checksum_config = checksum_config;
1037    }
1038
1039    /// Sets the body input stream for this message, and returns any previously set input stream.
1040    /// If input_stream is None, unsets the body.
1041    fn set_body_stream(&mut self, input_stream: Option<InputStream<'a>>) -> Option<InputStream<'a>> {
1042        self.inner.set_body_stream(input_stream)
1043    }
1044
1045    /// Set the content length header.
1046    fn set_content_length_header(
1047        &mut self,
1048        content_length: usize,
1049    ) -> Result<(), mountpoint_s3_crt::common::error::Error> {
1050        self.inner
1051            .set_header(&Header::new("Content-Length", content_length.to_string()))
1052    }
1053
1054    /// Set the checksum header.
1055    fn set_checksum_header(
1056        &mut self,
1057        checksum: &UploadChecksum,
1058    ) -> Result<(), mountpoint_s3_crt::common::error::Error> {
1059        let header = match checksum {
1060            UploadChecksum::Crc64nvme(crc64) => Header::new("x-amz-checksum-crc64nvme", crc64nvme_to_base64(crc64)),
1061            UploadChecksum::Crc32c(crc32c) => Header::new("x-amz-checksum-crc32c", crc32c_to_base64(crc32c)),
1062            UploadChecksum::Crc32(crc32) => Header::new("x-amz-checksum-crc32", crc32_to_base64(crc32)),
1063            UploadChecksum::Sha1(sha1) => Header::new("x-amz-checksum-sha1", sha1_to_base64(sha1)),
1064            UploadChecksum::Sha256(sha256) => Header::new("x-amz-checksum-sha256", sha256_to_base64(sha256)),
1065        };
1066        self.inner.set_header(&header)
1067    }
1068
1069    fn into_options(self, operation: S3Operation) -> MetaRequestOptions<'a> {
1070        let mut options = MetaRequestOptions::new();
1071        if let Some(checksum_config) = self.checksum_config {
1072            options.checksum_config(checksum_config);
1073        }
1074        if let Some(signing_config) = self.signing_config {
1075            options.signing_config(signing_config);
1076        }
1077        options
1078            .message(self.inner)
1079            .endpoint(self.uri)
1080            .request_type(operation.meta_request_type());
1081        if let Some(operation_name) = operation.operation_name() {
1082            options.operation_name(operation_name);
1083        }
1084        options
1085    }
1086}
1087
1088#[derive(Debug)]
1089#[pin_project]
1090#[must_use]
1091struct S3MetaRequest<T, E> {
1092    /// Receiver for the result of meta-request.
1093    #[pin]
1094    receiver: Fuse<oneshot::Receiver<ObjectClientResult<T, E, S3RequestError>>>,
1095    meta_request: CancellingMetaRequest,
1096}
1097
1098impl<T: Send, E: Send> Future for S3MetaRequest<T, E> {
1099    type Output = ObjectClientResult<T, E, S3RequestError>;
1100
1101    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1102        let this = self.project();
1103        this.receiver
1104            .poll(cx)
1105            .map(|result| result.unwrap_or_else(|err| Err(S3RequestError::internal_failure(err).into())))
1106    }
1107}
1108
1109impl<T: Send, E: Send> FusedFuture for S3MetaRequest<T, E> {
1110    fn is_terminated(&self) -> bool {
1111        self.receiver.is_terminated()
1112    }
1113}
1114
1115/// Wrapper for a [MetaRequest] that cancels it on drop.
1116///
1117/// Note that if the request has already completed, cancelling it has no effect.
1118#[derive(Debug)]
1119struct CancellingMetaRequest {
1120    inner: MetaRequest,
1121}
1122
1123impl CancellingMetaRequest {
1124    fn wrap(meta_request: MetaRequest) -> Self {
1125        Self { inner: meta_request }
1126    }
1127}
1128
1129impl Drop for CancellingMetaRequest {
1130    fn drop(&mut self) {
1131        self.inner.cancel();
1132    }
1133}
1134
1135impl Deref for CancellingMetaRequest {
1136    type Target = MetaRequest;
1137
1138    fn deref(&self) -> &Self::Target {
1139        &self.inner
1140    }
1141}
1142
1143impl DerefMut for CancellingMetaRequest {
1144    fn deref_mut(&mut self) -> &mut Self::Target {
1145        &mut self.inner
1146    }
1147}
1148
1149/// Failures to construct a new S3 client
1150#[derive(Error, Debug)]
1151#[non_exhaustive]
1152pub enum NewClientError {
1153    /// Invalid S3 endpoint
1154    #[error("invalid S3 endpoint")]
1155    InvalidEndpoint(#[from] EndpointError),
1156    /// Invalid AWS credentials
1157    #[error("invalid AWS credentials")]
1158    ProviderFailure(#[source] mountpoint_s3_crt::common::error::Error),
1159    /// Invalid configuration
1160    #[error("invalid configuration: {0}")]
1161    InvalidConfiguration(String),
1162    /// An internal error from within the AWS Common Runtime
1163    #[error("Unknown CRT error")]
1164    CrtError(#[source] mountpoint_s3_crt::common::error::Error),
1165}
1166
1167/// Errors returned by the CRT-based S3 client
1168#[derive(Error, Debug)]
1169pub enum S3RequestError {
1170    /// An internal error from within the S3 client. The request may have been sent.
1171    #[error("Internal S3 client error")]
1172    InternalError(#[source] Box<dyn std::error::Error + Send + Sync>),
1173
1174    /// An internal error from within the AWS Common Runtime. The request may have been sent.
1175    #[error("Unknown CRT error")]
1176    CrtError(#[from] mountpoint_s3_crt::common::error::Error),
1177
1178    /// An error during construction of a request. The request was not sent.
1179    #[error("Failed to construct request")]
1180    ConstructionFailure(#[from] ConstructionError),
1181
1182    /// The request was sent but an unknown or unhandled failure occurred while processing it.
1183    #[error("Unknown response error: {0:?}")]
1184    ResponseError(MetaRequestResult),
1185
1186    /// The request was made to the wrong region
1187    #[error("Wrong region (expecting {0})")]
1188    IncorrectRegion(String, ClientErrorMetadata),
1189
1190    /// Forbidden
1191    #[error("Forbidden: {0}")]
1192    Forbidden(String, ClientErrorMetadata),
1193
1194    /// The request was attempted but could not be signed due to no available credentials
1195    #[error("No signing credentials available, see CRT debug logs")]
1196    NoSigningCredentials,
1197
1198    /// The request was canceled
1199    #[error("Request canceled")]
1200    RequestCanceled,
1201
1202    /// The request was throttled by S3
1203    #[error("Request throttled")]
1204    Throttled,
1205
1206    /// Cannot fetch more data because current read window is exhausted. The read window must
1207    /// be advanced using [GetObjectRequest::increment_read_window(u64)] to continue fetching
1208    /// new data.
1209    #[error("Polled for data with empty read window")]
1210    EmptyReadWindow,
1211}
1212
1213impl S3RequestError {
1214    fn construction_failure(inner: impl Into<ConstructionError>) -> Self {
1215        S3RequestError::ConstructionFailure(inner.into())
1216    }
1217
1218    fn internal_failure(inner: impl std::error::Error + Send + Sync + 'static) -> Self {
1219        S3RequestError::InternalError(Box::new(inner))
1220    }
1221}
1222
1223impl ProvideErrorMetadata for S3RequestError {
1224    fn meta(&self) -> ClientErrorMetadata {
1225        match self {
1226            Self::ResponseError(request_result) => ClientErrorMetadata::from_meta_request_result(request_result),
1227            Self::Forbidden(_, metadata) => metadata.clone(),
1228            Self::Throttled => ClientErrorMetadata {
1229                http_code: Some(503),
1230                error_code: Some("SlowDown".to_string()),
1231                error_message: Some("Please reduce your request rate.".to_string()),
1232            },
1233            Self::IncorrectRegion(_, metadata) => metadata.clone(),
1234            _ => Default::default(),
1235        }
1236    }
1237}
1238
1239#[derive(Error, Debug)]
1240pub enum ConstructionError {
1241    /// CRT error while constructing the request
1242    #[error("Unknown CRT error")]
1243    CrtError(#[from] mountpoint_s3_crt::common::error::Error),
1244
1245    /// The S3 endpoint was invalid
1246    #[error("Invalid S3 endpoint")]
1247    InvalidEndpoint(#[from] EndpointError),
1248}
1249
1250/// Return a `&'static str` for a given non-static `&str`, as required by [metrics] crate.
1251fn operation_name_to_static_metrics_string(operation_name: Option<&str>) -> &'static str {
1252    const UNKNOWN_METRIC_STR: &str = "Unknown";
1253
1254    let Some(operation_name) = operation_name else {
1255        return UNKNOWN_METRIC_STR;
1256    };
1257
1258    // Take an input expression, and then a list of strings to map into their `&'static str` equivalent.
1259    // Use macro to avoid typos in mapping.
1260    macro_rules! map_to_static_str_for_known_str {
1261        ($input:expr, $($str_literal:literal),* $(,)?) => {
1262            match $input {
1263                $(
1264                    $str_literal => Some($str_literal),
1265                )*
1266                _ => None
1267            }
1268        };
1269    }
1270
1271    let static_str = map_to_static_str_for_known_str!(
1272        operation_name,
1273        "GetObject",
1274        "HeadObject",
1275        "ListParts",
1276        "CreateMultipartUpload",
1277        "UploadPart",
1278        "AbortMultipartUpload",
1279        "CompleteMultipartUpload",
1280        "UploadPartCopy",
1281        "CopyObject",
1282        "PutObject",
1283        "ListObjectsV2",
1284        "DeleteObject",
1285        "GetObjectAttributes",
1286        "HeadBucket",
1287        "RenameObject",
1288    );
1289
1290    debug_assert!(
1291        static_str.is_some(),
1292        "input set as {operation_name:?} but no matcher, update required",
1293    );
1294    static_str.unwrap_or(UNKNOWN_METRIC_STR)
1295}
1296
1297/// Extract the byte range from the Content-Range header if present and valid
1298fn extract_range_header(headers: &Headers) -> Option<Range<u64>> {
1299    let header = headers.get("Content-Range").ok()?;
1300    let value = header.value().to_str()?;
1301
1302    // Content-Range: <unit> <range-start>-<range-end>/<size>
1303
1304    if !value.starts_with("bytes ") {
1305        return None;
1306    }
1307    let (_, value) = value.split_at("bytes ".len());
1308    let (range, _) = value.split_once('/')?;
1309    let (start, end) = range.split_once('-')?;
1310    let start = start.parse::<u64>().ok()?;
1311    let end = end.parse::<u64>().ok()?;
1312
1313    // Rust ranges are exclusive at the end, but Content-Range is inclusive
1314    Some(start..end + 1)
1315}
1316
1317/// Extract the [Checksum] information from headers
1318fn parse_checksum(headers: &Headers) -> Result<Checksum, HeadersError> {
1319    let checksum_crc32 = headers.get_as_optional_string("x-amz-checksum-crc32")?;
1320    let checksum_crc32c = headers.get_as_optional_string("x-amz-checksum-crc32c")?;
1321    let checksum_sha1 = headers.get_as_optional_string("x-amz-checksum-sha1")?;
1322    let checksum_sha256 = headers.get_as_optional_string("x-amz-checksum-sha256")?;
1323    let checksum_crc64nvme = headers.get_as_optional_string("x-amz-checksum-crc64nvme")?;
1324
1325    Ok(Checksum {
1326        checksum_crc64nvme,
1327        checksum_crc32,
1328        checksum_crc32c,
1329        checksum_sha1,
1330        checksum_sha256,
1331    })
1332}
1333
1334/// Try to parse a modeled error out of a failing meta request
1335fn try_parse_generic_error(request_result: &MetaRequestResult) -> Option<S3RequestError> {
1336    /// Look for a redirect header pointing to a different region for the bucket
1337    fn try_parse_redirect(request_result: &MetaRequestResult) -> Option<S3RequestError> {
1338        let headers = request_result.error_response_headers.as_ref()?;
1339        let region_header = headers.get("x-amz-bucket-region").ok()?;
1340        let region = region_header.value().to_owned().into_string().ok()?;
1341        Some(S3RequestError::IncorrectRegion(
1342            region,
1343            ClientErrorMetadata::from_meta_request_result(request_result),
1344        ))
1345    }
1346
1347    /// Look for access-related errors
1348    fn try_parse_forbidden(request_result: &MetaRequestResult) -> Option<S3RequestError> {
1349        let Some(body) = request_result.error_response_body.as_ref() else {
1350            // Header-only requests like HeadObject and HeadBucket can't give us a more detailed
1351            // error, so just trust the response code
1352            return Some(S3RequestError::Forbidden(
1353                "<no message>".to_owned(),
1354                ClientErrorMetadata {
1355                    http_code: Some(request_result.response_status),
1356                    ..Default::default()
1357                },
1358            ));
1359        };
1360        let error_elem = xmltree::Element::parse(body.as_bytes()).ok()?;
1361        let error_code = error_elem.get_child("Code")?;
1362        let error_code_str = error_code.get_text()?;
1363        // Always translate 403 to Forbidden, but otherwise first check the error code, since other
1364        // response statuses are overloaded and not always access-related errors.
1365        if request_result.response_status == 403
1366            || matches!(
1367                error_code_str.deref(),
1368                "AccessDenied" | "InvalidToken" | "ExpiredToken" | "SignatureDoesNotMatch"
1369            )
1370        {
1371            let message = error_elem
1372                .get_child("Message")
1373                .and_then(|e| e.get_text())
1374                .unwrap_or(error_code_str.clone());
1375            Some(S3RequestError::Forbidden(
1376                message.to_string(),
1377                ClientErrorMetadata {
1378                    http_code: Some(request_result.response_status),
1379                    error_code: Some(error_code_str.to_string()),
1380                    error_message: Some(message.into_owned()),
1381                },
1382            ))
1383        } else {
1384            None
1385        }
1386    }
1387
1388    /// Try to look for error related to no signing credentials, returns generic error otherwise
1389    fn try_parse_no_credentials_or_generic(request_result: &MetaRequestResult) -> S3RequestError {
1390        let crt_error_code = request_result.crt_error.raw_error();
1391        if crt_error_code == mountpoint_s3_crt::auth::ErrorCode::AWS_AUTH_SIGNING_NO_CREDENTIALS as i32 {
1392            S3RequestError::NoSigningCredentials
1393        } else {
1394            S3RequestError::CrtError(crt_error_code.into())
1395        }
1396    }
1397
1398    fn try_parse_throttled(request_result: &MetaRequestResult) -> Option<S3RequestError> {
1399        let crt_error_code = request_result.crt_error.raw_error();
1400        if crt_error_code == mountpoint_s3_crt::s3::ErrorCode::AWS_ERROR_S3_SLOW_DOWN as i32 {
1401            Some(S3RequestError::Throttled)
1402        } else {
1403            None
1404        }
1405    }
1406
1407    /// Handle canceled requests
1408    fn try_parse_canceled_request(request_result: &MetaRequestResult) -> Option<S3RequestError> {
1409        request_result.is_canceled().then_some(S3RequestError::RequestCanceled)
1410    }
1411
1412    match request_result.response_status {
1413        301 => try_parse_redirect(request_result),
1414        // 400 is overloaded, it can be an access error (invalid token) or (for MRAP) a bucket
1415        // redirect
1416        400 => try_parse_forbidden(request_result).or_else(|| try_parse_redirect(request_result)),
1417        403 => try_parse_forbidden(request_result),
1418        // if the http response status is not set, we look into crt_error_code to identify the error
1419        0 => try_parse_throttled(request_result)
1420            .or_else(|| try_parse_canceled_request(request_result))
1421            .or_else(|| Some(try_parse_no_credentials_or_generic(request_result))),
1422        _ => None,
1423    }
1424}
1425
1426/// Record a throughput metric for GET/PUT. We can't inline this into S3CrtClient callbacks because
1427/// PUT bytes don't transit those callbacks.
1428fn emit_throughput_metric(bytes: u64, duration: Duration, op: &'static str) {
1429    let throughput_mbps = bytes as f64 / 1024.0 / 1024.0 / duration.as_secs_f64();
1430    // Semi-arbitrary choices here to avoid averaging out large and small requests
1431    const MEGABYTE: u64 = 1024 * 1024;
1432    let bucket = if bytes < MEGABYTE {
1433        "<1MiB"
1434    } else if bytes <= 16 * MEGABYTE {
1435        "1-16MiB"
1436    } else {
1437        ">16MiB"
1438    };
1439    metrics::histogram!("s3.meta_requests.throughput_mibs", "op" => op, "size" => bucket).record(throughput_mbps);
1440}
1441
1442#[cfg_attr(not(docsrs), async_trait)]
1443impl ObjectClient for S3CrtClient {
1444    type GetObjectResponse = S3GetObjectResponse;
1445    type PutObjectRequest = S3PutObjectRequest;
1446    type ClientError = S3RequestError;
1447
1448    fn read_part_size(&self) -> usize {
1449        self.inner.read_part_size
1450    }
1451
1452    fn write_part_size(&self) -> usize {
1453        // TODO: the CRT does some clamping to a max size rather than just swallowing the part size
1454        // we configured it with, so this might be wrong. Right now the only clamping is to the max
1455        // S3 part size (5GiB), so this shouldn't affect the result.
1456        // https://github.com/awslabs/aws-c-s3/blob/94e3342c12833c5199/source/s3_client.c#L337-L344
1457        self.inner.write_part_size
1458    }
1459
1460    fn initial_read_window_size(&self) -> Option<usize> {
1461        if self.inner.enable_backpressure {
1462            Some(self.inner.initial_read_window_size)
1463        } else {
1464            None
1465        }
1466    }
1467
1468    fn mem_usage_stats(&self) -> Option<BufferPoolUsageStats> {
1469        let start = Instant::now();
1470        self.inner
1471            .s3_client
1472            .poll_default_buffer_pool_usage_stats()
1473            .inspect(|_| {
1474                metrics::histogram!("s3.client.buffer_pool.get_usage_latency_us")
1475                    .record(start.elapsed().as_micros() as f64);
1476            })
1477    }
1478
1479    async fn delete_object(
1480        &self,
1481        bucket: &str,
1482        key: &str,
1483    ) -> ObjectClientResult<DeleteObjectResult, DeleteObjectError, Self::ClientError> {
1484        self.delete_object(bucket, key).await
1485    }
1486
1487    async fn copy_object(
1488        &self,
1489        source_bucket: &str,
1490        source_key: &str,
1491        destination_bucket: &str,
1492        destination_key: &str,
1493        params: &CopyObjectParams,
1494    ) -> ObjectClientResult<CopyObjectResult, CopyObjectError, S3RequestError> {
1495        self.copy_object(source_bucket, source_key, destination_bucket, destination_key, params)
1496            .await
1497    }
1498
1499    async fn get_object(
1500        &self,
1501        bucket: &str,
1502        key: &str,
1503        params: &GetObjectParams,
1504    ) -> ObjectClientResult<Self::GetObjectResponse, GetObjectError, Self::ClientError> {
1505        self.get_object(bucket, key, params).await
1506    }
1507
1508    async fn list_objects(
1509        &self,
1510        bucket: &str,
1511        continuation_token: Option<&str>,
1512        delimiter: &str,
1513        max_keys: usize,
1514        prefix: &str,
1515    ) -> ObjectClientResult<ListObjectsResult, ListObjectsError, Self::ClientError> {
1516        self.list_objects(bucket, continuation_token, delimiter, max_keys, prefix)
1517            .await
1518    }
1519
1520    async fn head_object(
1521        &self,
1522        bucket: &str,
1523        key: &str,
1524        params: &HeadObjectParams,
1525    ) -> ObjectClientResult<HeadObjectResult, HeadObjectError, Self::ClientError> {
1526        self.head_object(bucket, key, params).await
1527    }
1528
1529    async fn put_object(
1530        &self,
1531        bucket: &str,
1532        key: &str,
1533        params: &PutObjectParams,
1534    ) -> ObjectClientResult<Self::PutObjectRequest, PutObjectError, Self::ClientError> {
1535        self.put_object(bucket, key, params).await
1536    }
1537
1538    async fn put_object_single<'a>(
1539        &self,
1540        bucket: &str,
1541        key: &str,
1542        params: &PutObjectSingleParams,
1543        contents: impl AsRef<[u8]> + Send + 'a,
1544    ) -> ObjectClientResult<PutObjectResult, PutObjectError, Self::ClientError> {
1545        self.put_object_single(bucket, key, params, contents).await
1546    }
1547
1548    async fn get_object_attributes(
1549        &self,
1550        bucket: &str,
1551        key: &str,
1552        max_parts: Option<usize>,
1553        part_number_marker: Option<usize>,
1554        object_attributes: &[ObjectAttribute],
1555    ) -> ObjectClientResult<GetObjectAttributesResult, GetObjectAttributesError, Self::ClientError> {
1556        self.get_object_attributes(bucket, key, max_parts, part_number_marker, object_attributes)
1557            .await
1558    }
1559
1560    async fn rename_object(
1561        &self,
1562        bucket: &str,
1563        src_key: &str,
1564        dst_key: &str,
1565        params: &RenameObjectParams,
1566    ) -> ObjectClientResult<RenameObjectResult, RenameObjectError, Self::ClientError> {
1567        self.rename_object(bucket, src_key, dst_key, params).await
1568    }
1569}
1570
1571/// Custom handling of telemetry events
1572pub trait OnTelemetry: std::fmt::Debug + Send + Sync {
1573    fn on_telemetry(&self, request_metrics: &RequestMetrics);
1574}
1575
1576#[cfg(test)]
1577mod tests {
1578    use mountpoint_s3_crt::common::error::Error;
1579    use rusty_fork::rusty_fork_test;
1580    use std::assert_eq;
1581
1582    use super::*;
1583    use test_case::test_case;
1584
1585    /// Test explicit validation in [Client::new]
1586    fn client_new_fails_with_invalid_part_size(part_size: usize) {
1587        let config = S3ClientConfig::default().part_size(part_size);
1588        let e = S3CrtClient::new(config).expect_err("creating a new client should fail");
1589        let message = if cfg!(target_pointer_width = "64") {
1590            "invalid configuration: part size must be at between 5MiB and 5GiB".to_string()
1591        } else {
1592            format!(
1593                "invalid configuration: part size must be at between 5MiB and {}GiB",
1594                usize::MAX / 1024 / 1024 / 1024
1595            )
1596        };
1597        assert_eq!(e.to_string(), message);
1598    }
1599
1600    /// On 32-bit platform we limit part size with usize:MAX,
1601    /// it is impossible to provide a greater value
1602    #[cfg(target_pointer_width = "64")]
1603    #[test]
1604    fn client_new_fails_with_greater_part_size() {
1605        let part_size = 6 * 1024 * 1024 * 1024; // greater than 5GiB
1606        client_new_fails_with_invalid_part_size(part_size);
1607    }
1608
1609    #[test]
1610    fn client_new_fails_with_smaller_part_size() {
1611        let part_size = 4 * 1024 * 1024; // less than 5MiB
1612        client_new_fails_with_invalid_part_size(part_size);
1613    }
1614
1615    /// Test if the prefix is added correctly to the User-Agent header
1616    #[test]
1617    fn test_user_agent_with_prefix() {
1618        let user_agent_prefix = String::from("someprefix");
1619        let expected_user_agent = "someprefix mountpoint-s3-client/";
1620
1621        let config = S3ClientConfig {
1622            user_agent: Some(UserAgent::new(Some(user_agent_prefix))),
1623            ..Default::default()
1624        };
1625
1626        let client = S3CrtClient::new(config).expect("Create test client");
1627
1628        let mut message = client
1629            .inner
1630            .new_request_template("GET", "amzn-s3-demo-bucket")
1631            .expect("new request template expected");
1632
1633        let headers = message.inner.get_headers().expect("Expected a block of HTTP headers");
1634
1635        let user_agent_header = headers
1636            .get("User-Agent")
1637            .expect("User Agent Header expected with given prefix");
1638        let user_agent_header_value = user_agent_header.value();
1639
1640        assert!(
1641            user_agent_header_value
1642                .to_string_lossy()
1643                .starts_with(expected_user_agent)
1644        );
1645    }
1646
1647    fn assert_expected_host(expected_host: &str, endpoint_config: EndpointConfig) {
1648        let config = S3ClientConfig {
1649            endpoint_config,
1650            ..Default::default()
1651        };
1652
1653        let client = S3CrtClient::new(config).expect("create test client");
1654
1655        let mut message = client
1656            .inner
1657            .new_request_template("GET", "")
1658            .expect("new request template expected");
1659
1660        let headers = message.inner.get_headers().expect("expected a block of HTTP headers");
1661
1662        let host_header = headers.get("Host").expect("Host header expected");
1663        let host_header_value = host_header.value();
1664
1665        assert_eq!(host_header_value.to_string_lossy(), expected_host);
1666    }
1667
1668    // run with rusty_fork to avoid issues with other tests and their env variables.
1669    rusty_fork_test! {
1670        #[test]
1671        fn test_endpoint_favors_parameter_over_env_variable() {
1672            let endpoint_uri = Uri::new_from_str(&Allocator::default(), "https://s3.us-west-2.amazonaws.com").unwrap();
1673            let endpoint_config = EndpointConfig::new("region-place-holder").endpoint(endpoint_uri);
1674
1675            // SAFETY: This test is run in a forked process, so won't affect any other concurrently running tests.
1676            unsafe { std::env::set_var("AWS_ENDPOINT_URL", "https://s3.us-east-1.amazonaws.com"); }
1677
1678            // even though we set the environment variable, the parameter takes precedence
1679            assert_expected_host("s3.us-west-2.amazonaws.com", endpoint_config);
1680        }
1681
1682        #[test]
1683        fn test_endpoint_favors_env_variable() {
1684            let endpoint_config = EndpointConfig::new("us-east-1");
1685
1686            // SAFETY: This test is run in a forked process, so won't affect any other concurrently running tests.
1687            unsafe { std::env::set_var("AWS_ENDPOINT_URL", "https://s3.eu-west-1.amazonaws.com"); }
1688
1689            assert_expected_host("s3.eu-west-1.amazonaws.com", endpoint_config);
1690        }
1691
1692        #[test]
1693        fn test_endpoint_with_invalid_env_variable() {
1694            let endpoint_config = EndpointConfig::new("us-east-1");
1695
1696            // SAFETY: This test is run in a forked process, so won't affect any other concurrently running tests.
1697            unsafe { std::env::set_var("AWS_ENDPOINT_URL", "htp:/bad:url"); }
1698
1699            let config = S3ClientConfig {
1700                endpoint_config,
1701                ..Default::default()
1702            };
1703
1704            let client = S3CrtClient::new(config);
1705            match client {
1706                Ok(_) => panic!("expected an error"),
1707                Err(e) => assert_eq!(e.to_string().to_lowercase(), "invalid s3 endpoint"),
1708            }
1709        }
1710
1711    }
1712
1713    /// Simple test to ensure the user agent header is correct even when prefix is not added
1714    #[test]
1715    fn test_user_agent_without_prefix() {
1716        let expected_user_agent = "mountpoint-s3-client/";
1717
1718        let config: S3ClientConfig = Default::default();
1719
1720        let client = S3CrtClient::new(config).expect("Create test client");
1721
1722        let mut message = client
1723            .inner
1724            .new_request_template("GET", "amzn-s3-demo-bucket")
1725            .expect("new request template expected");
1726
1727        let headers = message.inner.get_headers().expect("Expected a block of HTTP headers");
1728
1729        let user_agent_header = headers
1730            .get("User-Agent")
1731            .expect("User Agent Header expected with given prefix");
1732        let user_agent_header_value = user_agent_header.value();
1733
1734        assert!(
1735            user_agent_header_value
1736                .to_string_lossy()
1737                .starts_with(expected_user_agent)
1738        );
1739    }
1740
1741    #[test_case("bytes 200-1000/67589" => Some(200..1001))]
1742    #[test_case("bytes 200-1000/*" => Some(200..1001))]
1743    #[test_case("bytes 200-1000" => None)]
1744    #[test_case("bytes */67589" => None)]
1745    #[test_case("octets 200-1000]" => None)]
1746    fn parse_content_range(range: &str) -> Option<Range<u64>> {
1747        let mut headers = Headers::new(&Allocator::default()).unwrap();
1748        let header = Header::new("Content-Range", range);
1749        headers.add_header(&header).unwrap();
1750        extract_range_header(&headers)
1751    }
1752
1753    /// Simple test to ensure the expected bucket owner can be set
1754    #[test]
1755    fn test_expected_bucket_owner() {
1756        let expected_bucket_owner = "111122223333";
1757
1758        let config: S3ClientConfig = S3ClientConfig::new().bucket_owner("111122223333");
1759
1760        let client = S3CrtClient::new(config).expect("Create test client");
1761
1762        let mut message = client
1763            .inner
1764            .new_request_template("GET", "amzn-s3-demo-bucket")
1765            .expect("new request template expected");
1766
1767        let headers = message.inner.get_headers().expect("Expected a block of HTTP headers");
1768
1769        let expected_bucket_owner_header = headers
1770            .get("x-amz-expected-bucket-owner")
1771            .expect("the headers should contain x-amz-expected-bucket-owner");
1772        let expected_bucket_owner_value = expected_bucket_owner_header.value();
1773
1774        assert!(
1775            expected_bucket_owner_value
1776                .to_string_lossy()
1777                .starts_with(expected_bucket_owner)
1778        );
1779    }
1780
1781    fn make_result(
1782        response_status: i32,
1783        body: impl Into<OsString>,
1784        bucket_region_header: Option<&str>,
1785    ) -> MetaRequestResult {
1786        let error_response_headers = bucket_region_header.map(|h| {
1787            let mut headers = Headers::new(&Allocator::default()).unwrap();
1788            headers.add_header(&Header::new("x-amz-bucket-region", h)).unwrap();
1789            headers
1790        });
1791        MetaRequestResult {
1792            response_status,
1793            crt_error: 1i32.into(),
1794            error_response_headers,
1795            error_response_body: Some(body.into()),
1796        }
1797    }
1798
1799    #[test]
1800    fn parse_301_redirect() {
1801        let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>PermanentRedirect</Code><Message>The bucket you are attempting to access must be addressed using the specified endpoint. Please send all future requests to this endpoint.</Message><Endpoint>amzn-s3-demo-bucket.s3-us-west-2.amazonaws.com</Endpoint><Bucket>amzn-s3-demo-bucket</Bucket><RequestId>CM0Z9YFABRVSWXDJ</RequestId><HostId>HHmbUixasrJ02DlkOSCvJId897Jm0ERHuE2XMkSn2Oax1J/ad2+AU9nFrODN1ay13cWFgIAYBnI=</HostId></Error>"#;
1802        let result = make_result(301, OsStr::from_bytes(&body[..]), Some("us-west-2"));
1803        let result = try_parse_generic_error(&result);
1804        let Some(S3RequestError::IncorrectRegion(region, _)) = result else {
1805            panic!("wrong result, got: {result:?}");
1806        };
1807        assert_eq!(region, "us-west-2");
1808    }
1809
1810    #[test]
1811    fn parse_403_access_denied() {
1812        let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>AccessDenied</Code><Message>Access Denied</Message><RequestId>CM0R497NB0WAQ977</RequestId><HostId>w1TqUKGaIuNAIgzqm/L2azuzgEBINxTngWPbV1iH2IvpLsVCCTKHJTh4HsGp4JnggHqVkA+KN1MGqHDw1+WEuA==</HostId></Error>"#;
1813        let result = make_result(403, OsStr::from_bytes(&body[..]), None);
1814        let result = try_parse_generic_error(&result);
1815        let Some(S3RequestError::Forbidden(message, _)) = result else {
1816            panic!("wrong result, got: {result:?}");
1817        };
1818        assert_eq!(message, "Access Denied");
1819    }
1820
1821    #[test]
1822    fn parse_400_invalid_token() {
1823        let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>InvalidToken</Code><Message>The provided token is malformed or otherwise invalid.</Message><Token-0>THEREALTOKENGOESHERE</Token-0><RequestId>CBFNVADDAZ8661HK</RequestId><HostId>rb5dpgYeIFxi8p5BzVK8s8wG/nQ4a7C5kMBp/KWIT4bvOUihugpssMTy7xS0mispbz6IIaX8W1g=</HostId></Error>"#;
1824        let result = make_result(400, OsStr::from_bytes(&body[..]), None);
1825        let result = try_parse_generic_error(&result);
1826        let Some(S3RequestError::Forbidden(message, _)) = result else {
1827            panic!("wrong result, got: {result:?}");
1828        };
1829        assert_eq!(message, "The provided token is malformed or otherwise invalid.");
1830    }
1831
1832    #[test]
1833    fn parse_400_expired_token() {
1834        let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>ExpiredToken</Code><Message>The provided token has expired.</Message><Token-0>THEREALTOKENGOESHERE</Token-0><RequestId>RFXW0E15XSRPJYSW</RequestId><HostId>djitP7S+g43JSzR4pMOJpOO3RYpQUOUsmD4AqhRe3v24+JB/c+vwOEZgI8A35KDUe1cqQ5yKHwg=</HostId></Error>"#;
1835        let result = make_result(400, OsStr::from_bytes(&body[..]), None);
1836        let result = try_parse_generic_error(&result);
1837        let Some(S3RequestError::Forbidden(message, _)) = result else {
1838            panic!("wrong result, got: {result:?}");
1839        };
1840        assert_eq!(message, "The provided token has expired.");
1841    }
1842
1843    #[test]
1844    fn parse_400_redirect() {
1845        // From an s3-accelerate endpoint with the wrong region set for signing
1846        let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>AuthorizationHeaderMalformed</Code><Message>The authorization header is malformed; the region \'us-east-1\' is wrong; expecting \'us-west-2\'</Message><Region>us-west-2</Region><RequestId>VR3NH4JF5F39GB66</RequestId><HostId>ZDzYFC1w0E5K34+ZCAnvh9ZiGaAhvx5COyZVYTUnKvSP/694xCiXmJ2AEGZd5T1Epy9vB4EOOjk=</HostId></Error>"#;
1847        let result = make_result(400, OsStr::from_bytes(&body[..]), Some("us-west-2"));
1848        let result = try_parse_generic_error(&result);
1849        let Some(S3RequestError::IncorrectRegion(region, _)) = result else {
1850            panic!("wrong result, got: {result:?}");
1851        };
1852        assert_eq!(region, "us-west-2");
1853    }
1854
1855    #[test]
1856    fn parse_403_signature_does_not_match() {
1857        let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>SignatureDoesNotMatch</Code><Message>The request signature we calculated does not match the signature you provided. Check your key and signing method.</Message><AWSAccessKeyId>ASIASMEXAMPLE0000000</AWSAccessKeyId><StringToSign>EXAMPLE</StringToSign><SignatureProvided>EXAMPLE</SignatureProvided><StringToSignBytes>EXAMPLE</StringToSignBytes><CanonicalRequest>EXAMPLE</CanonicalRequest><CanonicalRequestBytes>EXAMPLE</CanonicalRequestBytes><RequestId>A1F516XX5M8AATSQ</RequestId><HostId>qs9dULIp5ABM7U+H8nGfzKtMYTxvqxIVvOYZ8lEFBDyTF4Fe+876Y4bLptG4mb+PTZFyG4yaUjg=</HostId></Error>"#;
1858        let result = make_result(403, OsStr::from_bytes(&body[..]), None);
1859        let result = try_parse_generic_error(&result);
1860        let Some(S3RequestError::Forbidden(message, _)) = result else {
1861            panic!("wrong result, got: {result:?}");
1862        };
1863        assert_eq!(
1864            message,
1865            "The request signature we calculated does not match the signature you provided. Check your key and signing method."
1866        );
1867    }
1868
1869    #[test]
1870    fn parse_403_made_up_error() {
1871        // A made up error to check that we map all 403s even if we don't recognize them
1872        let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>NotARealError</Code><Message>This error is made up.</Message><RequestId>CM0R497NB0WAQ977</RequestId><HostId>w1TqUKGaIuNAIgzqm/L2azuzgEBINxTngWPbV1iH2IvpLsVCCTKHJTh4HsGp4JnggHqVkA+KN1MGqHDw1+WEuA==</HostId></Error>"#;
1873        let result = make_result(403, OsStr::from_bytes(&body[..]), None);
1874        let result = try_parse_generic_error(&result);
1875        let Some(S3RequestError::Forbidden(message, _)) = result else {
1876            panic!("wrong result, got: {result:?}");
1877        };
1878        assert_eq!(message, "This error is made up.");
1879    }
1880
1881    fn make_crt_error_result(response_status: i32, crt_error: Error) -> MetaRequestResult {
1882        MetaRequestResult {
1883            response_status,
1884            crt_error,
1885            error_response_headers: None,
1886            error_response_body: None,
1887        }
1888    }
1889
1890    #[test]
1891    fn parse_no_signing_credential_error() {
1892        let error_code = mountpoint_s3_crt::auth::ErrorCode::AWS_AUTH_SIGNING_NO_CREDENTIALS as i32;
1893        let result = make_crt_error_result(0, error_code.into());
1894        let result = try_parse_generic_error(&result);
1895        let Some(S3RequestError::NoSigningCredentials) = result else {
1896            panic!("wrong result, got: {result:?}");
1897        };
1898    }
1899
1900    #[test]
1901    fn parse_test_other_crt_error() {
1902        // A signing error that isn't "no signing credentials"
1903        let error_code = mountpoint_s3_crt::auth::ErrorCode::AWS_AUTH_SIGNING_UNSUPPORTED_ALGORITHM as i32;
1904        let result = make_crt_error_result(0, error_code.into());
1905        let result = try_parse_generic_error(&result);
1906        let Some(S3RequestError::CrtError(error)) = result else {
1907            panic!("wrong result, got: {result:?}");
1908        };
1909        assert_eq!(error, error_code.into());
1910    }
1911
1912    #[test]
1913    fn test_checksum_sha256() {
1914        let mut headers = Headers::new(&Allocator::default()).unwrap();
1915        let value = "QwzjTQIHJO11oZbfwq1nx3dy0Wk=";
1916        let header = Header::new("x-amz-checksum-sha256", value.to_owned());
1917        headers.add_header(&header).unwrap();
1918
1919        let checksum = parse_checksum(&headers).expect("failed to parse headers");
1920        assert_eq!(checksum.checksum_crc32, None, "other checksums shouldn't be set");
1921        assert_eq!(checksum.checksum_crc32c, None, "other checksums shouldn't be set");
1922        assert_eq!(checksum.checksum_sha1, None, "other checksums shouldn't be set");
1923        assert_eq!(
1924            checksum.checksum_sha256,
1925            Some(value.to_owned()),
1926            "sha256 header should match"
1927        );
1928    }
1929
1930    #[test]
1931    fn test_operation_name_to_static_metrics_string() {
1932        assert_eq!(operation_name_to_static_metrics_string(Some("GetObject")), "GetObject");
1933        assert_eq!(
1934            operation_name_to_static_metrics_string(Some("RenameObject")),
1935            "RenameObject",
1936        );
1937        assert_eq!(operation_name_to_static_metrics_string(None), "Unknown");
1938    }
1939}