mountpoint_s3_client/
s3_crt_client.rs

1use std::cmp;
2use std::ffi::{OsStr, OsString};
3use std::future::Future;
4use std::num::NonZeroUsize;
5use std::ops::Range;
6use std::ops::{Deref, DerefMut};
7use std::os::unix::prelude::OsStrExt;
8use std::pin::Pin;
9use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
10use std::sync::{Arc, Mutex};
11use std::task::{Context, Poll};
12use std::time::{Duration, Instant};
13
14use futures::FutureExt;
15use futures::future::{Fuse, FusedFuture};
16pub use mountpoint_s3_crt::auth::credentials::{CredentialsProvider, CredentialsProviderStaticOptions};
17use mountpoint_s3_crt::auth::credentials::{CredentialsProviderChainDefaultOptions, CredentialsProviderProfileOptions};
18use mountpoint_s3_crt::auth::signing_config::SigningConfig;
19use mountpoint_s3_crt::common::allocator::Allocator;
20pub use mountpoint_s3_crt::common::error::Error as CrtError;
21use mountpoint_s3_crt::common::string::AwsString;
22use mountpoint_s3_crt::common::uri::Uri;
23use mountpoint_s3_crt::http::request_response::{Header, Headers, HeadersError, Message};
24use mountpoint_s3_crt::io::channel_bootstrap::{ClientBootstrap, ClientBootstrapOptions};
25pub use mountpoint_s3_crt::io::event_loop::EventLoopGroup;
26use mountpoint_s3_crt::io::host_resolver::{AddressKinds, HostResolver, HostResolverDefaultOptions};
27use mountpoint_s3_crt::io::retry_strategy::{ExponentialBackoffJitterMode, RetryStrategy, StandardRetryOptions};
28use mountpoint_s3_crt::io::stream::InputStream;
29use mountpoint_s3_crt::s3::buffer::Buffer;
30use mountpoint_s3_crt::s3::client::{
31    BufferPoolUsageStats, ChecksumConfig, Client, ClientConfig, MetaRequest, MetaRequestOptions, MetaRequestResult,
32    MetaRequestType, RequestMetrics, RequestType, init_signing_config,
33};
34
35use async_trait::async_trait;
36use futures::channel::oneshot;
37use mountpoint_s3_crt::s3::pool::{CrtBufferPoolFactory, MemoryPool, MemoryPoolFactory};
38use percent_encoding::{AsciiSet, NON_ALPHANUMERIC, percent_encode};
39use pin_project::pin_project;
40use thiserror::Error;
41use tracing::{Span, debug, error, trace};
42
43use crate::checksums::{crc32_to_base64, crc32c_to_base64, crc64nvme_to_base64, sha1_to_base64, sha256_to_base64};
44use crate::endpoint_config::EndpointError;
45use crate::endpoint_config::{self, EndpointConfig};
46use crate::error_metadata::{ClientErrorMetadata, ProvideErrorMetadata};
47use crate::metrics::{
48    ATTR_HTTP_STATUS, ATTR_S3_REQUEST, S3_REQUEST_CANCELED, S3_REQUEST_COUNT, S3_REQUEST_FAILURE,
49    S3_REQUEST_FIRST_BYTE_LATENCY, S3_REQUEST_TOTAL_LATENCY,
50};
51use crate::object_client::*;
52use crate::user_agent::UserAgent;
53
54macro_rules! request_span {
55    ($self:expr, $method:expr, $($field:tt)*) => {{
56        let counter = $self.next_request_counter();
57        // I have confused myself at least 4 times about how to choose the level for tracing spans.
58        // We want this span to be constructed whenever events at WARN or lower severity (INFO,
59        // DEBUG, TRACE) are emitted. So we set its severity to WARN too.
60        let span = tracing::warn_span!(target: "mountpoint_s3_client::s3_crt_client::request", $method, id = counter, $($field)*);
61        span.in_scope(|| tracing::debug!("new request"));
62        span
63    }};
64    ($self:expr, $method:expr) => { request_span!($self, $method,) };
65}
66
67pub(crate) mod copy_object;
68pub(crate) mod delete_object;
69pub(crate) mod get_object;
70
71pub(crate) use get_object::S3GetObjectResponse;
72pub(crate) mod get_object_attributes;
73
74pub(crate) mod head_object;
75pub(crate) mod list_objects;
76
77pub(crate) mod rename_object;
78
79pub(crate) mod head_bucket;
80pub(crate) mod put_object;
81pub use head_bucket::HeadBucketError;
82pub(crate) use put_object::S3PutObjectRequest;
83
84/// `tracing` doesn't allow dynamic levels but we want to dynamically choose the log level for
85/// requests based on their response status. https://github.com/tokio-rs/tracing/issues/372
86macro_rules! event {
87    ($level:expr, $($args:tt)*) => {
88        match $level {
89            ::tracing::Level::ERROR => ::tracing::event!(::tracing::Level::ERROR, $($args)*),
90            ::tracing::Level::WARN => ::tracing::event!(::tracing::Level::WARN, $($args)*),
91            ::tracing::Level::INFO => ::tracing::event!(::tracing::Level::INFO, $($args)*),
92            ::tracing::Level::DEBUG => ::tracing::event!(::tracing::Level::DEBUG, $($args)*),
93            ::tracing::Level::TRACE => ::tracing::event!(::tracing::Level::TRACE, $($args)*),
94        }
95    }
96}
97
98/// Configurations for the CRT-based S3 client
99#[derive(Debug, Clone)]
100pub struct S3ClientConfig {
101    auth_config: S3ClientAuthConfig,
102    throughput_target_gbps: f64,
103    memory_limit_in_bytes: u64,
104    read_part_size: usize,
105    write_part_size: usize,
106    endpoint_config: EndpointConfig,
107    user_agent: Option<UserAgent>,
108    request_payer: Option<String>,
109    bucket_owner: Option<String>,
110    max_attempts: Option<NonZeroUsize>,
111    read_backpressure: bool,
112    initial_read_window: usize,
113    network_interface_names: Vec<String>,
114    telemetry_callback: Option<Arc<dyn OnTelemetry>>,
115    event_loop_threads: Option<u16>,
116    buffer_pool_factory: Option<CrtBufferPoolFactory>,
117}
118
119impl Default for S3ClientConfig {
120    fn default() -> Self {
121        const DEFAULT_PART_SIZE: usize = 8 * 1024 * 1024;
122        Self {
123            auth_config: Default::default(),
124            throughput_target_gbps: 10.0,
125            memory_limit_in_bytes: 0,
126            read_part_size: DEFAULT_PART_SIZE,
127            write_part_size: DEFAULT_PART_SIZE,
128            endpoint_config: EndpointConfig::new("us-east-1"),
129            user_agent: None,
130            request_payer: None,
131            bucket_owner: None,
132            max_attempts: None,
133            read_backpressure: false,
134            initial_read_window: DEFAULT_PART_SIZE,
135            network_interface_names: vec![],
136            telemetry_callback: None,
137            event_loop_threads: None,
138            buffer_pool_factory: None,
139        }
140    }
141}
142
143impl S3ClientConfig {
144    pub fn new() -> Self {
145        Self::default()
146    }
147
148    /// Set the configuration for authenticating to S3
149    #[must_use = "S3ClientConfig follows a builder pattern"]
150    pub fn auth_config(mut self, auth_config: S3ClientAuthConfig) -> Self {
151        self.auth_config = auth_config;
152        self
153    }
154
155    /// Set the part size for multi-part operations to S3 (both PUT and GET)
156    #[must_use = "S3ClientConfig follows a builder pattern"]
157    pub fn part_size(mut self, part_size: usize) -> Self {
158        self.read_part_size = part_size;
159        self.write_part_size = part_size;
160        self
161    }
162
163    /// Set the part size for multi-part-get operations to S3.
164    #[must_use = "S3ClientConfig follows a builder pattern"]
165    pub fn read_part_size(mut self, size: usize) -> Self {
166        self.read_part_size = size;
167        self
168    }
169
170    /// Set the part size for multi-part-put operations to S3.
171    #[must_use = "S3ClientConfig follows a builder pattern"]
172    pub fn write_part_size(mut self, size: usize) -> Self {
173        self.write_part_size = size;
174        self
175    }
176
177    /// Set the target throughput in Gbps for the S3 client
178    #[must_use = "S3ClientConfig follows a builder pattern"]
179    pub fn throughput_target_gbps(mut self, throughput_target_gbps: f64) -> Self {
180        self.throughput_target_gbps = throughput_target_gbps;
181        self
182    }
183
184    /// Set the memory limit in bytes for the S3 client
185    #[must_use = "S3ClientConfig follows a builder pattern"]
186    pub fn memory_limit_in_bytes(mut self, memory_limit_in_bytes: u64) -> Self {
187        self.memory_limit_in_bytes = memory_limit_in_bytes;
188        self
189    }
190
191    /// Set the endpoint configuration for endpoint resolution
192    #[must_use = "S3ClientConfig follows a builder pattern"]
193    pub fn endpoint_config(mut self, endpoint_config: EndpointConfig) -> Self {
194        self.endpoint_config = endpoint_config;
195        self
196    }
197
198    /// Set a constructor for the HTTP User-agent header for S3 requests
199    #[must_use = "S3ClientConfig follows a builder pattern"]
200    pub fn user_agent(mut self, user_agent: UserAgent) -> Self {
201        self.user_agent = Some(user_agent);
202        self
203    }
204
205    /// Set a value for the request payer HTTP header for S3 requests
206    #[must_use = "S3ClientConfig follows a builder pattern"]
207    pub fn request_payer(mut self, request_payer: &str) -> Self {
208        self.request_payer = Some(request_payer.to_owned());
209        self
210    }
211
212    /// Set an expected bucket owner value
213    #[must_use = "S3ClientConfig follows a builder pattern"]
214    pub fn bucket_owner(mut self, bucket_owner: &str) -> Self {
215        self.bucket_owner = Some(bucket_owner.to_owned());
216        self
217    }
218
219    /// Set a maximum number of attempts for S3 requests. Will be overridden by the
220    /// `AWS_MAX_ATTEMPTS` environment variable if set.
221    #[must_use = "S3ClientConfig follows a builder pattern"]
222    pub fn max_attempts(mut self, max_attempts: NonZeroUsize) -> Self {
223        self.max_attempts = Some(max_attempts);
224        self
225    }
226
227    /// Set the flag for backpressure read
228    #[must_use = "S3ClientConfig follows a builder pattern"]
229    pub fn read_backpressure(mut self, read_backpressure: bool) -> Self {
230        self.read_backpressure = read_backpressure;
231        self
232    }
233
234    /// Set a value for initial backpressure read window size
235    #[must_use = "S3ClientConfig follows a builder pattern"]
236    pub fn initial_read_window(mut self, initial_read_window: usize) -> Self {
237        self.initial_read_window = initial_read_window;
238        self
239    }
240
241    /// Set a list of network interfaces to distribute S3 requests over
242    #[must_use = "S3ClientConfig follows a builder pattern"]
243    pub fn network_interface_names(mut self, network_interface_names: Vec<String>) -> Self {
244        self.network_interface_names = network_interface_names;
245        self
246    }
247
248    /// Set a custom telemetry callback handler
249    #[must_use = "S3ClientConfig follows a builder pattern"]
250    pub fn telemetry_callback(mut self, telemetry_callback: Arc<dyn OnTelemetry>) -> Self {
251        self.telemetry_callback = Some(telemetry_callback);
252        self
253    }
254
255    /// Override the number of threads used by the CRTs AwsEventLoop
256    #[must_use = "S3ClientConfig follows a builder pattern"]
257    pub fn event_loop_threads(mut self, event_loop_threads: u16) -> Self {
258        self.event_loop_threads = Some(event_loop_threads);
259        self
260    }
261
262    /// Set a custom memory pool
263    #[must_use = "S3ClientConfig follows a builder pattern"]
264    pub fn memory_pool(mut self, pool: impl MemoryPool) -> Self {
265        self.buffer_pool_factory = Some(CrtBufferPoolFactory::new(move |_| pool.clone()));
266        self
267    }
268
269    /// Set a custom memory pool factory
270    #[must_use = "S3ClientConfig follows a builder pattern"]
271    pub fn memory_pool_factory(mut self, pool_factory: impl MemoryPoolFactory) -> Self {
272        self.buffer_pool_factory = Some(CrtBufferPoolFactory::new(pool_factory));
273        self
274    }
275}
276
277/// Authentication configuration for the CRT-based S3 client
278#[derive(Debug, Clone, Default)]
279pub enum S3ClientAuthConfig {
280    /// The default AWS credentials resolution chain, similar to the AWS CLI
281    #[default]
282    Default,
283    /// Do not sign requests at all
284    NoSigning,
285    /// Explicitly load the given profile name from the AWS CLI configuration file
286    Profile(String),
287    /// Use a custom credentials provider
288    Provider(CredentialsProvider),
289}
290
291/// An S3 client that uses the [AWS Common Runtime (CRT)][crt] to make requests.
292///
293/// The AWS CRT is a C library that provides a common set of functionality for AWS SDKs. Its S3
294/// client provides high throughput by implementing S3 performance best practices, including
295/// automatic parallelization of GET and PUT requests.
296///
297/// To use this client, invoke the methods from the [`ObjectClient`] trait.
298///
299/// [crt]: https://docs.aws.amazon.com/sdkref/latest/guide/common-runtime.html
300#[derive(Debug, Clone)]
301pub struct S3CrtClient {
302    inner: Arc<S3CrtClientInner>,
303}
304
305impl S3CrtClient {
306    /// Construct a new S3 client with the given configuration.
307    pub fn new(config: S3ClientConfig) -> Result<Self, NewClientError> {
308        Ok(Self {
309            inner: Arc::new(S3CrtClientInner::new(config)?),
310        })
311    }
312
313    /// Return a copy of the [EndpointConfig] for this client
314    pub fn endpoint_config(&self) -> EndpointConfig {
315        self.inner.endpoint_config.clone()
316    }
317
318    #[doc(hidden)]
319    pub fn event_loop_group(&self) -> EventLoopGroup {
320        self.inner.event_loop_group.clone()
321    }
322}
323
324#[derive(Debug)]
325struct S3CrtClientInner {
326    s3_client: Client,
327    event_loop_group: EventLoopGroup,
328    endpoint_config: EndpointConfig,
329    allocator: Allocator,
330    next_request_counter: AtomicU64,
331    /// user_agent_header will be passed into CRT which add additional information "CRTS3NativeClient/0.1.x".
332    /// Here it will add the user agent prefix and s3 client information.
333    user_agent_header: String,
334    request_payer: Option<String>,
335    read_part_size: usize,
336    write_part_size: usize,
337    enable_backpressure: bool,
338    initial_read_window_size: usize,
339    bucket_owner: Option<String>,
340    credentials_provider: Option<CredentialsProvider>,
341    host_resolver: HostResolver,
342    telemetry_callback: Option<Arc<dyn OnTelemetry>>,
343}
344
345impl S3CrtClientInner {
346    fn new(config: S3ClientConfig) -> Result<Self, NewClientError> {
347        let allocator = Allocator::default();
348
349        let mut event_loop_group = EventLoopGroup::new_default(&allocator, config.event_loop_threads, || {}).unwrap();
350
351        let resolver_options = HostResolverDefaultOptions {
352            max_entries: 8,
353            event_loop_group: &mut event_loop_group,
354        };
355
356        let mut host_resolver = HostResolver::new_default(&allocator, &resolver_options).unwrap();
357
358        let bootstrap_options = ClientBootstrapOptions {
359            event_loop_group: &mut event_loop_group,
360            host_resolver: &mut host_resolver,
361        };
362
363        let mut client_bootstrap = ClientBootstrap::new(&allocator, &bootstrap_options).unwrap();
364
365        let mut client_config = ClientConfig::new();
366
367        let retry_strategy = {
368            let mut retry_strategy_options = StandardRetryOptions::default(&mut event_loop_group);
369            let max_attempts = std::env::var("AWS_MAX_ATTEMPTS")
370                .ok()
371                .and_then(|s| s.parse::<usize>().ok())
372                .or_else(|| config.max_attempts.map(|m| m.get()))
373                .unwrap_or(3);
374            // Max *attempts* includes the initial attempt, the CRT's max *retries* does not, so
375            // decrement by one
376            retry_strategy_options.backoff_retry_options.max_retries = max_attempts.saturating_sub(1);
377            retry_strategy_options.backoff_retry_options.backoff_scale_factor = Duration::from_millis(500);
378            retry_strategy_options.backoff_retry_options.jitter_mode = ExponentialBackoffJitterMode::Full;
379            RetryStrategy::standard(&allocator, &retry_strategy_options).unwrap()
380        };
381
382        trace!("constructing client with auth config {:?}", config.auth_config);
383        let credentials_provider = match config.auth_config {
384            S3ClientAuthConfig::Default => {
385                let credentials_chain_default_options = CredentialsProviderChainDefaultOptions {
386                    bootstrap: &mut client_bootstrap,
387                };
388                CredentialsProvider::new_chain_default(&allocator, credentials_chain_default_options)
389                    .map_err(NewClientError::ProviderFailure)?
390            }
391            S3ClientAuthConfig::NoSigning => {
392                CredentialsProvider::new_anonymous(&allocator).map_err(NewClientError::ProviderFailure)?
393            }
394            S3ClientAuthConfig::Profile(profile_name) => {
395                let credentials_profile_options = CredentialsProviderProfileOptions {
396                    bootstrap: &mut client_bootstrap,
397                    profile_name_override: &profile_name,
398                };
399                CredentialsProvider::new_profile(&allocator, credentials_profile_options)
400                    .map_err(NewClientError::ProviderFailure)?
401            }
402            S3ClientAuthConfig::Provider(provider) => provider,
403        };
404
405        let endpoint_config = config.endpoint_config;
406        client_config.region(endpoint_config.get_region());
407        let signing_config = init_signing_config(
408            endpoint_config.get_region(),
409            credentials_provider.clone(),
410            None,
411            None,
412            None,
413        );
414
415        let endpoint_config = match endpoint_config.get_endpoint() {
416            None => {
417                // No explicit endpoint was configured, let's try the environment variable
418                if let Ok(aws_endpoint_url) = std::env::var("AWS_ENDPOINT_URL") {
419                    debug!("using AWS_ENDPOINT_URL {}", aws_endpoint_url);
420                    let env_uri = Uri::new_from_str(&allocator, &aws_endpoint_url)
421                        .map_err(|e| EndpointError::InvalidUri(endpoint_config::InvalidUriError::CouldNotParse(e)))?;
422                    endpoint_config.endpoint(env_uri)
423                } else {
424                    endpoint_config
425                }
426            }
427            Some(_) => endpoint_config,
428        };
429
430        client_config.express_support(true);
431        client_config.read_backpressure(config.read_backpressure);
432        client_config.initial_read_window(config.initial_read_window);
433        client_config.signing_config(signing_config);
434
435        client_config
436            .client_bootstrap(client_bootstrap)
437            .retry_strategy(retry_strategy);
438
439        client_config.throughput_target_gbps(config.throughput_target_gbps);
440        client_config.memory_limit_in_bytes(config.memory_limit_in_bytes);
441
442        if let Some(pool_factory) = &config.buffer_pool_factory {
443            client_config.buffer_pool_factory(pool_factory.clone());
444        }
445
446        // max_part_size is 5GB or less depending on the platform (4GB on 32-bit)
447        let max_part_size = cmp::min(5_u64 * 1024 * 1024 * 1024, usize::MAX as u64) as usize;
448        // TODO: Review the part size validation for read_part_size, read_part_size can have a more relax limit.
449        for part_size in [config.read_part_size, config.write_part_size] {
450            if !(5 * 1024 * 1024..=max_part_size).contains(&part_size) {
451                return Err(NewClientError::InvalidConfiguration(format!(
452                    "part size must be at between 5MiB and {}GiB",
453                    max_part_size / 1024 / 1024 / 1024
454                )));
455            }
456        }
457
458        if !config.network_interface_names.is_empty() {
459            client_config.network_interface_names(config.network_interface_names);
460        }
461
462        let user_agent = config.user_agent.unwrap_or_else(|| UserAgent::new(None));
463        let user_agent_header = user_agent.build();
464
465        let s3_client = Client::new(&allocator, client_config).map_err(NewClientError::CrtError)?;
466
467        Ok(Self {
468            allocator,
469            s3_client,
470            event_loop_group,
471            endpoint_config,
472            next_request_counter: AtomicU64::new(0),
473            user_agent_header,
474            request_payer: config.request_payer,
475            read_part_size: config.read_part_size,
476            write_part_size: config.write_part_size,
477            enable_backpressure: config.read_backpressure,
478            initial_read_window_size: config.initial_read_window,
479            bucket_owner: config.bucket_owner,
480            credentials_provider: Some(credentials_provider),
481            host_resolver,
482            telemetry_callback: config.telemetry_callback,
483        })
484    }
485
486    /// Create a new HTTP request template for the given HTTP method and S3 bucket name.
487    /// Pre-populates common headers used across all requests. Sets the "accept" header assuming the
488    /// response should be XML; this header should be overwritten for requests like GET that return
489    /// object data.
490    fn new_request_template(&self, method: &str, bucket: &str) -> Result<S3Message<'_>, ConstructionError> {
491        let endpoint = self.endpoint_config.resolve_for_bucket(bucket)?;
492        let uri = endpoint.uri()?;
493        trace!(?uri, "resolved endpoint");
494
495        let signing_config = if let Some(credentials_provider) = &self.credentials_provider {
496            let auth_scheme = match endpoint.auth_scheme() {
497                Ok(auth_scheme) => auth_scheme,
498                Err(e) => {
499                    error!(error=?e, "no auth scheme for endpoint");
500                    return Err(e.into());
501                }
502            };
503            trace!(?auth_scheme, "resolved auth scheme");
504            let algorithm = Some(auth_scheme.scheme_name());
505            let service = Some(auth_scheme.signing_name());
506            let use_double_uri_encode = Some(!auth_scheme.disable_double_encoding());
507            Some(init_signing_config(
508                auth_scheme.signing_region(),
509                credentials_provider.clone(),
510                algorithm,
511                service,
512                use_double_uri_encode,
513            ))
514        } else {
515            None
516        };
517
518        let hostname = uri.host_name().to_str().unwrap();
519        let path_prefix = uri.path().to_os_string().into_string().unwrap();
520        let port = uri.host_port();
521        let hostname_header = if port > 0 {
522            format!("{hostname}:{port}")
523        } else {
524            hostname.to_string()
525        };
526
527        let mut message = Message::new_request(&self.allocator)?;
528        message.set_request_method(method)?;
529        message.add_header(&Header::new("Host", hostname_header))?;
530        message.add_header(&Header::new("accept", "application/xml"))?;
531        message.add_header(&Header::new("User-Agent", &self.user_agent_header))?;
532
533        if let Some(ref payer) = self.request_payer {
534            message.add_header(&Header::new("x-amz-request-payer", payer))?;
535        }
536
537        if let Some(ref owner) = self.bucket_owner {
538            message.add_header(&Header::new("x-amz-expected-bucket-owner", owner))?;
539        }
540
541        Ok(S3Message {
542            inner: message,
543            uri,
544            path_prefix,
545            checksum_config: None,
546            signing_config,
547        })
548    }
549
550    /// Make a meta-request using this S3 client that invokes the given callbacks as the request
551    /// makes progress.
552    ///
553    /// The `parse_meta_request_error` callback is invoked on failed requests. It should return `None`
554    /// if it doesn't have a request-specific failure reason. The client will apply some generic error
555    /// parsing in this case (e.g. for permissions errors).
556    #[allow(clippy::too_many_arguments)]
557    fn meta_request_with_callbacks<E: std::error::Error + Send + 'static>(
558        &self,
559        mut options: MetaRequestOptions,
560        request_span: Span,
561        on_request_finish: impl Fn(&RequestMetrics) + Send + 'static,
562        mut on_headers: impl FnMut(&Headers, i32) + Send + 'static,
563        mut on_body: impl FnMut(u64, &Buffer) + Send + 'static,
564        parse_meta_request_error: impl FnOnce(&MetaRequestResult) -> Option<E> + Send + 'static,
565        on_meta_request_result: impl FnOnce(ObjectClientResult<(), E, S3RequestError>) + Send + 'static,
566    ) -> Result<CancellingMetaRequest, S3RequestError> {
567        let span_telemetry = request_span.clone();
568        let span_body = request_span.clone();
569        let span_finish = request_span;
570
571        let endpoint = options.get_endpoint().expect("S3Message always has an endpoint");
572        let hostname = endpoint.host_name().to_str().unwrap().to_owned();
573        let host_resolver = self.host_resolver.clone();
574        let telemetry_callback = self.telemetry_callback.clone();
575
576        let start_time = Instant::now();
577        let first_body_part = Arc::new(AtomicBool::new(true));
578        let first_body_part_clone = Arc::clone(&first_body_part);
579        let total_bytes = Arc::new(AtomicU64::new(0));
580        let total_bytes_clone = Arc::clone(&total_bytes);
581
582        options
583            .on_telemetry(move |metrics| {
584                let _guard = span_telemetry.enter();
585
586                on_request_finish(metrics);
587
588                let http_status = metrics.status_code();
589                let request_canceled = metrics.is_canceled();
590                let request_failure = http_status.map(|status| !(200..299).contains(&status)).unwrap_or(!request_canceled);
591                let crt_error = Some(metrics.error()).filter(|e| e.is_err());
592                let request_type = request_type_to_metrics_string(metrics.request_type());
593                let request_id = metrics.request_id().unwrap_or_else(|| "<unknown>".into());
594                let duration = metrics.total_duration();
595                let ttfb = metrics.time_to_first_byte();
596                let range = metrics.response_headers().and_then(|headers| extract_range_header(&headers));
597
598                let message = if request_failure {
599                    "S3 request failed"
600                } else if request_canceled {
601                    "S3 request canceled"
602                } else {
603                    "S3 request finished"
604                };
605                debug!(%request_type, ?crt_error, http_status, ?range, ?duration, ?ttfb, %request_id, "{}", message);
606                trace!(detailed_metrics=?metrics, "S3 request completed");
607
608                if let Some(ttfb) = ttfb {
609                    metrics::histogram!(S3_REQUEST_FIRST_BYTE_LATENCY, ATTR_S3_REQUEST => request_type).record(ttfb.as_micros() as f64);
610                }
611                metrics::histogram!(S3_REQUEST_TOTAL_LATENCY, ATTR_S3_REQUEST => request_type).record(duration.as_micros() as f64);
612                metrics::counter!(S3_REQUEST_COUNT, ATTR_S3_REQUEST => request_type).increment(1);
613                if request_failure {
614                    metrics::counter!(S3_REQUEST_FAILURE, ATTR_S3_REQUEST => request_type, ATTR_HTTP_STATUS => http_status.unwrap_or(-1).to_string()).increment(1);
615                } else if request_canceled {
616                    metrics::counter!(S3_REQUEST_CANCELED, ATTR_S3_REQUEST => request_type).increment(1);
617                }
618
619                if let Some(telemetry_callback) = &telemetry_callback {
620                    telemetry_callback.on_telemetry(metrics);
621                }
622            })
623            .on_headers(move |headers, response_status| {
624                (on_headers)(headers, response_status);
625            })
626            .on_body(move |range_start, data| {
627                let _guard = span_body.enter();
628
629                if first_body_part.compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst).ok() == Some(true) {
630                    let latency = start_time.elapsed().as_micros() as f64;
631                    let op = span_body.metadata().map(|m| m.name()).unwrap_or("unknown");
632                    metrics::histogram!("s3.meta_requests.first_byte_latency_us", "op" => op).record(latency);
633                }
634                total_bytes.fetch_add(data.len() as u64, Ordering::SeqCst);
635
636                trace!(start = range_start, length = data.len(), "body part received");
637
638                (on_body)(range_start, data);
639            })
640            .on_finish(move |request_result| {
641                let _guard = span_finish.enter();
642
643                let op = span_finish.metadata().map(|m| m.name()).unwrap_or("unknown");
644                let duration = start_time.elapsed();
645
646                metrics::counter!("s3.meta_requests", "op" => op).increment(1);
647                metrics::histogram!("s3.meta_requests.total_latency_us", "op" => op).record(duration.as_micros() as f64);
648                // Some HTTP requests (like HEAD) don't have a body to stream back, so calculate TTFB now
649                if first_body_part_clone.compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst).ok() == Some(true)  {
650                    let latency = duration.as_micros() as f64;
651                    metrics::histogram!("s3.meta_requests.first_byte_latency_us", "op" => op).record(latency);
652                }
653                let total_bytes = total_bytes_clone.load(Ordering::SeqCst);
654                // We only log throughput of object data. PUT needs to be measured in its stream
655                // implementation rather than these callbacks, so we can only do GET here.
656                if op == "get_object" {
657                    emit_throughput_metric(total_bytes, duration, op);
658                }
659                let hostname_awsstring = AwsString::from_str(&hostname, &Allocator::default());
660                if let Ok(host_count) = host_resolver.get_host_address_count(&hostname_awsstring, AddressKinds::a()) {
661                    metrics::gauge!("s3.client.host_count", "host" => hostname).set(host_count as f64);
662                }
663
664                let status_code = request_result.response_status;
665                let log_level = if (200..=399).contains(&status_code) || status_code == 404 || request_result.is_canceled() {
666                    tracing::Level::DEBUG
667                } else {
668                    tracing::Level::WARN
669                };
670
671                let result =
672                    if !request_result.is_err() {
673                        event!(log_level, ?duration, "meta request finished");
674                        Ok(())
675                    } else {
676                        // The `parse_meta_request_error` callback has a choice of whether to give us an error or not.
677                        // If not, fall back to generic error parsing (e.g. for permissions errors), or just no error if that fails too.
678                        let error = parse_meta_request_error(&request_result).map(ObjectClientError::ServiceError);
679                        let maybe_err = error.or_else(|| try_parse_generic_error(&request_result).map(ObjectClientError::ClientError));
680
681                        // Try to parse request header out of the failure. We can't just use the
682                        // telemetry callback because there might be multiple requests per meta
683                        // request, but these headers are known to be from the failed request.
684                        let request_id = match &request_result.error_response_headers {
685                            Some(headers) => headers.get("x-amz-request-id").map(|s| s.value().to_string_lossy().to_string()).ok(),
686                            None => None,
687                        };
688                        let request_id = request_id.unwrap_or_else(|| "<unknown>".into());
689
690                        let message = if request_result.is_canceled() {
691                            "meta request canceled"
692                        } else {
693                            "meta request failed"
694                        };
695                        if let Some(error) = &maybe_err {
696                            event!(log_level, ?duration, %request_id, ?error, message);
697                            debug!("meta request result: {:?}", request_result);
698                        } else {
699                            event!(log_level, ?duration, %request_id, ?request_result, message);
700                        }
701
702                        if request_result.is_canceled() {
703                            metrics::counter!("s3.meta_requests.canceled", "op" => op).increment(1);
704                        } else {
705                            // If it's not a real HTTP status, encode the CRT error in the metric instead
706                            let error_status = if request_result.response_status >= 100 {
707                                request_result.response_status
708                            } else {
709                                -request_result.crt_error.raw_error()
710                            };
711                            metrics::counter!("s3.meta_requests.failures", "op" => op, "status" => format!("{error_status}")).increment(1);
712                        }
713
714                        // Fill in a generic error if we weren't able to parse one
715                        Err(maybe_err.unwrap_or_else(|| ObjectClientError::ClientError(S3RequestError::ResponseError(request_result))))
716                    };
717
718                on_meta_request_result(result);
719            });
720
721        // Issue the HTTP request using the CRT's S3 meta request API
722        let meta_request = self.s3_client.make_meta_request(options)?;
723        Self::poll_client_metrics(&self.s3_client);
724        Ok(CancellingMetaRequest::wrap(meta_request))
725    }
726
727    /// Make a meta-request using this S3 client that returns the response body on success or
728    /// invokes the given callback on failure.
729    ///
730    /// The `parse_meta_request_error` callback is invoked on failed requests. It should return `None`
731    /// if it doesn't have a request-specific failure reason. The client will apply some generic error
732    /// parsing in this case (e.g. for permissions errors).
733    fn meta_request_with_body_payload<E: std::error::Error + Send + 'static>(
734        &self,
735        options: MetaRequestOptions,
736        request_span: Span,
737        parse_meta_request_error: impl FnOnce(&MetaRequestResult) -> Option<E> + Send + 'static,
738    ) -> Result<S3MetaRequest<Vec<u8>, E>, S3RequestError> {
739        let (tx, rx) = oneshot::channel::<ObjectClientResult<Vec<u8>, E, S3RequestError>>();
740
741        // Accumulate the body of the response into this Vec<u8>
742        let body: Arc<Mutex<Vec<u8>>> = Default::default();
743        let body_clone = Arc::clone(&body);
744
745        let meta_request = self.meta_request_with_callbacks(
746            options,
747            request_span,
748            |_| {},
749            |_, _| {},
750            move |offset, data| {
751                let mut body = body_clone.lock().unwrap();
752                assert_eq!(offset as usize, body.len());
753                body.extend_from_slice(data);
754            },
755            parse_meta_request_error,
756            move |result| _ = tx.send(result.map(|_| std::mem::take(&mut *body.lock().unwrap()))),
757        )?;
758        Ok(S3MetaRequest {
759            receiver: rx.fuse(),
760            meta_request,
761        })
762    }
763
764    /// Make a meta-request using this S3 client that returns the response headers on success or
765    /// invokes the given callback on failure.
766    ///
767    /// The `parse_meta_request_error` callback is invoked on failed requests. It should return `None`
768    /// if it doesn't have a request-specific failure reason. The client will apply some generic error
769    /// parsing in this case (e.g. for permissions errors).
770    fn meta_request_with_headers_payload<E: std::error::Error + Send + 'static>(
771        &self,
772        options: MetaRequestOptions,
773        request_span: Span,
774        parse_meta_request_error: impl FnOnce(&MetaRequestResult) -> Option<E> + Send + 'static,
775    ) -> Result<S3MetaRequest<Headers, E>, S3RequestError> {
776        let (tx, rx) = oneshot::channel::<ObjectClientResult<Headers, E, S3RequestError>>();
777
778        // On success, stash the headers in this lock during the on_headers callback,
779        // and pull them out during the on_meta_request_result callback.
780        let on_headers: Arc<Mutex<Option<Headers>>> = Default::default();
781        let on_result = on_headers.clone();
782
783        let meta_request = self.meta_request_with_callbacks(
784            options,
785            request_span,
786            |_| {},
787            move |headers, status| {
788                // Only store headers if we have a 2xx status code. If we only get other status codes,
789                // then on_meta_request_result will send an error.
790                if (200..300).contains(&status) {
791                    *on_headers.lock().unwrap() = Some(headers.clone());
792                }
793            },
794            |_, _| {},
795            parse_meta_request_error,
796            move |result| {
797                // Return the headers on success, otherwise propagate the error.
798                let headers =
799                    result.and_then(|_| {
800                        on_result.lock().unwrap().take().ok_or_else(|| {
801                            S3RequestError::internal_failure(ResponseHeadersError::MissingHeaders).into()
802                        })
803                    });
804                _ = tx.send(headers);
805            },
806        )?;
807        Ok(S3MetaRequest {
808            receiver: rx.fuse(),
809            meta_request,
810        })
811    }
812
813    /// Make a meta-request using this S3 client that invokes the given callback on failure.
814    ///
815    /// The `parse_meta_request_error` callback is invoked on failed requests. It should return `None`
816    /// if it doesn't have a request-specific failure reason. The client will apply some generic error
817    /// parsing in this case (e.g. for permissions errors).
818    fn meta_request_without_payload<E: std::error::Error + Send + 'static>(
819        &self,
820        options: MetaRequestOptions,
821        request_span: Span,
822        parse_meta_request_error: impl FnOnce(&MetaRequestResult) -> Option<E> + Send + 'static,
823    ) -> Result<S3MetaRequest<(), E>, S3RequestError> {
824        let (tx, rx) = oneshot::channel::<ObjectClientResult<(), E, S3RequestError>>();
825
826        let meta_request = self.meta_request_with_callbacks(
827            options,
828            request_span,
829            |_| {},
830            |_, _| {},
831            |_, _| {},
832            parse_meta_request_error,
833            move |result| _ = tx.send(result),
834        )?;
835        Ok(S3MetaRequest {
836            receiver: rx.fuse(),
837            meta_request,
838        })
839    }
840
841    fn poll_client_metrics(s3_client: &Client) {
842        let metrics = s3_client.poll_client_metrics();
843        metrics::gauge!("s3.client.num_requests_being_processed").set(metrics.num_requests_tracked_requests as f64);
844        metrics::gauge!("s3.client.num_requests_being_prepared").set(metrics.num_requests_being_prepared as f64);
845        metrics::gauge!("s3.client.request_queue_size").set(metrics.request_queue_size as f64);
846        metrics::gauge!("s3.client.num_auto_default_network_io").set(metrics.num_auto_default_network_io as f64);
847        metrics::gauge!("s3.client.num_auto_ranged_get_network_io").set(metrics.num_auto_ranged_get_network_io as f64);
848        metrics::gauge!("s3.client.num_auto_ranged_put_network_io").set(metrics.num_auto_ranged_put_network_io as f64);
849        metrics::gauge!("s3.client.num_auto_ranged_copy_network_io")
850            .set(metrics.num_auto_ranged_copy_network_io as f64);
851        metrics::gauge!("s3.client.num_total_network_io").set(metrics.num_total_network_io() as f64);
852        metrics::gauge!("s3.client.num_requests_stream_queued_waiting")
853            .set(metrics.num_requests_stream_queued_waiting as f64);
854        metrics::gauge!("s3.client.num_requests_streaming_response")
855            .set(metrics.num_requests_streaming_response as f64);
856
857        // Buffer pool metrics
858        let start = Instant::now();
859        if let Some(buffer_pool_stats) = s3_client.poll_default_buffer_pool_usage_stats() {
860            metrics::histogram!("s3.client.buffer_pool.get_usage_latency_us")
861                .record(start.elapsed().as_micros() as f64);
862            metrics::gauge!("s3.client.buffer_pool.mem_limit").set(buffer_pool_stats.mem_limit as f64);
863            metrics::gauge!("s3.client.buffer_pool.primary_cutoff").set(buffer_pool_stats.primary_cutoff as f64);
864            metrics::gauge!("s3.client.buffer_pool.primary_used").set(buffer_pool_stats.primary_used as f64);
865            metrics::gauge!("s3.client.buffer_pool.primary_allocated").set(buffer_pool_stats.primary_allocated as f64);
866            metrics::gauge!("s3.client.buffer_pool.primary_reserved").set(buffer_pool_stats.primary_reserved as f64);
867            metrics::gauge!("s3.client.buffer_pool.primary_num_blocks")
868                .set(buffer_pool_stats.primary_num_blocks as f64);
869            metrics::gauge!("s3.client.buffer_pool.secondary_reserved")
870                .set(buffer_pool_stats.secondary_reserved as f64);
871            metrics::gauge!("s3.client.buffer_pool.secondary_used").set(buffer_pool_stats.secondary_used as f64);
872            metrics::gauge!("s3.client.buffer_pool.forced_used").set(buffer_pool_stats.forced_used as f64);
873        }
874    }
875
876    fn next_request_counter(&self) -> u64 {
877        self.next_request_counter.fetch_add(1, Ordering::SeqCst)
878    }
879}
880
881/// Failure retrieving headers
882#[derive(Debug, Error)]
883enum ResponseHeadersError {
884    #[error("response headers are missing")]
885    MissingHeaders,
886}
887
888/// S3 operation supported by this client.
889#[derive(Debug, Clone, Copy)]
890enum S3Operation {
891    DeleteObject,
892    GetObject,
893    GetObjectAttributes,
894    HeadBucket,
895    HeadObject,
896    ListObjects,
897    PutObject,
898    CopyObject,
899    PutObjectSingle,
900}
901
902impl S3Operation {
903    /// The [MetaRequestType] to use for this operation.
904    fn meta_request_type(&self) -> MetaRequestType {
905        match self {
906            S3Operation::GetObject => MetaRequestType::GetObject,
907            S3Operation::PutObject => MetaRequestType::PutObject,
908            S3Operation::CopyObject => MetaRequestType::CopyObject,
909            _ => MetaRequestType::Default,
910        }
911    }
912
913    /// The operation name to set when configuring a request. Required for operations that
914    /// have MetaRequestType::Default (see [meta_request_type]). `None` otherwise.
915    fn operation_name(&self) -> Option<&'static str> {
916        match self {
917            S3Operation::DeleteObject => Some("DeleteObject"),
918            S3Operation::GetObject => None,
919            S3Operation::GetObjectAttributes => Some("GetObjectAttributes"),
920            S3Operation::HeadBucket => Some("HeadBucket"),
921            S3Operation::HeadObject => Some("HeadObject"),
922            S3Operation::ListObjects => Some("ListObjectsV2"),
923            S3Operation::PutObject => None,
924            S3Operation::CopyObject => None,
925            S3Operation::PutObjectSingle => Some("PutObject"),
926        }
927    }
928}
929
930/// A HTTP message to be sent to S3. This is a wrapper around a plain HTTP message, except that it
931/// helps us correctly configure the endpoint and "Host" header to handle both path-style and
932/// virtual-hosted-style addresses. The `path_prefix` is appended to the front of all paths, and
933/// need not be terminated with a `/`.
934#[derive(Debug)]
935struct S3Message<'a> {
936    inner: Message<'a>,
937    uri: Uri,
938    path_prefix: String,
939    checksum_config: Option<ChecksumConfig>,
940    signing_config: Option<SigningConfig>,
941}
942
943// This is RFC 3986 but with '/' also considered a safe character for path fragments.
944const URLENCODE_QUERY_FRAGMENT: &AsciiSet = &NON_ALPHANUMERIC.remove(b'-').remove(b'.').remove(b'_').remove(b'~');
945const URLENCODE_PATH_FRAGMENT: &AsciiSet = &URLENCODE_QUERY_FRAGMENT.remove(b'/');
946
947fn write_encoded_fragment(s: &mut OsString, piece: impl AsRef<OsStr>, encoding: &'static AsciiSet) {
948    let iter = percent_encode(piece.as_ref().as_bytes(), encoding);
949    s.extend(iter.map(|s| OsStr::from_bytes(s.as_bytes())));
950}
951
952#[derive(Debug, Default)]
953enum QueryFragment<'a, P: AsRef<OsStr>> {
954    #[default]
955    Empty,
956    Query(&'a [(P, P)]),
957    Action(P),
958}
959
960impl<P: AsRef<OsStr>> QueryFragment<'_, P> {
961    // This estimate is exact if no characters need encoding, otherwise we'll end up
962    // reallocating a couple of times. The '?' for the query is counted in the first key-value
963    // pair.
964    fn size(&self) -> usize {
965        match self {
966            QueryFragment::Empty => 0,
967            QueryFragment::Query(query) => query
968                .iter()
969                .map(|(key, value)| key.as_ref().len() + value.as_ref().len() + 2)
970                .sum::<usize>(),
971            QueryFragment::Action(action) => 1 + action.as_ref().len(),
972        }
973    }
974
975    // Write the fragment to the query string
976    fn write(&self, path: &mut OsString) {
977        match &self {
978            QueryFragment::Query(query) if !query.is_empty() => {
979                path.push("?");
980                for (i, (key, value)) in query.iter().enumerate() {
981                    if i != 0 {
982                        path.push("&");
983                    }
984                    write_encoded_fragment(path, key, URLENCODE_QUERY_FRAGMENT);
985                    path.push("=");
986                    write_encoded_fragment(path, value, URLENCODE_QUERY_FRAGMENT);
987                }
988            }
989            QueryFragment::Action(action) => {
990                path.push("?");
991                path.push(action.as_ref());
992            }
993            _ => {}
994        }
995    }
996}
997
998impl<'a> S3Message<'a> {
999    /// Add a header to this message. The header is added if necessary and any existing values for
1000    /// this header are removed.
1001    fn set_header(
1002        &mut self,
1003        header: &Header<impl AsRef<OsStr>, impl AsRef<OsStr>>,
1004    ) -> Result<(), mountpoint_s3_crt::common::error::Error> {
1005        self.inner.set_header(header)
1006    }
1007
1008    /// Set the request path and query for this message. The components should not be URL-encoded;
1009    /// this method will handle that.
1010    fn set_request_path_and_query<P: AsRef<OsStr>>(
1011        &mut self,
1012        path: impl AsRef<OsStr>,
1013        query_or_action: QueryFragment<P>,
1014    ) -> Result<(), mountpoint_s3_crt::common::error::Error> {
1015        let space_needed = self.path_prefix.len() + query_or_action.size();
1016
1017        let mut full_path = OsString::with_capacity(space_needed);
1018
1019        write_encoded_fragment(&mut full_path, &self.path_prefix, URLENCODE_PATH_FRAGMENT);
1020        write_encoded_fragment(&mut full_path, &path, URLENCODE_PATH_FRAGMENT);
1021
1022        // Build the query string
1023        query_or_action.write(&mut full_path);
1024        self.inner.set_request_path(full_path)
1025    }
1026
1027    /// Set the request path for this message. The path should not be URL-encoded; this method will
1028    /// handle that.
1029    fn set_request_path(&mut self, path: impl AsRef<OsStr>) -> Result<(), mountpoint_s3_crt::common::error::Error> {
1030        self.set_request_path_and_query::<&str>(path, Default::default())
1031    }
1032
1033    /// Sets the checksum configuration for this message.
1034    fn set_checksum_config(&mut self, checksum_config: Option<ChecksumConfig>) {
1035        self.checksum_config = checksum_config;
1036    }
1037
1038    /// Sets the body input stream for this message, and returns any previously set input stream.
1039    /// If input_stream is None, unsets the body.
1040    fn set_body_stream(&mut self, input_stream: Option<InputStream<'a>>) -> Option<InputStream<'a>> {
1041        self.inner.set_body_stream(input_stream)
1042    }
1043
1044    /// Set the content length header.
1045    fn set_content_length_header(
1046        &mut self,
1047        content_length: usize,
1048    ) -> Result<(), mountpoint_s3_crt::common::error::Error> {
1049        self.inner
1050            .set_header(&Header::new("Content-Length", content_length.to_string()))
1051    }
1052
1053    /// Set the checksum header.
1054    fn set_checksum_header(
1055        &mut self,
1056        checksum: &UploadChecksum,
1057    ) -> Result<(), mountpoint_s3_crt::common::error::Error> {
1058        let header = match checksum {
1059            UploadChecksum::Crc64nvme(crc64) => Header::new("x-amz-checksum-crc64nvme", crc64nvme_to_base64(crc64)),
1060            UploadChecksum::Crc32c(crc32c) => Header::new("x-amz-checksum-crc32c", crc32c_to_base64(crc32c)),
1061            UploadChecksum::Crc32(crc32) => Header::new("x-amz-checksum-crc32", crc32_to_base64(crc32)),
1062            UploadChecksum::Sha1(sha1) => Header::new("x-amz-checksum-sha1", sha1_to_base64(sha1)),
1063            UploadChecksum::Sha256(sha256) => Header::new("x-amz-checksum-sha256", sha256_to_base64(sha256)),
1064        };
1065        self.inner.set_header(&header)
1066    }
1067
1068    fn into_options(self, operation: S3Operation) -> MetaRequestOptions<'a> {
1069        let mut options = MetaRequestOptions::new();
1070        if let Some(checksum_config) = self.checksum_config {
1071            options.checksum_config(checksum_config);
1072        }
1073        if let Some(signing_config) = self.signing_config {
1074            options.signing_config(signing_config);
1075        }
1076        options
1077            .message(self.inner)
1078            .endpoint(self.uri)
1079            .request_type(operation.meta_request_type());
1080        if let Some(operation_name) = operation.operation_name() {
1081            options.operation_name(operation_name);
1082        }
1083        options
1084    }
1085}
1086
1087#[derive(Debug)]
1088#[pin_project]
1089#[must_use]
1090struct S3MetaRequest<T, E> {
1091    /// Receiver for the result of meta-request.
1092    #[pin]
1093    receiver: Fuse<oneshot::Receiver<ObjectClientResult<T, E, S3RequestError>>>,
1094    meta_request: CancellingMetaRequest,
1095}
1096
1097impl<T: Send, E: Send> Future for S3MetaRequest<T, E> {
1098    type Output = ObjectClientResult<T, E, S3RequestError>;
1099
1100    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1101        let this = self.project();
1102        this.receiver
1103            .poll(cx)
1104            .map(|result| result.unwrap_or_else(|err| Err(S3RequestError::internal_failure(err).into())))
1105    }
1106}
1107
1108impl<T: Send, E: Send> FusedFuture for S3MetaRequest<T, E> {
1109    fn is_terminated(&self) -> bool {
1110        self.receiver.is_terminated()
1111    }
1112}
1113
1114/// Wrapper for a [MetaRequest] that cancels it on drop.
1115///
1116/// Note that if the request has already completed, cancelling it has no effect.
1117#[derive(Debug)]
1118struct CancellingMetaRequest {
1119    inner: MetaRequest,
1120}
1121
1122impl CancellingMetaRequest {
1123    fn wrap(meta_request: MetaRequest) -> Self {
1124        Self { inner: meta_request }
1125    }
1126}
1127
1128impl Drop for CancellingMetaRequest {
1129    fn drop(&mut self) {
1130        self.inner.cancel();
1131    }
1132}
1133
1134impl Deref for CancellingMetaRequest {
1135    type Target = MetaRequest;
1136
1137    fn deref(&self) -> &Self::Target {
1138        &self.inner
1139    }
1140}
1141
1142impl DerefMut for CancellingMetaRequest {
1143    fn deref_mut(&mut self) -> &mut Self::Target {
1144        &mut self.inner
1145    }
1146}
1147
1148/// Failures to construct a new S3 client
1149#[derive(Error, Debug)]
1150#[non_exhaustive]
1151pub enum NewClientError {
1152    /// Invalid S3 endpoint
1153    #[error("invalid S3 endpoint")]
1154    InvalidEndpoint(#[from] EndpointError),
1155    /// Invalid AWS credentials
1156    #[error("invalid AWS credentials")]
1157    ProviderFailure(#[source] mountpoint_s3_crt::common::error::Error),
1158    /// Invalid configuration
1159    #[error("invalid configuration: {0}")]
1160    InvalidConfiguration(String),
1161    /// An internal error from within the AWS Common Runtime
1162    #[error("Unknown CRT error")]
1163    CrtError(#[source] mountpoint_s3_crt::common::error::Error),
1164}
1165
1166/// Errors returned by the CRT-based S3 client
1167#[derive(Error, Debug)]
1168pub enum S3RequestError {
1169    /// An internal error from within the S3 client. The request may have been sent.
1170    #[error("Internal S3 client error")]
1171    InternalError(#[source] Box<dyn std::error::Error + Send + Sync>),
1172
1173    /// An internal error from within the AWS Common Runtime. The request may have been sent.
1174    #[error("Unknown CRT error")]
1175    CrtError(#[from] mountpoint_s3_crt::common::error::Error),
1176
1177    /// An error during construction of a request. The request was not sent.
1178    #[error("Failed to construct request")]
1179    ConstructionFailure(#[from] ConstructionError),
1180
1181    /// The request was sent but an unknown or unhandled failure occurred while processing it.
1182    #[error("Unknown response error: {0:?}")]
1183    ResponseError(MetaRequestResult),
1184
1185    /// The request was made to the wrong region
1186    #[error("Wrong region (expecting {0})")]
1187    IncorrectRegion(String, ClientErrorMetadata),
1188
1189    /// Forbidden
1190    #[error("Forbidden: {0}")]
1191    Forbidden(String, ClientErrorMetadata),
1192
1193    /// The request was attempted but could not be signed due to no available credentials
1194    #[error("No signing credentials available, see CRT debug logs")]
1195    NoSigningCredentials,
1196
1197    /// The request was canceled
1198    #[error("Request canceled")]
1199    RequestCanceled,
1200
1201    /// The request was throttled by S3
1202    #[error("Request throttled")]
1203    Throttled,
1204
1205    /// Cannot fetch more data because current read window is exhausted. The read window must
1206    /// be advanced using [GetObjectRequest::increment_read_window(u64)] to continue fetching
1207    /// new data.
1208    #[error("Polled for data with empty read window")]
1209    EmptyReadWindow,
1210}
1211
1212impl S3RequestError {
1213    fn construction_failure(inner: impl Into<ConstructionError>) -> Self {
1214        S3RequestError::ConstructionFailure(inner.into())
1215    }
1216
1217    fn internal_failure(inner: impl std::error::Error + Send + Sync + 'static) -> Self {
1218        S3RequestError::InternalError(Box::new(inner))
1219    }
1220}
1221
1222impl ProvideErrorMetadata for S3RequestError {
1223    fn meta(&self) -> ClientErrorMetadata {
1224        match self {
1225            Self::ResponseError(request_result) => ClientErrorMetadata::from_meta_request_result(request_result),
1226            Self::Forbidden(_, metadata) => metadata.clone(),
1227            Self::Throttled => ClientErrorMetadata {
1228                http_code: Some(503),
1229                error_code: Some("SlowDown".to_string()),
1230                error_message: Some("Please reduce your request rate.".to_string()),
1231            },
1232            Self::IncorrectRegion(_, metadata) => metadata.clone(),
1233            _ => Default::default(),
1234        }
1235    }
1236}
1237
1238#[derive(Error, Debug)]
1239pub enum ConstructionError {
1240    /// CRT error while constructing the request
1241    #[error("Unknown CRT error")]
1242    CrtError(#[from] mountpoint_s3_crt::common::error::Error),
1243
1244    /// The S3 endpoint was invalid
1245    #[error("Invalid S3 endpoint")]
1246    InvalidEndpoint(#[from] EndpointError),
1247}
1248
1249/// Return a string version of a [RequestType] for use in metrics
1250///
1251/// TODO: Replace this method with `aws_s3_request_metrics_get_operation_name`,
1252///       and ensure all requests have an associated operation name.
1253fn request_type_to_metrics_string(request_type: RequestType) -> &'static str {
1254    match request_type {
1255        RequestType::Unknown => "Default",
1256        RequestType::HeadObject => "HeadObject",
1257        RequestType::GetObject => "GetObject",
1258        RequestType::ListParts => "ListParts",
1259        RequestType::CreateMultipartUpload => "CreateMultipartUpload",
1260        RequestType::UploadPart => "UploadPart",
1261        RequestType::AbortMultipartUpload => "AbortMultipartUpload",
1262        RequestType::CompleteMultipartUpload => "CompleteMultipartUpload",
1263        RequestType::UploadPartCopy => "UploadPartCopy",
1264        RequestType::CopyObject => "CopyObject",
1265        RequestType::PutObject => "PutObject",
1266    }
1267}
1268
1269/// Extract the byte range from the Content-Range header if present and valid
1270fn extract_range_header(headers: &Headers) -> Option<Range<u64>> {
1271    let header = headers.get("Content-Range").ok()?;
1272    let value = header.value().to_str()?;
1273
1274    // Content-Range: <unit> <range-start>-<range-end>/<size>
1275
1276    if !value.starts_with("bytes ") {
1277        return None;
1278    }
1279    let (_, value) = value.split_at("bytes ".len());
1280    let (range, _) = value.split_once('/')?;
1281    let (start, end) = range.split_once('-')?;
1282    let start = start.parse::<u64>().ok()?;
1283    let end = end.parse::<u64>().ok()?;
1284
1285    // Rust ranges are exclusive at the end, but Content-Range is inclusive
1286    Some(start..end + 1)
1287}
1288
1289/// Extract the [Checksum] information from headers
1290fn parse_checksum(headers: &Headers) -> Result<Checksum, HeadersError> {
1291    let checksum_crc32 = headers.get_as_optional_string("x-amz-checksum-crc32")?;
1292    let checksum_crc32c = headers.get_as_optional_string("x-amz-checksum-crc32c")?;
1293    let checksum_sha1 = headers.get_as_optional_string("x-amz-checksum-sha1")?;
1294    let checksum_sha256 = headers.get_as_optional_string("x-amz-checksum-sha256")?;
1295    let checksum_crc64nvme = headers.get_as_optional_string("x-amz-checksum-crc64nvme")?;
1296
1297    Ok(Checksum {
1298        checksum_crc64nvme,
1299        checksum_crc32,
1300        checksum_crc32c,
1301        checksum_sha1,
1302        checksum_sha256,
1303    })
1304}
1305
1306/// Try to parse a modeled error out of a failing meta request
1307fn try_parse_generic_error(request_result: &MetaRequestResult) -> Option<S3RequestError> {
1308    /// Look for a redirect header pointing to a different region for the bucket
1309    fn try_parse_redirect(request_result: &MetaRequestResult) -> Option<S3RequestError> {
1310        let headers = request_result.error_response_headers.as_ref()?;
1311        let region_header = headers.get("x-amz-bucket-region").ok()?;
1312        let region = region_header.value().to_owned().into_string().ok()?;
1313        Some(S3RequestError::IncorrectRegion(
1314            region,
1315            ClientErrorMetadata::from_meta_request_result(request_result),
1316        ))
1317    }
1318
1319    /// Look for access-related errors
1320    fn try_parse_forbidden(request_result: &MetaRequestResult) -> Option<S3RequestError> {
1321        let Some(body) = request_result.error_response_body.as_ref() else {
1322            // Header-only requests like HeadObject and HeadBucket can't give us a more detailed
1323            // error, so just trust the response code
1324            return Some(S3RequestError::Forbidden(
1325                "<no message>".to_owned(),
1326                ClientErrorMetadata {
1327                    http_code: Some(request_result.response_status),
1328                    ..Default::default()
1329                },
1330            ));
1331        };
1332        let error_elem = xmltree::Element::parse(body.as_bytes()).ok()?;
1333        let error_code = error_elem.get_child("Code")?;
1334        let error_code_str = error_code.get_text()?;
1335        // Always translate 403 to Forbidden, but otherwise first check the error code, since other
1336        // response statuses are overloaded and not always access-related errors.
1337        if request_result.response_status == 403
1338            || matches!(
1339                error_code_str.deref(),
1340                "AccessDenied" | "InvalidToken" | "ExpiredToken" | "SignatureDoesNotMatch"
1341            )
1342        {
1343            let message = error_elem
1344                .get_child("Message")
1345                .and_then(|e| e.get_text())
1346                .unwrap_or(error_code_str.clone());
1347            Some(S3RequestError::Forbidden(
1348                message.to_string(),
1349                ClientErrorMetadata {
1350                    http_code: Some(request_result.response_status),
1351                    error_code: Some(error_code_str.to_string()),
1352                    error_message: Some(message.into_owned()),
1353                },
1354            ))
1355        } else {
1356            None
1357        }
1358    }
1359
1360    /// Try to look for error related to no signing credentials, returns generic error otherwise
1361    fn try_parse_no_credentials_or_generic(request_result: &MetaRequestResult) -> S3RequestError {
1362        let crt_error_code = request_result.crt_error.raw_error();
1363        if crt_error_code == mountpoint_s3_crt::auth::ErrorCode::AWS_AUTH_SIGNING_NO_CREDENTIALS as i32 {
1364            S3RequestError::NoSigningCredentials
1365        } else {
1366            S3RequestError::CrtError(crt_error_code.into())
1367        }
1368    }
1369
1370    fn try_parse_throttled(request_result: &MetaRequestResult) -> Option<S3RequestError> {
1371        let crt_error_code = request_result.crt_error.raw_error();
1372        if crt_error_code == mountpoint_s3_crt::s3::ErrorCode::AWS_ERROR_S3_SLOW_DOWN as i32 {
1373            Some(S3RequestError::Throttled)
1374        } else {
1375            None
1376        }
1377    }
1378
1379    /// Handle canceled requests
1380    fn try_parse_canceled_request(request_result: &MetaRequestResult) -> Option<S3RequestError> {
1381        request_result.is_canceled().then_some(S3RequestError::RequestCanceled)
1382    }
1383
1384    match request_result.response_status {
1385        301 => try_parse_redirect(request_result),
1386        // 400 is overloaded, it can be an access error (invalid token) or (for MRAP) a bucket
1387        // redirect
1388        400 => try_parse_forbidden(request_result).or_else(|| try_parse_redirect(request_result)),
1389        403 => try_parse_forbidden(request_result),
1390        // if the http response status is not set, we look into crt_error_code to identify the error
1391        0 => try_parse_throttled(request_result)
1392            .or_else(|| try_parse_canceled_request(request_result))
1393            .or_else(|| Some(try_parse_no_credentials_or_generic(request_result))),
1394        _ => None,
1395    }
1396}
1397
1398/// Record a throughput metric for GET/PUT. We can't inline this into S3CrtClient callbacks because
1399/// PUT bytes don't transit those callbacks.
1400fn emit_throughput_metric(bytes: u64, duration: Duration, op: &'static str) {
1401    let throughput_mbps = bytes as f64 / 1024.0 / 1024.0 / duration.as_secs_f64();
1402    // Semi-arbitrary choices here to avoid averaging out large and small requests
1403    const MEGABYTE: u64 = 1024 * 1024;
1404    let bucket = if bytes < MEGABYTE {
1405        "<1MiB"
1406    } else if bytes <= 16 * MEGABYTE {
1407        "1-16MiB"
1408    } else {
1409        ">16MiB"
1410    };
1411    metrics::histogram!("s3.meta_requests.throughput_mibs", "op" => op, "size" => bucket).record(throughput_mbps);
1412}
1413
1414#[cfg_attr(not(docsrs), async_trait)]
1415impl ObjectClient for S3CrtClient {
1416    type GetObjectResponse = S3GetObjectResponse;
1417    type PutObjectRequest = S3PutObjectRequest;
1418    type ClientError = S3RequestError;
1419
1420    fn read_part_size(&self) -> usize {
1421        self.inner.read_part_size
1422    }
1423
1424    fn write_part_size(&self) -> usize {
1425        // TODO: the CRT does some clamping to a max size rather than just swallowing the part size
1426        // we configured it with, so this might be wrong. Right now the only clamping is to the max
1427        // S3 part size (5GiB), so this shouldn't affect the result.
1428        // https://github.com/awslabs/aws-c-s3/blob/94e3342c12833c5199/source/s3_client.c#L337-L344
1429        self.inner.write_part_size
1430    }
1431
1432    fn initial_read_window_size(&self) -> Option<usize> {
1433        if self.inner.enable_backpressure {
1434            Some(self.inner.initial_read_window_size)
1435        } else {
1436            None
1437        }
1438    }
1439
1440    fn mem_usage_stats(&self) -> Option<BufferPoolUsageStats> {
1441        let start = Instant::now();
1442        self.inner
1443            .s3_client
1444            .poll_default_buffer_pool_usage_stats()
1445            .inspect(|_| {
1446                metrics::histogram!("s3.client.buffer_pool.get_usage_latency_us")
1447                    .record(start.elapsed().as_micros() as f64);
1448            })
1449    }
1450
1451    async fn delete_object(
1452        &self,
1453        bucket: &str,
1454        key: &str,
1455    ) -> ObjectClientResult<DeleteObjectResult, DeleteObjectError, Self::ClientError> {
1456        self.delete_object(bucket, key).await
1457    }
1458
1459    async fn copy_object(
1460        &self,
1461        source_bucket: &str,
1462        source_key: &str,
1463        destination_bucket: &str,
1464        destination_key: &str,
1465        params: &CopyObjectParams,
1466    ) -> ObjectClientResult<CopyObjectResult, CopyObjectError, S3RequestError> {
1467        self.copy_object(source_bucket, source_key, destination_bucket, destination_key, params)
1468            .await
1469    }
1470
1471    async fn get_object(
1472        &self,
1473        bucket: &str,
1474        key: &str,
1475        params: &GetObjectParams,
1476    ) -> ObjectClientResult<Self::GetObjectResponse, GetObjectError, Self::ClientError> {
1477        self.get_object(bucket, key, params).await
1478    }
1479
1480    async fn list_objects(
1481        &self,
1482        bucket: &str,
1483        continuation_token: Option<&str>,
1484        delimiter: &str,
1485        max_keys: usize,
1486        prefix: &str,
1487    ) -> ObjectClientResult<ListObjectsResult, ListObjectsError, Self::ClientError> {
1488        self.list_objects(bucket, continuation_token, delimiter, max_keys, prefix)
1489            .await
1490    }
1491
1492    async fn head_object(
1493        &self,
1494        bucket: &str,
1495        key: &str,
1496        params: &HeadObjectParams,
1497    ) -> ObjectClientResult<HeadObjectResult, HeadObjectError, Self::ClientError> {
1498        self.head_object(bucket, key, params).await
1499    }
1500
1501    async fn put_object(
1502        &self,
1503        bucket: &str,
1504        key: &str,
1505        params: &PutObjectParams,
1506    ) -> ObjectClientResult<Self::PutObjectRequest, PutObjectError, Self::ClientError> {
1507        self.put_object(bucket, key, params).await
1508    }
1509
1510    async fn put_object_single<'a>(
1511        &self,
1512        bucket: &str,
1513        key: &str,
1514        params: &PutObjectSingleParams,
1515        contents: impl AsRef<[u8]> + Send + 'a,
1516    ) -> ObjectClientResult<PutObjectResult, PutObjectError, Self::ClientError> {
1517        self.put_object_single(bucket, key, params, contents).await
1518    }
1519
1520    async fn get_object_attributes(
1521        &self,
1522        bucket: &str,
1523        key: &str,
1524        max_parts: Option<usize>,
1525        part_number_marker: Option<usize>,
1526        object_attributes: &[ObjectAttribute],
1527    ) -> ObjectClientResult<GetObjectAttributesResult, GetObjectAttributesError, Self::ClientError> {
1528        self.get_object_attributes(bucket, key, max_parts, part_number_marker, object_attributes)
1529            .await
1530    }
1531
1532    async fn rename_object(
1533        &self,
1534        bucket: &str,
1535        src_key: &str,
1536        dst_key: &str,
1537        params: &RenameObjectParams,
1538    ) -> ObjectClientResult<RenameObjectResult, RenameObjectError, Self::ClientError> {
1539        self.rename_object(bucket, src_key, dst_key, params).await
1540    }
1541}
1542
1543/// Custom handling of telemetry events
1544pub trait OnTelemetry: std::fmt::Debug + Send + Sync {
1545    fn on_telemetry(&self, request_metrics: &RequestMetrics);
1546}
1547
1548#[cfg(test)]
1549mod tests {
1550    use mountpoint_s3_crt::common::error::Error;
1551    use rusty_fork::rusty_fork_test;
1552    use std::assert_eq;
1553
1554    use super::*;
1555    use test_case::test_case;
1556
1557    /// Test explicit validation in [Client::new]
1558    fn client_new_fails_with_invalid_part_size(part_size: usize) {
1559        let config = S3ClientConfig::default().part_size(part_size);
1560        let e = S3CrtClient::new(config).expect_err("creating a new client should fail");
1561        let message = if cfg!(target_pointer_width = "64") {
1562            "invalid configuration: part size must be at between 5MiB and 5GiB".to_string()
1563        } else {
1564            format!(
1565                "invalid configuration: part size must be at between 5MiB and {}GiB",
1566                usize::MAX / 1024 / 1024 / 1024
1567            )
1568        };
1569        assert_eq!(e.to_string(), message);
1570    }
1571
1572    /// On 32-bit platform we limit part size with usize:MAX,
1573    /// it is impossible to provide a greater value
1574    #[cfg(target_pointer_width = "64")]
1575    #[test]
1576    fn client_new_fails_with_greater_part_size() {
1577        let part_size = 6 * 1024 * 1024 * 1024; // greater than 5GiB
1578        client_new_fails_with_invalid_part_size(part_size);
1579    }
1580
1581    #[test]
1582    fn client_new_fails_with_smaller_part_size() {
1583        let part_size = 4 * 1024 * 1024; // less than 5MiB
1584        client_new_fails_with_invalid_part_size(part_size);
1585    }
1586
1587    /// Test if the prefix is added correctly to the User-Agent header
1588    #[test]
1589    fn test_user_agent_with_prefix() {
1590        let user_agent_prefix = String::from("someprefix");
1591        let expected_user_agent = "someprefix mountpoint-s3-client/";
1592
1593        let config = S3ClientConfig {
1594            user_agent: Some(UserAgent::new(Some(user_agent_prefix))),
1595            ..Default::default()
1596        };
1597
1598        let client = S3CrtClient::new(config).expect("Create test client");
1599
1600        let mut message = client
1601            .inner
1602            .new_request_template("GET", "amzn-s3-demo-bucket")
1603            .expect("new request template expected");
1604
1605        let headers = message.inner.get_headers().expect("Expected a block of HTTP headers");
1606
1607        let user_agent_header = headers
1608            .get("User-Agent")
1609            .expect("User Agent Header expected with given prefix");
1610        let user_agent_header_value = user_agent_header.value();
1611
1612        assert!(
1613            user_agent_header_value
1614                .to_string_lossy()
1615                .starts_with(expected_user_agent)
1616        );
1617    }
1618
1619    fn assert_expected_host(expected_host: &str, endpoint_config: EndpointConfig) {
1620        let config = S3ClientConfig {
1621            endpoint_config,
1622            ..Default::default()
1623        };
1624
1625        let client = S3CrtClient::new(config).expect("create test client");
1626
1627        let mut message = client
1628            .inner
1629            .new_request_template("GET", "")
1630            .expect("new request template expected");
1631
1632        let headers = message.inner.get_headers().expect("expected a block of HTTP headers");
1633
1634        let host_header = headers.get("Host").expect("Host header expected");
1635        let host_header_value = host_header.value();
1636
1637        assert_eq!(host_header_value.to_string_lossy(), expected_host);
1638    }
1639
1640    // run with rusty_fork to avoid issues with other tests and their env variables.
1641    rusty_fork_test! {
1642        #[test]
1643        fn test_endpoint_favors_parameter_over_env_variable() {
1644            let endpoint_uri = Uri::new_from_str(&Allocator::default(), "https://s3.us-west-2.amazonaws.com").unwrap();
1645            let endpoint_config = EndpointConfig::new("region-place-holder").endpoint(endpoint_uri);
1646
1647            // SAFETY: This test is run in a forked process, so won't affect any other concurrently running tests.
1648            unsafe { std::env::set_var("AWS_ENDPOINT_URL", "https://s3.us-east-1.amazonaws.com"); }
1649
1650            // even though we set the environment variable, the parameter takes precedence
1651            assert_expected_host("s3.us-west-2.amazonaws.com", endpoint_config);
1652        }
1653
1654        #[test]
1655        fn test_endpoint_favors_env_variable() {
1656            let endpoint_config = EndpointConfig::new("us-east-1");
1657
1658            // SAFETY: This test is run in a forked process, so won't affect any other concurrently running tests.
1659            unsafe { std::env::set_var("AWS_ENDPOINT_URL", "https://s3.eu-west-1.amazonaws.com"); }
1660
1661            assert_expected_host("s3.eu-west-1.amazonaws.com", endpoint_config);
1662        }
1663
1664        #[test]
1665        fn test_endpoint_with_invalid_env_variable() {
1666            let endpoint_config = EndpointConfig::new("us-east-1");
1667
1668            // SAFETY: This test is run in a forked process, so won't affect any other concurrently running tests.
1669            unsafe { std::env::set_var("AWS_ENDPOINT_URL", "htp:/bad:url"); }
1670
1671            let config = S3ClientConfig {
1672                endpoint_config,
1673                ..Default::default()
1674            };
1675
1676            let client = S3CrtClient::new(config);
1677            match client {
1678                Ok(_) => panic!("expected an error"),
1679                Err(e) => assert_eq!(e.to_string().to_lowercase(), "invalid s3 endpoint"),
1680            }
1681        }
1682
1683    }
1684
1685    /// Simple test to ensure the user agent header is correct even when prefix is not added
1686    #[test]
1687    fn test_user_agent_without_prefix() {
1688        let expected_user_agent = "mountpoint-s3-client/";
1689
1690        let config: S3ClientConfig = Default::default();
1691
1692        let client = S3CrtClient::new(config).expect("Create test client");
1693
1694        let mut message = client
1695            .inner
1696            .new_request_template("GET", "amzn-s3-demo-bucket")
1697            .expect("new request template expected");
1698
1699        let headers = message.inner.get_headers().expect("Expected a block of HTTP headers");
1700
1701        let user_agent_header = headers
1702            .get("User-Agent")
1703            .expect("User Agent Header expected with given prefix");
1704        let user_agent_header_value = user_agent_header.value();
1705
1706        assert!(
1707            user_agent_header_value
1708                .to_string_lossy()
1709                .starts_with(expected_user_agent)
1710        );
1711    }
1712
1713    #[test_case("bytes 200-1000/67589" => Some(200..1001))]
1714    #[test_case("bytes 200-1000/*" => Some(200..1001))]
1715    #[test_case("bytes 200-1000" => None)]
1716    #[test_case("bytes */67589" => None)]
1717    #[test_case("octets 200-1000]" => None)]
1718    fn parse_content_range(range: &str) -> Option<Range<u64>> {
1719        let mut headers = Headers::new(&Allocator::default()).unwrap();
1720        let header = Header::new("Content-Range", range);
1721        headers.add_header(&header).unwrap();
1722        extract_range_header(&headers)
1723    }
1724
1725    /// Simple test to ensure the expected bucket owner can be set
1726    #[test]
1727    fn test_expected_bucket_owner() {
1728        let expected_bucket_owner = "111122223333";
1729
1730        let config: S3ClientConfig = S3ClientConfig::new().bucket_owner("111122223333");
1731
1732        let client = S3CrtClient::new(config).expect("Create test client");
1733
1734        let mut message = client
1735            .inner
1736            .new_request_template("GET", "amzn-s3-demo-bucket")
1737            .expect("new request template expected");
1738
1739        let headers = message.inner.get_headers().expect("Expected a block of HTTP headers");
1740
1741        let expected_bucket_owner_header = headers
1742            .get("x-amz-expected-bucket-owner")
1743            .expect("the headers should contain x-amz-expected-bucket-owner");
1744        let expected_bucket_owner_value = expected_bucket_owner_header.value();
1745
1746        assert!(
1747            expected_bucket_owner_value
1748                .to_string_lossy()
1749                .starts_with(expected_bucket_owner)
1750        );
1751    }
1752
1753    fn make_result(
1754        response_status: i32,
1755        body: impl Into<OsString>,
1756        bucket_region_header: Option<&str>,
1757    ) -> MetaRequestResult {
1758        let error_response_headers = bucket_region_header.map(|h| {
1759            let mut headers = Headers::new(&Allocator::default()).unwrap();
1760            headers.add_header(&Header::new("x-amz-bucket-region", h)).unwrap();
1761            headers
1762        });
1763        MetaRequestResult {
1764            response_status,
1765            crt_error: 1i32.into(),
1766            error_response_headers,
1767            error_response_body: Some(body.into()),
1768        }
1769    }
1770
1771    #[test]
1772    fn parse_301_redirect() {
1773        let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>PermanentRedirect</Code><Message>The bucket you are attempting to access must be addressed using the specified endpoint. Please send all future requests to this endpoint.</Message><Endpoint>amzn-s3-demo-bucket.s3-us-west-2.amazonaws.com</Endpoint><Bucket>amzn-s3-demo-bucket</Bucket><RequestId>CM0Z9YFABRVSWXDJ</RequestId><HostId>HHmbUixasrJ02DlkOSCvJId897Jm0ERHuE2XMkSn2Oax1J/ad2+AU9nFrODN1ay13cWFgIAYBnI=</HostId></Error>"#;
1774        let result = make_result(301, OsStr::from_bytes(&body[..]), Some("us-west-2"));
1775        let result = try_parse_generic_error(&result);
1776        let Some(S3RequestError::IncorrectRegion(region, _)) = result else {
1777            panic!("wrong result, got: {result:?}");
1778        };
1779        assert_eq!(region, "us-west-2");
1780    }
1781
1782    #[test]
1783    fn parse_403_access_denied() {
1784        let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>AccessDenied</Code><Message>Access Denied</Message><RequestId>CM0R497NB0WAQ977</RequestId><HostId>w1TqUKGaIuNAIgzqm/L2azuzgEBINxTngWPbV1iH2IvpLsVCCTKHJTh4HsGp4JnggHqVkA+KN1MGqHDw1+WEuA==</HostId></Error>"#;
1785        let result = make_result(403, OsStr::from_bytes(&body[..]), None);
1786        let result = try_parse_generic_error(&result);
1787        let Some(S3RequestError::Forbidden(message, _)) = result else {
1788            panic!("wrong result, got: {result:?}");
1789        };
1790        assert_eq!(message, "Access Denied");
1791    }
1792
1793    #[test]
1794    fn parse_400_invalid_token() {
1795        let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>InvalidToken</Code><Message>The provided token is malformed or otherwise invalid.</Message><Token-0>THEREALTOKENGOESHERE</Token-0><RequestId>CBFNVADDAZ8661HK</RequestId><HostId>rb5dpgYeIFxi8p5BzVK8s8wG/nQ4a7C5kMBp/KWIT4bvOUihugpssMTy7xS0mispbz6IIaX8W1g=</HostId></Error>"#;
1796        let result = make_result(400, OsStr::from_bytes(&body[..]), None);
1797        let result = try_parse_generic_error(&result);
1798        let Some(S3RequestError::Forbidden(message, _)) = result else {
1799            panic!("wrong result, got: {result:?}");
1800        };
1801        assert_eq!(message, "The provided token is malformed or otherwise invalid.");
1802    }
1803
1804    #[test]
1805    fn parse_400_expired_token() {
1806        let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>ExpiredToken</Code><Message>The provided token has expired.</Message><Token-0>THEREALTOKENGOESHERE</Token-0><RequestId>RFXW0E15XSRPJYSW</RequestId><HostId>djitP7S+g43JSzR4pMOJpOO3RYpQUOUsmD4AqhRe3v24+JB/c+vwOEZgI8A35KDUe1cqQ5yKHwg=</HostId></Error>"#;
1807        let result = make_result(400, OsStr::from_bytes(&body[..]), None);
1808        let result = try_parse_generic_error(&result);
1809        let Some(S3RequestError::Forbidden(message, _)) = result else {
1810            panic!("wrong result, got: {result:?}");
1811        };
1812        assert_eq!(message, "The provided token has expired.");
1813    }
1814
1815    #[test]
1816    fn parse_400_redirect() {
1817        // From an s3-accelerate endpoint with the wrong region set for signing
1818        let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>AuthorizationHeaderMalformed</Code><Message>The authorization header is malformed; the region \'us-east-1\' is wrong; expecting \'us-west-2\'</Message><Region>us-west-2</Region><RequestId>VR3NH4JF5F39GB66</RequestId><HostId>ZDzYFC1w0E5K34+ZCAnvh9ZiGaAhvx5COyZVYTUnKvSP/694xCiXmJ2AEGZd5T1Epy9vB4EOOjk=</HostId></Error>"#;
1819        let result = make_result(400, OsStr::from_bytes(&body[..]), Some("us-west-2"));
1820        let result = try_parse_generic_error(&result);
1821        let Some(S3RequestError::IncorrectRegion(region, _)) = result else {
1822            panic!("wrong result, got: {result:?}");
1823        };
1824        assert_eq!(region, "us-west-2");
1825    }
1826
1827    #[test]
1828    fn parse_403_signature_does_not_match() {
1829        let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>SignatureDoesNotMatch</Code><Message>The request signature we calculated does not match the signature you provided. Check your key and signing method.</Message><AWSAccessKeyId>ASIASMEXAMPLE0000000</AWSAccessKeyId><StringToSign>EXAMPLE</StringToSign><SignatureProvided>EXAMPLE</SignatureProvided><StringToSignBytes>EXAMPLE</StringToSignBytes><CanonicalRequest>EXAMPLE</CanonicalRequest><CanonicalRequestBytes>EXAMPLE</CanonicalRequestBytes><RequestId>A1F516XX5M8AATSQ</RequestId><HostId>qs9dULIp5ABM7U+H8nGfzKtMYTxvqxIVvOYZ8lEFBDyTF4Fe+876Y4bLptG4mb+PTZFyG4yaUjg=</HostId></Error>"#;
1830        let result = make_result(403, OsStr::from_bytes(&body[..]), None);
1831        let result = try_parse_generic_error(&result);
1832        let Some(S3RequestError::Forbidden(message, _)) = result else {
1833            panic!("wrong result, got: {result:?}");
1834        };
1835        assert_eq!(
1836            message,
1837            "The request signature we calculated does not match the signature you provided. Check your key and signing method."
1838        );
1839    }
1840
1841    #[test]
1842    fn parse_403_made_up_error() {
1843        // A made up error to check that we map all 403s even if we don't recognize them
1844        let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>NotARealError</Code><Message>This error is made up.</Message><RequestId>CM0R497NB0WAQ977</RequestId><HostId>w1TqUKGaIuNAIgzqm/L2azuzgEBINxTngWPbV1iH2IvpLsVCCTKHJTh4HsGp4JnggHqVkA+KN1MGqHDw1+WEuA==</HostId></Error>"#;
1845        let result = make_result(403, OsStr::from_bytes(&body[..]), None);
1846        let result = try_parse_generic_error(&result);
1847        let Some(S3RequestError::Forbidden(message, _)) = result else {
1848            panic!("wrong result, got: {result:?}");
1849        };
1850        assert_eq!(message, "This error is made up.");
1851    }
1852
1853    fn make_crt_error_result(response_status: i32, crt_error: Error) -> MetaRequestResult {
1854        MetaRequestResult {
1855            response_status,
1856            crt_error,
1857            error_response_headers: None,
1858            error_response_body: None,
1859        }
1860    }
1861
1862    #[test]
1863    fn parse_no_signing_credential_error() {
1864        let error_code = mountpoint_s3_crt::auth::ErrorCode::AWS_AUTH_SIGNING_NO_CREDENTIALS as i32;
1865        let result = make_crt_error_result(0, error_code.into());
1866        let result = try_parse_generic_error(&result);
1867        let Some(S3RequestError::NoSigningCredentials) = result else {
1868            panic!("wrong result, got: {result:?}");
1869        };
1870    }
1871
1872    #[test]
1873    fn parse_test_other_crt_error() {
1874        // A signing error that isn't "no signing credentials"
1875        let error_code = mountpoint_s3_crt::auth::ErrorCode::AWS_AUTH_SIGNING_UNSUPPORTED_ALGORITHM as i32;
1876        let result = make_crt_error_result(0, error_code.into());
1877        let result = try_parse_generic_error(&result);
1878        let Some(S3RequestError::CrtError(error)) = result else {
1879            panic!("wrong result, got: {result:?}");
1880        };
1881        assert_eq!(error, error_code.into());
1882    }
1883
1884    #[test]
1885    fn test_checksum_sha256() {
1886        let mut headers = Headers::new(&Allocator::default()).unwrap();
1887        let value = "QwzjTQIHJO11oZbfwq1nx3dy0Wk=";
1888        let header = Header::new("x-amz-checksum-sha256", value.to_owned());
1889        headers.add_header(&header).unwrap();
1890
1891        let checksum = parse_checksum(&headers).expect("failed to parse headers");
1892        assert_eq!(checksum.checksum_crc32, None, "other checksums shouldn't be set");
1893        assert_eq!(checksum.checksum_crc32c, None, "other checksums shouldn't be set");
1894        assert_eq!(checksum.checksum_sha1, None, "other checksums shouldn't be set");
1895        assert_eq!(
1896            checksum.checksum_sha256,
1897            Some(value.to_owned()),
1898            "sha256 header should match"
1899        );
1900    }
1901}