mountpoint_s3_client/
s3_crt_client.rs

1use std::cmp;
2use std::ffi::{OsStr, OsString};
3use std::future::Future;
4use std::num::NonZeroUsize;
5use std::ops::Range;
6use std::ops::{Deref, DerefMut};
7use std::os::unix::prelude::OsStrExt;
8use std::pin::Pin;
9use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
10use std::sync::{Arc, Mutex};
11use std::task::{Context, Poll};
12use std::time::{Duration, Instant};
13
14use futures::future::{Fuse, FusedFuture};
15use futures::FutureExt;
16pub use mountpoint_s3_crt::auth::credentials::{CredentialsProvider, CredentialsProviderStaticOptions};
17use mountpoint_s3_crt::auth::credentials::{CredentialsProviderChainDefaultOptions, CredentialsProviderProfileOptions};
18use mountpoint_s3_crt::auth::signing_config::SigningConfig;
19use mountpoint_s3_crt::common::allocator::Allocator;
20pub use mountpoint_s3_crt::common::error::Error as CrtError;
21use mountpoint_s3_crt::common::string::AwsString;
22use mountpoint_s3_crt::common::uri::Uri;
23use mountpoint_s3_crt::http::request_response::{Header, Headers, HeadersError, Message};
24use mountpoint_s3_crt::io::channel_bootstrap::{ClientBootstrap, ClientBootstrapOptions};
25pub use mountpoint_s3_crt::io::event_loop::EventLoopGroup;
26use mountpoint_s3_crt::io::host_resolver::{AddressKinds, HostResolver, HostResolverDefaultOptions};
27use mountpoint_s3_crt::io::retry_strategy::{ExponentialBackoffJitterMode, RetryStrategy, StandardRetryOptions};
28use mountpoint_s3_crt::io::stream::InputStream;
29use mountpoint_s3_crt::s3::client::{
30    init_signing_config, BufferPoolUsageStats, ChecksumConfig, Client, ClientConfig, MetaRequest, MetaRequestOptions,
31    MetaRequestResult, MetaRequestType, RequestMetrics, RequestType,
32};
33
34use async_trait::async_trait;
35use futures::channel::oneshot;
36use percent_encoding::{percent_encode, AsciiSet, NON_ALPHANUMERIC};
37use pin_project::pin_project;
38use thiserror::Error;
39use tracing::{debug, error, trace, Span};
40
41use crate::checksums::{crc32_to_base64, crc32c_to_base64, crc64nvme_to_base64, sha1_to_base64, sha256_to_base64};
42use crate::endpoint_config::EndpointError;
43use crate::endpoint_config::{self, EndpointConfig};
44use crate::error_metadata::{ClientErrorMetadata, ProvideErrorMetadata};
45use crate::object_client::*;
46use crate::user_agent::UserAgent;
47
48macro_rules! request_span {
49    ($self:expr, $method:expr, $($field:tt)*) => {{
50        let counter = $self.next_request_counter();
51        // I have confused myself at least 4 times about how to choose the level for tracing spans.
52        // We want this span to be constructed whenever events at WARN or lower severity (INFO,
53        // DEBUG, TRACE) are emitted. So we set its severity to WARN too.
54        let span = tracing::warn_span!(target: "mountpoint_s3_client::s3_crt_client::request", $method, id = counter, $($field)*);
55        span.in_scope(|| tracing::debug!("new request"));
56        span
57    }};
58    ($self:expr, $method:expr) => { request_span!($self, $method,) };
59}
60
61pub(crate) mod copy_object;
62pub(crate) mod delete_object;
63pub(crate) mod get_object;
64
65pub(crate) use get_object::S3GetObjectResponse;
66pub(crate) mod get_object_attributes;
67
68pub(crate) mod head_object;
69pub(crate) mod list_objects;
70
71pub(crate) mod head_bucket;
72pub(crate) mod put_object;
73pub use head_bucket::HeadBucketError;
74pub(crate) use put_object::S3PutObjectRequest;
75
76/// `tracing` doesn't allow dynamic levels but we want to dynamically choose the log level for
77/// requests based on their response status. https://github.com/tokio-rs/tracing/issues/372
78macro_rules! event {
79    ($level:expr, $($args:tt)*) => {
80        match $level {
81            ::tracing::Level::ERROR => ::tracing::event!(::tracing::Level::ERROR, $($args)*),
82            ::tracing::Level::WARN => ::tracing::event!(::tracing::Level::WARN, $($args)*),
83            ::tracing::Level::INFO => ::tracing::event!(::tracing::Level::INFO, $($args)*),
84            ::tracing::Level::DEBUG => ::tracing::event!(::tracing::Level::DEBUG, $($args)*),
85            ::tracing::Level::TRACE => ::tracing::event!(::tracing::Level::TRACE, $($args)*),
86        }
87    }
88}
89
90/// Configurations for the CRT-based S3 client
91#[derive(Debug, Clone)]
92pub struct S3ClientConfig {
93    auth_config: S3ClientAuthConfig,
94    throughput_target_gbps: f64,
95    read_part_size: usize,
96    write_part_size: usize,
97    endpoint_config: EndpointConfig,
98    user_agent: Option<UserAgent>,
99    request_payer: Option<String>,
100    bucket_owner: Option<String>,
101    max_attempts: Option<NonZeroUsize>,
102    read_backpressure: bool,
103    initial_read_window: usize,
104    network_interface_names: Vec<String>,
105    telemetry_callback: Option<Arc<dyn OnTelemetry>>,
106    event_loop_threads: Option<u16>,
107}
108
109impl Default for S3ClientConfig {
110    fn default() -> Self {
111        const DEFAULT_PART_SIZE: usize = 8 * 1024 * 1024;
112        Self {
113            auth_config: Default::default(),
114            throughput_target_gbps: 10.0,
115            read_part_size: DEFAULT_PART_SIZE,
116            write_part_size: DEFAULT_PART_SIZE,
117            endpoint_config: EndpointConfig::new("us-east-1"),
118            user_agent: None,
119            request_payer: None,
120            bucket_owner: None,
121            max_attempts: None,
122            read_backpressure: false,
123            initial_read_window: DEFAULT_PART_SIZE,
124            network_interface_names: vec![],
125            telemetry_callback: None,
126            event_loop_threads: None,
127        }
128    }
129}
130
131impl S3ClientConfig {
132    pub fn new() -> Self {
133        Self::default()
134    }
135
136    /// Set the configuration for authenticating to S3
137    #[must_use = "S3ClientConfig follows a builder pattern"]
138    pub fn auth_config(mut self, auth_config: S3ClientAuthConfig) -> Self {
139        self.auth_config = auth_config;
140        self
141    }
142
143    /// Set the part size for multi-part operations to S3 (both PUT and GET)
144    #[must_use = "S3ClientConfig follows a builder pattern"]
145    pub fn part_size(mut self, part_size: usize) -> Self {
146        self.read_part_size = part_size;
147        self.write_part_size = part_size;
148        self
149    }
150
151    /// Set the part size for multi-part-get operations to S3.
152    #[must_use = "S3ClientConfig follows a builder pattern"]
153    pub fn read_part_size(mut self, size: usize) -> Self {
154        self.read_part_size = size;
155        self
156    }
157
158    /// Set the part size for multi-part-put operations to S3.
159    #[must_use = "S3ClientConfig follows a builder pattern"]
160    pub fn write_part_size(mut self, size: usize) -> Self {
161        self.write_part_size = size;
162        self
163    }
164
165    /// Set the target throughput in Gbps for the S3 client
166    #[must_use = "S3ClientConfig follows a builder pattern"]
167    pub fn throughput_target_gbps(mut self, throughput_target_gbps: f64) -> Self {
168        self.throughput_target_gbps = throughput_target_gbps;
169        self
170    }
171
172    /// Set the endpoint configuration for endpoint resolution
173    #[must_use = "S3ClientConfig follows a builder pattern"]
174    pub fn endpoint_config(mut self, endpoint_config: EndpointConfig) -> Self {
175        self.endpoint_config = endpoint_config;
176        self
177    }
178
179    /// Set a constructor for the HTTP User-agent header for S3 requests
180    #[must_use = "S3ClientConfig follows a builder pattern"]
181    pub fn user_agent(mut self, user_agent: UserAgent) -> Self {
182        self.user_agent = Some(user_agent);
183        self
184    }
185
186    /// Set a value for the request payer HTTP header for S3 requests
187    #[must_use = "S3ClientConfig follows a builder pattern"]
188    pub fn request_payer(mut self, request_payer: &str) -> Self {
189        self.request_payer = Some(request_payer.to_owned());
190        self
191    }
192
193    /// Set an expected bucket owner value
194    #[must_use = "S3ClientConfig follows a builder pattern"]
195    pub fn bucket_owner(mut self, bucket_owner: &str) -> Self {
196        self.bucket_owner = Some(bucket_owner.to_owned());
197        self
198    }
199
200    /// Set a maximum number of attempts for S3 requests. Will be overridden by the
201    /// `AWS_MAX_ATTEMPTS` environment variable if set.
202    #[must_use = "S3ClientConfig follows a builder pattern"]
203    pub fn max_attempts(mut self, max_attempts: NonZeroUsize) -> Self {
204        self.max_attempts = Some(max_attempts);
205        self
206    }
207
208    /// Set the flag for backpressure read
209    #[must_use = "S3ClientConfig follows a builder pattern"]
210    pub fn read_backpressure(mut self, read_backpressure: bool) -> Self {
211        self.read_backpressure = read_backpressure;
212        self
213    }
214
215    /// Set a value for initial backpressure read window size
216    #[must_use = "S3ClientConfig follows a builder pattern"]
217    pub fn initial_read_window(mut self, initial_read_window: usize) -> Self {
218        self.initial_read_window = initial_read_window;
219        self
220    }
221
222    /// Set a list of network interfaces to distribute S3 requests over
223    #[must_use = "S3ClientConfig follows a builder pattern"]
224    pub fn network_interface_names(mut self, network_interface_names: Vec<String>) -> Self {
225        self.network_interface_names = network_interface_names;
226        self
227    }
228
229    /// Set a custom telemetry callback handler
230    #[must_use = "S3ClientConfig follows a builder pattern"]
231    pub fn telemetry_callback(mut self, telemetry_callback: Arc<dyn OnTelemetry>) -> Self {
232        self.telemetry_callback = Some(telemetry_callback);
233        self
234    }
235
236    /// Override the number of threads used by the CRTs AwsEventLoop
237    #[must_use = "S3ClientConfig follows a builder pattern"]
238    pub fn event_loop_threads(mut self, event_loop_threads: u16) -> Self {
239        self.event_loop_threads = Some(event_loop_threads);
240        self
241    }
242}
243
244/// Authentication configuration for the CRT-based S3 client
245#[derive(Debug, Clone, Default)]
246pub enum S3ClientAuthConfig {
247    /// The default AWS credentials resolution chain, similar to the AWS CLI
248    #[default]
249    Default,
250    /// Do not sign requests at all
251    NoSigning,
252    /// Explicitly load the given profile name from the AWS CLI configuration file
253    Profile(String),
254    /// Use a custom credentials provider
255    Provider(CredentialsProvider),
256}
257
258/// An S3 client that uses the [AWS Common Runtime (CRT)][crt] to make requests.
259///
260/// The AWS CRT is a C library that provides a common set of functionality for AWS SDKs. Its S3
261/// client provides high throughput by implementing S3 performance best practices, including
262/// automatic parallelization of GET and PUT requests.
263///
264/// To use this client, invoke the methods from the [`ObjectClient`] trait.
265///
266/// [crt]: https://docs.aws.amazon.com/sdkref/latest/guide/common-runtime.html
267#[derive(Debug, Clone)]
268pub struct S3CrtClient {
269    inner: Arc<S3CrtClientInner>,
270}
271
272impl S3CrtClient {
273    /// Construct a new S3 client with the given configuration.
274    pub fn new(config: S3ClientConfig) -> Result<Self, NewClientError> {
275        Ok(Self {
276            inner: Arc::new(S3CrtClientInner::new(config)?),
277        })
278    }
279
280    /// Return a copy of the [EndpointConfig] for this client
281    pub fn endpoint_config(&self) -> EndpointConfig {
282        self.inner.endpoint_config.clone()
283    }
284
285    #[doc(hidden)]
286    pub fn event_loop_group(&self) -> EventLoopGroup {
287        self.inner.event_loop_group.clone()
288    }
289}
290
291#[derive(Debug)]
292struct S3CrtClientInner {
293    s3_client: Client,
294    event_loop_group: EventLoopGroup,
295    endpoint_config: EndpointConfig,
296    allocator: Allocator,
297    next_request_counter: AtomicU64,
298    /// user_agent_header will be passed into CRT which add additional information "CRTS3NativeClient/0.1.x".
299    /// Here it will add the user agent prefix and s3 client information.
300    user_agent_header: String,
301    request_payer: Option<String>,
302    read_part_size: usize,
303    write_part_size: usize,
304    enable_backpressure: bool,
305    initial_read_window_size: usize,
306    bucket_owner: Option<String>,
307    credentials_provider: Option<CredentialsProvider>,
308    host_resolver: HostResolver,
309    telemetry_callback: Option<Arc<dyn OnTelemetry>>,
310}
311
312impl S3CrtClientInner {
313    fn new(config: S3ClientConfig) -> Result<Self, NewClientError> {
314        let allocator = Allocator::default();
315
316        let mut event_loop_group = EventLoopGroup::new_default(&allocator, config.event_loop_threads, || {}).unwrap();
317
318        let resolver_options = HostResolverDefaultOptions {
319            max_entries: 8,
320            event_loop_group: &mut event_loop_group,
321        };
322
323        let mut host_resolver = HostResolver::new_default(&allocator, &resolver_options).unwrap();
324
325        let bootstrap_options = ClientBootstrapOptions {
326            event_loop_group: &mut event_loop_group,
327            host_resolver: &mut host_resolver,
328        };
329
330        let mut client_bootstrap = ClientBootstrap::new(&allocator, &bootstrap_options).unwrap();
331
332        let mut client_config = ClientConfig::new();
333
334        let retry_strategy = {
335            let mut retry_strategy_options = StandardRetryOptions::default(&mut event_loop_group);
336            let max_attempts = std::env::var("AWS_MAX_ATTEMPTS")
337                .ok()
338                .and_then(|s| s.parse::<usize>().ok())
339                .or_else(|| config.max_attempts.map(|m| m.get()))
340                .unwrap_or(3);
341            // Max *attempts* includes the initial attempt, the CRT's max *retries* does not, so
342            // decrement by one
343            retry_strategy_options.backoff_retry_options.max_retries = max_attempts.saturating_sub(1);
344            retry_strategy_options.backoff_retry_options.backoff_scale_factor = Duration::from_millis(500);
345            retry_strategy_options.backoff_retry_options.jitter_mode = ExponentialBackoffJitterMode::Full;
346            RetryStrategy::standard(&allocator, &retry_strategy_options).unwrap()
347        };
348
349        trace!("constructing client with auth config {:?}", config.auth_config);
350        let credentials_provider = match config.auth_config {
351            S3ClientAuthConfig::Default => {
352                let credentials_chain_default_options = CredentialsProviderChainDefaultOptions {
353                    bootstrap: &mut client_bootstrap,
354                };
355                CredentialsProvider::new_chain_default(&allocator, credentials_chain_default_options)
356                    .map_err(NewClientError::ProviderFailure)?
357            }
358            S3ClientAuthConfig::NoSigning => {
359                CredentialsProvider::new_anonymous(&allocator).map_err(NewClientError::ProviderFailure)?
360            }
361            S3ClientAuthConfig::Profile(profile_name) => {
362                let credentials_profile_options = CredentialsProviderProfileOptions {
363                    bootstrap: &mut client_bootstrap,
364                    profile_name_override: &profile_name,
365                };
366                CredentialsProvider::new_profile(&allocator, credentials_profile_options)
367                    .map_err(NewClientError::ProviderFailure)?
368            }
369            S3ClientAuthConfig::Provider(provider) => provider,
370        };
371
372        let endpoint_config = config.endpoint_config;
373        client_config.region(endpoint_config.get_region());
374        let signing_config = init_signing_config(
375            endpoint_config.get_region(),
376            credentials_provider.clone(),
377            None,
378            None,
379            None,
380        );
381
382        let endpoint_config = match endpoint_config.get_endpoint() {
383            None => {
384                // No explicit endpoint was configured, let's try the environment variable
385                if let Ok(aws_endpoint_url) = std::env::var("AWS_ENDPOINT_URL") {
386                    debug!("using AWS_ENDPOINT_URL {}", aws_endpoint_url);
387                    let env_uri = Uri::new_from_str(&allocator, &aws_endpoint_url)
388                        .map_err(|e| EndpointError::InvalidUri(endpoint_config::InvalidUriError::CouldNotParse(e)))?;
389                    endpoint_config.endpoint(env_uri)
390                } else {
391                    endpoint_config
392                }
393            }
394            Some(_) => endpoint_config,
395        };
396
397        client_config.express_support(true);
398        client_config.read_backpressure(config.read_backpressure);
399        client_config.initial_read_window(config.initial_read_window);
400        client_config.signing_config(signing_config);
401
402        client_config
403            .client_bootstrap(client_bootstrap)
404            .retry_strategy(retry_strategy);
405
406        client_config.throughput_target_gbps(config.throughput_target_gbps);
407
408        // max_part_size is 5GB or less depending on the platform (4GB on 32-bit)
409        let max_part_size = cmp::min(5_u64 * 1024 * 1024 * 1024, usize::MAX as u64) as usize;
410        // TODO: Review the part size validation for read_part_size, read_part_size can have a more relax limit.
411        for part_size in [config.read_part_size, config.write_part_size] {
412            if !(5 * 1024 * 1024..=max_part_size).contains(&part_size) {
413                return Err(NewClientError::InvalidConfiguration(format!(
414                    "part size must be at between 5MiB and {}GiB",
415                    max_part_size / 1024 / 1024 / 1024
416                )));
417            }
418        }
419
420        if !config.network_interface_names.is_empty() {
421            client_config.network_interface_names(config.network_interface_names);
422        }
423
424        let user_agent = config.user_agent.unwrap_or_else(|| UserAgent::new(None));
425        let user_agent_header = user_agent.build();
426
427        let s3_client = Client::new(&allocator, client_config).map_err(NewClientError::CrtError)?;
428
429        Ok(Self {
430            allocator,
431            s3_client,
432            event_loop_group,
433            endpoint_config,
434            next_request_counter: AtomicU64::new(0),
435            user_agent_header,
436            request_payer: config.request_payer,
437            read_part_size: config.read_part_size,
438            write_part_size: config.write_part_size,
439            enable_backpressure: config.read_backpressure,
440            initial_read_window_size: config.initial_read_window,
441            bucket_owner: config.bucket_owner,
442            credentials_provider: Some(credentials_provider),
443            host_resolver,
444            telemetry_callback: config.telemetry_callback,
445        })
446    }
447
448    /// Create a new HTTP request template for the given HTTP method and S3 bucket name.
449    /// Pre-populates common headers used across all requests. Sets the "accept" header assuming the
450    /// response should be XML; this header should be overwritten for requests like GET that return
451    /// object data.
452    fn new_request_template(&self, method: &str, bucket: &str) -> Result<S3Message, ConstructionError> {
453        let endpoint = self.endpoint_config.resolve_for_bucket(bucket)?;
454        let uri = endpoint.uri()?;
455        trace!(?uri, "resolved endpoint");
456
457        let signing_config = if let Some(credentials_provider) = &self.credentials_provider {
458            let auth_scheme = match endpoint.auth_scheme() {
459                Ok(auth_scheme) => auth_scheme,
460                Err(e) => {
461                    error!(error=?e, "no auth scheme for endpoint");
462                    return Err(e.into());
463                }
464            };
465            trace!(?auth_scheme, "resolved auth scheme");
466            let algorithm = Some(auth_scheme.scheme_name());
467            let service = Some(auth_scheme.signing_name());
468            let use_double_uri_encode = Some(!auth_scheme.disable_double_encoding());
469            Some(init_signing_config(
470                auth_scheme.signing_region(),
471                credentials_provider.clone(),
472                algorithm,
473                service,
474                use_double_uri_encode,
475            ))
476        } else {
477            None
478        };
479
480        let hostname = uri.host_name().to_str().unwrap();
481        let path_prefix = uri.path().to_os_string().into_string().unwrap();
482        let port = uri.host_port();
483        let hostname_header = if port > 0 {
484            format!("{}:{}", hostname, port)
485        } else {
486            hostname.to_string()
487        };
488
489        let mut message = Message::new_request(&self.allocator)?;
490        message.set_request_method(method)?;
491        message.add_header(&Header::new("Host", hostname_header))?;
492        message.add_header(&Header::new("accept", "application/xml"))?;
493        message.add_header(&Header::new("User-Agent", &self.user_agent_header))?;
494
495        if let Some(ref payer) = self.request_payer {
496            message.add_header(&Header::new("x-amz-request-payer", payer))?;
497        }
498
499        if let Some(ref owner) = self.bucket_owner {
500            message.add_header(&Header::new("x-amz-expected-bucket-owner", owner))?;
501        }
502
503        Ok(S3Message {
504            inner: message,
505            uri,
506            path_prefix,
507            checksum_config: None,
508            signing_config,
509        })
510    }
511
512    /// Make a meta-request using this S3 client that invokes the given callbacks as the request
513    /// makes progress.
514    ///
515    /// The `parse_meta_request_error` callback is invoked on failed requests. It should return `None`
516    /// if it doesn't have a request-specific failure reason. The client will apply some generic error
517    /// parsing in this case (e.g. for permissions errors).
518    #[allow(clippy::too_many_arguments)]
519    fn meta_request_with_callbacks<E: std::error::Error + Send + 'static>(
520        &self,
521        mut options: MetaRequestOptions,
522        request_span: Span,
523        on_request_finish: impl Fn(&RequestMetrics) + Send + 'static,
524        mut on_headers: impl FnMut(&Headers, i32) + Send + 'static,
525        mut on_body: impl FnMut(u64, &[u8]) + Send + 'static,
526        parse_meta_request_error: impl FnOnce(&MetaRequestResult) -> Option<E> + Send + 'static,
527        on_meta_request_result: impl FnOnce(ObjectClientResult<(), E, S3RequestError>) + Send + 'static,
528    ) -> Result<CancellingMetaRequest, S3RequestError> {
529        let span_telemetry = request_span.clone();
530        let span_body = request_span.clone();
531        let span_finish = request_span;
532
533        let endpoint = options.get_endpoint().expect("S3Message always has an endpoint");
534        let hostname = endpoint.host_name().to_str().unwrap().to_owned();
535        let host_resolver = self.host_resolver.clone();
536        let telemetry_callback = self.telemetry_callback.clone();
537
538        let start_time = Instant::now();
539        let first_body_part = Arc::new(AtomicBool::new(true));
540        let first_body_part_clone = Arc::clone(&first_body_part);
541        let total_bytes = Arc::new(AtomicU64::new(0));
542        let total_bytes_clone = Arc::clone(&total_bytes);
543
544        options
545            .on_telemetry(move |metrics| {
546                let _guard = span_telemetry.enter();
547
548                on_request_finish(metrics);
549
550                let http_status = metrics.status_code();
551                let request_canceled = metrics.is_canceled();
552                let request_failure = http_status.map(|status| !(200..299).contains(&status)).unwrap_or(!request_canceled);
553                let crt_error = Some(metrics.error()).filter(|e| e.is_err());
554                let request_type = request_type_to_metrics_string(metrics.request_type());
555                let request_id = metrics.request_id().unwrap_or_else(|| "<unknown>".into());
556                let duration = metrics.total_duration();
557                let ttfb = metrics.time_to_first_byte();
558                let range = metrics.response_headers().and_then(|headers| extract_range_header(&headers));
559
560                let message = if request_failure {
561                    "S3 request failed"
562                } else if request_canceled {
563                    "S3 request canceled"
564                } else {
565                    "S3 request finished"
566                };
567                debug!(%request_type, ?crt_error, http_status, ?range, ?duration, ?ttfb, %request_id, "{}", message);
568                trace!(detailed_metrics=?metrics, "S3 request completed");
569
570                let op = span_telemetry.metadata().map(|m| m.name()).unwrap_or("unknown");
571                if let Some(ttfb) = ttfb {
572                    metrics::histogram!("s3.requests.first_byte_latency_us", "op" => op, "type" => request_type).record(ttfb.as_micros() as f64);
573                }
574                metrics::histogram!("s3.requests.total_latency_us", "op" => op, "type" => request_type).record(duration.as_micros() as f64);
575                metrics::counter!("s3.requests", "op" => op, "type" => request_type).increment(1);
576                if request_failure {
577                    metrics::counter!("s3.requests.failures", "op" => op, "type" => request_type, "status" => http_status.unwrap_or(-1).to_string()).increment(1);
578                } else if request_canceled {
579                    metrics::counter!("s3.requests.canceled", "op" => op, "type" => request_type).increment(1);
580                }
581
582                if let Some(telemetry_callback) = &telemetry_callback {
583                    telemetry_callback.on_telemetry(metrics);
584                }
585            })
586            .on_headers(move |headers, response_status| {
587                (on_headers)(headers, response_status);
588            })
589            .on_body(move |range_start, data| {
590                let _guard = span_body.enter();
591
592                if first_body_part.compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst).ok() == Some(true) {
593                    let latency = start_time.elapsed().as_micros() as f64;
594                    let op = span_body.metadata().map(|m| m.name()).unwrap_or("unknown");
595                    metrics::histogram!("s3.meta_requests.first_byte_latency_us", "op" => op).record(latency);
596                }
597                total_bytes.fetch_add(data.len() as u64, Ordering::SeqCst);
598
599                trace!(start = range_start, length = data.len(), "body part received");
600
601                (on_body)(range_start, data);
602            })
603            .on_finish(move |request_result| {
604                let _guard = span_finish.enter();
605
606                let op = span_finish.metadata().map(|m| m.name()).unwrap_or("unknown");
607                let duration = start_time.elapsed();
608
609                metrics::counter!("s3.meta_requests", "op" => op).increment(1);
610                metrics::histogram!("s3.meta_requests.total_latency_us", "op" => op).record(duration.as_micros() as f64);
611                // Some HTTP requests (like HEAD) don't have a body to stream back, so calculate TTFB now
612                if first_body_part_clone.compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst).ok() == Some(true)  {
613                    let latency = duration.as_micros() as f64;
614                    metrics::histogram!("s3.meta_requests.first_byte_latency_us", "op" => op).record(latency);
615                }
616                let total_bytes = total_bytes_clone.load(Ordering::SeqCst);
617                // We only log throughput of object data. PUT needs to be measured in its stream
618                // implementation rather than these callbacks, so we can only do GET here.
619                if op == "get_object" {
620                    emit_throughput_metric(total_bytes, duration, op);
621                }
622                let hostname_awsstring = AwsString::from_str(&hostname, &Allocator::default());
623                if let Ok(host_count) = host_resolver.get_host_address_count(&hostname_awsstring, AddressKinds::a()) {
624                    metrics::gauge!("s3.client.host_count", "host" => hostname).set(host_count as f64);
625                }
626
627                let status_code = request_result.response_status;
628                let log_level = if (200..=399).contains(&status_code) || status_code == 404 || request_result.is_canceled() {
629                    tracing::Level::DEBUG
630                } else {
631                    tracing::Level::WARN
632                };
633
634                let result =
635                    if !request_result.is_err() {
636                        event!(log_level, ?duration, "meta request finished");
637                        Ok(())
638                    } else {
639                        // The `parse_meta_request_error` callback has a choice of whether to give us an error or not.
640                        // If not, fall back to generic error parsing (e.g. for permissions errors), or just no error if that fails too.
641                        let error = parse_meta_request_error(&request_result).map(ObjectClientError::ServiceError);
642                        let maybe_err = error.or_else(|| try_parse_generic_error(&request_result).map(ObjectClientError::ClientError));
643
644                        // Try to parse request header out of the failure. We can't just use the
645                        // telemetry callback because there might be multiple requests per meta
646                        // request, but these headers are known to be from the failed request.
647                        let request_id = match &request_result.error_response_headers {
648                            Some(headers) => headers.get("x-amz-request-id").map(|s| s.value().to_string_lossy().to_string()).ok(),
649                            None => None,
650                        };
651                        let request_id = request_id.unwrap_or_else(|| "<unknown>".into());
652
653                        let message = if request_result.is_canceled() {
654                            "meta request canceled"
655                        } else {
656                            "meta request failed"
657                        };
658                        if let Some(error) = &maybe_err {
659                            event!(log_level, ?duration, %request_id, ?error, message);
660                            debug!("meta request result: {:?}", request_result);
661                        } else {
662                            event!(log_level, ?duration, %request_id, ?request_result, message);
663                        }
664
665                        if request_result.is_canceled() {
666                            metrics::counter!("s3.meta_requests.canceled", "op" => op).increment(1);
667                        } else {
668                            // If it's not a real HTTP status, encode the CRT error in the metric instead
669                            let error_status = if request_result.response_status >= 100 {
670                                request_result.response_status
671                            } else {
672                                -request_result.crt_error.raw_error()
673                            };
674                            metrics::counter!("s3.meta_requests.failures", "op" => op, "status" => format!("{error_status}")).increment(1);
675                        }
676
677                        // Fill in a generic error if we weren't able to parse one
678                        Err(maybe_err.unwrap_or_else(|| ObjectClientError::ClientError(S3RequestError::ResponseError(request_result))))
679                    };
680
681                on_meta_request_result(result);
682            });
683
684        // Issue the HTTP request using the CRT's S3 meta request API
685        let meta_request = self.s3_client.make_meta_request(options)?;
686        Self::poll_client_metrics(&self.s3_client);
687        Ok(CancellingMetaRequest::wrap(meta_request))
688    }
689
690    /// Make a meta-request using this S3 client that returns the response body on success or
691    /// invokes the given callback on failure.
692    ///
693    /// The `parse_meta_request_error` callback is invoked on failed requests. It should return `None`
694    /// if it doesn't have a request-specific failure reason. The client will apply some generic error
695    /// parsing in this case (e.g. for permissions errors).
696    fn meta_request_with_body_payload<E: std::error::Error + Send + 'static>(
697        &self,
698        options: MetaRequestOptions,
699        request_span: Span,
700        parse_meta_request_error: impl FnOnce(&MetaRequestResult) -> Option<E> + Send + 'static,
701    ) -> Result<S3MetaRequest<Vec<u8>, E>, S3RequestError> {
702        let (tx, rx) = oneshot::channel::<ObjectClientResult<Vec<u8>, E, S3RequestError>>();
703
704        // Accumulate the body of the response into this Vec<u8>
705        let body: Arc<Mutex<Vec<u8>>> = Default::default();
706        let body_clone = Arc::clone(&body);
707
708        let meta_request = self.meta_request_with_callbacks(
709            options,
710            request_span,
711            |_| {},
712            |_, _| {},
713            move |offset, data| {
714                let mut body = body_clone.lock().unwrap();
715                assert_eq!(offset as usize, body.len());
716                body.extend_from_slice(data);
717            },
718            parse_meta_request_error,
719            move |result| _ = tx.send(result.map(|_| std::mem::take(&mut *body.lock().unwrap()))),
720        )?;
721        Ok(S3MetaRequest {
722            receiver: rx.fuse(),
723            meta_request,
724        })
725    }
726
727    /// Make a meta-request using this S3 client that returns the response headers on success or
728    /// invokes the given callback on failure.
729    ///
730    /// The `parse_meta_request_error` callback is invoked on failed requests. It should return `None`
731    /// if it doesn't have a request-specific failure reason. The client will apply some generic error
732    /// parsing in this case (e.g. for permissions errors).
733    fn meta_request_with_headers_payload<E: std::error::Error + Send + 'static>(
734        &self,
735        options: MetaRequestOptions,
736        request_span: Span,
737        parse_meta_request_error: impl FnOnce(&MetaRequestResult) -> Option<E> + Send + 'static,
738    ) -> Result<S3MetaRequest<Headers, E>, S3RequestError> {
739        let (tx, rx) = oneshot::channel::<ObjectClientResult<Headers, E, S3RequestError>>();
740
741        // On success, stash the headers in this lock during the on_headers callback,
742        // and pull them out during the on_meta_request_result callback.
743        let on_headers: Arc<Mutex<Option<Headers>>> = Default::default();
744        let on_result = on_headers.clone();
745
746        let meta_request = self.meta_request_with_callbacks(
747            options,
748            request_span,
749            |_| {},
750            move |headers, status| {
751                // Only store headers if we have a 2xx status code. If we only get other status codes,
752                // then on_meta_request_result will send an error.
753                if (200..300).contains(&status) {
754                    *on_headers.lock().unwrap() = Some(headers.clone());
755                }
756            },
757            |_, _| {},
758            parse_meta_request_error,
759            move |result| {
760                // Return the headers on success, otherwise propagate the error.
761                let headers =
762                    result.and_then(|_| {
763                        on_result.lock().unwrap().take().ok_or_else(|| {
764                            S3RequestError::internal_failure(ResponseHeadersError::MissingHeaders).into()
765                        })
766                    });
767                _ = tx.send(headers);
768            },
769        )?;
770        Ok(S3MetaRequest {
771            receiver: rx.fuse(),
772            meta_request,
773        })
774    }
775
776    /// Make a meta-request using this S3 client that invokes the given callback on failure.
777    ///
778    /// The `parse_meta_request_error` callback is invoked on failed requests. It should return `None`
779    /// if it doesn't have a request-specific failure reason. The client will apply some generic error
780    /// parsing in this case (e.g. for permissions errors).
781    fn meta_request_without_payload<E: std::error::Error + Send + 'static>(
782        &self,
783        options: MetaRequestOptions,
784        request_span: Span,
785        parse_meta_request_error: impl FnOnce(&MetaRequestResult) -> Option<E> + Send + 'static,
786    ) -> Result<S3MetaRequest<(), E>, S3RequestError> {
787        let (tx, rx) = oneshot::channel::<ObjectClientResult<(), E, S3RequestError>>();
788
789        let meta_request = self.meta_request_with_callbacks(
790            options,
791            request_span,
792            |_| {},
793            |_, _| {},
794            |_, _| {},
795            parse_meta_request_error,
796            move |result| _ = tx.send(result),
797        )?;
798        Ok(S3MetaRequest {
799            receiver: rx.fuse(),
800            meta_request,
801        })
802    }
803
804    fn poll_client_metrics(s3_client: &Client) {
805        let metrics = s3_client.poll_client_metrics();
806        metrics::gauge!("s3.client.num_requests_being_processed").set(metrics.num_requests_tracked_requests as f64);
807        metrics::gauge!("s3.client.num_requests_being_prepared").set(metrics.num_requests_being_prepared as f64);
808        metrics::gauge!("s3.client.request_queue_size").set(metrics.request_queue_size as f64);
809        metrics::gauge!("s3.client.num_auto_default_network_io").set(metrics.num_auto_default_network_io as f64);
810        metrics::gauge!("s3.client.num_auto_ranged_get_network_io").set(metrics.num_auto_ranged_get_network_io as f64);
811        metrics::gauge!("s3.client.num_auto_ranged_put_network_io").set(metrics.num_auto_ranged_put_network_io as f64);
812        metrics::gauge!("s3.client.num_auto_ranged_copy_network_io")
813            .set(metrics.num_auto_ranged_copy_network_io as f64);
814        metrics::gauge!("s3.client.num_total_network_io").set(metrics.num_total_network_io() as f64);
815        metrics::gauge!("s3.client.num_requests_stream_queued_waiting")
816            .set(metrics.num_requests_stream_queued_waiting as f64);
817        metrics::gauge!("s3.client.num_requests_streaming_response")
818            .set(metrics.num_requests_streaming_response as f64);
819
820        // Buffer pool metrics
821        let start = Instant::now();
822        let buffer_pool_stats = s3_client.poll_buffer_pool_usage_stats();
823        metrics::histogram!("s3.client.buffer_pool.get_usage_latency_us").record(start.elapsed().as_micros() as f64);
824        metrics::gauge!("s3.client.buffer_pool.mem_limit").set(buffer_pool_stats.mem_limit as f64);
825        metrics::gauge!("s3.client.buffer_pool.primary_cutoff").set(buffer_pool_stats.primary_cutoff as f64);
826        metrics::gauge!("s3.client.buffer_pool.primary_used").set(buffer_pool_stats.primary_used as f64);
827        metrics::gauge!("s3.client.buffer_pool.primary_allocated").set(buffer_pool_stats.primary_allocated as f64);
828        metrics::gauge!("s3.client.buffer_pool.primary_reserved").set(buffer_pool_stats.primary_reserved as f64);
829        metrics::gauge!("s3.client.buffer_pool.primary_num_blocks").set(buffer_pool_stats.primary_num_blocks as f64);
830        metrics::gauge!("s3.client.buffer_pool.secondary_reserved").set(buffer_pool_stats.secondary_reserved as f64);
831        metrics::gauge!("s3.client.buffer_pool.secondary_used").set(buffer_pool_stats.secondary_used as f64);
832        metrics::gauge!("s3.client.buffer_pool.forced_used").set(buffer_pool_stats.forced_used as f64);
833    }
834
835    fn next_request_counter(&self) -> u64 {
836        self.next_request_counter.fetch_add(1, Ordering::SeqCst)
837    }
838}
839
840/// Failure retrieving headers
841#[derive(Debug, Error)]
842enum ResponseHeadersError {
843    #[error("response headers are missing")]
844    MissingHeaders,
845}
846
847/// S3 operation supported by this client.
848#[derive(Debug, Clone, Copy)]
849enum S3Operation {
850    DeleteObject,
851    GetObject,
852    GetObjectAttributes,
853    HeadBucket,
854    HeadObject,
855    ListObjects,
856    PutObject,
857    CopyObject,
858    PutObjectSingle,
859}
860
861impl S3Operation {
862    /// The [MetaRequestType] to use for this operation.
863    fn meta_request_type(&self) -> MetaRequestType {
864        match self {
865            S3Operation::GetObject => MetaRequestType::GetObject,
866            S3Operation::PutObject => MetaRequestType::PutObject,
867            S3Operation::CopyObject => MetaRequestType::CopyObject,
868            _ => MetaRequestType::Default,
869        }
870    }
871
872    /// The operation name to set when configuring a request. Required for operations that
873    /// have MetaRequestType::Default (see [meta_request_type]). `None` otherwise.
874    fn operation_name(&self) -> Option<&'static str> {
875        match self {
876            S3Operation::DeleteObject => Some("DeleteObject"),
877            S3Operation::GetObject => None,
878            S3Operation::GetObjectAttributes => Some("GetObjectAttributes"),
879            S3Operation::HeadBucket => Some("HeadBucket"),
880            S3Operation::HeadObject => Some("HeadObject"),
881            S3Operation::ListObjects => Some("ListObjectsV2"),
882            S3Operation::PutObject => None,
883            S3Operation::CopyObject => None,
884            S3Operation::PutObjectSingle => Some("PutObject"),
885        }
886    }
887}
888
889/// A HTTP message to be sent to S3. This is a wrapper around a plain HTTP message, except that it
890/// helps us correctly configure the endpoint and "Host" header to handle both path-style and
891/// virtual-hosted-style addresses. The `path_prefix` is appended to the front of all paths, and
892/// need not be terminated with a `/`.
893#[derive(Debug)]
894struct S3Message<'a> {
895    inner: Message<'a>,
896    uri: Uri,
897    path_prefix: String,
898    checksum_config: Option<ChecksumConfig>,
899    signing_config: Option<SigningConfig>,
900}
901
902impl<'a> S3Message<'a> {
903    /// Add a header to this message. The header is added if necessary and any existing values for
904    /// this header are removed.
905    fn set_header(
906        &mut self,
907        header: &Header<impl AsRef<OsStr>, impl AsRef<OsStr>>,
908    ) -> Result<(), mountpoint_s3_crt::common::error::Error> {
909        self.inner.set_header(header)
910    }
911
912    /// Set the request path and query for this message. The components should not be URL-encoded;
913    /// this method will handle that.
914    fn set_request_path_and_query<P: AsRef<OsStr>>(
915        &mut self,
916        path: impl AsRef<OsStr>,
917        query: impl AsRef<[(P, P)]>,
918    ) -> Result<(), mountpoint_s3_crt::common::error::Error> {
919        // This is RFC 3986 but with '/' also considered a safe character for path fragments.
920        const URLENCODE_QUERY_FRAGMENT: &AsciiSet =
921            &NON_ALPHANUMERIC.remove(b'-').remove(b'.').remove(b'_').remove(b'~');
922        const URLENCODE_PATH_FRAGMENT: &AsciiSet = &URLENCODE_QUERY_FRAGMENT.remove(b'/');
923
924        fn write_encoded_fragment(s: &mut OsString, piece: impl AsRef<OsStr>, encoding: &'static AsciiSet) {
925            let iter = percent_encode(piece.as_ref().as_bytes(), encoding);
926            s.extend(iter.map(|s| OsStr::from_bytes(s.as_bytes())));
927        }
928
929        // This estimate is exact if no characters need encoding, otherwise we'll end up
930        // reallocating a couple of times. The '?' for the query is counted in the first key-value
931        // pair.
932        let space_needed = self.path_prefix.len()
933            + path.as_ref().len()
934            + query
935                .as_ref()
936                .iter()
937                .map(|(key, value)| key.as_ref().len() + value.as_ref().len() + 2) // +2 for & and =
938                .sum::<usize>();
939
940        let mut full_path = OsString::with_capacity(space_needed);
941
942        write_encoded_fragment(&mut full_path, &self.path_prefix, URLENCODE_PATH_FRAGMENT);
943        write_encoded_fragment(&mut full_path, &path, URLENCODE_PATH_FRAGMENT);
944
945        // Build the query string
946        if !query.as_ref().is_empty() {
947            full_path.push("?");
948            for (i, (key, value)) in query.as_ref().iter().enumerate() {
949                if i != 0 {
950                    full_path.push("&");
951                }
952                write_encoded_fragment(&mut full_path, key, URLENCODE_QUERY_FRAGMENT);
953                full_path.push("=");
954                write_encoded_fragment(&mut full_path, value, URLENCODE_QUERY_FRAGMENT);
955            }
956        }
957
958        self.inner.set_request_path(full_path)
959    }
960
961    /// Set the request path for this message. The path should not be URL-encoded; this method will
962    /// handle that.
963    fn set_request_path(&mut self, path: impl AsRef<OsStr>) -> Result<(), mountpoint_s3_crt::common::error::Error> {
964        self.set_request_path_and_query::<&str>(path, &[])
965    }
966
967    /// Sets the checksum configuration for this message.
968    fn set_checksum_config(&mut self, checksum_config: Option<ChecksumConfig>) {
969        self.checksum_config = checksum_config;
970    }
971
972    /// Sets the body input stream for this message, and returns any previously set input stream.
973    /// If input_stream is None, unsets the body.
974    fn set_body_stream(&mut self, input_stream: Option<InputStream<'a>>) -> Option<InputStream<'a>> {
975        self.inner.set_body_stream(input_stream)
976    }
977
978    /// Set the content length header.
979    fn set_content_length_header(
980        &mut self,
981        content_length: usize,
982    ) -> Result<(), mountpoint_s3_crt::common::error::Error> {
983        self.inner
984            .set_header(&Header::new("Content-Length", content_length.to_string()))
985    }
986
987    /// Set the checksum header.
988    fn set_checksum_header(
989        &mut self,
990        checksum: &UploadChecksum,
991    ) -> Result<(), mountpoint_s3_crt::common::error::Error> {
992        let header = match checksum {
993            UploadChecksum::Crc64nvme(crc64) => Header::new("x-amz-checksum-crc64nvme", crc64nvme_to_base64(crc64)),
994            UploadChecksum::Crc32c(crc32c) => Header::new("x-amz-checksum-crc32c", crc32c_to_base64(crc32c)),
995            UploadChecksum::Crc32(crc32) => Header::new("x-amz-checksum-crc32", crc32_to_base64(crc32)),
996            UploadChecksum::Sha1(sha1) => Header::new("x-amz-checksum-sha1", sha1_to_base64(sha1)),
997            UploadChecksum::Sha256(sha256) => Header::new("x-amz-checksum-sha256", sha256_to_base64(sha256)),
998        };
999        self.inner.set_header(&header)
1000    }
1001
1002    fn into_options(self, operation: S3Operation) -> MetaRequestOptions<'a> {
1003        let mut options = MetaRequestOptions::new();
1004        if let Some(checksum_config) = self.checksum_config {
1005            options.checksum_config(checksum_config);
1006        }
1007        if let Some(signing_config) = self.signing_config {
1008            options.signing_config(signing_config);
1009        }
1010        options
1011            .message(self.inner)
1012            .endpoint(self.uri)
1013            .request_type(operation.meta_request_type());
1014        if let Some(operation_name) = operation.operation_name() {
1015            options.operation_name(operation_name);
1016        }
1017        options
1018    }
1019}
1020
1021#[derive(Debug)]
1022#[pin_project]
1023#[must_use]
1024struct S3MetaRequest<T, E> {
1025    /// Receiver for the result of meta-request.
1026    #[pin]
1027    receiver: Fuse<oneshot::Receiver<ObjectClientResult<T, E, S3RequestError>>>,
1028    meta_request: CancellingMetaRequest,
1029}
1030
1031impl<T: Send, E: Send> Future for S3MetaRequest<T, E> {
1032    type Output = ObjectClientResult<T, E, S3RequestError>;
1033
1034    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1035        let this = self.project();
1036        this.receiver
1037            .poll(cx)
1038            .map(|result| result.unwrap_or_else(|err| Err(S3RequestError::internal_failure(err).into())))
1039    }
1040}
1041
1042impl<T: Send, E: Send> FusedFuture for S3MetaRequest<T, E> {
1043    fn is_terminated(&self) -> bool {
1044        self.receiver.is_terminated()
1045    }
1046}
1047
1048/// Wrapper for a [MetaRequest] that cancels it on drop.
1049///
1050/// Note that if the request has already completed, cancelling it has no effect.
1051#[derive(Debug)]
1052struct CancellingMetaRequest {
1053    inner: MetaRequest,
1054}
1055
1056impl CancellingMetaRequest {
1057    fn wrap(meta_request: MetaRequest) -> Self {
1058        Self { inner: meta_request }
1059    }
1060}
1061
1062impl Drop for CancellingMetaRequest {
1063    fn drop(&mut self) {
1064        self.inner.cancel();
1065    }
1066}
1067
1068impl Deref for CancellingMetaRequest {
1069    type Target = MetaRequest;
1070
1071    fn deref(&self) -> &Self::Target {
1072        &self.inner
1073    }
1074}
1075
1076impl DerefMut for CancellingMetaRequest {
1077    fn deref_mut(&mut self) -> &mut Self::Target {
1078        &mut self.inner
1079    }
1080}
1081
1082/// Failures to construct a new S3 client
1083#[derive(Error, Debug)]
1084#[non_exhaustive]
1085pub enum NewClientError {
1086    /// Invalid S3 endpoint
1087    #[error("invalid S3 endpoint")]
1088    InvalidEndpoint(#[from] EndpointError),
1089    /// Invalid AWS credentials
1090    #[error("invalid AWS credentials")]
1091    ProviderFailure(#[source] mountpoint_s3_crt::common::error::Error),
1092    /// Invalid configuration
1093    #[error("invalid configuration: {0}")]
1094    InvalidConfiguration(String),
1095    /// An internal error from within the AWS Common Runtime
1096    #[error("Unknown CRT error")]
1097    CrtError(#[source] mountpoint_s3_crt::common::error::Error),
1098}
1099
1100/// Errors returned by the CRT-based S3 client
1101#[derive(Error, Debug)]
1102pub enum S3RequestError {
1103    /// An internal error from within the S3 client. The request may have been sent.
1104    #[error("Internal S3 client error")]
1105    InternalError(#[source] Box<dyn std::error::Error + Send + Sync>),
1106
1107    /// An internal error from within the AWS Common Runtime. The request may have been sent.
1108    #[error("Unknown CRT error")]
1109    CrtError(#[from] mountpoint_s3_crt::common::error::Error),
1110
1111    /// An error during construction of a request. The request was not sent.
1112    #[error("Failed to construct request")]
1113    ConstructionFailure(#[from] ConstructionError),
1114
1115    /// The request was sent but an unknown or unhandled failure occurred while processing it.
1116    #[error("Unknown response error: {0:?}")]
1117    ResponseError(MetaRequestResult),
1118
1119    /// The request was made to the wrong region
1120    #[error("Wrong region (expecting {0})")]
1121    IncorrectRegion(String),
1122
1123    /// Forbidden
1124    #[error("Forbidden: {0}")]
1125    Forbidden(String, ClientErrorMetadata),
1126
1127    /// The request was attempted but could not be signed due to no available credentials
1128    #[error("No signing credentials available, see CRT debug logs")]
1129    NoSigningCredentials,
1130
1131    /// The request was canceled
1132    #[error("Request canceled")]
1133    RequestCanceled,
1134
1135    /// The request was throttled by S3
1136    #[error("Request throttled")]
1137    Throttled,
1138
1139    /// Cannot fetch more data because current read window is exhausted. The read window must
1140    /// be advanced using [GetObjectRequest::increment_read_window(u64)] to continue fetching
1141    /// new data.
1142    #[error("Polled for data with empty read window")]
1143    EmptyReadWindow,
1144}
1145
1146impl S3RequestError {
1147    fn construction_failure(inner: impl Into<ConstructionError>) -> Self {
1148        S3RequestError::ConstructionFailure(inner.into())
1149    }
1150
1151    fn internal_failure(inner: impl std::error::Error + Send + Sync + 'static) -> Self {
1152        S3RequestError::InternalError(Box::new(inner))
1153    }
1154}
1155
1156impl ProvideErrorMetadata for S3RequestError {
1157    fn meta(&self) -> ClientErrorMetadata {
1158        match self {
1159            Self::ResponseError(request_result) => {
1160                let http_code = if request_result.response_status >= 100 {
1161                    Some(request_result.response_status)
1162                } else {
1163                    None
1164                };
1165                ClientErrorMetadata {
1166                    http_code,
1167                    ..Default::default()
1168                }
1169            }
1170            Self::Forbidden(_, metadata) => metadata.clone(),
1171            Self::Throttled => ClientErrorMetadata {
1172                http_code: Some(503),
1173                ..Default::default()
1174            },
1175            _ => Default::default(),
1176        }
1177    }
1178}
1179
1180#[derive(Error, Debug)]
1181pub enum ConstructionError {
1182    /// CRT error while constructing the request
1183    #[error("Unknown CRT error")]
1184    CrtError(#[from] mountpoint_s3_crt::common::error::Error),
1185
1186    /// The S3 endpoint was invalid
1187    #[error("Invalid S3 endpoint")]
1188    InvalidEndpoint(#[from] EndpointError),
1189}
1190
1191/// Return a string version of a [RequestType] for use in metrics
1192///
1193/// TODO: Replace this method with `aws_s3_request_metrics_get_operation_name`,
1194///       and ensure all requests have an associated operation name.
1195fn request_type_to_metrics_string(request_type: RequestType) -> &'static str {
1196    match request_type {
1197        RequestType::Unknown => "Default",
1198        RequestType::HeadObject => "HeadObject",
1199        RequestType::GetObject => "GetObject",
1200        RequestType::ListParts => "ListParts",
1201        RequestType::CreateMultipartUpload => "CreateMultipartUpload",
1202        RequestType::UploadPart => "UploadPart",
1203        RequestType::AbortMultipartUpload => "AbortMultipartUpload",
1204        RequestType::CompleteMultipartUpload => "CompleteMultipartUpload",
1205        RequestType::UploadPartCopy => "UploadPartCopy",
1206        RequestType::CopyObject => "CopyObject",
1207        RequestType::PutObject => "PutObject",
1208    }
1209}
1210
1211/// Extract the byte range from the Content-Range header if present and valid
1212fn extract_range_header(headers: &Headers) -> Option<Range<u64>> {
1213    let header = headers.get("Content-Range").ok()?;
1214    let value = header.value().to_str()?;
1215
1216    // Content-Range: <unit> <range-start>-<range-end>/<size>
1217
1218    if !value.starts_with("bytes ") {
1219        return None;
1220    }
1221    let (_, value) = value.split_at("bytes ".len());
1222    let (range, _) = value.split_once('/')?;
1223    let (start, end) = range.split_once('-')?;
1224    let start = start.parse::<u64>().ok()?;
1225    let end = end.parse::<u64>().ok()?;
1226
1227    // Rust ranges are exclusive at the end, but Content-Range is inclusive
1228    Some(start..end + 1)
1229}
1230
1231/// Extract the [Checksum] information from headers
1232fn parse_checksum(headers: &Headers) -> Result<Checksum, HeadersError> {
1233    let checksum_crc32 = headers.get_as_optional_string("x-amz-checksum-crc32")?;
1234    let checksum_crc32c = headers.get_as_optional_string("x-amz-checksum-crc32c")?;
1235    let checksum_sha1 = headers.get_as_optional_string("x-amz-checksum-sha1")?;
1236    let checksum_sha256 = headers.get_as_optional_string("x-amz-checksum-sha256")?;
1237    let checksum_crc64nvme = headers.get_as_optional_string("x-amz-checksum-crc64nvme")?;
1238
1239    Ok(Checksum {
1240        checksum_crc64nvme,
1241        checksum_crc32,
1242        checksum_crc32c,
1243        checksum_sha1,
1244        checksum_sha256,
1245    })
1246}
1247
1248/// Try to parse a modeled error out of a failing meta request
1249fn try_parse_generic_error(request_result: &MetaRequestResult) -> Option<S3RequestError> {
1250    /// Look for a redirect header pointing to a different region for the bucket
1251    fn try_parse_redirect(request_result: &MetaRequestResult) -> Option<S3RequestError> {
1252        let headers = request_result.error_response_headers.as_ref()?;
1253        let region_header = headers.get("x-amz-bucket-region").ok()?;
1254        let region = region_header.value().to_owned().into_string().ok()?;
1255        Some(S3RequestError::IncorrectRegion(region))
1256    }
1257
1258    /// Look for access-related errors
1259    fn try_parse_forbidden(request_result: &MetaRequestResult) -> Option<S3RequestError> {
1260        let Some(body) = request_result.error_response_body.as_ref() else {
1261            // Header-only requests like HeadObject and HeadBucket can't give us a more detailed
1262            // error, so just trust the response code
1263            return Some(S3RequestError::Forbidden(
1264                "<no message>".to_owned(),
1265                ClientErrorMetadata {
1266                    http_code: Some(request_result.response_status),
1267                    ..Default::default()
1268                },
1269            ));
1270        };
1271        let error_elem = xmltree::Element::parse(body.as_bytes()).ok()?;
1272        let error_code = error_elem.get_child("Code")?;
1273        let error_code_str = error_code.get_text()?;
1274        // Always translate 403 to Forbidden, but otherwise first check the error code, since other
1275        // response statuses are overloaded and not always access-related errors.
1276        if request_result.response_status == 403
1277            || matches!(
1278                error_code_str.deref(),
1279                "AccessDenied" | "InvalidToken" | "ExpiredToken" | "SignatureDoesNotMatch"
1280            )
1281        {
1282            let message = error_elem
1283                .get_child("Message")
1284                .and_then(|e| e.get_text())
1285                .unwrap_or(error_code_str.clone());
1286            Some(S3RequestError::Forbidden(
1287                message.to_string(),
1288                ClientErrorMetadata {
1289                    http_code: Some(request_result.response_status),
1290                    error_code: Some(error_code_str.to_string()),
1291                    error_message: Some(message.into_owned()),
1292                },
1293            ))
1294        } else {
1295            None
1296        }
1297    }
1298
1299    /// Try to look for error related to no signing credentials, returns generic error otherwise
1300    fn try_parse_no_credentials_or_generic(request_result: &MetaRequestResult) -> S3RequestError {
1301        let crt_error_code = request_result.crt_error.raw_error();
1302        if crt_error_code == mountpoint_s3_crt::auth::ErrorCode::AWS_AUTH_SIGNING_NO_CREDENTIALS as i32 {
1303            S3RequestError::NoSigningCredentials
1304        } else {
1305            S3RequestError::CrtError(crt_error_code.into())
1306        }
1307    }
1308
1309    fn try_parse_throttled(request_result: &MetaRequestResult) -> Option<S3RequestError> {
1310        let crt_error_code = request_result.crt_error.raw_error();
1311        if crt_error_code == mountpoint_s3_crt::s3::ErrorCode::AWS_ERROR_S3_SLOW_DOWN as i32 {
1312            Some(S3RequestError::Throttled)
1313        } else {
1314            None
1315        }
1316    }
1317
1318    /// Handle canceled requests
1319    fn try_parse_canceled_request(request_result: &MetaRequestResult) -> Option<S3RequestError> {
1320        request_result.is_canceled().then_some(S3RequestError::RequestCanceled)
1321    }
1322
1323    match request_result.response_status {
1324        301 => try_parse_redirect(request_result),
1325        // 400 is overloaded, it can be an access error (invalid token) or (for MRAP) a bucket
1326        // redirect
1327        400 => try_parse_forbidden(request_result).or_else(|| try_parse_redirect(request_result)),
1328        403 => try_parse_forbidden(request_result),
1329        // if the http response status is not set, we look into crt_error_code to identify the error
1330        0 => try_parse_throttled(request_result)
1331            .or_else(|| try_parse_canceled_request(request_result))
1332            .or_else(|| Some(try_parse_no_credentials_or_generic(request_result))),
1333        _ => None,
1334    }
1335}
1336
1337/// Record a throughput metric for GET/PUT. We can't inline this into S3CrtClient callbacks because
1338/// PUT bytes don't transit those callbacks.
1339fn emit_throughput_metric(bytes: u64, duration: Duration, op: &'static str) {
1340    let throughput_mbps = bytes as f64 / 1024.0 / 1024.0 / duration.as_secs_f64();
1341    // Semi-arbitrary choices here to avoid averaging out large and small requests
1342    const MEGABYTE: u64 = 1024 * 1024;
1343    let bucket = if bytes < MEGABYTE {
1344        "<1MiB"
1345    } else if bytes <= 16 * MEGABYTE {
1346        "1-16MiB"
1347    } else {
1348        ">16MiB"
1349    };
1350    metrics::histogram!("s3.meta_requests.throughput_mibs", "op" => op, "size" => bucket).record(throughput_mbps);
1351}
1352
1353#[cfg_attr(not(docsrs), async_trait)]
1354impl ObjectClient for S3CrtClient {
1355    type GetObjectResponse = S3GetObjectResponse;
1356    type PutObjectRequest = S3PutObjectRequest;
1357    type ClientError = S3RequestError;
1358
1359    fn read_part_size(&self) -> Option<usize> {
1360        Some(self.inner.read_part_size)
1361    }
1362
1363    fn write_part_size(&self) -> Option<usize> {
1364        // TODO: the CRT does some clamping to a max size rather than just swallowing the part size
1365        // we configured it with, so this might be wrong. Right now the only clamping is to the max
1366        // S3 part size (5GiB), so this shouldn't affect the result.
1367        // https://github.com/awslabs/aws-c-s3/blob/94e3342c12833c5199/source/s3_client.c#L337-L344
1368        Some(self.inner.write_part_size)
1369    }
1370
1371    fn initial_read_window_size(&self) -> Option<usize> {
1372        if self.inner.enable_backpressure {
1373            Some(self.inner.initial_read_window_size)
1374        } else {
1375            None
1376        }
1377    }
1378
1379    fn mem_usage_stats(&self) -> Option<BufferPoolUsageStats> {
1380        let start = Instant::now();
1381        let crt_buffer_pool_stats = self.inner.s3_client.poll_buffer_pool_usage_stats();
1382        metrics::histogram!("s3.client.buffer_pool.get_usage_latency_us").record(start.elapsed().as_micros() as f64);
1383        Some(crt_buffer_pool_stats)
1384    }
1385
1386    async fn delete_object(
1387        &self,
1388        bucket: &str,
1389        key: &str,
1390    ) -> ObjectClientResult<DeleteObjectResult, DeleteObjectError, Self::ClientError> {
1391        self.delete_object(bucket, key).await
1392    }
1393
1394    async fn copy_object(
1395        &self,
1396        source_bucket: &str,
1397        source_key: &str,
1398        destination_bucket: &str,
1399        destination_key: &str,
1400        params: &CopyObjectParams,
1401    ) -> ObjectClientResult<CopyObjectResult, CopyObjectError, S3RequestError> {
1402        self.copy_object(source_bucket, source_key, destination_bucket, destination_key, params)
1403            .await
1404    }
1405
1406    async fn get_object(
1407        &self,
1408        bucket: &str,
1409        key: &str,
1410        params: &GetObjectParams,
1411    ) -> ObjectClientResult<Self::GetObjectResponse, GetObjectError, Self::ClientError> {
1412        self.get_object(bucket, key, params).await
1413    }
1414
1415    async fn list_objects(
1416        &self,
1417        bucket: &str,
1418        continuation_token: Option<&str>,
1419        delimiter: &str,
1420        max_keys: usize,
1421        prefix: &str,
1422    ) -> ObjectClientResult<ListObjectsResult, ListObjectsError, Self::ClientError> {
1423        self.list_objects(bucket, continuation_token, delimiter, max_keys, prefix)
1424            .await
1425    }
1426
1427    async fn head_object(
1428        &self,
1429        bucket: &str,
1430        key: &str,
1431        params: &HeadObjectParams,
1432    ) -> ObjectClientResult<HeadObjectResult, HeadObjectError, Self::ClientError> {
1433        self.head_object(bucket, key, params).await
1434    }
1435
1436    async fn put_object(
1437        &self,
1438        bucket: &str,
1439        key: &str,
1440        params: &PutObjectParams,
1441    ) -> ObjectClientResult<Self::PutObjectRequest, PutObjectError, Self::ClientError> {
1442        self.put_object(bucket, key, params).await
1443    }
1444
1445    async fn put_object_single<'a>(
1446        &self,
1447        bucket: &str,
1448        key: &str,
1449        params: &PutObjectSingleParams,
1450        contents: impl AsRef<[u8]> + Send + 'a,
1451    ) -> ObjectClientResult<PutObjectResult, PutObjectError, Self::ClientError> {
1452        self.put_object_single(bucket, key, params, contents).await
1453    }
1454
1455    async fn get_object_attributes(
1456        &self,
1457        bucket: &str,
1458        key: &str,
1459        max_parts: Option<usize>,
1460        part_number_marker: Option<usize>,
1461        object_attributes: &[ObjectAttribute],
1462    ) -> ObjectClientResult<GetObjectAttributesResult, GetObjectAttributesError, Self::ClientError> {
1463        self.get_object_attributes(bucket, key, max_parts, part_number_marker, object_attributes)
1464            .await
1465    }
1466}
1467
1468/// Custom handling of telemetry events
1469pub trait OnTelemetry: std::fmt::Debug + Send + Sync {
1470    fn on_telemetry(&self, request_metrics: &RequestMetrics);
1471}
1472
1473#[cfg(test)]
1474mod tests {
1475    use mountpoint_s3_crt::common::error::Error;
1476    use rusty_fork::rusty_fork_test;
1477    use std::assert_eq;
1478
1479    use super::*;
1480    use test_case::test_case;
1481
1482    /// Test explicit validation in [Client::new]
1483    fn client_new_fails_with_invalid_part_size(part_size: usize) {
1484        let config = S3ClientConfig::default().part_size(part_size);
1485        let e = S3CrtClient::new(config).expect_err("creating a new client should fail");
1486        let message = if cfg!(target_pointer_width = "64") {
1487            "invalid configuration: part size must be at between 5MiB and 5GiB".to_string()
1488        } else {
1489            format!(
1490                "invalid configuration: part size must be at between 5MiB and {}GiB",
1491                usize::MAX / 1024 / 1024 / 1024
1492            )
1493        };
1494        assert_eq!(e.to_string(), message);
1495    }
1496
1497    /// On 32-bit platform we limit part size with usize:MAX,
1498    /// it is impossible to provide a greater value
1499    #[cfg(target_pointer_width = "64")]
1500    #[test]
1501    fn client_new_fails_with_greater_part_size() {
1502        let part_size = 6 * 1024 * 1024 * 1024; // greater than 5GiB
1503        client_new_fails_with_invalid_part_size(part_size);
1504    }
1505
1506    #[test]
1507    fn client_new_fails_with_smaller_part_size() {
1508        let part_size = 4 * 1024 * 1024; // less than 5MiB
1509        client_new_fails_with_invalid_part_size(part_size);
1510    }
1511
1512    /// Test if the prefix is added correctly to the User-Agent header
1513    #[test]
1514    fn test_user_agent_with_prefix() {
1515        let user_agent_prefix = String::from("someprefix");
1516        let expected_user_agent = "someprefix mountpoint-s3-client/";
1517
1518        let config = S3ClientConfig {
1519            user_agent: Some(UserAgent::new(Some(user_agent_prefix))),
1520            ..Default::default()
1521        };
1522
1523        let client = S3CrtClient::new(config).expect("Create test client");
1524
1525        let mut message = client
1526            .inner
1527            .new_request_template("GET", "amzn-s3-demo-bucket")
1528            .expect("new request template expected");
1529
1530        let headers = message.inner.get_headers().expect("Expected a block of HTTP headers");
1531
1532        let user_agent_header = headers
1533            .get("User-Agent")
1534            .expect("User Agent Header expected with given prefix");
1535        let user_agent_header_value = user_agent_header.value();
1536
1537        assert!(user_agent_header_value
1538            .to_string_lossy()
1539            .starts_with(expected_user_agent));
1540    }
1541
1542    fn assert_expected_host(expected_host: &str, endpoint_config: EndpointConfig) {
1543        let config = S3ClientConfig {
1544            endpoint_config,
1545            ..Default::default()
1546        };
1547
1548        let client = S3CrtClient::new(config).expect("create test client");
1549
1550        let mut message = client
1551            .inner
1552            .new_request_template("GET", "")
1553            .expect("new request template expected");
1554
1555        let headers = message.inner.get_headers().expect("expected a block of HTTP headers");
1556
1557        let host_header = headers.get("Host").expect("Host header expected");
1558        let host_header_value = host_header.value();
1559
1560        assert_eq!(host_header_value.to_string_lossy(), expected_host);
1561    }
1562
1563    // run with rusty_fork to avoid issues with other tests and their env variables.
1564    rusty_fork_test! {
1565        #[test]
1566        fn test_endpoint_favors_parameter_over_env_variable() {
1567            let endpoint_uri = Uri::new_from_str(&Allocator::default(), "https://s3.us-west-2.amazonaws.com").unwrap();
1568            let endpoint_config = EndpointConfig::new("region-place-holder").endpoint(endpoint_uri);
1569            std::env::set_var("AWS_ENDPOINT_URL", "https://s3.us-east-1.amazonaws.com");
1570            // even though we set the environment variable, the parameter takes precedence
1571            assert_expected_host("s3.us-west-2.amazonaws.com", endpoint_config);
1572        }
1573
1574        #[test]
1575        fn test_endpoint_favors_env_variable() {
1576            let endpoint_config = EndpointConfig::new("us-east-1");
1577            std::env::set_var("AWS_ENDPOINT_URL", "https://s3.eu-west-1.amazonaws.com");
1578            assert_expected_host("s3.eu-west-1.amazonaws.com", endpoint_config);
1579        }
1580
1581        #[test]
1582        fn test_endpoint_with_invalid_env_variable() {
1583            let endpoint_config = EndpointConfig::new("us-east-1");
1584            std::env::set_var("AWS_ENDPOINT_URL", "htp:/bad:url");
1585            let config = S3ClientConfig {
1586                endpoint_config,
1587                ..Default::default()
1588            };
1589
1590            let client = S3CrtClient::new(config);
1591            match client {
1592                Ok(_) => panic!("expected an error"),
1593                Err(e) => assert_eq!(e.to_string().to_lowercase(), "invalid s3 endpoint"),
1594            }
1595        }
1596
1597    }
1598
1599    /// Simple test to ensure the user agent header is correct even when prefix is not added
1600    #[test]
1601    fn test_user_agent_without_prefix() {
1602        let expected_user_agent = "mountpoint-s3-client/";
1603
1604        let config: S3ClientConfig = Default::default();
1605
1606        let client = S3CrtClient::new(config).expect("Create test client");
1607
1608        let mut message = client
1609            .inner
1610            .new_request_template("GET", "amzn-s3-demo-bucket")
1611            .expect("new request template expected");
1612
1613        let headers = message.inner.get_headers().expect("Expected a block of HTTP headers");
1614
1615        let user_agent_header = headers
1616            .get("User-Agent")
1617            .expect("User Agent Header expected with given prefix");
1618        let user_agent_header_value = user_agent_header.value();
1619
1620        assert!(user_agent_header_value
1621            .to_string_lossy()
1622            .starts_with(expected_user_agent));
1623    }
1624
1625    #[test_case("bytes 200-1000/67589" => Some(200..1001))]
1626    #[test_case("bytes 200-1000/*" => Some(200..1001))]
1627    #[test_case("bytes 200-1000" => None)]
1628    #[test_case("bytes */67589" => None)]
1629    #[test_case("octets 200-1000]" => None)]
1630    fn parse_content_range(range: &str) -> Option<Range<u64>> {
1631        let mut headers = Headers::new(&Allocator::default()).unwrap();
1632        let header = Header::new("Content-Range", range);
1633        headers.add_header(&header).unwrap();
1634        extract_range_header(&headers)
1635    }
1636
1637    /// Simple test to ensure the expected bucket owner can be set
1638    #[test]
1639    fn test_expected_bucket_owner() {
1640        let expected_bucket_owner = "111122223333";
1641
1642        let config: S3ClientConfig = S3ClientConfig::new().bucket_owner("111122223333");
1643
1644        let client = S3CrtClient::new(config).expect("Create test client");
1645
1646        let mut message = client
1647            .inner
1648            .new_request_template("GET", "amzn-s3-demo-bucket")
1649            .expect("new request template expected");
1650
1651        let headers = message.inner.get_headers().expect("Expected a block of HTTP headers");
1652
1653        let expected_bucket_owner_header = headers
1654            .get("x-amz-expected-bucket-owner")
1655            .expect("the headers should contain x-amz-expected-bucket-owner");
1656        let expected_bucket_owner_value = expected_bucket_owner_header.value();
1657
1658        assert!(expected_bucket_owner_value
1659            .to_string_lossy()
1660            .starts_with(expected_bucket_owner));
1661    }
1662
1663    fn make_result(
1664        response_status: i32,
1665        body: impl Into<OsString>,
1666        bucket_region_header: Option<&str>,
1667    ) -> MetaRequestResult {
1668        let error_response_headers = bucket_region_header.map(|h| {
1669            let mut headers = Headers::new(&Allocator::default()).unwrap();
1670            headers.add_header(&Header::new("x-amz-bucket-region", h)).unwrap();
1671            headers
1672        });
1673        MetaRequestResult {
1674            response_status,
1675            crt_error: 1i32.into(),
1676            error_response_headers,
1677            error_response_body: Some(body.into()),
1678        }
1679    }
1680
1681    #[test]
1682    fn parse_301_redirect() {
1683        let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>PermanentRedirect</Code><Message>The bucket you are attempting to access must be addressed using the specified endpoint. Please send all future requests to this endpoint.</Message><Endpoint>amzn-s3-demo-bucket.s3-us-west-2.amazonaws.com</Endpoint><Bucket>amzn-s3-demo-bucket</Bucket><RequestId>CM0Z9YFABRVSWXDJ</RequestId><HostId>HHmbUixasrJ02DlkOSCvJId897Jm0ERHuE2XMkSn2Oax1J/ad2+AU9nFrODN1ay13cWFgIAYBnI=</HostId></Error>"#;
1684        let result = make_result(301, OsStr::from_bytes(&body[..]), Some("us-west-2"));
1685        let result = try_parse_generic_error(&result);
1686        let Some(S3RequestError::IncorrectRegion(region)) = result else {
1687            panic!("wrong result, got: {:?}", result);
1688        };
1689        assert_eq!(region, "us-west-2");
1690    }
1691
1692    #[test]
1693    fn parse_403_access_denied() {
1694        let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>AccessDenied</Code><Message>Access Denied</Message><RequestId>CM0R497NB0WAQ977</RequestId><HostId>w1TqUKGaIuNAIgzqm/L2azuzgEBINxTngWPbV1iH2IvpLsVCCTKHJTh4HsGp4JnggHqVkA+KN1MGqHDw1+WEuA==</HostId></Error>"#;
1695        let result = make_result(403, OsStr::from_bytes(&body[..]), None);
1696        let result = try_parse_generic_error(&result);
1697        let Some(S3RequestError::Forbidden(message, _)) = result else {
1698            panic!("wrong result, got: {:?}", result);
1699        };
1700        assert_eq!(message, "Access Denied");
1701    }
1702
1703    #[test]
1704    fn parse_400_invalid_token() {
1705        let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>InvalidToken</Code><Message>The provided token is malformed or otherwise invalid.</Message><Token-0>THEREALTOKENGOESHERE</Token-0><RequestId>CBFNVADDAZ8661HK</RequestId><HostId>rb5dpgYeIFxi8p5BzVK8s8wG/nQ4a7C5kMBp/KWIT4bvOUihugpssMTy7xS0mispbz6IIaX8W1g=</HostId></Error>"#;
1706        let result = make_result(400, OsStr::from_bytes(&body[..]), None);
1707        let result = try_parse_generic_error(&result);
1708        let Some(S3RequestError::Forbidden(message, _)) = result else {
1709            panic!("wrong result, got: {:?}", result);
1710        };
1711        assert_eq!(message, "The provided token is malformed or otherwise invalid.");
1712    }
1713
1714    #[test]
1715    fn parse_400_expired_token() {
1716        let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>ExpiredToken</Code><Message>The provided token has expired.</Message><Token-0>THEREALTOKENGOESHERE</Token-0><RequestId>RFXW0E15XSRPJYSW</RequestId><HostId>djitP7S+g43JSzR4pMOJpOO3RYpQUOUsmD4AqhRe3v24+JB/c+vwOEZgI8A35KDUe1cqQ5yKHwg=</HostId></Error>"#;
1717        let result = make_result(400, OsStr::from_bytes(&body[..]), None);
1718        let result = try_parse_generic_error(&result);
1719        let Some(S3RequestError::Forbidden(message, _)) = result else {
1720            panic!("wrong result, got: {:?}", result);
1721        };
1722        assert_eq!(message, "The provided token has expired.");
1723    }
1724
1725    #[test]
1726    fn parse_400_redirect() {
1727        // From an s3-accelerate endpoint with the wrong region set for signing
1728        let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>AuthorizationHeaderMalformed</Code><Message>The authorization header is malformed; the region \'us-east-1\' is wrong; expecting \'us-west-2\'</Message><Region>us-west-2</Region><RequestId>VR3NH4JF5F39GB66</RequestId><HostId>ZDzYFC1w0E5K34+ZCAnvh9ZiGaAhvx5COyZVYTUnKvSP/694xCiXmJ2AEGZd5T1Epy9vB4EOOjk=</HostId></Error>"#;
1729        let result = make_result(400, OsStr::from_bytes(&body[..]), Some("us-west-2"));
1730        let result = try_parse_generic_error(&result);
1731        let Some(S3RequestError::IncorrectRegion(region)) = result else {
1732            panic!("wrong result, got: {:?}", result);
1733        };
1734        assert_eq!(region, "us-west-2");
1735    }
1736
1737    #[test]
1738    fn parse_403_signature_does_not_match() {
1739        let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>SignatureDoesNotMatch</Code><Message>The request signature we calculated does not match the signature you provided. Check your key and signing method.</Message><AWSAccessKeyId>ASIASMEXAMPLE0000000</AWSAccessKeyId><StringToSign>EXAMPLE</StringToSign><SignatureProvided>EXAMPLE</SignatureProvided><StringToSignBytes>EXAMPLE</StringToSignBytes><CanonicalRequest>EXAMPLE</CanonicalRequest><CanonicalRequestBytes>EXAMPLE</CanonicalRequestBytes><RequestId>A1F516XX5M8AATSQ</RequestId><HostId>qs9dULIp5ABM7U+H8nGfzKtMYTxvqxIVvOYZ8lEFBDyTF4Fe+876Y4bLptG4mb+PTZFyG4yaUjg=</HostId></Error>"#;
1740        let result = make_result(403, OsStr::from_bytes(&body[..]), None);
1741        let result = try_parse_generic_error(&result);
1742        let Some(S3RequestError::Forbidden(message, _)) = result else {
1743            panic!("wrong result, got: {:?}", result);
1744        };
1745        assert_eq!(message, "The request signature we calculated does not match the signature you provided. Check your key and signing method.");
1746    }
1747
1748    #[test]
1749    fn parse_403_made_up_error() {
1750        // A made up error to check that we map all 403s even if we don't recognize them
1751        let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>NotARealError</Code><Message>This error is made up.</Message><RequestId>CM0R497NB0WAQ977</RequestId><HostId>w1TqUKGaIuNAIgzqm/L2azuzgEBINxTngWPbV1iH2IvpLsVCCTKHJTh4HsGp4JnggHqVkA+KN1MGqHDw1+WEuA==</HostId></Error>"#;
1752        let result = make_result(403, OsStr::from_bytes(&body[..]), None);
1753        let result = try_parse_generic_error(&result);
1754        let Some(S3RequestError::Forbidden(message, _)) = result else {
1755            panic!("wrong result, got: {:?}", result);
1756        };
1757        assert_eq!(message, "This error is made up.");
1758    }
1759
1760    fn make_crt_error_result(response_status: i32, crt_error: Error) -> MetaRequestResult {
1761        MetaRequestResult {
1762            response_status,
1763            crt_error,
1764            error_response_headers: None,
1765            error_response_body: None,
1766        }
1767    }
1768
1769    #[test]
1770    fn parse_no_signing_credential_error() {
1771        let error_code = mountpoint_s3_crt::auth::ErrorCode::AWS_AUTH_SIGNING_NO_CREDENTIALS as i32;
1772        let result = make_crt_error_result(0, error_code.into());
1773        let result = try_parse_generic_error(&result);
1774        let Some(S3RequestError::NoSigningCredentials) = result else {
1775            panic!("wrong result, got: {:?}", result);
1776        };
1777    }
1778
1779    #[test]
1780    fn parse_test_other_crt_error() {
1781        // A signing error that isn't "no signing credentials"
1782        let error_code = mountpoint_s3_crt::auth::ErrorCode::AWS_AUTH_SIGNING_UNSUPPORTED_ALGORITHM as i32;
1783        let result = make_crt_error_result(0, error_code.into());
1784        let result = try_parse_generic_error(&result);
1785        let Some(S3RequestError::CrtError(error)) = result else {
1786            panic!("wrong result, got: {:?}", result);
1787        };
1788        assert_eq!(error, error_code.into());
1789    }
1790
1791    #[test]
1792    fn test_checksum_sha256() {
1793        let mut headers = Headers::new(&Allocator::default()).unwrap();
1794        let value = "QwzjTQIHJO11oZbfwq1nx3dy0Wk=";
1795        let header = Header::new("x-amz-checksum-sha256", value.to_owned());
1796        headers.add_header(&header).unwrap();
1797
1798        let checksum = parse_checksum(&headers).expect("failed to parse headers");
1799        assert_eq!(checksum.checksum_crc32, None, "other checksums shouldn't be set");
1800        assert_eq!(checksum.checksum_crc32c, None, "other checksums shouldn't be set");
1801        assert_eq!(checksum.checksum_sha1, None, "other checksums shouldn't be set");
1802        assert_eq!(
1803            checksum.checksum_sha256,
1804            Some(value.to_owned()),
1805            "sha256 header should match"
1806        );
1807    }
1808}