mountpoint_s3_client/
s3_crt_client.rs

1use std::cmp;
2use std::ffi::{OsStr, OsString};
3use std::future::Future;
4use std::num::NonZeroUsize;
5use std::ops::Range;
6use std::ops::{Deref, DerefMut};
7use std::os::unix::prelude::OsStrExt;
8use std::pin::Pin;
9use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
10use std::sync::{Arc, Mutex};
11use std::task::{Context, Poll};
12use std::time::{Duration, Instant};
13
14use futures::FutureExt;
15use futures::future::{Fuse, FusedFuture};
16pub use mountpoint_s3_crt::auth::credentials::{CredentialsProvider, CredentialsProviderStaticOptions};
17use mountpoint_s3_crt::auth::credentials::{CredentialsProviderChainDefaultOptions, CredentialsProviderProfileOptions};
18use mountpoint_s3_crt::auth::signing_config::SigningConfig;
19use mountpoint_s3_crt::common::allocator::Allocator;
20pub use mountpoint_s3_crt::common::error::Error as CrtError;
21use mountpoint_s3_crt::common::string::AwsString;
22use mountpoint_s3_crt::common::uri::Uri;
23use mountpoint_s3_crt::http::request_response::{Header, Headers, HeadersError, Message};
24use mountpoint_s3_crt::io::channel_bootstrap::{ClientBootstrap, ClientBootstrapOptions};
25pub use mountpoint_s3_crt::io::event_loop::EventLoopGroup;
26use mountpoint_s3_crt::io::host_resolver::{AddressKinds, HostResolver, HostResolverDefaultOptions};
27use mountpoint_s3_crt::io::retry_strategy::{ExponentialBackoffJitterMode, RetryStrategy, StandardRetryOptions};
28use mountpoint_s3_crt::io::stream::InputStream;
29use mountpoint_s3_crt::s3::buffer::Buffer;
30use mountpoint_s3_crt::s3::client::{
31    BufferPoolUsageStats, ChecksumConfig, Client, ClientConfig, MetaRequest, MetaRequestOptions, MetaRequestResult,
32    MetaRequestType, RequestMetrics, init_signing_config,
33};
34
35use async_trait::async_trait;
36use futures::channel::oneshot;
37use mountpoint_s3_crt::s3::pool::{CrtBufferPoolFactory, MemoryPool, MemoryPoolFactory};
38use percent_encoding::{AsciiSet, NON_ALPHANUMERIC, percent_encode};
39use pin_project::pin_project;
40use thiserror::Error;
41use tracing::{Span, debug, error, trace};
42
43use crate::checksums::{crc32_to_base64, crc32c_to_base64, crc64nvme_to_base64, sha1_to_base64, sha256_to_base64};
44use crate::endpoint_config::EndpointError;
45use crate::endpoint_config::{self, EndpointConfig};
46use crate::error_metadata::{ClientErrorMetadata, ProvideErrorMetadata};
47use crate::metrics::{
48    ATTR_HTTP_STATUS, ATTR_S3_REQUEST, S3_REQUEST_CANCELED, S3_REQUEST_COUNT, S3_REQUEST_ERRORS,
49    S3_REQUEST_FIRST_BYTE_LATENCY, S3_REQUEST_TOTAL_LATENCY,
50};
51use crate::object_client::*;
52use crate::user_agent::UserAgent;
53
54macro_rules! request_span {
55    ($self:expr, $method:expr, $($field:tt)*) => {{
56        let counter = $self.next_request_counter();
57        // I have confused myself at least 4 times about how to choose the level for tracing spans.
58        // We want this span to be constructed whenever events at WARN or lower severity (INFO,
59        // DEBUG, TRACE) are emitted. So we set its severity to WARN too.
60        let span = tracing::warn_span!(target: "mountpoint_s3_client::s3_crt_client::request", $method, id = counter, $($field)*);
61        span.in_scope(|| tracing::debug!("new request"));
62        span
63    }};
64    ($self:expr, $method:expr) => { request_span!($self, $method,) };
65}
66
67pub(crate) mod copy_object;
68pub(crate) mod delete_object;
69pub(crate) mod get_object;
70
71pub(crate) use get_object::S3GetObjectResponse;
72pub(crate) mod get_object_attributes;
73
74pub(crate) mod head_object;
75pub(crate) mod list_objects;
76
77pub(crate) mod rename_object;
78
79pub(crate) mod head_bucket;
80pub(crate) mod put_object;
81pub use head_bucket::HeadBucketError;
82pub(crate) use put_object::S3PutObjectRequest;
83
84/// `tracing` doesn't allow dynamic levels but we want to dynamically choose the log level for
85/// requests based on their response status. https://github.com/tokio-rs/tracing/issues/372
86macro_rules! event {
87    ($level:expr, $($args:tt)*) => {
88        match $level {
89            ::tracing::Level::ERROR => ::tracing::event!(::tracing::Level::ERROR, $($args)*),
90            ::tracing::Level::WARN => ::tracing::event!(::tracing::Level::WARN, $($args)*),
91            ::tracing::Level::INFO => ::tracing::event!(::tracing::Level::INFO, $($args)*),
92            ::tracing::Level::DEBUG => ::tracing::event!(::tracing::Level::DEBUG, $($args)*),
93            ::tracing::Level::TRACE => ::tracing::event!(::tracing::Level::TRACE, $($args)*),
94        }
95    }
96}
97
98/// Configurations for the CRT-based S3 client
99#[derive(Debug, Clone)]
100pub struct S3ClientConfig {
101    auth_config: S3ClientAuthConfig,
102    throughput_target_gbps: f64,
103    memory_limit_in_bytes: u64,
104    read_part_size: usize,
105    write_part_size: usize,
106    endpoint_config: EndpointConfig,
107    user_agent: Option<UserAgent>,
108    request_payer: Option<String>,
109    bucket_owner: Option<String>,
110    max_attempts: Option<NonZeroUsize>,
111    read_backpressure: bool,
112    initial_read_window: usize,
113    network_interface_names: Vec<String>,
114    telemetry_callback: Option<Arc<dyn OnTelemetry>>,
115    event_loop_threads: Option<u16>,
116    buffer_pool_factory: Option<CrtBufferPoolFactory>,
117}
118
119impl Default for S3ClientConfig {
120    fn default() -> Self {
121        const DEFAULT_PART_SIZE: usize = 8 * 1024 * 1024;
122        Self {
123            auth_config: Default::default(),
124            throughput_target_gbps: 10.0,
125            memory_limit_in_bytes: 0,
126            read_part_size: DEFAULT_PART_SIZE,
127            write_part_size: DEFAULT_PART_SIZE,
128            endpoint_config: EndpointConfig::new("us-east-1"),
129            user_agent: None,
130            request_payer: None,
131            bucket_owner: None,
132            max_attempts: None,
133            read_backpressure: false,
134            initial_read_window: DEFAULT_PART_SIZE,
135            network_interface_names: vec![],
136            telemetry_callback: None,
137            event_loop_threads: None,
138            buffer_pool_factory: None,
139        }
140    }
141}
142
143impl S3ClientConfig {
144    pub fn new() -> Self {
145        Self::default()
146    }
147
148    /// Set the configuration for authenticating to S3
149    #[must_use = "S3ClientConfig follows a builder pattern"]
150    pub fn auth_config(mut self, auth_config: S3ClientAuthConfig) -> Self {
151        self.auth_config = auth_config;
152        self
153    }
154
155    /// Set the part size for multi-part operations to S3 (both PUT and GET)
156    #[must_use = "S3ClientConfig follows a builder pattern"]
157    pub fn part_size(mut self, part_size: usize) -> Self {
158        self.read_part_size = part_size;
159        self.write_part_size = part_size;
160        self
161    }
162
163    /// Set the part size for multi-part-get operations to S3.
164    #[must_use = "S3ClientConfig follows a builder pattern"]
165    pub fn read_part_size(mut self, size: usize) -> Self {
166        self.read_part_size = size;
167        self
168    }
169
170    /// Set the part size for multi-part-put operations to S3.
171    #[must_use = "S3ClientConfig follows a builder pattern"]
172    pub fn write_part_size(mut self, size: usize) -> Self {
173        self.write_part_size = size;
174        self
175    }
176
177    /// Set the target throughput in Gbps for the S3 client
178    #[must_use = "S3ClientConfig follows a builder pattern"]
179    pub fn throughput_target_gbps(mut self, throughput_target_gbps: f64) -> Self {
180        self.throughput_target_gbps = throughput_target_gbps;
181        self
182    }
183
184    /// Set the memory limit in bytes for the S3 client
185    #[must_use = "S3ClientConfig follows a builder pattern"]
186    pub fn memory_limit_in_bytes(mut self, memory_limit_in_bytes: u64) -> Self {
187        self.memory_limit_in_bytes = memory_limit_in_bytes;
188        self
189    }
190
191    /// Set the endpoint configuration for endpoint resolution
192    #[must_use = "S3ClientConfig follows a builder pattern"]
193    pub fn endpoint_config(mut self, endpoint_config: EndpointConfig) -> Self {
194        self.endpoint_config = endpoint_config;
195        self
196    }
197
198    /// Set a constructor for the HTTP User-agent header for S3 requests
199    #[must_use = "S3ClientConfig follows a builder pattern"]
200    pub fn user_agent(mut self, user_agent: UserAgent) -> Self {
201        self.user_agent = Some(user_agent);
202        self
203    }
204
205    /// Set a value for the request payer HTTP header for S3 requests
206    #[must_use = "S3ClientConfig follows a builder pattern"]
207    pub fn request_payer(mut self, request_payer: &str) -> Self {
208        self.request_payer = Some(request_payer.to_owned());
209        self
210    }
211
212    /// Set an expected bucket owner value
213    #[must_use = "S3ClientConfig follows a builder pattern"]
214    pub fn bucket_owner(mut self, bucket_owner: &str) -> Self {
215        self.bucket_owner = Some(bucket_owner.to_owned());
216        self
217    }
218
219    /// Set a maximum number of attempts for S3 requests. Will be overridden by the
220    /// `AWS_MAX_ATTEMPTS` environment variable if set.
221    #[must_use = "S3ClientConfig follows a builder pattern"]
222    pub fn max_attempts(mut self, max_attempts: NonZeroUsize) -> Self {
223        self.max_attempts = Some(max_attempts);
224        self
225    }
226
227    /// Set the flag for backpressure read
228    #[must_use = "S3ClientConfig follows a builder pattern"]
229    pub fn read_backpressure(mut self, read_backpressure: bool) -> Self {
230        self.read_backpressure = read_backpressure;
231        self
232    }
233
234    /// Set a value for initial backpressure read window size
235    #[must_use = "S3ClientConfig follows a builder pattern"]
236    pub fn initial_read_window(mut self, initial_read_window: usize) -> Self {
237        self.initial_read_window = initial_read_window;
238        self
239    }
240
241    /// Set a list of network interfaces to distribute S3 requests over
242    #[must_use = "S3ClientConfig follows a builder pattern"]
243    pub fn network_interface_names(mut self, network_interface_names: Vec<String>) -> Self {
244        self.network_interface_names = network_interface_names;
245        self
246    }
247
248    /// Set a custom telemetry callback handler
249    #[must_use = "S3ClientConfig follows a builder pattern"]
250    pub fn telemetry_callback(mut self, telemetry_callback: Arc<dyn OnTelemetry>) -> Self {
251        self.telemetry_callback = Some(telemetry_callback);
252        self
253    }
254
255    /// Override the number of threads used by the CRTs AwsEventLoop
256    #[must_use = "S3ClientConfig follows a builder pattern"]
257    pub fn event_loop_threads(mut self, event_loop_threads: u16) -> Self {
258        self.event_loop_threads = Some(event_loop_threads);
259        self
260    }
261
262    /// Set a custom memory pool
263    #[must_use = "S3ClientConfig follows a builder pattern"]
264    pub fn memory_pool(mut self, pool: impl MemoryPool) -> Self {
265        self.buffer_pool_factory = Some(CrtBufferPoolFactory::new(move |_| pool.clone()));
266        self
267    }
268
269    /// Set a custom memory pool factory
270    #[must_use = "S3ClientConfig follows a builder pattern"]
271    pub fn memory_pool_factory(mut self, pool_factory: impl MemoryPoolFactory) -> Self {
272        self.buffer_pool_factory = Some(CrtBufferPoolFactory::new(pool_factory));
273        self
274    }
275}
276
277/// Authentication configuration for the CRT-based S3 client
278#[derive(Debug, Clone, Default)]
279pub enum S3ClientAuthConfig {
280    /// The default AWS credentials resolution chain, similar to the AWS CLI
281    #[default]
282    Default,
283    /// Do not sign requests at all
284    NoSigning,
285    /// Explicitly load the given profile name from the AWS CLI configuration file
286    Profile(String),
287    /// Use a custom credentials provider
288    Provider(CredentialsProvider),
289}
290
291/// An S3 client that uses the [AWS Common Runtime (CRT)][crt] to make requests.
292///
293/// The AWS CRT is a C library that provides a common set of functionality for AWS SDKs. Its S3
294/// client provides high throughput by implementing S3 performance best practices, including
295/// automatic parallelization of GET and PUT requests.
296///
297/// To use this client, invoke the methods from the [`ObjectClient`] trait.
298///
299/// [crt]: https://docs.aws.amazon.com/sdkref/latest/guide/common-runtime.html
300#[derive(Debug, Clone)]
301pub struct S3CrtClient {
302    inner: Arc<S3CrtClientInner>,
303}
304
305impl S3CrtClient {
306    /// Construct a new S3 client with the given configuration.
307    pub fn new(config: S3ClientConfig) -> Result<Self, NewClientError> {
308        Ok(Self {
309            inner: Arc::new(S3CrtClientInner::new(config)?),
310        })
311    }
312
313    /// Return a copy of the [EndpointConfig] for this client
314    pub fn endpoint_config(&self) -> EndpointConfig {
315        self.inner.endpoint_config.clone()
316    }
317
318    #[doc(hidden)]
319    pub fn event_loop_group(&self) -> EventLoopGroup {
320        self.inner.event_loop_group.clone()
321    }
322}
323
324#[derive(Debug)]
325struct S3CrtClientInner {
326    s3_client: Client,
327    event_loop_group: EventLoopGroup,
328    endpoint_config: EndpointConfig,
329    allocator: Allocator,
330    next_request_counter: AtomicU64,
331    /// user_agent_header will be passed into CRT which add additional information "CRTS3NativeClient/0.1.x".
332    /// Here it will add the user agent prefix and s3 client information.
333    user_agent_header: String,
334    request_payer: Option<String>,
335    read_part_size: usize,
336    write_part_size: usize,
337    enable_backpressure: bool,
338    initial_read_window_size: usize,
339    bucket_owner: Option<String>,
340    credentials_provider: Option<CredentialsProvider>,
341    host_resolver: HostResolver,
342    telemetry_callback: Option<Arc<dyn OnTelemetry>>,
343}
344
345impl S3CrtClientInner {
346    fn new(config: S3ClientConfig) -> Result<Self, NewClientError> {
347        let allocator = Allocator::default();
348
349        let mut event_loop_group = EventLoopGroup::new_default(&allocator, config.event_loop_threads, || {}).unwrap();
350
351        let resolver_options = HostResolverDefaultOptions {
352            max_entries: 8,
353            event_loop_group: &mut event_loop_group,
354        };
355
356        let mut host_resolver = HostResolver::new_default(&allocator, &resolver_options).unwrap();
357
358        let bootstrap_options = ClientBootstrapOptions {
359            event_loop_group: &mut event_loop_group,
360            host_resolver: &mut host_resolver,
361        };
362
363        let mut client_bootstrap = ClientBootstrap::new(&allocator, &bootstrap_options).unwrap();
364
365        let mut client_config = ClientConfig::new();
366
367        let retry_strategy = {
368            let mut retry_strategy_options = StandardRetryOptions::default(&mut event_loop_group);
369            let max_attempts = std::env::var("AWS_MAX_ATTEMPTS")
370                .ok()
371                .and_then(|s| s.parse::<usize>().ok())
372                .or_else(|| config.max_attempts.map(|m| m.get()))
373                .unwrap_or(3);
374            // Max *attempts* includes the initial attempt, the CRT's max *retries* does not, so
375            // decrement by one
376            retry_strategy_options.backoff_retry_options.max_retries = max_attempts.saturating_sub(1);
377            retry_strategy_options.backoff_retry_options.backoff_scale_factor = Duration::from_millis(500);
378            retry_strategy_options.backoff_retry_options.jitter_mode = ExponentialBackoffJitterMode::Full;
379            RetryStrategy::standard(&allocator, &retry_strategy_options).unwrap()
380        };
381
382        trace!("constructing client with auth config {:?}", config.auth_config);
383        let credentials_provider = match config.auth_config {
384            S3ClientAuthConfig::Default => {
385                let credentials_chain_default_options = CredentialsProviderChainDefaultOptions {
386                    bootstrap: &mut client_bootstrap,
387                };
388                CredentialsProvider::new_chain_default(&allocator, credentials_chain_default_options)
389                    .map_err(NewClientError::ProviderFailure)?
390            }
391            S3ClientAuthConfig::NoSigning => {
392                CredentialsProvider::new_anonymous(&allocator).map_err(NewClientError::ProviderFailure)?
393            }
394            S3ClientAuthConfig::Profile(profile_name) => {
395                let credentials_profile_options = CredentialsProviderProfileOptions {
396                    bootstrap: &mut client_bootstrap,
397                    profile_name_override: &profile_name,
398                };
399                CredentialsProvider::new_profile(&allocator, credentials_profile_options)
400                    .map_err(NewClientError::ProviderFailure)?
401            }
402            S3ClientAuthConfig::Provider(provider) => provider,
403        };
404
405        let endpoint_config = config.endpoint_config;
406        client_config.region(endpoint_config.get_region());
407        let signing_config = init_signing_config(
408            endpoint_config.get_region(),
409            credentials_provider.clone(),
410            None,
411            None,
412            None,
413        );
414
415        let endpoint_config = match endpoint_config.get_endpoint() {
416            None => {
417                // No explicit endpoint was configured, let's try the environment variable
418                if let Ok(aws_endpoint_url) = std::env::var("AWS_ENDPOINT_URL") {
419                    debug!("using AWS_ENDPOINT_URL {}", aws_endpoint_url);
420                    let env_uri = Uri::new_from_str(&allocator, &aws_endpoint_url)
421                        .map_err(|e| EndpointError::InvalidUri(endpoint_config::InvalidUriError::CouldNotParse(e)))?;
422                    endpoint_config.endpoint(env_uri)
423                } else {
424                    endpoint_config
425                }
426            }
427            Some(_) => endpoint_config,
428        };
429
430        client_config.express_support(true);
431        client_config.read_backpressure(config.read_backpressure);
432        client_config.initial_read_window(config.initial_read_window);
433        client_config.signing_config(signing_config);
434
435        client_config
436            .client_bootstrap(client_bootstrap)
437            .retry_strategy(retry_strategy);
438
439        client_config.throughput_target_gbps(config.throughput_target_gbps);
440        client_config.memory_limit_in_bytes(config.memory_limit_in_bytes);
441
442        if let Some(pool_factory) = &config.buffer_pool_factory {
443            client_config.buffer_pool_factory(pool_factory.clone());
444        }
445
446        // max_part_size is 5GB or less depending on the platform (4GB on 32-bit)
447        let max_part_size = cmp::min(5_u64 * 1024 * 1024 * 1024, usize::MAX as u64) as usize;
448        // TODO: Review the part size validation for read_part_size, read_part_size can have a more relax limit.
449        for part_size in [config.read_part_size, config.write_part_size] {
450            if !(5 * 1024 * 1024..=max_part_size).contains(&part_size) {
451                return Err(NewClientError::InvalidConfiguration(format!(
452                    "part size must be at between 5MiB and {}GiB",
453                    max_part_size / 1024 / 1024 / 1024
454                )));
455            }
456        }
457
458        if !config.network_interface_names.is_empty() {
459            client_config.network_interface_names(config.network_interface_names);
460        }
461
462        let user_agent = config.user_agent.unwrap_or_else(|| UserAgent::new(None));
463        let user_agent_header = user_agent.build();
464
465        let s3_client = Client::new(&allocator, client_config).map_err(NewClientError::CrtError)?;
466
467        Ok(Self {
468            allocator,
469            s3_client,
470            event_loop_group,
471            endpoint_config,
472            next_request_counter: AtomicU64::new(0),
473            user_agent_header,
474            request_payer: config.request_payer,
475            read_part_size: config.read_part_size,
476            write_part_size: config.write_part_size,
477            enable_backpressure: config.read_backpressure,
478            initial_read_window_size: config.initial_read_window,
479            bucket_owner: config.bucket_owner,
480            credentials_provider: Some(credentials_provider),
481            host_resolver,
482            telemetry_callback: config.telemetry_callback,
483        })
484    }
485
486    /// Create a new HTTP request template for the given HTTP method and S3 bucket name.
487    /// Pre-populates common headers used across all requests. Sets the "accept" header assuming the
488    /// response should be XML; this header should be overwritten for requests like GET that return
489    /// object data.
490    fn new_request_template(&self, method: &str, bucket: &str) -> Result<S3Message<'_>, ConstructionError> {
491        let endpoint = self.endpoint_config.resolve_for_bucket(bucket)?;
492        let uri = endpoint.uri()?;
493        trace!(?uri, "resolved endpoint");
494
495        let signing_config = if let Some(credentials_provider) = &self.credentials_provider {
496            let auth_scheme = match endpoint.auth_scheme() {
497                Ok(auth_scheme) => auth_scheme,
498                Err(e) => {
499                    error!(error=?e, "no auth scheme for endpoint");
500                    return Err(e.into());
501                }
502            };
503            trace!(?auth_scheme, "resolved auth scheme");
504            let algorithm = Some(auth_scheme.scheme_name());
505            let service = Some(auth_scheme.signing_name());
506            let use_double_uri_encode = Some(!auth_scheme.disable_double_encoding());
507            Some(init_signing_config(
508                auth_scheme.signing_region(),
509                credentials_provider.clone(),
510                algorithm,
511                service,
512                use_double_uri_encode,
513            ))
514        } else {
515            None
516        };
517
518        let hostname = uri.host_name().to_str().unwrap();
519        let path_prefix = uri.path().to_os_string().into_string().unwrap();
520        let port = uri.host_port();
521        let hostname_header = if port > 0 {
522            format!("{hostname}:{port}")
523        } else {
524            hostname.to_string()
525        };
526
527        let mut message = Message::new_request(&self.allocator)?;
528        message.set_request_method(method)?;
529        message.add_header(&Header::new("Host", hostname_header))?;
530        message.add_header(&Header::new("accept", "application/xml"))?;
531        message.add_header(&Header::new("User-Agent", &self.user_agent_header))?;
532
533        if let Some(ref payer) = self.request_payer {
534            message.add_header(&Header::new("x-amz-request-payer", payer))?;
535        }
536
537        if let Some(ref owner) = self.bucket_owner {
538            message.add_header(&Header::new("x-amz-expected-bucket-owner", owner))?;
539        }
540
541        Ok(S3Message {
542            inner: message,
543            uri,
544            path_prefix,
545            checksum_config: None,
546            signing_config,
547        })
548    }
549
550    /// Make a meta-request using this S3 client that invokes the given callbacks as the request
551    /// makes progress.
552    ///
553    /// The `parse_meta_request_error` callback is invoked on failed requests. It should return `None`
554    /// if it doesn't have a request-specific failure reason. The client will apply some generic error
555    /// parsing in this case (e.g. for permissions errors).
556    #[allow(clippy::too_many_arguments)]
557    fn meta_request_with_callbacks<E: std::error::Error + Send + 'static>(
558        &self,
559        mut options: MetaRequestOptions,
560        request_span: Span,
561        on_request_finish: impl Fn(&RequestMetrics) + Send + 'static,
562        mut on_headers: impl FnMut(&Headers, i32) + Send + 'static,
563        mut on_body: impl FnMut(u64, &Buffer) + Send + 'static,
564        parse_meta_request_error: impl FnOnce(&MetaRequestResult) -> Option<E> + Send + 'static,
565        on_meta_request_result: impl FnOnce(ObjectClientResult<(), E, S3RequestError>) + Send + 'static,
566    ) -> Result<CancellingMetaRequest, S3RequestError> {
567        let span_telemetry = request_span.clone();
568        let span_body = request_span.clone();
569        let span_finish = request_span;
570
571        let endpoint = options.get_endpoint().expect("S3Message always has an endpoint");
572        let hostname = endpoint.host_name().to_str().unwrap().to_owned();
573        let host_resolver = self.host_resolver.clone();
574        let telemetry_callback = self.telemetry_callback.clone();
575
576        let start_time = Instant::now();
577        let first_body_part = Arc::new(AtomicBool::new(true));
578        let first_body_part_clone = Arc::clone(&first_body_part);
579        let total_bytes = Arc::new(AtomicU64::new(0));
580        let total_bytes_clone = Arc::clone(&total_bytes);
581
582        options
583            .on_telemetry(move |metrics| {
584                let _guard = span_telemetry.enter();
585
586                on_request_finish(metrics);
587
588                let http_status = metrics.status_code();
589                let request_canceled = metrics.is_canceled();
590                let request_failure = http_status.map(|status| !(200..299).contains(&status)).unwrap_or(!request_canceled);
591                let crt_error = Some(metrics.error()).filter(|e| e.is_err());
592                let operation_name = operation_name_to_static_metrics_string(metrics.operation_name());
593                let request_id = metrics.request_id().unwrap_or_else(|| "<unknown>".into());
594                let duration = metrics.total_duration();
595                let ttfb = metrics.time_to_first_byte();
596                let range = metrics.response_headers().and_then(|headers| extract_range_header(&headers));
597
598                let message = if request_failure {
599                    "S3 request failed"
600                } else if request_canceled {
601                    "S3 request canceled"
602                } else {
603                    "S3 request finished"
604                };
605                debug!(?operation_name, ?crt_error, http_status, ?range, ?duration, ?ttfb, %request_id, "{}", message);
606                trace!(detailed_metrics=?metrics, "S3 request completed");
607
608                if let Some(ttfb) = ttfb {
609                    metrics::histogram!(S3_REQUEST_FIRST_BYTE_LATENCY, ATTR_S3_REQUEST => operation_name).record(ttfb.as_micros() as f64);
610                }
611                metrics::histogram!(S3_REQUEST_TOTAL_LATENCY, ATTR_S3_REQUEST => operation_name).record(duration.as_micros() as f64);
612                metrics::counter!(S3_REQUEST_COUNT, ATTR_S3_REQUEST => operation_name).increment(1);
613                if request_failure {
614                    metrics::counter!(S3_REQUEST_ERRORS, ATTR_S3_REQUEST => operation_name, ATTR_HTTP_STATUS => http_status.unwrap_or(-1).to_string()).increment(1);
615                } else if request_canceled {
616                    metrics::counter!(S3_REQUEST_CANCELED, ATTR_S3_REQUEST => operation_name).increment(1);
617                }
618
619                if let Some(telemetry_callback) = &telemetry_callback {
620                    telemetry_callback.on_telemetry(metrics);
621                }
622            })
623            .on_headers(move |headers, response_status| {
624                (on_headers)(headers, response_status);
625            })
626            .on_body(move |range_start, data| {
627                let _guard = span_body.enter();
628
629                if first_body_part.compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst).ok() == Some(true) {
630                    let latency = start_time.elapsed().as_micros() as f64;
631                    let op = span_body.metadata().map(|m| m.name()).unwrap_or("unknown");
632                    metrics::histogram!("s3.meta_requests.first_byte_latency_us", "op" => op).record(latency);
633                }
634                total_bytes.fetch_add(data.len() as u64, Ordering::SeqCst);
635
636                trace!(start = range_start, length = data.len(), "body part received");
637
638                (on_body)(range_start, data);
639            })
640            .on_finish(move |request_result| {
641                let _guard = span_finish.enter();
642
643                let op = span_finish.metadata().map(|m| m.name()).unwrap_or("unknown");
644                let duration = start_time.elapsed();
645
646                metrics::counter!("s3.meta_requests", "op" => op).increment(1);
647                metrics::histogram!("s3.meta_requests.total_latency_us", "op" => op).record(duration.as_micros() as f64);
648                // Some HTTP requests (like HEAD) don't have a body to stream back, so calculate TTFB now
649                if first_body_part_clone.compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst).ok() == Some(true)  {
650                    let latency = duration.as_micros() as f64;
651                    metrics::histogram!("s3.meta_requests.first_byte_latency_us", "op" => op).record(latency);
652                }
653                let total_bytes = total_bytes_clone.load(Ordering::SeqCst);
654                // We only log throughput of object data. PUT needs to be measured in its stream
655                // implementation rather than these callbacks, so we can only do GET here.
656                if op == "get_object" {
657                    emit_throughput_metric(total_bytes, duration, op);
658                }
659                let hostname_awsstring = AwsString::from_str(&hostname, &Allocator::default());
660                if let Ok(host_count) = host_resolver.get_host_address_count(&hostname_awsstring, AddressKinds::a()) {
661                    metrics::gauge!("s3.client.host_count", "host" => hostname).set(host_count as f64);
662                }
663
664                let status_code = request_result.response_status;
665                let log_level = if (200..=399).contains(&status_code) || status_code == 404 || request_result.is_canceled() {
666                    tracing::Level::DEBUG
667                } else {
668                    tracing::Level::WARN
669                };
670
671                let result =
672                    if !request_result.is_err() {
673                        event!(log_level, ?duration, "meta request finished");
674                        Ok(())
675                    } else {
676                        // The `parse_meta_request_error` callback has a choice of whether to give us an error or not.
677                        // If not, fall back to generic error parsing (e.g. for permissions errors), or just no error if that fails too.
678                        let error = parse_meta_request_error(&request_result).map(ObjectClientError::ServiceError);
679                        let maybe_err = error.or_else(|| try_parse_generic_error(&request_result).map(ObjectClientError::ClientError));
680
681                        // Try to parse request header out of the failure. We can't just use the
682                        // telemetry callback because there might be multiple requests per meta
683                        // request, but these headers are known to be from the failed request.
684                        let request_id = match &request_result.error_response_headers {
685                            Some(headers) => headers.get("x-amz-request-id").map(|s| s.value().to_string_lossy().to_string()).ok(),
686                            None => None,
687                        };
688                        let request_id = request_id.unwrap_or_else(|| "<unknown>".into());
689
690                        let message = if request_result.is_canceled() {
691                            "meta request canceled"
692                        } else {
693                            "meta request failed"
694                        };
695                        if let Some(error) = &maybe_err {
696                            event!(log_level, ?duration, %request_id, ?error, message);
697                            debug!("meta request result: {:?}", request_result);
698                        } else {
699                            event!(log_level, ?duration, %request_id, ?request_result, message);
700                        }
701
702                        if request_result.is_canceled() {
703                            metrics::counter!("s3.meta_requests.canceled", "op" => op).increment(1);
704                        } else {
705                            // If it's not a real HTTP status, encode the CRT error in the metric instead
706                            let error_status = if request_result.response_status >= 100 {
707                                request_result.response_status
708                            } else {
709                                -request_result.crt_error.raw_error()
710                            };
711                            metrics::counter!("s3.meta_requests.failures", "op" => op, "status" => format!("{error_status}")).increment(1);
712                        }
713
714                        // Fill in a generic error if we weren't able to parse one
715                        Err(maybe_err.unwrap_or_else(|| ObjectClientError::ClientError(S3RequestError::ResponseError(request_result))))
716                    };
717
718                on_meta_request_result(result);
719            });
720
721        // Issue the HTTP request using the CRT's S3 meta request API
722        let meta_request = self.s3_client.make_meta_request(options)?;
723        Self::poll_client_metrics(&self.s3_client);
724        Ok(CancellingMetaRequest::wrap(meta_request))
725    }
726
727    /// Make a meta-request using this S3 client that returns the response body on success or
728    /// invokes the given callback on failure.
729    ///
730    /// The `parse_meta_request_error` callback is invoked on failed requests. It should return `None`
731    /// if it doesn't have a request-specific failure reason. The client will apply some generic error
732    /// parsing in this case (e.g. for permissions errors).
733    fn meta_request_with_body_payload<E: std::error::Error + Send + 'static>(
734        &self,
735        options: MetaRequestOptions,
736        request_span: Span,
737        parse_meta_request_error: impl FnOnce(&MetaRequestResult) -> Option<E> + Send + 'static,
738    ) -> Result<S3MetaRequest<Vec<u8>, E>, S3RequestError> {
739        let (tx, rx) = oneshot::channel::<ObjectClientResult<Vec<u8>, E, S3RequestError>>();
740
741        // Accumulate the body of the response into this Vec<u8>
742        let body: Arc<Mutex<Vec<u8>>> = Default::default();
743        let body_clone = Arc::clone(&body);
744
745        let meta_request = self.meta_request_with_callbacks(
746            options,
747            request_span,
748            |_| {},
749            |_, _| {},
750            move |offset, data| {
751                let mut body = body_clone.lock().unwrap();
752                assert_eq!(offset as usize, body.len());
753                body.extend_from_slice(data);
754            },
755            parse_meta_request_error,
756            move |result| _ = tx.send(result.map(|_| std::mem::take(&mut *body.lock().unwrap()))),
757        )?;
758        Ok(S3MetaRequest {
759            receiver: rx.fuse(),
760            meta_request,
761        })
762    }
763
764    /// Make a meta-request using this S3 client that returns the response headers on success or
765    /// invokes the given callback on failure.
766    ///
767    /// The `parse_meta_request_error` callback is invoked on failed requests. It should return `None`
768    /// if it doesn't have a request-specific failure reason. The client will apply some generic error
769    /// parsing in this case (e.g. for permissions errors).
770    fn meta_request_with_headers_payload<E: std::error::Error + Send + 'static>(
771        &self,
772        options: MetaRequestOptions,
773        request_span: Span,
774        parse_meta_request_error: impl FnOnce(&MetaRequestResult) -> Option<E> + Send + 'static,
775    ) -> Result<S3MetaRequest<Headers, E>, S3RequestError> {
776        let (tx, rx) = oneshot::channel::<ObjectClientResult<Headers, E, S3RequestError>>();
777
778        // On success, stash the headers in this lock during the on_headers callback,
779        // and pull them out during the on_meta_request_result callback.
780        let on_headers: Arc<Mutex<Option<Headers>>> = Default::default();
781        let on_result = on_headers.clone();
782
783        let meta_request = self.meta_request_with_callbacks(
784            options,
785            request_span,
786            |_| {},
787            move |headers, status| {
788                // Only store headers if we have a 2xx status code. If we only get other status codes,
789                // then on_meta_request_result will send an error.
790                if (200..300).contains(&status) {
791                    *on_headers.lock().unwrap() = Some(headers.clone());
792                }
793            },
794            |_, _| {},
795            parse_meta_request_error,
796            move |result| {
797                // Return the headers on success, otherwise propagate the error.
798                let headers =
799                    result.and_then(|_| {
800                        on_result.lock().unwrap().take().ok_or_else(|| {
801                            S3RequestError::internal_failure(ResponseHeadersError::MissingHeaders).into()
802                        })
803                    });
804                _ = tx.send(headers);
805            },
806        )?;
807        Ok(S3MetaRequest {
808            receiver: rx.fuse(),
809            meta_request,
810        })
811    }
812
813    /// Make a meta-request using this S3 client that invokes the given callback on failure.
814    ///
815    /// The `parse_meta_request_error` callback is invoked on failed requests. It should return `None`
816    /// if it doesn't have a request-specific failure reason. The client will apply some generic error
817    /// parsing in this case (e.g. for permissions errors).
818    fn meta_request_without_payload<E: std::error::Error + Send + 'static>(
819        &self,
820        options: MetaRequestOptions,
821        request_span: Span,
822        parse_meta_request_error: impl FnOnce(&MetaRequestResult) -> Option<E> + Send + 'static,
823    ) -> Result<S3MetaRequest<(), E>, S3RequestError> {
824        let (tx, rx) = oneshot::channel::<ObjectClientResult<(), E, S3RequestError>>();
825
826        let meta_request = self.meta_request_with_callbacks(
827            options,
828            request_span,
829            |_| {},
830            |_, _| {},
831            |_, _| {},
832            parse_meta_request_error,
833            move |result| _ = tx.send(result),
834        )?;
835        Ok(S3MetaRequest {
836            receiver: rx.fuse(),
837            meta_request,
838        })
839    }
840
841    fn poll_client_metrics(s3_client: &Client) {
842        let metrics = s3_client.poll_client_metrics();
843        metrics::gauge!("s3.client.num_requests_being_processed").set(metrics.num_requests_tracked_requests as f64);
844        metrics::gauge!("s3.client.num_requests_being_prepared").set(metrics.num_requests_being_prepared as f64);
845        metrics::gauge!("s3.client.request_queue_size").set(metrics.request_queue_size as f64);
846        metrics::gauge!("s3.client.num_auto_default_network_io").set(metrics.num_auto_default_network_io as f64);
847        metrics::gauge!("s3.client.num_auto_ranged_get_network_io").set(metrics.num_auto_ranged_get_network_io as f64);
848        metrics::gauge!("s3.client.num_auto_ranged_put_network_io").set(metrics.num_auto_ranged_put_network_io as f64);
849        metrics::gauge!("s3.client.num_auto_ranged_copy_network_io")
850            .set(metrics.num_auto_ranged_copy_network_io as f64);
851        metrics::gauge!("s3.client.num_total_network_io").set(metrics.num_total_network_io() as f64);
852        metrics::gauge!("s3.client.num_requests_stream_queued_waiting")
853            .set(metrics.num_requests_stream_queued_waiting as f64);
854        metrics::gauge!("s3.client.num_requests_streaming_response")
855            .set(metrics.num_requests_streaming_response as f64);
856
857        // Buffer pool metrics
858        let start = Instant::now();
859        if let Some(buffer_pool_stats) = s3_client.poll_default_buffer_pool_usage_stats() {
860            metrics::histogram!("s3.client.buffer_pool.get_usage_latency_us")
861                .record(start.elapsed().as_micros() as f64);
862            metrics::gauge!("s3.client.buffer_pool.mem_limit").set(buffer_pool_stats.mem_limit as f64);
863            metrics::gauge!("s3.client.buffer_pool.primary_cutoff").set(buffer_pool_stats.primary_cutoff as f64);
864            metrics::gauge!("s3.client.buffer_pool.primary_used").set(buffer_pool_stats.primary_used as f64);
865            metrics::gauge!("s3.client.buffer_pool.primary_allocated").set(buffer_pool_stats.primary_allocated as f64);
866            metrics::gauge!("s3.client.buffer_pool.primary_reserved").set(buffer_pool_stats.primary_reserved as f64);
867            metrics::gauge!("s3.client.buffer_pool.primary_num_blocks")
868                .set(buffer_pool_stats.primary_num_blocks as f64);
869            metrics::gauge!("s3.client.buffer_pool.secondary_reserved")
870                .set(buffer_pool_stats.secondary_reserved as f64);
871            metrics::gauge!("s3.client.buffer_pool.secondary_used").set(buffer_pool_stats.secondary_used as f64);
872            metrics::gauge!("s3.client.buffer_pool.forced_used").set(buffer_pool_stats.forced_used as f64);
873        }
874    }
875
876    fn next_request_counter(&self) -> u64 {
877        self.next_request_counter.fetch_add(1, Ordering::SeqCst)
878    }
879}
880
881/// Failure retrieving headers
882#[derive(Debug, Error)]
883enum ResponseHeadersError {
884    #[error("response headers are missing")]
885    MissingHeaders,
886}
887
888/// S3 operation supported by this client.
889#[derive(Debug, Clone, Copy)]
890enum S3Operation {
891    DeleteObject,
892    GetObject,
893    GetObjectAttributes,
894    HeadBucket,
895    HeadObject,
896    ListObjects,
897    PutObject,
898    CopyObject,
899    PutObjectSingle,
900}
901
902impl S3Operation {
903    /// The [MetaRequestType] to use for this operation.
904    fn meta_request_type(&self) -> MetaRequestType {
905        match self {
906            S3Operation::GetObject => MetaRequestType::GetObject,
907            S3Operation::PutObject => MetaRequestType::PutObject,
908            S3Operation::CopyObject => MetaRequestType::CopyObject,
909            _ => MetaRequestType::Default,
910        }
911    }
912
913    /// The operation name to set when configuring a request. Required for operations that
914    /// have MetaRequestType::Default (see [meta_request_type]). `None` otherwise.
915    fn operation_name(&self) -> Option<&'static str> {
916        match self {
917            S3Operation::DeleteObject => Some("DeleteObject"),
918            S3Operation::GetObject => None,
919            S3Operation::GetObjectAttributes => Some("GetObjectAttributes"),
920            S3Operation::HeadBucket => Some("HeadBucket"),
921            S3Operation::HeadObject => Some("HeadObject"),
922            S3Operation::ListObjects => Some("ListObjectsV2"),
923            S3Operation::PutObject => None,
924            S3Operation::CopyObject => None,
925            S3Operation::PutObjectSingle => Some("PutObject"),
926        }
927    }
928}
929
930/// A HTTP message to be sent to S3. This is a wrapper around a plain HTTP message, except that it
931/// helps us correctly configure the endpoint and "Host" header to handle both path-style and
932/// virtual-hosted-style addresses. The `path_prefix` is appended to the front of all paths, and
933/// need not be terminated with a `/`.
934#[derive(Debug)]
935struct S3Message<'a> {
936    inner: Message<'a>,
937    uri: Uri,
938    path_prefix: String,
939    checksum_config: Option<ChecksumConfig>,
940    signing_config: Option<SigningConfig>,
941}
942
943// This is RFC 3986 but with '/' also considered a safe character for path fragments.
944const URLENCODE_QUERY_FRAGMENT: &AsciiSet = &NON_ALPHANUMERIC.remove(b'-').remove(b'.').remove(b'_').remove(b'~');
945const URLENCODE_PATH_FRAGMENT: &AsciiSet = &URLENCODE_QUERY_FRAGMENT.remove(b'/');
946
947fn write_encoded_fragment(s: &mut OsString, piece: impl AsRef<OsStr>, encoding: &'static AsciiSet) {
948    let iter = percent_encode(piece.as_ref().as_bytes(), encoding);
949    s.extend(iter.map(|s| OsStr::from_bytes(s.as_bytes())));
950}
951
952#[derive(Debug, Default)]
953enum QueryFragment<'a, P: AsRef<OsStr>> {
954    #[default]
955    Empty,
956    Query(&'a [(P, P)]),
957    Action(P),
958}
959
960impl<P: AsRef<OsStr>> QueryFragment<'_, P> {
961    // This estimate is exact if no characters need encoding, otherwise we'll end up
962    // reallocating a couple of times. The '?' for the query is counted in the first key-value
963    // pair.
964    fn size(&self) -> usize {
965        match self {
966            QueryFragment::Empty => 0,
967            QueryFragment::Query(query) => query
968                .iter()
969                .map(|(key, value)| key.as_ref().len() + value.as_ref().len() + 2)
970                .sum::<usize>(),
971            QueryFragment::Action(action) => 1 + action.as_ref().len(),
972        }
973    }
974
975    // Write the fragment to the query string
976    fn write(&self, path: &mut OsString) {
977        match &self {
978            QueryFragment::Query(query) if !query.is_empty() => {
979                path.push("?");
980                for (i, (key, value)) in query.iter().enumerate() {
981                    if i != 0 {
982                        path.push("&");
983                    }
984                    write_encoded_fragment(path, key, URLENCODE_QUERY_FRAGMENT);
985                    path.push("=");
986                    write_encoded_fragment(path, value, URLENCODE_QUERY_FRAGMENT);
987                }
988            }
989            QueryFragment::Action(action) => {
990                path.push("?");
991                path.push(action.as_ref());
992            }
993            _ => {}
994        }
995    }
996}
997
998impl<'a> S3Message<'a> {
999    /// Add a header to this message. The header is added if necessary and any existing values for
1000    /// this header are removed.
1001    fn set_header(
1002        &mut self,
1003        header: &Header<impl AsRef<OsStr>, impl AsRef<OsStr>>,
1004    ) -> Result<(), mountpoint_s3_crt::common::error::Error> {
1005        self.inner.set_header(header)
1006    }
1007
1008    /// Set the request path and query for this message. The components should not be URL-encoded;
1009    /// this method will handle that.
1010    fn set_request_path_and_query<P: AsRef<OsStr>>(
1011        &mut self,
1012        path: impl AsRef<OsStr>,
1013        query_or_action: QueryFragment<P>,
1014    ) -> Result<(), mountpoint_s3_crt::common::error::Error> {
1015        let space_needed = self.path_prefix.len() + query_or_action.size();
1016
1017        let mut full_path = OsString::with_capacity(space_needed);
1018
1019        write_encoded_fragment(&mut full_path, &self.path_prefix, URLENCODE_PATH_FRAGMENT);
1020        write_encoded_fragment(&mut full_path, &path, URLENCODE_PATH_FRAGMENT);
1021
1022        // Build the query string
1023        query_or_action.write(&mut full_path);
1024        self.inner.set_request_path(full_path)
1025    }
1026
1027    /// Set the request path for this message. The path should not be URL-encoded; this method will
1028    /// handle that.
1029    fn set_request_path(&mut self, path: impl AsRef<OsStr>) -> Result<(), mountpoint_s3_crt::common::error::Error> {
1030        self.set_request_path_and_query::<&str>(path, Default::default())
1031    }
1032
1033    /// Sets the checksum configuration for this message.
1034    fn set_checksum_config(&mut self, checksum_config: Option<ChecksumConfig>) {
1035        self.checksum_config = checksum_config;
1036    }
1037
1038    /// Sets the body input stream for this message, and returns any previously set input stream.
1039    /// If input_stream is None, unsets the body.
1040    fn set_body_stream(&mut self, input_stream: Option<InputStream<'a>>) -> Option<InputStream<'a>> {
1041        self.inner.set_body_stream(input_stream)
1042    }
1043
1044    /// Set the content length header.
1045    fn set_content_length_header(
1046        &mut self,
1047        content_length: usize,
1048    ) -> Result<(), mountpoint_s3_crt::common::error::Error> {
1049        self.inner
1050            .set_header(&Header::new("Content-Length", content_length.to_string()))
1051    }
1052
1053    /// Set the checksum header.
1054    fn set_checksum_header(
1055        &mut self,
1056        checksum: &UploadChecksum,
1057    ) -> Result<(), mountpoint_s3_crt::common::error::Error> {
1058        let header = match checksum {
1059            UploadChecksum::Crc64nvme(crc64) => Header::new("x-amz-checksum-crc64nvme", crc64nvme_to_base64(crc64)),
1060            UploadChecksum::Crc32c(crc32c) => Header::new("x-amz-checksum-crc32c", crc32c_to_base64(crc32c)),
1061            UploadChecksum::Crc32(crc32) => Header::new("x-amz-checksum-crc32", crc32_to_base64(crc32)),
1062            UploadChecksum::Sha1(sha1) => Header::new("x-amz-checksum-sha1", sha1_to_base64(sha1)),
1063            UploadChecksum::Sha256(sha256) => Header::new("x-amz-checksum-sha256", sha256_to_base64(sha256)),
1064        };
1065        self.inner.set_header(&header)
1066    }
1067
1068    fn into_options(self, operation: S3Operation) -> MetaRequestOptions<'a> {
1069        let mut options = MetaRequestOptions::new();
1070        if let Some(checksum_config) = self.checksum_config {
1071            options.checksum_config(checksum_config);
1072        }
1073        if let Some(signing_config) = self.signing_config {
1074            options.signing_config(signing_config);
1075        }
1076        options
1077            .message(self.inner)
1078            .endpoint(self.uri)
1079            .request_type(operation.meta_request_type());
1080        if let Some(operation_name) = operation.operation_name() {
1081            options.operation_name(operation_name);
1082        }
1083        options
1084    }
1085}
1086
1087#[derive(Debug)]
1088#[pin_project]
1089#[must_use]
1090struct S3MetaRequest<T, E> {
1091    /// Receiver for the result of meta-request.
1092    #[pin]
1093    receiver: Fuse<oneshot::Receiver<ObjectClientResult<T, E, S3RequestError>>>,
1094    meta_request: CancellingMetaRequest,
1095}
1096
1097impl<T: Send, E: Send> Future for S3MetaRequest<T, E> {
1098    type Output = ObjectClientResult<T, E, S3RequestError>;
1099
1100    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1101        let this = self.project();
1102        this.receiver
1103            .poll(cx)
1104            .map(|result| result.unwrap_or_else(|err| Err(S3RequestError::internal_failure(err).into())))
1105    }
1106}
1107
1108impl<T: Send, E: Send> FusedFuture for S3MetaRequest<T, E> {
1109    fn is_terminated(&self) -> bool {
1110        self.receiver.is_terminated()
1111    }
1112}
1113
1114/// Wrapper for a [MetaRequest] that cancels it on drop.
1115///
1116/// Note that if the request has already completed, cancelling it has no effect.
1117#[derive(Debug)]
1118struct CancellingMetaRequest {
1119    inner: MetaRequest,
1120}
1121
1122impl CancellingMetaRequest {
1123    fn wrap(meta_request: MetaRequest) -> Self {
1124        Self { inner: meta_request }
1125    }
1126}
1127
1128impl Drop for CancellingMetaRequest {
1129    fn drop(&mut self) {
1130        self.inner.cancel();
1131    }
1132}
1133
1134impl Deref for CancellingMetaRequest {
1135    type Target = MetaRequest;
1136
1137    fn deref(&self) -> &Self::Target {
1138        &self.inner
1139    }
1140}
1141
1142impl DerefMut for CancellingMetaRequest {
1143    fn deref_mut(&mut self) -> &mut Self::Target {
1144        &mut self.inner
1145    }
1146}
1147
1148/// Failures to construct a new S3 client
1149#[derive(Error, Debug)]
1150#[non_exhaustive]
1151pub enum NewClientError {
1152    /// Invalid S3 endpoint
1153    #[error("invalid S3 endpoint")]
1154    InvalidEndpoint(#[from] EndpointError),
1155    /// Invalid AWS credentials
1156    #[error("invalid AWS credentials")]
1157    ProviderFailure(#[source] mountpoint_s3_crt::common::error::Error),
1158    /// Invalid configuration
1159    #[error("invalid configuration: {0}")]
1160    InvalidConfiguration(String),
1161    /// An internal error from within the AWS Common Runtime
1162    #[error("Unknown CRT error")]
1163    CrtError(#[source] mountpoint_s3_crt::common::error::Error),
1164}
1165
1166/// Errors returned by the CRT-based S3 client
1167#[derive(Error, Debug)]
1168pub enum S3RequestError {
1169    /// An internal error from within the S3 client. The request may have been sent.
1170    #[error("Internal S3 client error")]
1171    InternalError(#[source] Box<dyn std::error::Error + Send + Sync>),
1172
1173    /// An internal error from within the AWS Common Runtime. The request may have been sent.
1174    #[error("Unknown CRT error")]
1175    CrtError(#[from] mountpoint_s3_crt::common::error::Error),
1176
1177    /// An error during construction of a request. The request was not sent.
1178    #[error("Failed to construct request")]
1179    ConstructionFailure(#[from] ConstructionError),
1180
1181    /// The request was sent but an unknown or unhandled failure occurred while processing it.
1182    #[error("Unknown response error: {0:?}")]
1183    ResponseError(MetaRequestResult),
1184
1185    /// The request was made to the wrong region
1186    #[error("Wrong region (expecting {0})")]
1187    IncorrectRegion(String, ClientErrorMetadata),
1188
1189    /// Forbidden
1190    #[error("Forbidden: {0}")]
1191    Forbidden(String, ClientErrorMetadata),
1192
1193    /// The request was attempted but could not be signed due to no available credentials
1194    #[error("No signing credentials available, see CRT debug logs")]
1195    NoSigningCredentials,
1196
1197    /// The request was canceled
1198    #[error("Request canceled")]
1199    RequestCanceled,
1200
1201    /// The request was throttled by S3
1202    #[error("Request throttled")]
1203    Throttled,
1204
1205    /// Cannot fetch more data because current read window is exhausted. The read window must
1206    /// be advanced using [GetObjectRequest::increment_read_window(u64)] to continue fetching
1207    /// new data.
1208    #[error("Polled for data with empty read window")]
1209    EmptyReadWindow,
1210}
1211
1212impl S3RequestError {
1213    fn construction_failure(inner: impl Into<ConstructionError>) -> Self {
1214        S3RequestError::ConstructionFailure(inner.into())
1215    }
1216
1217    fn internal_failure(inner: impl std::error::Error + Send + Sync + 'static) -> Self {
1218        S3RequestError::InternalError(Box::new(inner))
1219    }
1220}
1221
1222impl ProvideErrorMetadata for S3RequestError {
1223    fn meta(&self) -> ClientErrorMetadata {
1224        match self {
1225            Self::ResponseError(request_result) => ClientErrorMetadata::from_meta_request_result(request_result),
1226            Self::Forbidden(_, metadata) => metadata.clone(),
1227            Self::Throttled => ClientErrorMetadata {
1228                http_code: Some(503),
1229                error_code: Some("SlowDown".to_string()),
1230                error_message: Some("Please reduce your request rate.".to_string()),
1231            },
1232            Self::IncorrectRegion(_, metadata) => metadata.clone(),
1233            _ => Default::default(),
1234        }
1235    }
1236}
1237
1238#[derive(Error, Debug)]
1239pub enum ConstructionError {
1240    /// CRT error while constructing the request
1241    #[error("Unknown CRT error")]
1242    CrtError(#[from] mountpoint_s3_crt::common::error::Error),
1243
1244    /// The S3 endpoint was invalid
1245    #[error("Invalid S3 endpoint")]
1246    InvalidEndpoint(#[from] EndpointError),
1247}
1248
1249/// Return a `&'static str` for a given non-static `&str`, as required by [metrics] crate.
1250fn operation_name_to_static_metrics_string(operation_name: Option<&str>) -> &'static str {
1251    const UNKNOWN_METRIC_STR: &str = "Unknown";
1252
1253    let Some(operation_name) = operation_name else {
1254        return UNKNOWN_METRIC_STR;
1255    };
1256
1257    // Take an input expression, and then a list of strings to map into their `&'static str` equivalent.
1258    // Use macro to avoid typos in mapping.
1259    macro_rules! map_to_static_str_for_known_str {
1260        ($input:expr, $($str_literal:literal),* $(,)?) => {
1261            match $input {
1262                $(
1263                    $str_literal => Some($str_literal),
1264                )*
1265                _ => None
1266            }
1267        };
1268    }
1269
1270    let static_str = map_to_static_str_for_known_str!(
1271        operation_name,
1272        "GetObject",
1273        "HeadObject",
1274        "ListParts",
1275        "CreateMultipartUpload",
1276        "UploadPart",
1277        "AbortMultipartUpload",
1278        "CompleteMultipartUpload",
1279        "UploadPartCopy",
1280        "CopyObject",
1281        "PutObject",
1282        "ListObjectsV2",
1283        "DeleteObject",
1284        "GetObjectAttributes",
1285        "HeadBucket",
1286        "RenameObject",
1287    );
1288
1289    debug_assert!(
1290        static_str.is_some(),
1291        "input set as {operation_name:?} but no matcher, update required",
1292    );
1293    static_str.unwrap_or(UNKNOWN_METRIC_STR)
1294}
1295
1296/// Extract the byte range from the Content-Range header if present and valid
1297fn extract_range_header(headers: &Headers) -> Option<Range<u64>> {
1298    let header = headers.get("Content-Range").ok()?;
1299    let value = header.value().to_str()?;
1300
1301    // Content-Range: <unit> <range-start>-<range-end>/<size>
1302
1303    if !value.starts_with("bytes ") {
1304        return None;
1305    }
1306    let (_, value) = value.split_at("bytes ".len());
1307    let (range, _) = value.split_once('/')?;
1308    let (start, end) = range.split_once('-')?;
1309    let start = start.parse::<u64>().ok()?;
1310    let end = end.parse::<u64>().ok()?;
1311
1312    // Rust ranges are exclusive at the end, but Content-Range is inclusive
1313    Some(start..end + 1)
1314}
1315
1316/// Extract the [Checksum] information from headers
1317fn parse_checksum(headers: &Headers) -> Result<Checksum, HeadersError> {
1318    let checksum_crc32 = headers.get_as_optional_string("x-amz-checksum-crc32")?;
1319    let checksum_crc32c = headers.get_as_optional_string("x-amz-checksum-crc32c")?;
1320    let checksum_sha1 = headers.get_as_optional_string("x-amz-checksum-sha1")?;
1321    let checksum_sha256 = headers.get_as_optional_string("x-amz-checksum-sha256")?;
1322    let checksum_crc64nvme = headers.get_as_optional_string("x-amz-checksum-crc64nvme")?;
1323
1324    Ok(Checksum {
1325        checksum_crc64nvme,
1326        checksum_crc32,
1327        checksum_crc32c,
1328        checksum_sha1,
1329        checksum_sha256,
1330    })
1331}
1332
1333/// Try to parse a modeled error out of a failing meta request
1334fn try_parse_generic_error(request_result: &MetaRequestResult) -> Option<S3RequestError> {
1335    /// Look for a redirect header pointing to a different region for the bucket
1336    fn try_parse_redirect(request_result: &MetaRequestResult) -> Option<S3RequestError> {
1337        let headers = request_result.error_response_headers.as_ref()?;
1338        let region_header = headers.get("x-amz-bucket-region").ok()?;
1339        let region = region_header.value().to_owned().into_string().ok()?;
1340        Some(S3RequestError::IncorrectRegion(
1341            region,
1342            ClientErrorMetadata::from_meta_request_result(request_result),
1343        ))
1344    }
1345
1346    /// Look for access-related errors
1347    fn try_parse_forbidden(request_result: &MetaRequestResult) -> Option<S3RequestError> {
1348        let Some(body) = request_result.error_response_body.as_ref() else {
1349            // Header-only requests like HeadObject and HeadBucket can't give us a more detailed
1350            // error, so just trust the response code
1351            return Some(S3RequestError::Forbidden(
1352                "<no message>".to_owned(),
1353                ClientErrorMetadata {
1354                    http_code: Some(request_result.response_status),
1355                    ..Default::default()
1356                },
1357            ));
1358        };
1359        let error_elem = xmltree::Element::parse(body.as_bytes()).ok()?;
1360        let error_code = error_elem.get_child("Code")?;
1361        let error_code_str = error_code.get_text()?;
1362        // Always translate 403 to Forbidden, but otherwise first check the error code, since other
1363        // response statuses are overloaded and not always access-related errors.
1364        if request_result.response_status == 403
1365            || matches!(
1366                error_code_str.deref(),
1367                "AccessDenied" | "InvalidToken" | "ExpiredToken" | "SignatureDoesNotMatch"
1368            )
1369        {
1370            let message = error_elem
1371                .get_child("Message")
1372                .and_then(|e| e.get_text())
1373                .unwrap_or(error_code_str.clone());
1374            Some(S3RequestError::Forbidden(
1375                message.to_string(),
1376                ClientErrorMetadata {
1377                    http_code: Some(request_result.response_status),
1378                    error_code: Some(error_code_str.to_string()),
1379                    error_message: Some(message.into_owned()),
1380                },
1381            ))
1382        } else {
1383            None
1384        }
1385    }
1386
1387    /// Try to look for error related to no signing credentials, returns generic error otherwise
1388    fn try_parse_no_credentials_or_generic(request_result: &MetaRequestResult) -> S3RequestError {
1389        let crt_error_code = request_result.crt_error.raw_error();
1390        if crt_error_code == mountpoint_s3_crt::auth::ErrorCode::AWS_AUTH_SIGNING_NO_CREDENTIALS as i32 {
1391            S3RequestError::NoSigningCredentials
1392        } else {
1393            S3RequestError::CrtError(crt_error_code.into())
1394        }
1395    }
1396
1397    fn try_parse_throttled(request_result: &MetaRequestResult) -> Option<S3RequestError> {
1398        let crt_error_code = request_result.crt_error.raw_error();
1399        if crt_error_code == mountpoint_s3_crt::s3::ErrorCode::AWS_ERROR_S3_SLOW_DOWN as i32 {
1400            Some(S3RequestError::Throttled)
1401        } else {
1402            None
1403        }
1404    }
1405
1406    /// Handle canceled requests
1407    fn try_parse_canceled_request(request_result: &MetaRequestResult) -> Option<S3RequestError> {
1408        request_result.is_canceled().then_some(S3RequestError::RequestCanceled)
1409    }
1410
1411    match request_result.response_status {
1412        301 => try_parse_redirect(request_result),
1413        // 400 is overloaded, it can be an access error (invalid token) or (for MRAP) a bucket
1414        // redirect
1415        400 => try_parse_forbidden(request_result).or_else(|| try_parse_redirect(request_result)),
1416        403 => try_parse_forbidden(request_result),
1417        // if the http response status is not set, we look into crt_error_code to identify the error
1418        0 => try_parse_throttled(request_result)
1419            .or_else(|| try_parse_canceled_request(request_result))
1420            .or_else(|| Some(try_parse_no_credentials_or_generic(request_result))),
1421        _ => None,
1422    }
1423}
1424
1425/// Record a throughput metric for GET/PUT. We can't inline this into S3CrtClient callbacks because
1426/// PUT bytes don't transit those callbacks.
1427fn emit_throughput_metric(bytes: u64, duration: Duration, op: &'static str) {
1428    let throughput_mbps = bytes as f64 / 1024.0 / 1024.0 / duration.as_secs_f64();
1429    // Semi-arbitrary choices here to avoid averaging out large and small requests
1430    const MEGABYTE: u64 = 1024 * 1024;
1431    let bucket = if bytes < MEGABYTE {
1432        "<1MiB"
1433    } else if bytes <= 16 * MEGABYTE {
1434        "1-16MiB"
1435    } else {
1436        ">16MiB"
1437    };
1438    metrics::histogram!("s3.meta_requests.throughput_mibs", "op" => op, "size" => bucket).record(throughput_mbps);
1439}
1440
1441#[cfg_attr(not(docsrs), async_trait)]
1442impl ObjectClient for S3CrtClient {
1443    type GetObjectResponse = S3GetObjectResponse;
1444    type PutObjectRequest = S3PutObjectRequest;
1445    type ClientError = S3RequestError;
1446
1447    fn read_part_size(&self) -> usize {
1448        self.inner.read_part_size
1449    }
1450
1451    fn write_part_size(&self) -> usize {
1452        // TODO: the CRT does some clamping to a max size rather than just swallowing the part size
1453        // we configured it with, so this might be wrong. Right now the only clamping is to the max
1454        // S3 part size (5GiB), so this shouldn't affect the result.
1455        // https://github.com/awslabs/aws-c-s3/blob/94e3342c12833c5199/source/s3_client.c#L337-L344
1456        self.inner.write_part_size
1457    }
1458
1459    fn initial_read_window_size(&self) -> Option<usize> {
1460        if self.inner.enable_backpressure {
1461            Some(self.inner.initial_read_window_size)
1462        } else {
1463            None
1464        }
1465    }
1466
1467    fn mem_usage_stats(&self) -> Option<BufferPoolUsageStats> {
1468        let start = Instant::now();
1469        self.inner
1470            .s3_client
1471            .poll_default_buffer_pool_usage_stats()
1472            .inspect(|_| {
1473                metrics::histogram!("s3.client.buffer_pool.get_usage_latency_us")
1474                    .record(start.elapsed().as_micros() as f64);
1475            })
1476    }
1477
1478    async fn delete_object(
1479        &self,
1480        bucket: &str,
1481        key: &str,
1482    ) -> ObjectClientResult<DeleteObjectResult, DeleteObjectError, Self::ClientError> {
1483        self.delete_object(bucket, key).await
1484    }
1485
1486    async fn copy_object(
1487        &self,
1488        source_bucket: &str,
1489        source_key: &str,
1490        destination_bucket: &str,
1491        destination_key: &str,
1492        params: &CopyObjectParams,
1493    ) -> ObjectClientResult<CopyObjectResult, CopyObjectError, S3RequestError> {
1494        self.copy_object(source_bucket, source_key, destination_bucket, destination_key, params)
1495            .await
1496    }
1497
1498    async fn get_object(
1499        &self,
1500        bucket: &str,
1501        key: &str,
1502        params: &GetObjectParams,
1503    ) -> ObjectClientResult<Self::GetObjectResponse, GetObjectError, Self::ClientError> {
1504        self.get_object(bucket, key, params).await
1505    }
1506
1507    async fn list_objects(
1508        &self,
1509        bucket: &str,
1510        continuation_token: Option<&str>,
1511        delimiter: &str,
1512        max_keys: usize,
1513        prefix: &str,
1514    ) -> ObjectClientResult<ListObjectsResult, ListObjectsError, Self::ClientError> {
1515        self.list_objects(bucket, continuation_token, delimiter, max_keys, prefix)
1516            .await
1517    }
1518
1519    async fn head_object(
1520        &self,
1521        bucket: &str,
1522        key: &str,
1523        params: &HeadObjectParams,
1524    ) -> ObjectClientResult<HeadObjectResult, HeadObjectError, Self::ClientError> {
1525        self.head_object(bucket, key, params).await
1526    }
1527
1528    async fn put_object(
1529        &self,
1530        bucket: &str,
1531        key: &str,
1532        params: &PutObjectParams,
1533    ) -> ObjectClientResult<Self::PutObjectRequest, PutObjectError, Self::ClientError> {
1534        self.put_object(bucket, key, params).await
1535    }
1536
1537    async fn put_object_single<'a>(
1538        &self,
1539        bucket: &str,
1540        key: &str,
1541        params: &PutObjectSingleParams,
1542        contents: impl AsRef<[u8]> + Send + 'a,
1543    ) -> ObjectClientResult<PutObjectResult, PutObjectError, Self::ClientError> {
1544        self.put_object_single(bucket, key, params, contents).await
1545    }
1546
1547    async fn get_object_attributes(
1548        &self,
1549        bucket: &str,
1550        key: &str,
1551        max_parts: Option<usize>,
1552        part_number_marker: Option<usize>,
1553        object_attributes: &[ObjectAttribute],
1554    ) -> ObjectClientResult<GetObjectAttributesResult, GetObjectAttributesError, Self::ClientError> {
1555        self.get_object_attributes(bucket, key, max_parts, part_number_marker, object_attributes)
1556            .await
1557    }
1558
1559    async fn rename_object(
1560        &self,
1561        bucket: &str,
1562        src_key: &str,
1563        dst_key: &str,
1564        params: &RenameObjectParams,
1565    ) -> ObjectClientResult<RenameObjectResult, RenameObjectError, Self::ClientError> {
1566        self.rename_object(bucket, src_key, dst_key, params).await
1567    }
1568}
1569
1570/// Custom handling of telemetry events
1571pub trait OnTelemetry: std::fmt::Debug + Send + Sync {
1572    fn on_telemetry(&self, request_metrics: &RequestMetrics);
1573}
1574
1575#[cfg(test)]
1576mod tests {
1577    use mountpoint_s3_crt::common::error::Error;
1578    use rusty_fork::rusty_fork_test;
1579    use std::assert_eq;
1580
1581    use super::*;
1582    use test_case::test_case;
1583
1584    /// Test explicit validation in [Client::new]
1585    fn client_new_fails_with_invalid_part_size(part_size: usize) {
1586        let config = S3ClientConfig::default().part_size(part_size);
1587        let e = S3CrtClient::new(config).expect_err("creating a new client should fail");
1588        let message = if cfg!(target_pointer_width = "64") {
1589            "invalid configuration: part size must be at between 5MiB and 5GiB".to_string()
1590        } else {
1591            format!(
1592                "invalid configuration: part size must be at between 5MiB and {}GiB",
1593                usize::MAX / 1024 / 1024 / 1024
1594            )
1595        };
1596        assert_eq!(e.to_string(), message);
1597    }
1598
1599    /// On 32-bit platform we limit part size with usize:MAX,
1600    /// it is impossible to provide a greater value
1601    #[cfg(target_pointer_width = "64")]
1602    #[test]
1603    fn client_new_fails_with_greater_part_size() {
1604        let part_size = 6 * 1024 * 1024 * 1024; // greater than 5GiB
1605        client_new_fails_with_invalid_part_size(part_size);
1606    }
1607
1608    #[test]
1609    fn client_new_fails_with_smaller_part_size() {
1610        let part_size = 4 * 1024 * 1024; // less than 5MiB
1611        client_new_fails_with_invalid_part_size(part_size);
1612    }
1613
1614    /// Test if the prefix is added correctly to the User-Agent header
1615    #[test]
1616    fn test_user_agent_with_prefix() {
1617        let user_agent_prefix = String::from("someprefix");
1618        let expected_user_agent = "someprefix mountpoint-s3-client/";
1619
1620        let config = S3ClientConfig {
1621            user_agent: Some(UserAgent::new(Some(user_agent_prefix))),
1622            ..Default::default()
1623        };
1624
1625        let client = S3CrtClient::new(config).expect("Create test client");
1626
1627        let mut message = client
1628            .inner
1629            .new_request_template("GET", "amzn-s3-demo-bucket")
1630            .expect("new request template expected");
1631
1632        let headers = message.inner.get_headers().expect("Expected a block of HTTP headers");
1633
1634        let user_agent_header = headers
1635            .get("User-Agent")
1636            .expect("User Agent Header expected with given prefix");
1637        let user_agent_header_value = user_agent_header.value();
1638
1639        assert!(
1640            user_agent_header_value
1641                .to_string_lossy()
1642                .starts_with(expected_user_agent)
1643        );
1644    }
1645
1646    fn assert_expected_host(expected_host: &str, endpoint_config: EndpointConfig) {
1647        let config = S3ClientConfig {
1648            endpoint_config,
1649            ..Default::default()
1650        };
1651
1652        let client = S3CrtClient::new(config).expect("create test client");
1653
1654        let mut message = client
1655            .inner
1656            .new_request_template("GET", "")
1657            .expect("new request template expected");
1658
1659        let headers = message.inner.get_headers().expect("expected a block of HTTP headers");
1660
1661        let host_header = headers.get("Host").expect("Host header expected");
1662        let host_header_value = host_header.value();
1663
1664        assert_eq!(host_header_value.to_string_lossy(), expected_host);
1665    }
1666
1667    // run with rusty_fork to avoid issues with other tests and their env variables.
1668    rusty_fork_test! {
1669        #[test]
1670        fn test_endpoint_favors_parameter_over_env_variable() {
1671            let endpoint_uri = Uri::new_from_str(&Allocator::default(), "https://s3.us-west-2.amazonaws.com").unwrap();
1672            let endpoint_config = EndpointConfig::new("region-place-holder").endpoint(endpoint_uri);
1673
1674            // SAFETY: This test is run in a forked process, so won't affect any other concurrently running tests.
1675            unsafe { std::env::set_var("AWS_ENDPOINT_URL", "https://s3.us-east-1.amazonaws.com"); }
1676
1677            // even though we set the environment variable, the parameter takes precedence
1678            assert_expected_host("s3.us-west-2.amazonaws.com", endpoint_config);
1679        }
1680
1681        #[test]
1682        fn test_endpoint_favors_env_variable() {
1683            let endpoint_config = EndpointConfig::new("us-east-1");
1684
1685            // SAFETY: This test is run in a forked process, so won't affect any other concurrently running tests.
1686            unsafe { std::env::set_var("AWS_ENDPOINT_URL", "https://s3.eu-west-1.amazonaws.com"); }
1687
1688            assert_expected_host("s3.eu-west-1.amazonaws.com", endpoint_config);
1689        }
1690
1691        #[test]
1692        fn test_endpoint_with_invalid_env_variable() {
1693            let endpoint_config = EndpointConfig::new("us-east-1");
1694
1695            // SAFETY: This test is run in a forked process, so won't affect any other concurrently running tests.
1696            unsafe { std::env::set_var("AWS_ENDPOINT_URL", "htp:/bad:url"); }
1697
1698            let config = S3ClientConfig {
1699                endpoint_config,
1700                ..Default::default()
1701            };
1702
1703            let client = S3CrtClient::new(config);
1704            match client {
1705                Ok(_) => panic!("expected an error"),
1706                Err(e) => assert_eq!(e.to_string().to_lowercase(), "invalid s3 endpoint"),
1707            }
1708        }
1709
1710    }
1711
1712    /// Simple test to ensure the user agent header is correct even when prefix is not added
1713    #[test]
1714    fn test_user_agent_without_prefix() {
1715        let expected_user_agent = "mountpoint-s3-client/";
1716
1717        let config: S3ClientConfig = Default::default();
1718
1719        let client = S3CrtClient::new(config).expect("Create test client");
1720
1721        let mut message = client
1722            .inner
1723            .new_request_template("GET", "amzn-s3-demo-bucket")
1724            .expect("new request template expected");
1725
1726        let headers = message.inner.get_headers().expect("Expected a block of HTTP headers");
1727
1728        let user_agent_header = headers
1729            .get("User-Agent")
1730            .expect("User Agent Header expected with given prefix");
1731        let user_agent_header_value = user_agent_header.value();
1732
1733        assert!(
1734            user_agent_header_value
1735                .to_string_lossy()
1736                .starts_with(expected_user_agent)
1737        );
1738    }
1739
1740    #[test_case("bytes 200-1000/67589" => Some(200..1001))]
1741    #[test_case("bytes 200-1000/*" => Some(200..1001))]
1742    #[test_case("bytes 200-1000" => None)]
1743    #[test_case("bytes */67589" => None)]
1744    #[test_case("octets 200-1000]" => None)]
1745    fn parse_content_range(range: &str) -> Option<Range<u64>> {
1746        let mut headers = Headers::new(&Allocator::default()).unwrap();
1747        let header = Header::new("Content-Range", range);
1748        headers.add_header(&header).unwrap();
1749        extract_range_header(&headers)
1750    }
1751
1752    /// Simple test to ensure the expected bucket owner can be set
1753    #[test]
1754    fn test_expected_bucket_owner() {
1755        let expected_bucket_owner = "111122223333";
1756
1757        let config: S3ClientConfig = S3ClientConfig::new().bucket_owner("111122223333");
1758
1759        let client = S3CrtClient::new(config).expect("Create test client");
1760
1761        let mut message = client
1762            .inner
1763            .new_request_template("GET", "amzn-s3-demo-bucket")
1764            .expect("new request template expected");
1765
1766        let headers = message.inner.get_headers().expect("Expected a block of HTTP headers");
1767
1768        let expected_bucket_owner_header = headers
1769            .get("x-amz-expected-bucket-owner")
1770            .expect("the headers should contain x-amz-expected-bucket-owner");
1771        let expected_bucket_owner_value = expected_bucket_owner_header.value();
1772
1773        assert!(
1774            expected_bucket_owner_value
1775                .to_string_lossy()
1776                .starts_with(expected_bucket_owner)
1777        );
1778    }
1779
1780    fn make_result(
1781        response_status: i32,
1782        body: impl Into<OsString>,
1783        bucket_region_header: Option<&str>,
1784    ) -> MetaRequestResult {
1785        let error_response_headers = bucket_region_header.map(|h| {
1786            let mut headers = Headers::new(&Allocator::default()).unwrap();
1787            headers.add_header(&Header::new("x-amz-bucket-region", h)).unwrap();
1788            headers
1789        });
1790        MetaRequestResult {
1791            response_status,
1792            crt_error: 1i32.into(),
1793            error_response_headers,
1794            error_response_body: Some(body.into()),
1795        }
1796    }
1797
1798    #[test]
1799    fn parse_301_redirect() {
1800        let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>PermanentRedirect</Code><Message>The bucket you are attempting to access must be addressed using the specified endpoint. Please send all future requests to this endpoint.</Message><Endpoint>amzn-s3-demo-bucket.s3-us-west-2.amazonaws.com</Endpoint><Bucket>amzn-s3-demo-bucket</Bucket><RequestId>CM0Z9YFABRVSWXDJ</RequestId><HostId>HHmbUixasrJ02DlkOSCvJId897Jm0ERHuE2XMkSn2Oax1J/ad2+AU9nFrODN1ay13cWFgIAYBnI=</HostId></Error>"#;
1801        let result = make_result(301, OsStr::from_bytes(&body[..]), Some("us-west-2"));
1802        let result = try_parse_generic_error(&result);
1803        let Some(S3RequestError::IncorrectRegion(region, _)) = result else {
1804            panic!("wrong result, got: {result:?}");
1805        };
1806        assert_eq!(region, "us-west-2");
1807    }
1808
1809    #[test]
1810    fn parse_403_access_denied() {
1811        let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>AccessDenied</Code><Message>Access Denied</Message><RequestId>CM0R497NB0WAQ977</RequestId><HostId>w1TqUKGaIuNAIgzqm/L2azuzgEBINxTngWPbV1iH2IvpLsVCCTKHJTh4HsGp4JnggHqVkA+KN1MGqHDw1+WEuA==</HostId></Error>"#;
1812        let result = make_result(403, OsStr::from_bytes(&body[..]), None);
1813        let result = try_parse_generic_error(&result);
1814        let Some(S3RequestError::Forbidden(message, _)) = result else {
1815            panic!("wrong result, got: {result:?}");
1816        };
1817        assert_eq!(message, "Access Denied");
1818    }
1819
1820    #[test]
1821    fn parse_400_invalid_token() {
1822        let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>InvalidToken</Code><Message>The provided token is malformed or otherwise invalid.</Message><Token-0>THEREALTOKENGOESHERE</Token-0><RequestId>CBFNVADDAZ8661HK</RequestId><HostId>rb5dpgYeIFxi8p5BzVK8s8wG/nQ4a7C5kMBp/KWIT4bvOUihugpssMTy7xS0mispbz6IIaX8W1g=</HostId></Error>"#;
1823        let result = make_result(400, OsStr::from_bytes(&body[..]), None);
1824        let result = try_parse_generic_error(&result);
1825        let Some(S3RequestError::Forbidden(message, _)) = result else {
1826            panic!("wrong result, got: {result:?}");
1827        };
1828        assert_eq!(message, "The provided token is malformed or otherwise invalid.");
1829    }
1830
1831    #[test]
1832    fn parse_400_expired_token() {
1833        let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>ExpiredToken</Code><Message>The provided token has expired.</Message><Token-0>THEREALTOKENGOESHERE</Token-0><RequestId>RFXW0E15XSRPJYSW</RequestId><HostId>djitP7S+g43JSzR4pMOJpOO3RYpQUOUsmD4AqhRe3v24+JB/c+vwOEZgI8A35KDUe1cqQ5yKHwg=</HostId></Error>"#;
1834        let result = make_result(400, OsStr::from_bytes(&body[..]), None);
1835        let result = try_parse_generic_error(&result);
1836        let Some(S3RequestError::Forbidden(message, _)) = result else {
1837            panic!("wrong result, got: {result:?}");
1838        };
1839        assert_eq!(message, "The provided token has expired.");
1840    }
1841
1842    #[test]
1843    fn parse_400_redirect() {
1844        // From an s3-accelerate endpoint with the wrong region set for signing
1845        let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>AuthorizationHeaderMalformed</Code><Message>The authorization header is malformed; the region \'us-east-1\' is wrong; expecting \'us-west-2\'</Message><Region>us-west-2</Region><RequestId>VR3NH4JF5F39GB66</RequestId><HostId>ZDzYFC1w0E5K34+ZCAnvh9ZiGaAhvx5COyZVYTUnKvSP/694xCiXmJ2AEGZd5T1Epy9vB4EOOjk=</HostId></Error>"#;
1846        let result = make_result(400, OsStr::from_bytes(&body[..]), Some("us-west-2"));
1847        let result = try_parse_generic_error(&result);
1848        let Some(S3RequestError::IncorrectRegion(region, _)) = result else {
1849            panic!("wrong result, got: {result:?}");
1850        };
1851        assert_eq!(region, "us-west-2");
1852    }
1853
1854    #[test]
1855    fn parse_403_signature_does_not_match() {
1856        let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>SignatureDoesNotMatch</Code><Message>The request signature we calculated does not match the signature you provided. Check your key and signing method.</Message><AWSAccessKeyId>ASIASMEXAMPLE0000000</AWSAccessKeyId><StringToSign>EXAMPLE</StringToSign><SignatureProvided>EXAMPLE</SignatureProvided><StringToSignBytes>EXAMPLE</StringToSignBytes><CanonicalRequest>EXAMPLE</CanonicalRequest><CanonicalRequestBytes>EXAMPLE</CanonicalRequestBytes><RequestId>A1F516XX5M8AATSQ</RequestId><HostId>qs9dULIp5ABM7U+H8nGfzKtMYTxvqxIVvOYZ8lEFBDyTF4Fe+876Y4bLptG4mb+PTZFyG4yaUjg=</HostId></Error>"#;
1857        let result = make_result(403, OsStr::from_bytes(&body[..]), None);
1858        let result = try_parse_generic_error(&result);
1859        let Some(S3RequestError::Forbidden(message, _)) = result else {
1860            panic!("wrong result, got: {result:?}");
1861        };
1862        assert_eq!(
1863            message,
1864            "The request signature we calculated does not match the signature you provided. Check your key and signing method."
1865        );
1866    }
1867
1868    #[test]
1869    fn parse_403_made_up_error() {
1870        // A made up error to check that we map all 403s even if we don't recognize them
1871        let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>NotARealError</Code><Message>This error is made up.</Message><RequestId>CM0R497NB0WAQ977</RequestId><HostId>w1TqUKGaIuNAIgzqm/L2azuzgEBINxTngWPbV1iH2IvpLsVCCTKHJTh4HsGp4JnggHqVkA+KN1MGqHDw1+WEuA==</HostId></Error>"#;
1872        let result = make_result(403, OsStr::from_bytes(&body[..]), None);
1873        let result = try_parse_generic_error(&result);
1874        let Some(S3RequestError::Forbidden(message, _)) = result else {
1875            panic!("wrong result, got: {result:?}");
1876        };
1877        assert_eq!(message, "This error is made up.");
1878    }
1879
1880    fn make_crt_error_result(response_status: i32, crt_error: Error) -> MetaRequestResult {
1881        MetaRequestResult {
1882            response_status,
1883            crt_error,
1884            error_response_headers: None,
1885            error_response_body: None,
1886        }
1887    }
1888
1889    #[test]
1890    fn parse_no_signing_credential_error() {
1891        let error_code = mountpoint_s3_crt::auth::ErrorCode::AWS_AUTH_SIGNING_NO_CREDENTIALS as i32;
1892        let result = make_crt_error_result(0, error_code.into());
1893        let result = try_parse_generic_error(&result);
1894        let Some(S3RequestError::NoSigningCredentials) = result else {
1895            panic!("wrong result, got: {result:?}");
1896        };
1897    }
1898
1899    #[test]
1900    fn parse_test_other_crt_error() {
1901        // A signing error that isn't "no signing credentials"
1902        let error_code = mountpoint_s3_crt::auth::ErrorCode::AWS_AUTH_SIGNING_UNSUPPORTED_ALGORITHM as i32;
1903        let result = make_crt_error_result(0, error_code.into());
1904        let result = try_parse_generic_error(&result);
1905        let Some(S3RequestError::CrtError(error)) = result else {
1906            panic!("wrong result, got: {result:?}");
1907        };
1908        assert_eq!(error, error_code.into());
1909    }
1910
1911    #[test]
1912    fn test_checksum_sha256() {
1913        let mut headers = Headers::new(&Allocator::default()).unwrap();
1914        let value = "QwzjTQIHJO11oZbfwq1nx3dy0Wk=";
1915        let header = Header::new("x-amz-checksum-sha256", value.to_owned());
1916        headers.add_header(&header).unwrap();
1917
1918        let checksum = parse_checksum(&headers).expect("failed to parse headers");
1919        assert_eq!(checksum.checksum_crc32, None, "other checksums shouldn't be set");
1920        assert_eq!(checksum.checksum_crc32c, None, "other checksums shouldn't be set");
1921        assert_eq!(checksum.checksum_sha1, None, "other checksums shouldn't be set");
1922        assert_eq!(
1923            checksum.checksum_sha256,
1924            Some(value.to_owned()),
1925            "sha256 header should match"
1926        );
1927    }
1928
1929    #[test]
1930    fn test_operation_name_to_static_metrics_string() {
1931        assert_eq!(operation_name_to_static_metrics_string(Some("GetObject")), "GetObject");
1932        assert_eq!(
1933            operation_name_to_static_metrics_string(Some("RenameObject")),
1934            "RenameObject",
1935        );
1936        assert_eq!(operation_name_to_static_metrics_string(None), "Unknown");
1937    }
1938}