1use std::cmp;
2use std::ffi::{OsStr, OsString};
3use std::future::Future;
4use std::num::NonZeroUsize;
5use std::ops::Range;
6use std::ops::{Deref, DerefMut};
7use std::os::unix::prelude::OsStrExt;
8use std::pin::Pin;
9use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
10use std::sync::{Arc, Mutex};
11use std::task::{Context, Poll};
12use std::time::{Duration, Instant};
13
14use futures::FutureExt;
15use futures::future::{Fuse, FusedFuture};
16pub use mountpoint_s3_crt::auth::credentials::{CredentialsProvider, CredentialsProviderStaticOptions};
17use mountpoint_s3_crt::auth::credentials::{CredentialsProviderChainDefaultOptions, CredentialsProviderProfileOptions};
18use mountpoint_s3_crt::auth::signing_config::SigningConfig;
19use mountpoint_s3_crt::common::allocator::Allocator;
20pub use mountpoint_s3_crt::common::error::Error as CrtError;
21use mountpoint_s3_crt::common::string::AwsString;
22use mountpoint_s3_crt::common::uri::Uri;
23use mountpoint_s3_crt::http::request_response::{Header, Headers, HeadersError, Message};
24use mountpoint_s3_crt::io::channel_bootstrap::{ClientBootstrap, ClientBootstrapOptions};
25pub use mountpoint_s3_crt::io::event_loop::EventLoopGroup;
26use mountpoint_s3_crt::io::host_resolver::{AddressKinds, HostResolver, HostResolverDefaultOptions};
27use mountpoint_s3_crt::io::retry_strategy::{ExponentialBackoffJitterMode, RetryStrategy, StandardRetryOptions};
28use mountpoint_s3_crt::io::stream::InputStream;
29use mountpoint_s3_crt::s3::buffer::Buffer;
30use mountpoint_s3_crt::s3::client::{
31 BufferPoolUsageStats, ChecksumConfig, Client, ClientConfig, MetaRequest, MetaRequestOptions, MetaRequestResult,
32 MetaRequestType, RequestMetrics, init_signing_config,
33};
34
35use async_trait::async_trait;
36use futures::channel::oneshot;
37use mountpoint_s3_crt::s3::pool::{CrtBufferPoolFactory, MemoryPool, MemoryPoolFactory, MemoryPoolFactoryWrapper};
38use percent_encoding::{AsciiSet, NON_ALPHANUMERIC, percent_encode};
39use pin_project::pin_project;
40use thiserror::Error;
41use tracing::{Span, debug, error, trace};
42
43use crate::checksums::{crc32_to_base64, crc32c_to_base64, crc64nvme_to_base64, sha1_to_base64, sha256_to_base64};
44use crate::endpoint_config::EndpointError;
45use crate::endpoint_config::{self, EndpointConfig};
46use crate::error_metadata::{ClientErrorMetadata, ProvideErrorMetadata};
47use crate::metrics::{
48 ATTR_HTTP_STATUS, ATTR_S3_REQUEST, S3_REQUEST_CANCELED, S3_REQUEST_COUNT, S3_REQUEST_ERRORS,
49 S3_REQUEST_FIRST_BYTE_LATENCY, S3_REQUEST_TOTAL_LATENCY,
50};
51use crate::object_client::*;
52use crate::user_agent::UserAgent;
53
54macro_rules! request_span {
55 ($self:expr, $method:expr, $($field:tt)*) => {{
56 let counter = $self.next_request_counter();
57 let span = tracing::warn_span!(target: "mountpoint_s3_client::s3_crt_client::request", $method, id = counter, $($field)*);
61 span.in_scope(|| tracing::debug!("new request"));
62 span
63 }};
64 ($self:expr, $method:expr) => { request_span!($self, $method,) };
65}
66
67pub(crate) mod copy_object;
68pub(crate) mod delete_object;
69pub(crate) mod get_object;
70
71pub(crate) use get_object::S3GetObjectResponse;
72pub(crate) mod get_object_attributes;
73
74pub(crate) mod head_object;
75pub(crate) mod list_objects;
76
77pub(crate) mod rename_object;
78
79pub(crate) mod head_bucket;
80pub(crate) mod put_object;
81pub use head_bucket::HeadBucketError;
82pub(crate) use put_object::S3PutObjectRequest;
83
84macro_rules! event {
87 ($level:expr, $($args:tt)*) => {
88 match $level {
89 ::tracing::Level::ERROR => ::tracing::event!(::tracing::Level::ERROR, $($args)*),
90 ::tracing::Level::WARN => ::tracing::event!(::tracing::Level::WARN, $($args)*),
91 ::tracing::Level::INFO => ::tracing::event!(::tracing::Level::INFO, $($args)*),
92 ::tracing::Level::DEBUG => ::tracing::event!(::tracing::Level::DEBUG, $($args)*),
93 ::tracing::Level::TRACE => ::tracing::event!(::tracing::Level::TRACE, $($args)*),
94 }
95 }
96}
97
98#[derive(Debug, Clone)]
100pub struct S3ClientConfig {
101 auth_config: S3ClientAuthConfig,
102 throughput_target_gbps: f64,
103 memory_limit_in_bytes: u64,
104 read_part_size: usize,
105 write_part_size: usize,
106 endpoint_config: EndpointConfig,
107 user_agent: Option<UserAgent>,
108 request_payer: Option<String>,
109 bucket_owner: Option<String>,
110 max_attempts: Option<NonZeroUsize>,
111 read_backpressure: bool,
112 initial_read_window: usize,
113 network_interface_names: Vec<String>,
114 telemetry_callback: Option<Arc<dyn OnTelemetry>>,
115 event_loop_threads: Option<u16>,
116 buffer_pool_factory: Option<MemoryPoolFactoryWrapper>,
117}
118
119impl Default for S3ClientConfig {
120 fn default() -> Self {
121 const DEFAULT_PART_SIZE: usize = 8 * 1024 * 1024;
122 Self {
123 auth_config: Default::default(),
124 throughput_target_gbps: 10.0,
125 memory_limit_in_bytes: 0,
126 read_part_size: DEFAULT_PART_SIZE,
127 write_part_size: DEFAULT_PART_SIZE,
128 endpoint_config: EndpointConfig::new("us-east-1"),
129 user_agent: None,
130 request_payer: None,
131 bucket_owner: None,
132 max_attempts: None,
133 read_backpressure: false,
134 initial_read_window: DEFAULT_PART_SIZE,
135 network_interface_names: vec![],
136 telemetry_callback: None,
137 event_loop_threads: None,
138 buffer_pool_factory: None,
139 }
140 }
141}
142
143impl S3ClientConfig {
144 pub fn new() -> Self {
145 Self::default()
146 }
147
148 #[must_use = "S3ClientConfig follows a builder pattern"]
150 pub fn auth_config(mut self, auth_config: S3ClientAuthConfig) -> Self {
151 self.auth_config = auth_config;
152 self
153 }
154
155 #[must_use = "S3ClientConfig follows a builder pattern"]
157 pub fn part_size(mut self, part_size: usize) -> Self {
158 self.read_part_size = part_size;
159 self.write_part_size = part_size;
160 self
161 }
162
163 #[must_use = "S3ClientConfig follows a builder pattern"]
165 pub fn read_part_size(mut self, size: usize) -> Self {
166 self.read_part_size = size;
167 self
168 }
169
170 #[must_use = "S3ClientConfig follows a builder pattern"]
172 pub fn write_part_size(mut self, size: usize) -> Self {
173 self.write_part_size = size;
174 self
175 }
176
177 #[must_use = "S3ClientConfig follows a builder pattern"]
179 pub fn throughput_target_gbps(mut self, throughput_target_gbps: f64) -> Self {
180 self.throughput_target_gbps = throughput_target_gbps;
181 self
182 }
183
184 #[must_use = "S3ClientConfig follows a builder pattern"]
186 pub fn memory_limit_in_bytes(mut self, memory_limit_in_bytes: u64) -> Self {
187 self.memory_limit_in_bytes = memory_limit_in_bytes;
188 self
189 }
190
191 #[must_use = "S3ClientConfig follows a builder pattern"]
193 pub fn endpoint_config(mut self, endpoint_config: EndpointConfig) -> Self {
194 self.endpoint_config = endpoint_config;
195 self
196 }
197
198 #[must_use = "S3ClientConfig follows a builder pattern"]
200 pub fn user_agent(mut self, user_agent: UserAgent) -> Self {
201 self.user_agent = Some(user_agent);
202 self
203 }
204
205 #[must_use = "S3ClientConfig follows a builder pattern"]
207 pub fn request_payer(mut self, request_payer: &str) -> Self {
208 self.request_payer = Some(request_payer.to_owned());
209 self
210 }
211
212 #[must_use = "S3ClientConfig follows a builder pattern"]
214 pub fn bucket_owner(mut self, bucket_owner: &str) -> Self {
215 self.bucket_owner = Some(bucket_owner.to_owned());
216 self
217 }
218
219 #[must_use = "S3ClientConfig follows a builder pattern"]
222 pub fn max_attempts(mut self, max_attempts: NonZeroUsize) -> Self {
223 self.max_attempts = Some(max_attempts);
224 self
225 }
226
227 #[must_use = "S3ClientConfig follows a builder pattern"]
229 pub fn read_backpressure(mut self, read_backpressure: bool) -> Self {
230 self.read_backpressure = read_backpressure;
231 self
232 }
233
234 #[must_use = "S3ClientConfig follows a builder pattern"]
236 pub fn initial_read_window(mut self, initial_read_window: usize) -> Self {
237 self.initial_read_window = initial_read_window;
238 self
239 }
240
241 #[must_use = "S3ClientConfig follows a builder pattern"]
243 pub fn network_interface_names(mut self, network_interface_names: Vec<String>) -> Self {
244 self.network_interface_names = network_interface_names;
245 self
246 }
247
248 #[must_use = "S3ClientConfig follows a builder pattern"]
250 pub fn telemetry_callback(mut self, telemetry_callback: Arc<dyn OnTelemetry>) -> Self {
251 self.telemetry_callback = Some(telemetry_callback);
252 self
253 }
254
255 #[must_use = "S3ClientConfig follows a builder pattern"]
257 pub fn event_loop_threads(mut self, event_loop_threads: u16) -> Self {
258 self.event_loop_threads = Some(event_loop_threads);
259 self
260 }
261
262 #[must_use = "S3ClientConfig follows a builder pattern"]
264 pub fn memory_pool(mut self, pool: impl MemoryPool) -> Self {
265 self.buffer_pool_factory = Some(MemoryPoolFactoryWrapper::new(move |_| pool.clone()));
266 self
267 }
268
269 #[must_use = "S3ClientConfig follows a builder pattern"]
271 pub fn memory_pool_factory(mut self, pool_factory: impl MemoryPoolFactory) -> Self {
272 self.buffer_pool_factory = Some(MemoryPoolFactoryWrapper::new(pool_factory));
273 self
274 }
275}
276
277#[derive(Debug, Clone, Default)]
279pub enum S3ClientAuthConfig {
280 #[default]
282 Default,
283 NoSigning,
285 Profile(String),
287 Provider(CredentialsProvider),
289}
290
291#[derive(Debug, Clone)]
301pub struct S3CrtClient {
302 inner: Arc<S3CrtClientInner>,
303}
304
305impl S3CrtClient {
306 pub fn new(config: S3ClientConfig) -> Result<Self, NewClientError> {
308 Ok(Self {
309 inner: Arc::new(S3CrtClientInner::new(config)?),
310 })
311 }
312
313 pub fn endpoint_config(&self) -> EndpointConfig {
315 self.inner.endpoint_config.clone()
316 }
317
318 #[doc(hidden)]
319 pub fn event_loop_group(&self) -> EventLoopGroup {
320 self.inner.event_loop_group.clone()
321 }
322}
323
324#[derive(Debug)]
325struct S3CrtClientInner {
326 s3_client: Client,
327 event_loop_group: EventLoopGroup,
328 endpoint_config: EndpointConfig,
329 allocator: Allocator,
330 next_request_counter: AtomicU64,
331 user_agent_header: String,
334 request_payer: Option<String>,
335 read_part_size: usize,
336 write_part_size: usize,
337 enable_backpressure: bool,
338 initial_read_window_size: usize,
339 bucket_owner: Option<String>,
340 credentials_provider: Option<CredentialsProvider>,
341 host_resolver: HostResolver,
342 telemetry_callback: Option<Arc<dyn OnTelemetry>>,
343}
344
345impl S3CrtClientInner {
346 fn new(config: S3ClientConfig) -> Result<Self, NewClientError> {
347 let allocator = Allocator::default();
348
349 let mut event_loop_group = EventLoopGroup::new_default(&allocator, config.event_loop_threads, || {}).unwrap();
350
351 let resolver_options = HostResolverDefaultOptions {
352 max_entries: 8,
353 event_loop_group: &mut event_loop_group,
354 };
355
356 let mut host_resolver = HostResolver::new_default(&allocator, &resolver_options).unwrap();
357
358 let bootstrap_options = ClientBootstrapOptions {
359 event_loop_group: &mut event_loop_group,
360 host_resolver: &mut host_resolver,
361 };
362
363 let mut client_bootstrap = ClientBootstrap::new(&allocator, &bootstrap_options).unwrap();
364
365 let mut client_config = ClientConfig::new();
366
367 let retry_strategy = {
368 let mut retry_strategy_options = StandardRetryOptions::default(&mut event_loop_group);
369 let max_attempts = std::env::var("AWS_MAX_ATTEMPTS")
370 .ok()
371 .and_then(|s| s.parse::<usize>().ok())
372 .or_else(|| config.max_attempts.map(|m| m.get()))
373 .unwrap_or(3);
374 retry_strategy_options.backoff_retry_options.max_retries = max_attempts.saturating_sub(1);
377 retry_strategy_options.backoff_retry_options.backoff_scale_factor = Duration::from_millis(500);
378 retry_strategy_options.backoff_retry_options.jitter_mode = ExponentialBackoffJitterMode::Full;
379 RetryStrategy::standard(&allocator, &retry_strategy_options).unwrap()
380 };
381
382 trace!("constructing client with auth config {:?}", config.auth_config);
383 let credentials_provider = match config.auth_config {
384 S3ClientAuthConfig::Default => {
385 let credentials_chain_default_options = CredentialsProviderChainDefaultOptions {
386 bootstrap: &mut client_bootstrap,
387 };
388 CredentialsProvider::new_chain_default(&allocator, credentials_chain_default_options)
389 .map_err(NewClientError::ProviderFailure)?
390 }
391 S3ClientAuthConfig::NoSigning => {
392 CredentialsProvider::new_anonymous(&allocator).map_err(NewClientError::ProviderFailure)?
393 }
394 S3ClientAuthConfig::Profile(profile_name) => {
395 let credentials_profile_options = CredentialsProviderProfileOptions {
396 bootstrap: &mut client_bootstrap,
397 profile_name_override: &profile_name,
398 };
399 CredentialsProvider::new_profile(&allocator, credentials_profile_options)
400 .map_err(NewClientError::ProviderFailure)?
401 }
402 S3ClientAuthConfig::Provider(provider) => provider,
403 };
404
405 let endpoint_config = config.endpoint_config;
406 client_config.region(endpoint_config.get_region());
407 let signing_config = init_signing_config(
408 endpoint_config.get_region(),
409 credentials_provider.clone(),
410 None,
411 None,
412 None,
413 );
414
415 let endpoint_config = match endpoint_config.get_endpoint() {
416 None => {
417 if let Ok(aws_endpoint_url) = std::env::var("AWS_ENDPOINT_URL") {
419 debug!("using AWS_ENDPOINT_URL {}", aws_endpoint_url);
420 let env_uri = Uri::new_from_str(&allocator, &aws_endpoint_url)
421 .map_err(|e| EndpointError::InvalidUri(endpoint_config::InvalidUriError::CouldNotParse(e)))?;
422 endpoint_config.endpoint(env_uri)
423 } else {
424 endpoint_config
425 }
426 }
427 Some(_) => endpoint_config,
428 };
429
430 client_config.express_support(true);
431 client_config.read_backpressure(config.read_backpressure);
432 client_config.initial_read_window(config.initial_read_window);
433 client_config.signing_config(signing_config);
434
435 client_config
436 .client_bootstrap(client_bootstrap)
437 .retry_strategy(retry_strategy);
438
439 client_config.throughput_target_gbps(config.throughput_target_gbps);
440 client_config.memory_limit_in_bytes(config.memory_limit_in_bytes);
441
442 if let Some(wrapper) = config.buffer_pool_factory {
443 let pool_factory = CrtBufferPoolFactory::new(wrapper, event_loop_group.clone());
444 client_config.buffer_pool_factory(pool_factory);
445 }
446
447 let max_part_size = cmp::min(5_u64 * 1024 * 1024 * 1024, usize::MAX as u64) as usize;
449 for part_size in [config.read_part_size, config.write_part_size] {
451 if !(5 * 1024 * 1024..=max_part_size).contains(&part_size) {
452 return Err(NewClientError::InvalidConfiguration(format!(
453 "part size must be at between 5MiB and {}GiB",
454 max_part_size / 1024 / 1024 / 1024
455 )));
456 }
457 }
458
459 if !config.network_interface_names.is_empty() {
460 client_config.network_interface_names(config.network_interface_names);
461 }
462
463 let user_agent = config.user_agent.unwrap_or_else(|| UserAgent::new(None));
464 let user_agent_header = user_agent.build();
465
466 let s3_client = Client::new(&allocator, client_config).map_err(NewClientError::CrtError)?;
467
468 Ok(Self {
469 allocator,
470 s3_client,
471 event_loop_group,
472 endpoint_config,
473 next_request_counter: AtomicU64::new(0),
474 user_agent_header,
475 request_payer: config.request_payer,
476 read_part_size: config.read_part_size,
477 write_part_size: config.write_part_size,
478 enable_backpressure: config.read_backpressure,
479 initial_read_window_size: config.initial_read_window,
480 bucket_owner: config.bucket_owner,
481 credentials_provider: Some(credentials_provider),
482 host_resolver,
483 telemetry_callback: config.telemetry_callback,
484 })
485 }
486
487 fn new_request_template(&self, method: &str, bucket: &str) -> Result<S3Message<'_>, ConstructionError> {
492 let endpoint = self.endpoint_config.resolve_for_bucket(bucket)?;
493 let uri = endpoint.uri()?;
494 trace!(?uri, "resolved endpoint");
495
496 let signing_config = if let Some(credentials_provider) = &self.credentials_provider {
497 let auth_scheme = match endpoint.auth_scheme() {
498 Ok(auth_scheme) => auth_scheme,
499 Err(e) => {
500 error!(error=?e, "no auth scheme for endpoint");
501 return Err(e.into());
502 }
503 };
504 trace!(?auth_scheme, "resolved auth scheme");
505 let algorithm = Some(auth_scheme.scheme_name());
506 let service = Some(auth_scheme.signing_name());
507 let use_double_uri_encode = Some(!auth_scheme.disable_double_encoding());
508 Some(init_signing_config(
509 auth_scheme.signing_region(),
510 credentials_provider.clone(),
511 algorithm,
512 service,
513 use_double_uri_encode,
514 ))
515 } else {
516 None
517 };
518
519 let hostname = uri.host_name().to_str().unwrap();
520 let path_prefix = uri.path().to_os_string().into_string().unwrap();
521 let port = uri.host_port();
522 let hostname_header = if port > 0 {
523 format!("{hostname}:{port}")
524 } else {
525 hostname.to_string()
526 };
527
528 let mut message = Message::new_request(&self.allocator)?;
529 message.set_request_method(method)?;
530 message.add_header(&Header::new("Host", hostname_header))?;
531 message.add_header(&Header::new("accept", "application/xml"))?;
532 message.add_header(&Header::new("User-Agent", &self.user_agent_header))?;
533
534 if let Some(ref payer) = self.request_payer {
535 message.add_header(&Header::new("x-amz-request-payer", payer))?;
536 }
537
538 if let Some(ref owner) = self.bucket_owner {
539 message.add_header(&Header::new("x-amz-expected-bucket-owner", owner))?;
540 }
541
542 Ok(S3Message {
543 inner: message,
544 uri,
545 path_prefix,
546 checksum_config: None,
547 signing_config,
548 })
549 }
550
551 #[allow(clippy::too_many_arguments)]
558 fn meta_request_with_callbacks<E: std::error::Error + Send + 'static>(
559 &self,
560 mut options: MetaRequestOptions,
561 request_span: Span,
562 on_request_finish: impl Fn(&RequestMetrics) + Send + 'static,
563 mut on_headers: impl FnMut(&Headers, i32) + Send + 'static,
564 mut on_body: impl FnMut(u64, &Buffer) + Send + 'static,
565 parse_meta_request_error: impl FnOnce(&MetaRequestResult) -> Option<E> + Send + 'static,
566 on_meta_request_result: impl FnOnce(ObjectClientResult<(), E, S3RequestError>) + Send + 'static,
567 ) -> Result<CancellingMetaRequest, S3RequestError> {
568 let span_telemetry = request_span.clone();
569 let span_body = request_span.clone();
570 let span_finish = request_span;
571
572 let endpoint = options.get_endpoint().expect("S3Message always has an endpoint");
573 let hostname = endpoint.host_name().to_str().unwrap().to_owned();
574 let host_resolver = self.host_resolver.clone();
575 let telemetry_callback = self.telemetry_callback.clone();
576
577 let start_time = Instant::now();
578 let first_body_part = Arc::new(AtomicBool::new(true));
579 let first_body_part_clone = Arc::clone(&first_body_part);
580 let total_bytes = Arc::new(AtomicU64::new(0));
581 let total_bytes_clone = Arc::clone(&total_bytes);
582
583 options
584 .on_telemetry(move |metrics| {
585 let _guard = span_telemetry.enter();
586
587 on_request_finish(metrics);
588
589 let http_status = metrics.status_code();
590 let request_canceled = metrics.is_canceled();
591 let request_failure = http_status.map(|status| !(200..299).contains(&status)).unwrap_or(!request_canceled);
592 let crt_error = Some(metrics.error()).filter(|e| e.is_err());
593 let operation_name = operation_name_to_static_metrics_string(metrics.operation_name());
594 let request_id = metrics.request_id().unwrap_or_else(|| "<unknown>".into());
595 let duration = metrics.total_duration();
596 let ttfb = metrics.time_to_first_byte();
597 let range = metrics.response_headers().and_then(|headers| extract_range_header(&headers));
598
599 let message = if request_failure {
600 "S3 request failed"
601 } else if request_canceled {
602 "S3 request canceled"
603 } else {
604 "S3 request finished"
605 };
606 debug!(?operation_name, ?crt_error, http_status, ?range, ?duration, ?ttfb, %request_id, "{}", message);
607 trace!(detailed_metrics=?metrics, "S3 request completed");
608
609 if let Some(ttfb) = ttfb {
610 metrics::histogram!(S3_REQUEST_FIRST_BYTE_LATENCY, ATTR_S3_REQUEST => operation_name).record(ttfb.as_micros() as f64);
611 }
612 metrics::histogram!(S3_REQUEST_TOTAL_LATENCY, ATTR_S3_REQUEST => operation_name).record(duration.as_micros() as f64);
613 metrics::counter!(S3_REQUEST_COUNT, ATTR_S3_REQUEST => operation_name).increment(1);
614 if request_failure {
615 metrics::counter!(S3_REQUEST_ERRORS, ATTR_S3_REQUEST => operation_name, ATTR_HTTP_STATUS => http_status.unwrap_or(-1).to_string()).increment(1);
616 } else if request_canceled {
617 metrics::counter!(S3_REQUEST_CANCELED, ATTR_S3_REQUEST => operation_name).increment(1);
618 }
619
620 if let Some(telemetry_callback) = &telemetry_callback {
621 telemetry_callback.on_telemetry(metrics);
622 }
623 })
624 .on_headers(move |headers, response_status| {
625 (on_headers)(headers, response_status);
626 })
627 .on_body(move |range_start, data| {
628 let _guard = span_body.enter();
629
630 if first_body_part.compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst).ok() == Some(true) {
631 let latency = start_time.elapsed().as_micros() as f64;
632 let op = span_body.metadata().map(|m| m.name()).unwrap_or("unknown");
633 metrics::histogram!("s3.meta_requests.first_byte_latency_us", "op" => op).record(latency);
634 }
635 total_bytes.fetch_add(data.len() as u64, Ordering::SeqCst);
636
637 trace!(start = range_start, length = data.len(), "body part received");
638
639 (on_body)(range_start, data);
640 })
641 .on_finish(move |request_result| {
642 let _guard = span_finish.enter();
643
644 let op = span_finish.metadata().map(|m| m.name()).unwrap_or("unknown");
645 let duration = start_time.elapsed();
646
647 metrics::counter!("s3.meta_requests", "op" => op).increment(1);
648 metrics::histogram!("s3.meta_requests.total_latency_us", "op" => op).record(duration.as_micros() as f64);
649 if first_body_part_clone.compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst).ok() == Some(true) {
651 let latency = duration.as_micros() as f64;
652 metrics::histogram!("s3.meta_requests.first_byte_latency_us", "op" => op).record(latency);
653 }
654 let total_bytes = total_bytes_clone.load(Ordering::SeqCst);
655 if op == "get_object" {
658 emit_throughput_metric(total_bytes, duration, op);
659 }
660 let hostname_awsstring = AwsString::from_str(&hostname, &Allocator::default());
661 if let Ok(host_count) = host_resolver.get_host_address_count(&hostname_awsstring, AddressKinds::a()) {
662 metrics::gauge!("s3.client.host_count", "host" => hostname).set(host_count as f64);
663 }
664
665 let status_code = request_result.response_status;
666 let log_level = if (200..=399).contains(&status_code) || status_code == 404 || request_result.is_canceled() {
667 tracing::Level::DEBUG
668 } else {
669 tracing::Level::WARN
670 };
671
672 let result =
673 if !request_result.is_err() {
674 event!(log_level, ?duration, "meta request finished");
675 Ok(())
676 } else {
677 let error = parse_meta_request_error(&request_result).map(ObjectClientError::ServiceError);
680 let maybe_err = error.or_else(|| try_parse_generic_error(&request_result).map(ObjectClientError::ClientError));
681
682 let request_id = match &request_result.error_response_headers {
686 Some(headers) => headers.get("x-amz-request-id").map(|s| s.value().to_string_lossy().to_string()).ok(),
687 None => None,
688 };
689 let request_id = request_id.unwrap_or_else(|| "<unknown>".into());
690
691 let message = if request_result.is_canceled() {
692 "meta request canceled"
693 } else {
694 "meta request failed"
695 };
696 if let Some(error) = &maybe_err {
697 event!(log_level, ?duration, %request_id, ?error, message);
698 debug!("meta request result: {:?}", request_result);
699 } else {
700 event!(log_level, ?duration, %request_id, ?request_result, message);
701 }
702
703 if request_result.is_canceled() {
704 metrics::counter!("s3.meta_requests.canceled", "op" => op).increment(1);
705 } else {
706 let error_status = if request_result.response_status >= 100 {
708 request_result.response_status
709 } else {
710 -request_result.crt_error.raw_error()
711 };
712 metrics::counter!("s3.meta_requests.failures", "op" => op, "status" => format!("{error_status}")).increment(1);
713 }
714
715 Err(maybe_err.unwrap_or_else(|| ObjectClientError::ClientError(S3RequestError::ResponseError(request_result))))
717 };
718
719 on_meta_request_result(result);
720 });
721
722 let meta_request = self.s3_client.make_meta_request(options)?;
724 Self::poll_client_metrics(&self.s3_client);
725 Ok(CancellingMetaRequest::wrap(meta_request))
726 }
727
728 fn meta_request_with_body_payload<E: std::error::Error + Send + 'static>(
735 &self,
736 options: MetaRequestOptions,
737 request_span: Span,
738 parse_meta_request_error: impl FnOnce(&MetaRequestResult) -> Option<E> + Send + 'static,
739 ) -> Result<S3MetaRequest<Vec<u8>, E>, S3RequestError> {
740 let (tx, rx) = oneshot::channel::<ObjectClientResult<Vec<u8>, E, S3RequestError>>();
741
742 let body: Arc<Mutex<Vec<u8>>> = Default::default();
744 let body_clone = Arc::clone(&body);
745
746 let meta_request = self.meta_request_with_callbacks(
747 options,
748 request_span,
749 |_| {},
750 |_, _| {},
751 move |offset, data| {
752 let mut body = body_clone.lock().unwrap();
753 assert_eq!(offset as usize, body.len());
754 body.extend_from_slice(data);
755 },
756 parse_meta_request_error,
757 move |result| _ = tx.send(result.map(|_| std::mem::take(&mut *body.lock().unwrap()))),
758 )?;
759 Ok(S3MetaRequest {
760 receiver: rx.fuse(),
761 meta_request,
762 })
763 }
764
765 fn meta_request_with_headers_payload<E: std::error::Error + Send + 'static>(
772 &self,
773 options: MetaRequestOptions,
774 request_span: Span,
775 parse_meta_request_error: impl FnOnce(&MetaRequestResult) -> Option<E> + Send + 'static,
776 ) -> Result<S3MetaRequest<Headers, E>, S3RequestError> {
777 let (tx, rx) = oneshot::channel::<ObjectClientResult<Headers, E, S3RequestError>>();
778
779 let on_headers: Arc<Mutex<Option<Headers>>> = Default::default();
782 let on_result = on_headers.clone();
783
784 let meta_request = self.meta_request_with_callbacks(
785 options,
786 request_span,
787 |_| {},
788 move |headers, status| {
789 if (200..300).contains(&status) {
792 *on_headers.lock().unwrap() = Some(headers.clone());
793 }
794 },
795 |_, _| {},
796 parse_meta_request_error,
797 move |result| {
798 let headers =
800 result.and_then(|_| {
801 on_result.lock().unwrap().take().ok_or_else(|| {
802 S3RequestError::internal_failure(ResponseHeadersError::MissingHeaders).into()
803 })
804 });
805 _ = tx.send(headers);
806 },
807 )?;
808 Ok(S3MetaRequest {
809 receiver: rx.fuse(),
810 meta_request,
811 })
812 }
813
814 fn meta_request_without_payload<E: std::error::Error + Send + 'static>(
820 &self,
821 options: MetaRequestOptions,
822 request_span: Span,
823 parse_meta_request_error: impl FnOnce(&MetaRequestResult) -> Option<E> + Send + 'static,
824 ) -> Result<S3MetaRequest<(), E>, S3RequestError> {
825 let (tx, rx) = oneshot::channel::<ObjectClientResult<(), E, S3RequestError>>();
826
827 let meta_request = self.meta_request_with_callbacks(
828 options,
829 request_span,
830 |_| {},
831 |_, _| {},
832 |_, _| {},
833 parse_meta_request_error,
834 move |result| _ = tx.send(result),
835 )?;
836 Ok(S3MetaRequest {
837 receiver: rx.fuse(),
838 meta_request,
839 })
840 }
841
842 fn poll_client_metrics(s3_client: &Client) {
843 let metrics = s3_client.poll_client_metrics();
844 metrics::gauge!("s3.client.num_requests_being_processed").set(metrics.num_requests_tracked_requests as f64);
845 metrics::gauge!("s3.client.num_requests_being_prepared").set(metrics.num_requests_being_prepared as f64);
846 metrics::gauge!("s3.client.request_queue_size").set(metrics.request_queue_size as f64);
847 metrics::gauge!("s3.client.num_auto_default_network_io").set(metrics.num_auto_default_network_io as f64);
848 metrics::gauge!("s3.client.num_auto_ranged_get_network_io").set(metrics.num_auto_ranged_get_network_io as f64);
849 metrics::gauge!("s3.client.num_auto_ranged_put_network_io").set(metrics.num_auto_ranged_put_network_io as f64);
850 metrics::gauge!("s3.client.num_auto_ranged_copy_network_io")
851 .set(metrics.num_auto_ranged_copy_network_io as f64);
852 metrics::gauge!("s3.client.num_total_network_io").set(metrics.num_total_network_io() as f64);
853 metrics::gauge!("s3.client.num_requests_stream_queued_waiting")
854 .set(metrics.num_requests_stream_queued_waiting as f64);
855 metrics::gauge!("s3.client.num_requests_streaming_response")
856 .set(metrics.num_requests_streaming_response as f64);
857
858 let start = Instant::now();
860 if let Some(buffer_pool_stats) = s3_client.poll_default_buffer_pool_usage_stats() {
861 metrics::histogram!("s3.client.buffer_pool.get_usage_latency_us")
862 .record(start.elapsed().as_micros() as f64);
863 metrics::gauge!("s3.client.buffer_pool.mem_limit").set(buffer_pool_stats.mem_limit as f64);
864 metrics::gauge!("s3.client.buffer_pool.primary_cutoff").set(buffer_pool_stats.primary_cutoff as f64);
865 metrics::gauge!("s3.client.buffer_pool.primary_used").set(buffer_pool_stats.primary_used as f64);
866 metrics::gauge!("s3.client.buffer_pool.primary_allocated").set(buffer_pool_stats.primary_allocated as f64);
867 metrics::gauge!("s3.client.buffer_pool.primary_reserved").set(buffer_pool_stats.primary_reserved as f64);
868 metrics::gauge!("s3.client.buffer_pool.primary_num_blocks")
869 .set(buffer_pool_stats.primary_num_blocks as f64);
870 metrics::gauge!("s3.client.buffer_pool.secondary_reserved")
871 .set(buffer_pool_stats.secondary_reserved as f64);
872 metrics::gauge!("s3.client.buffer_pool.secondary_used").set(buffer_pool_stats.secondary_used as f64);
873 metrics::gauge!("s3.client.buffer_pool.forced_used").set(buffer_pool_stats.forced_used as f64);
874 }
875 }
876
877 fn next_request_counter(&self) -> u64 {
878 self.next_request_counter.fetch_add(1, Ordering::SeqCst)
879 }
880}
881
882#[derive(Debug, Error)]
884enum ResponseHeadersError {
885 #[error("response headers are missing")]
886 MissingHeaders,
887}
888
889#[derive(Debug, Clone, Copy)]
891enum S3Operation {
892 DeleteObject,
893 GetObject,
894 GetObjectAttributes,
895 HeadBucket,
896 HeadObject,
897 ListObjects,
898 PutObject,
899 CopyObject,
900 PutObjectSingle,
901}
902
903impl S3Operation {
904 fn meta_request_type(&self) -> MetaRequestType {
906 match self {
907 S3Operation::GetObject => MetaRequestType::GetObject,
908 S3Operation::PutObject => MetaRequestType::PutObject,
909 S3Operation::CopyObject => MetaRequestType::CopyObject,
910 _ => MetaRequestType::Default,
911 }
912 }
913
914 fn operation_name(&self) -> Option<&'static str> {
917 match self {
918 S3Operation::DeleteObject => Some("DeleteObject"),
919 S3Operation::GetObject => None,
920 S3Operation::GetObjectAttributes => Some("GetObjectAttributes"),
921 S3Operation::HeadBucket => Some("HeadBucket"),
922 S3Operation::HeadObject => Some("HeadObject"),
923 S3Operation::ListObjects => Some("ListObjectsV2"),
924 S3Operation::PutObject => None,
925 S3Operation::CopyObject => None,
926 S3Operation::PutObjectSingle => Some("PutObject"),
927 }
928 }
929}
930
931#[derive(Debug)]
936struct S3Message<'a> {
937 inner: Message<'a>,
938 uri: Uri,
939 path_prefix: String,
940 checksum_config: Option<ChecksumConfig>,
941 signing_config: Option<SigningConfig>,
942}
943
944const URLENCODE_QUERY_FRAGMENT: &AsciiSet = &NON_ALPHANUMERIC.remove(b'-').remove(b'.').remove(b'_').remove(b'~');
946const URLENCODE_PATH_FRAGMENT: &AsciiSet = &URLENCODE_QUERY_FRAGMENT.remove(b'/');
947
948fn write_encoded_fragment(s: &mut OsString, piece: impl AsRef<OsStr>, encoding: &'static AsciiSet) {
949 let iter = percent_encode(piece.as_ref().as_bytes(), encoding);
950 s.extend(iter.map(|s| OsStr::from_bytes(s.as_bytes())));
951}
952
953#[derive(Debug, Default)]
954enum QueryFragment<'a, P: AsRef<OsStr>> {
955 #[default]
956 Empty,
957 Query(&'a [(P, P)]),
958 Action(P),
959}
960
961impl<P: AsRef<OsStr>> QueryFragment<'_, P> {
962 fn size(&self) -> usize {
966 match self {
967 QueryFragment::Empty => 0,
968 QueryFragment::Query(query) => query
969 .iter()
970 .map(|(key, value)| key.as_ref().len() + value.as_ref().len() + 2)
971 .sum::<usize>(),
972 QueryFragment::Action(action) => 1 + action.as_ref().len(),
973 }
974 }
975
976 fn write(&self, path: &mut OsString) {
978 match &self {
979 QueryFragment::Query(query) if !query.is_empty() => {
980 path.push("?");
981 for (i, (key, value)) in query.iter().enumerate() {
982 if i != 0 {
983 path.push("&");
984 }
985 write_encoded_fragment(path, key, URLENCODE_QUERY_FRAGMENT);
986 path.push("=");
987 write_encoded_fragment(path, value, URLENCODE_QUERY_FRAGMENT);
988 }
989 }
990 QueryFragment::Action(action) => {
991 path.push("?");
992 path.push(action.as_ref());
993 }
994 _ => {}
995 }
996 }
997}
998
999impl<'a> S3Message<'a> {
1000 fn set_header(
1003 &mut self,
1004 header: &Header<impl AsRef<OsStr>, impl AsRef<OsStr>>,
1005 ) -> Result<(), mountpoint_s3_crt::common::error::Error> {
1006 self.inner.set_header(header)
1007 }
1008
1009 fn set_request_path_and_query<P: AsRef<OsStr>>(
1012 &mut self,
1013 path: impl AsRef<OsStr>,
1014 query_or_action: QueryFragment<P>,
1015 ) -> Result<(), mountpoint_s3_crt::common::error::Error> {
1016 let space_needed = self.path_prefix.len() + query_or_action.size();
1017
1018 let mut full_path = OsString::with_capacity(space_needed);
1019
1020 write_encoded_fragment(&mut full_path, &self.path_prefix, URLENCODE_PATH_FRAGMENT);
1021 write_encoded_fragment(&mut full_path, &path, URLENCODE_PATH_FRAGMENT);
1022
1023 query_or_action.write(&mut full_path);
1025 self.inner.set_request_path(full_path)
1026 }
1027
1028 fn set_request_path(&mut self, path: impl AsRef<OsStr>) -> Result<(), mountpoint_s3_crt::common::error::Error> {
1031 self.set_request_path_and_query::<&str>(path, Default::default())
1032 }
1033
1034 fn set_checksum_config(&mut self, checksum_config: Option<ChecksumConfig>) {
1036 self.checksum_config = checksum_config;
1037 }
1038
1039 fn set_body_stream(&mut self, input_stream: Option<InputStream<'a>>) -> Option<InputStream<'a>> {
1042 self.inner.set_body_stream(input_stream)
1043 }
1044
1045 fn set_content_length_header(
1047 &mut self,
1048 content_length: usize,
1049 ) -> Result<(), mountpoint_s3_crt::common::error::Error> {
1050 self.inner
1051 .set_header(&Header::new("Content-Length", content_length.to_string()))
1052 }
1053
1054 fn set_checksum_header(
1056 &mut self,
1057 checksum: &UploadChecksum,
1058 ) -> Result<(), mountpoint_s3_crt::common::error::Error> {
1059 let header = match checksum {
1060 UploadChecksum::Crc64nvme(crc64) => Header::new("x-amz-checksum-crc64nvme", crc64nvme_to_base64(crc64)),
1061 UploadChecksum::Crc32c(crc32c) => Header::new("x-amz-checksum-crc32c", crc32c_to_base64(crc32c)),
1062 UploadChecksum::Crc32(crc32) => Header::new("x-amz-checksum-crc32", crc32_to_base64(crc32)),
1063 UploadChecksum::Sha1(sha1) => Header::new("x-amz-checksum-sha1", sha1_to_base64(sha1)),
1064 UploadChecksum::Sha256(sha256) => Header::new("x-amz-checksum-sha256", sha256_to_base64(sha256)),
1065 };
1066 self.inner.set_header(&header)
1067 }
1068
1069 fn into_options(self, operation: S3Operation) -> MetaRequestOptions<'a> {
1070 let mut options = MetaRequestOptions::new();
1071 if let Some(checksum_config) = self.checksum_config {
1072 options.checksum_config(checksum_config);
1073 }
1074 if let Some(signing_config) = self.signing_config {
1075 options.signing_config(signing_config);
1076 }
1077 options
1078 .message(self.inner)
1079 .endpoint(self.uri)
1080 .request_type(operation.meta_request_type());
1081 if let Some(operation_name) = operation.operation_name() {
1082 options.operation_name(operation_name);
1083 }
1084 options
1085 }
1086}
1087
1088#[derive(Debug)]
1089#[pin_project]
1090#[must_use]
1091struct S3MetaRequest<T, E> {
1092 #[pin]
1094 receiver: Fuse<oneshot::Receiver<ObjectClientResult<T, E, S3RequestError>>>,
1095 meta_request: CancellingMetaRequest,
1096}
1097
1098impl<T: Send, E: Send> Future for S3MetaRequest<T, E> {
1099 type Output = ObjectClientResult<T, E, S3RequestError>;
1100
1101 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1102 let this = self.project();
1103 this.receiver
1104 .poll(cx)
1105 .map(|result| result.unwrap_or_else(|err| Err(S3RequestError::internal_failure(err).into())))
1106 }
1107}
1108
1109impl<T: Send, E: Send> FusedFuture for S3MetaRequest<T, E> {
1110 fn is_terminated(&self) -> bool {
1111 self.receiver.is_terminated()
1112 }
1113}
1114
1115#[derive(Debug)]
1119struct CancellingMetaRequest {
1120 inner: MetaRequest,
1121}
1122
1123impl CancellingMetaRequest {
1124 fn wrap(meta_request: MetaRequest) -> Self {
1125 Self { inner: meta_request }
1126 }
1127}
1128
1129impl Drop for CancellingMetaRequest {
1130 fn drop(&mut self) {
1131 self.inner.cancel();
1132 }
1133}
1134
1135impl Deref for CancellingMetaRequest {
1136 type Target = MetaRequest;
1137
1138 fn deref(&self) -> &Self::Target {
1139 &self.inner
1140 }
1141}
1142
1143impl DerefMut for CancellingMetaRequest {
1144 fn deref_mut(&mut self) -> &mut Self::Target {
1145 &mut self.inner
1146 }
1147}
1148
1149#[derive(Error, Debug)]
1151#[non_exhaustive]
1152pub enum NewClientError {
1153 #[error("invalid S3 endpoint")]
1155 InvalidEndpoint(#[from] EndpointError),
1156 #[error("invalid AWS credentials")]
1158 ProviderFailure(#[source] mountpoint_s3_crt::common::error::Error),
1159 #[error("invalid configuration: {0}")]
1161 InvalidConfiguration(String),
1162 #[error("Unknown CRT error")]
1164 CrtError(#[source] mountpoint_s3_crt::common::error::Error),
1165}
1166
1167#[derive(Error, Debug)]
1169pub enum S3RequestError {
1170 #[error("Internal S3 client error")]
1172 InternalError(#[source] Box<dyn std::error::Error + Send + Sync>),
1173
1174 #[error("Unknown CRT error")]
1176 CrtError(#[from] mountpoint_s3_crt::common::error::Error),
1177
1178 #[error("Failed to construct request")]
1180 ConstructionFailure(#[from] ConstructionError),
1181
1182 #[error("Unknown response error: {0:?}")]
1184 ResponseError(MetaRequestResult),
1185
1186 #[error("Wrong region (expecting {0})")]
1188 IncorrectRegion(String, ClientErrorMetadata),
1189
1190 #[error("Forbidden: {0}")]
1192 Forbidden(String, ClientErrorMetadata),
1193
1194 #[error("No signing credentials available, see CRT debug logs")]
1196 NoSigningCredentials,
1197
1198 #[error("Failed to create S3 Express session, see CRT debug logs")]
1200 CreateSessionError,
1201
1202 #[error("Request canceled")]
1204 RequestCanceled,
1205
1206 #[error("Request throttled")]
1208 Throttled,
1209
1210 #[error("Polled for data with empty read window")]
1214 EmptyReadWindow,
1215}
1216
1217impl S3RequestError {
1218 fn construction_failure(inner: impl Into<ConstructionError>) -> Self {
1219 S3RequestError::ConstructionFailure(inner.into())
1220 }
1221
1222 fn internal_failure(inner: impl std::error::Error + Send + Sync + 'static) -> Self {
1223 S3RequestError::InternalError(Box::new(inner))
1224 }
1225}
1226
1227impl ProvideErrorMetadata for S3RequestError {
1228 fn meta(&self) -> ClientErrorMetadata {
1229 match self {
1230 Self::ResponseError(request_result) => ClientErrorMetadata::from_meta_request_result(request_result),
1231 Self::Forbidden(_, metadata) => metadata.clone(),
1232 Self::Throttled => ClientErrorMetadata {
1233 http_code: Some(503),
1234 error_code: Some("SlowDown".to_string()),
1235 error_message: Some("Please reduce your request rate.".to_string()),
1236 },
1237 Self::IncorrectRegion(_, metadata) => metadata.clone(),
1238 _ => Default::default(),
1239 }
1240 }
1241}
1242
1243#[derive(Error, Debug)]
1244pub enum ConstructionError {
1245 #[error("Unknown CRT error")]
1247 CrtError(#[from] mountpoint_s3_crt::common::error::Error),
1248
1249 #[error("Invalid S3 endpoint")]
1251 InvalidEndpoint(#[from] EndpointError),
1252}
1253
1254fn operation_name_to_static_metrics_string(operation_name: Option<&str>) -> &'static str {
1256 const UNKNOWN_METRIC_STR: &str = "Unknown";
1257
1258 let Some(operation_name) = operation_name else {
1259 return UNKNOWN_METRIC_STR;
1260 };
1261
1262 macro_rules! map_to_static_str_for_known_str {
1265 ($input:expr, $($str_literal:literal),* $(,)?) => {
1266 match $input {
1267 $(
1268 $str_literal => Some($str_literal),
1269 )*
1270 _ => None
1271 }
1272 };
1273 }
1274
1275 let static_str = map_to_static_str_for_known_str!(
1276 operation_name,
1277 "GetObject",
1278 "HeadObject",
1279 "ListParts",
1280 "CreateMultipartUpload",
1281 "UploadPart",
1282 "AbortMultipartUpload",
1283 "CompleteMultipartUpload",
1284 "UploadPartCopy",
1285 "CopyObject",
1286 "PutObject",
1287 "ListObjectsV2",
1288 "DeleteObject",
1289 "GetObjectAttributes",
1290 "HeadBucket",
1291 "RenameObject",
1292 );
1293
1294 debug_assert!(
1295 static_str.is_some(),
1296 "input set as {operation_name:?} but no matcher, update required",
1297 );
1298 static_str.unwrap_or(UNKNOWN_METRIC_STR)
1299}
1300
1301fn extract_range_header(headers: &Headers) -> Option<Range<u64>> {
1303 let header = headers.get("Content-Range").ok()?;
1304 let value = header.value().to_str()?;
1305
1306 if !value.starts_with("bytes ") {
1309 return None;
1310 }
1311 let (_, value) = value.split_at("bytes ".len());
1312 let (range, _) = value.split_once('/')?;
1313 let (start, end) = range.split_once('-')?;
1314 let start = start.parse::<u64>().ok()?;
1315 let end = end.parse::<u64>().ok()?;
1316
1317 Some(start..end + 1)
1319}
1320
1321fn parse_checksum(headers: &Headers) -> Result<Checksum, HeadersError> {
1323 let checksum_crc32 = headers.get_as_optional_string("x-amz-checksum-crc32")?;
1324 let checksum_crc32c = headers.get_as_optional_string("x-amz-checksum-crc32c")?;
1325 let checksum_sha1 = headers.get_as_optional_string("x-amz-checksum-sha1")?;
1326 let checksum_sha256 = headers.get_as_optional_string("x-amz-checksum-sha256")?;
1327 let checksum_crc64nvme = headers.get_as_optional_string("x-amz-checksum-crc64nvme")?;
1328
1329 Ok(Checksum {
1330 checksum_crc64nvme,
1331 checksum_crc32,
1332 checksum_crc32c,
1333 checksum_sha1,
1334 checksum_sha256,
1335 })
1336}
1337
1338fn try_parse_generic_error(request_result: &MetaRequestResult) -> Option<S3RequestError> {
1340 fn try_parse_redirect(request_result: &MetaRequestResult) -> Option<S3RequestError> {
1342 let headers = request_result.error_response_headers.as_ref()?;
1343 let region_header = headers.get("x-amz-bucket-region").ok()?;
1344 let region = region_header.value().to_owned().into_string().ok()?;
1345 Some(S3RequestError::IncorrectRegion(
1346 region,
1347 ClientErrorMetadata::from_meta_request_result(request_result),
1348 ))
1349 }
1350
1351 fn try_parse_forbidden(request_result: &MetaRequestResult) -> Option<S3RequestError> {
1353 let Some(body) = request_result.error_response_body.as_ref() else {
1354 return Some(S3RequestError::Forbidden(
1357 "<no message>".to_owned(),
1358 ClientErrorMetadata {
1359 http_code: Some(request_result.response_status),
1360 ..Default::default()
1361 },
1362 ));
1363 };
1364 let error_elem = xmltree::Element::parse(body.as_bytes()).ok()?;
1365 let error_code = error_elem.get_child("Code")?;
1366 let error_code_str = error_code.get_text()?;
1367 if request_result.response_status == 403
1370 || matches!(
1371 error_code_str.deref(),
1372 "AccessDenied" | "InvalidToken" | "ExpiredToken" | "SignatureDoesNotMatch"
1373 )
1374 {
1375 let message = error_elem
1376 .get_child("Message")
1377 .and_then(|e| e.get_text())
1378 .unwrap_or(error_code_str.clone());
1379 Some(S3RequestError::Forbidden(
1380 message.to_string(),
1381 ClientErrorMetadata {
1382 http_code: Some(request_result.response_status),
1383 error_code: Some(error_code_str.to_string()),
1384 error_message: Some(message.into_owned()),
1385 },
1386 ))
1387 } else {
1388 None
1389 }
1390 }
1391
1392 fn try_parse_no_credentials_or_generic(request_result: &MetaRequestResult) -> S3RequestError {
1394 let crt_error_code = request_result.crt_error.raw_error();
1395 if crt_error_code == mountpoint_s3_crt::auth::ErrorCode::AWS_AUTH_SIGNING_NO_CREDENTIALS as i32 {
1396 S3RequestError::NoSigningCredentials
1397 } else if crt_error_code == mountpoint_s3_crt::s3::ErrorCode::AWS_ERROR_S3EXPRESS_CREATE_SESSION_FAILED as i32 {
1398 S3RequestError::CreateSessionError
1399 } else {
1400 S3RequestError::CrtError(crt_error_code.into())
1401 }
1402 }
1403
1404 fn try_parse_throttled(request_result: &MetaRequestResult) -> Option<S3RequestError> {
1405 let crt_error_code = request_result.crt_error.raw_error();
1406 if crt_error_code == mountpoint_s3_crt::s3::ErrorCode::AWS_ERROR_S3_SLOW_DOWN as i32 {
1407 Some(S3RequestError::Throttled)
1408 } else {
1409 None
1410 }
1411 }
1412
1413 fn try_parse_canceled_request(request_result: &MetaRequestResult) -> Option<S3RequestError> {
1415 request_result.is_canceled().then_some(S3RequestError::RequestCanceled)
1416 }
1417
1418 match request_result.response_status {
1419 301 => try_parse_redirect(request_result),
1420 400 => try_parse_forbidden(request_result).or_else(|| try_parse_redirect(request_result)),
1423 403 => try_parse_forbidden(request_result),
1424 0 => try_parse_throttled(request_result)
1426 .or_else(|| try_parse_canceled_request(request_result))
1427 .or_else(|| Some(try_parse_no_credentials_or_generic(request_result))),
1428 _ => None,
1429 }
1430}
1431
1432fn emit_throughput_metric(bytes: u64, duration: Duration, op: &'static str) {
1435 let throughput_mbps = bytes as f64 / 1024.0 / 1024.0 / duration.as_secs_f64();
1436 const MEGABYTE: u64 = 1024 * 1024;
1438 let bucket = if bytes < MEGABYTE {
1439 "<1MiB"
1440 } else if bytes <= 16 * MEGABYTE {
1441 "1-16MiB"
1442 } else {
1443 ">16MiB"
1444 };
1445 metrics::histogram!("s3.meta_requests.throughput_mibs", "op" => op, "size" => bucket).record(throughput_mbps);
1446}
1447
1448#[cfg_attr(not(docsrs), async_trait)]
1449impl ObjectClient for S3CrtClient {
1450 type GetObjectResponse = S3GetObjectResponse;
1451 type PutObjectRequest = S3PutObjectRequest;
1452 type ClientError = S3RequestError;
1453
1454 fn read_part_size(&self) -> usize {
1455 self.inner.read_part_size
1456 }
1457
1458 fn write_part_size(&self) -> usize {
1459 self.inner.write_part_size
1464 }
1465
1466 fn initial_read_window_size(&self) -> Option<usize> {
1467 if self.inner.enable_backpressure {
1468 Some(self.inner.initial_read_window_size)
1469 } else {
1470 None
1471 }
1472 }
1473
1474 fn mem_usage_stats(&self) -> Option<BufferPoolUsageStats> {
1475 let start = Instant::now();
1476 self.inner
1477 .s3_client
1478 .poll_default_buffer_pool_usage_stats()
1479 .inspect(|_| {
1480 metrics::histogram!("s3.client.buffer_pool.get_usage_latency_us")
1481 .record(start.elapsed().as_micros() as f64);
1482 })
1483 }
1484
1485 async fn delete_object(
1486 &self,
1487 bucket: &str,
1488 key: &str,
1489 ) -> ObjectClientResult<DeleteObjectResult, DeleteObjectError, Self::ClientError> {
1490 self.delete_object(bucket, key).await
1491 }
1492
1493 async fn copy_object(
1494 &self,
1495 source_bucket: &str,
1496 source_key: &str,
1497 destination_bucket: &str,
1498 destination_key: &str,
1499 params: &CopyObjectParams,
1500 ) -> ObjectClientResult<CopyObjectResult, CopyObjectError, S3RequestError> {
1501 self.copy_object(source_bucket, source_key, destination_bucket, destination_key, params)
1502 .await
1503 }
1504
1505 async fn get_object(
1506 &self,
1507 bucket: &str,
1508 key: &str,
1509 params: &GetObjectParams,
1510 ) -> ObjectClientResult<Self::GetObjectResponse, GetObjectError, Self::ClientError> {
1511 self.get_object(bucket, key, params).await
1512 }
1513
1514 async fn list_objects(
1515 &self,
1516 bucket: &str,
1517 continuation_token: Option<&str>,
1518 delimiter: &str,
1519 max_keys: usize,
1520 prefix: &str,
1521 ) -> ObjectClientResult<ListObjectsResult, ListObjectsError, Self::ClientError> {
1522 self.list_objects(bucket, continuation_token, delimiter, max_keys, prefix)
1523 .await
1524 }
1525
1526 async fn head_object(
1527 &self,
1528 bucket: &str,
1529 key: &str,
1530 params: &HeadObjectParams,
1531 ) -> ObjectClientResult<HeadObjectResult, HeadObjectError, Self::ClientError> {
1532 self.head_object(bucket, key, params).await
1533 }
1534
1535 async fn put_object(
1536 &self,
1537 bucket: &str,
1538 key: &str,
1539 params: &PutObjectParams,
1540 ) -> ObjectClientResult<Self::PutObjectRequest, PutObjectError, Self::ClientError> {
1541 self.put_object(bucket, key, params).await
1542 }
1543
1544 async fn put_object_single<'a>(
1545 &self,
1546 bucket: &str,
1547 key: &str,
1548 params: &PutObjectSingleParams,
1549 contents: impl AsRef<[u8]> + Send + 'a,
1550 ) -> ObjectClientResult<PutObjectResult, PutObjectError, Self::ClientError> {
1551 self.put_object_single(bucket, key, params, contents).await
1552 }
1553
1554 async fn get_object_attributes(
1555 &self,
1556 bucket: &str,
1557 key: &str,
1558 max_parts: Option<usize>,
1559 part_number_marker: Option<usize>,
1560 object_attributes: &[ObjectAttribute],
1561 ) -> ObjectClientResult<GetObjectAttributesResult, GetObjectAttributesError, Self::ClientError> {
1562 self.get_object_attributes(bucket, key, max_parts, part_number_marker, object_attributes)
1563 .await
1564 }
1565
1566 async fn rename_object(
1567 &self,
1568 bucket: &str,
1569 src_key: &str,
1570 dst_key: &str,
1571 params: &RenameObjectParams,
1572 ) -> ObjectClientResult<RenameObjectResult, RenameObjectError, Self::ClientError> {
1573 self.rename_object(bucket, src_key, dst_key, params).await
1574 }
1575}
1576
1577pub trait OnTelemetry: std::fmt::Debug + Send + Sync {
1579 fn on_telemetry(&self, request_metrics: &RequestMetrics);
1580}
1581
1582#[cfg(test)]
1583mod tests {
1584 use mountpoint_s3_crt::common::error::Error;
1585 use rusty_fork::rusty_fork_test;
1586 use std::assert_eq;
1587
1588 use super::*;
1589 use test_case::test_case;
1590
1591 fn client_new_fails_with_invalid_part_size(part_size: usize) {
1593 let config = S3ClientConfig::default().part_size(part_size);
1594 let e = S3CrtClient::new(config).expect_err("creating a new client should fail");
1595 let message = if cfg!(target_pointer_width = "64") {
1596 "invalid configuration: part size must be at between 5MiB and 5GiB".to_string()
1597 } else {
1598 format!(
1599 "invalid configuration: part size must be at between 5MiB and {}GiB",
1600 usize::MAX / 1024 / 1024 / 1024
1601 )
1602 };
1603 assert_eq!(e.to_string(), message);
1604 }
1605
1606 #[cfg(target_pointer_width = "64")]
1609 #[test]
1610 fn client_new_fails_with_greater_part_size() {
1611 let part_size = 6 * 1024 * 1024 * 1024; client_new_fails_with_invalid_part_size(part_size);
1613 }
1614
1615 #[test]
1616 fn client_new_fails_with_smaller_part_size() {
1617 let part_size = 4 * 1024 * 1024; client_new_fails_with_invalid_part_size(part_size);
1619 }
1620
1621 #[test]
1623 fn test_user_agent_with_prefix() {
1624 let user_agent_prefix = String::from("someprefix");
1625 let expected_user_agent = "someprefix mountpoint-s3-client/";
1626
1627 let config = S3ClientConfig {
1628 user_agent: Some(UserAgent::new(Some(user_agent_prefix))),
1629 ..Default::default()
1630 };
1631
1632 let client = S3CrtClient::new(config).expect("Create test client");
1633
1634 let mut message = client
1635 .inner
1636 .new_request_template("GET", "amzn-s3-demo-bucket")
1637 .expect("new request template expected");
1638
1639 let headers = message.inner.get_headers().expect("Expected a block of HTTP headers");
1640
1641 let user_agent_header = headers
1642 .get("User-Agent")
1643 .expect("User Agent Header expected with given prefix");
1644 let user_agent_header_value = user_agent_header.value();
1645
1646 assert!(
1647 user_agent_header_value
1648 .to_string_lossy()
1649 .starts_with(expected_user_agent)
1650 );
1651 }
1652
1653 fn assert_expected_host(expected_host: &str, endpoint_config: EndpointConfig) {
1654 let config = S3ClientConfig {
1655 endpoint_config,
1656 ..Default::default()
1657 };
1658
1659 let client = S3CrtClient::new(config).expect("create test client");
1660
1661 let mut message = client
1662 .inner
1663 .new_request_template("GET", "")
1664 .expect("new request template expected");
1665
1666 let headers = message.inner.get_headers().expect("expected a block of HTTP headers");
1667
1668 let host_header = headers.get("Host").expect("Host header expected");
1669 let host_header_value = host_header.value();
1670
1671 assert_eq!(host_header_value.to_string_lossy(), expected_host);
1672 }
1673
1674 rusty_fork_test! {
1676 #[test]
1677 fn test_endpoint_favors_parameter_over_env_variable() {
1678 let endpoint_uri = Uri::new_from_str(&Allocator::default(), "https://s3.us-west-2.amazonaws.com").unwrap();
1679 let endpoint_config = EndpointConfig::new("region-place-holder").endpoint(endpoint_uri);
1680
1681 unsafe { std::env::set_var("AWS_ENDPOINT_URL", "https://s3.us-east-1.amazonaws.com"); }
1683
1684 assert_expected_host("s3.us-west-2.amazonaws.com", endpoint_config);
1686 }
1687
1688 #[test]
1689 fn test_endpoint_favors_env_variable() {
1690 let endpoint_config = EndpointConfig::new("us-east-1");
1691
1692 unsafe { std::env::set_var("AWS_ENDPOINT_URL", "https://s3.eu-west-1.amazonaws.com"); }
1694
1695 assert_expected_host("s3.eu-west-1.amazonaws.com", endpoint_config);
1696 }
1697
1698 #[test]
1699 fn test_endpoint_with_invalid_env_variable() {
1700 let endpoint_config = EndpointConfig::new("us-east-1");
1701
1702 unsafe { std::env::set_var("AWS_ENDPOINT_URL", "htp:/bad:url"); }
1704
1705 let config = S3ClientConfig {
1706 endpoint_config,
1707 ..Default::default()
1708 };
1709
1710 let client = S3CrtClient::new(config);
1711 match client {
1712 Ok(_) => panic!("expected an error"),
1713 Err(e) => assert_eq!(e.to_string().to_lowercase(), "invalid s3 endpoint"),
1714 }
1715 }
1716
1717 }
1718
1719 #[test]
1721 fn test_user_agent_without_prefix() {
1722 let expected_user_agent = "mountpoint-s3-client/";
1723
1724 let config: S3ClientConfig = Default::default();
1725
1726 let client = S3CrtClient::new(config).expect("Create test client");
1727
1728 let mut message = client
1729 .inner
1730 .new_request_template("GET", "amzn-s3-demo-bucket")
1731 .expect("new request template expected");
1732
1733 let headers = message.inner.get_headers().expect("Expected a block of HTTP headers");
1734
1735 let user_agent_header = headers
1736 .get("User-Agent")
1737 .expect("User Agent Header expected with given prefix");
1738 let user_agent_header_value = user_agent_header.value();
1739
1740 assert!(
1741 user_agent_header_value
1742 .to_string_lossy()
1743 .starts_with(expected_user_agent)
1744 );
1745 }
1746
1747 #[test_case("bytes 200-1000/67589" => Some(200..1001))]
1748 #[test_case("bytes 200-1000/*" => Some(200..1001))]
1749 #[test_case("bytes 200-1000" => None)]
1750 #[test_case("bytes */67589" => None)]
1751 #[test_case("octets 200-1000]" => None)]
1752 fn parse_content_range(range: &str) -> Option<Range<u64>> {
1753 let mut headers = Headers::new(&Allocator::default()).unwrap();
1754 let header = Header::new("Content-Range", range);
1755 headers.add_header(&header).unwrap();
1756 extract_range_header(&headers)
1757 }
1758
1759 #[test]
1761 fn test_expected_bucket_owner() {
1762 let expected_bucket_owner = "111122223333";
1763
1764 let config: S3ClientConfig = S3ClientConfig::new().bucket_owner("111122223333");
1765
1766 let client = S3CrtClient::new(config).expect("Create test client");
1767
1768 let mut message = client
1769 .inner
1770 .new_request_template("GET", "amzn-s3-demo-bucket")
1771 .expect("new request template expected");
1772
1773 let headers = message.inner.get_headers().expect("Expected a block of HTTP headers");
1774
1775 let expected_bucket_owner_header = headers
1776 .get("x-amz-expected-bucket-owner")
1777 .expect("the headers should contain x-amz-expected-bucket-owner");
1778 let expected_bucket_owner_value = expected_bucket_owner_header.value();
1779
1780 assert!(
1781 expected_bucket_owner_value
1782 .to_string_lossy()
1783 .starts_with(expected_bucket_owner)
1784 );
1785 }
1786
1787 fn make_result(
1788 response_status: i32,
1789 body: impl Into<OsString>,
1790 bucket_region_header: Option<&str>,
1791 ) -> MetaRequestResult {
1792 let error_response_headers = bucket_region_header.map(|h| {
1793 let mut headers = Headers::new(&Allocator::default()).unwrap();
1794 headers.add_header(&Header::new("x-amz-bucket-region", h)).unwrap();
1795 headers
1796 });
1797 MetaRequestResult {
1798 response_status,
1799 crt_error: 1i32.into(),
1800 error_response_headers,
1801 error_response_body: Some(body.into()),
1802 }
1803 }
1804
1805 #[test]
1806 fn parse_301_redirect() {
1807 let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>PermanentRedirect</Code><Message>The bucket you are attempting to access must be addressed using the specified endpoint. Please send all future requests to this endpoint.</Message><Endpoint>amzn-s3-demo-bucket.s3-us-west-2.amazonaws.com</Endpoint><Bucket>amzn-s3-demo-bucket</Bucket><RequestId>CM0Z9YFABRVSWXDJ</RequestId><HostId>HHmbUixasrJ02DlkOSCvJId897Jm0ERHuE2XMkSn2Oax1J/ad2+AU9nFrODN1ay13cWFgIAYBnI=</HostId></Error>"#;
1808 let result = make_result(301, OsStr::from_bytes(&body[..]), Some("us-west-2"));
1809 let result = try_parse_generic_error(&result);
1810 let Some(S3RequestError::IncorrectRegion(region, _)) = result else {
1811 panic!("wrong result, got: {result:?}");
1812 };
1813 assert_eq!(region, "us-west-2");
1814 }
1815
1816 #[test]
1817 fn parse_403_access_denied() {
1818 let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>AccessDenied</Code><Message>Access Denied</Message><RequestId>CM0R497NB0WAQ977</RequestId><HostId>w1TqUKGaIuNAIgzqm/L2azuzgEBINxTngWPbV1iH2IvpLsVCCTKHJTh4HsGp4JnggHqVkA+KN1MGqHDw1+WEuA==</HostId></Error>"#;
1819 let result = make_result(403, OsStr::from_bytes(&body[..]), None);
1820 let result = try_parse_generic_error(&result);
1821 let Some(S3RequestError::Forbidden(message, _)) = result else {
1822 panic!("wrong result, got: {result:?}");
1823 };
1824 assert_eq!(message, "Access Denied");
1825 }
1826
1827 #[test]
1828 fn parse_400_invalid_token() {
1829 let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>InvalidToken</Code><Message>The provided token is malformed or otherwise invalid.</Message><Token-0>THEREALTOKENGOESHERE</Token-0><RequestId>CBFNVADDAZ8661HK</RequestId><HostId>rb5dpgYeIFxi8p5BzVK8s8wG/nQ4a7C5kMBp/KWIT4bvOUihugpssMTy7xS0mispbz6IIaX8W1g=</HostId></Error>"#;
1830 let result = make_result(400, OsStr::from_bytes(&body[..]), None);
1831 let result = try_parse_generic_error(&result);
1832 let Some(S3RequestError::Forbidden(message, _)) = result else {
1833 panic!("wrong result, got: {result:?}");
1834 };
1835 assert_eq!(message, "The provided token is malformed or otherwise invalid.");
1836 }
1837
1838 #[test]
1839 fn parse_400_expired_token() {
1840 let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>ExpiredToken</Code><Message>The provided token has expired.</Message><Token-0>THEREALTOKENGOESHERE</Token-0><RequestId>RFXW0E15XSRPJYSW</RequestId><HostId>djitP7S+g43JSzR4pMOJpOO3RYpQUOUsmD4AqhRe3v24+JB/c+vwOEZgI8A35KDUe1cqQ5yKHwg=</HostId></Error>"#;
1841 let result = make_result(400, OsStr::from_bytes(&body[..]), None);
1842 let result = try_parse_generic_error(&result);
1843 let Some(S3RequestError::Forbidden(message, _)) = result else {
1844 panic!("wrong result, got: {result:?}");
1845 };
1846 assert_eq!(message, "The provided token has expired.");
1847 }
1848
1849 #[test]
1850 fn parse_400_redirect() {
1851 let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>AuthorizationHeaderMalformed</Code><Message>The authorization header is malformed; the region \'us-east-1\' is wrong; expecting \'us-west-2\'</Message><Region>us-west-2</Region><RequestId>VR3NH4JF5F39GB66</RequestId><HostId>ZDzYFC1w0E5K34+ZCAnvh9ZiGaAhvx5COyZVYTUnKvSP/694xCiXmJ2AEGZd5T1Epy9vB4EOOjk=</HostId></Error>"#;
1853 let result = make_result(400, OsStr::from_bytes(&body[..]), Some("us-west-2"));
1854 let result = try_parse_generic_error(&result);
1855 let Some(S3RequestError::IncorrectRegion(region, _)) = result else {
1856 panic!("wrong result, got: {result:?}");
1857 };
1858 assert_eq!(region, "us-west-2");
1859 }
1860
1861 #[test]
1862 fn parse_403_signature_does_not_match() {
1863 let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>SignatureDoesNotMatch</Code><Message>The request signature we calculated does not match the signature you provided. Check your key and signing method.</Message><AWSAccessKeyId>ASIASMEXAMPLE0000000</AWSAccessKeyId><StringToSign>EXAMPLE</StringToSign><SignatureProvided>EXAMPLE</SignatureProvided><StringToSignBytes>EXAMPLE</StringToSignBytes><CanonicalRequest>EXAMPLE</CanonicalRequest><CanonicalRequestBytes>EXAMPLE</CanonicalRequestBytes><RequestId>A1F516XX5M8AATSQ</RequestId><HostId>qs9dULIp5ABM7U+H8nGfzKtMYTxvqxIVvOYZ8lEFBDyTF4Fe+876Y4bLptG4mb+PTZFyG4yaUjg=</HostId></Error>"#;
1864 let result = make_result(403, OsStr::from_bytes(&body[..]), None);
1865 let result = try_parse_generic_error(&result);
1866 let Some(S3RequestError::Forbidden(message, _)) = result else {
1867 panic!("wrong result, got: {result:?}");
1868 };
1869 assert_eq!(
1870 message,
1871 "The request signature we calculated does not match the signature you provided. Check your key and signing method."
1872 );
1873 }
1874
1875 #[test]
1876 fn parse_403_made_up_error() {
1877 let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>NotARealError</Code><Message>This error is made up.</Message><RequestId>CM0R497NB0WAQ977</RequestId><HostId>w1TqUKGaIuNAIgzqm/L2azuzgEBINxTngWPbV1iH2IvpLsVCCTKHJTh4HsGp4JnggHqVkA+KN1MGqHDw1+WEuA==</HostId></Error>"#;
1879 let result = make_result(403, OsStr::from_bytes(&body[..]), None);
1880 let result = try_parse_generic_error(&result);
1881 let Some(S3RequestError::Forbidden(message, _)) = result else {
1882 panic!("wrong result, got: {result:?}");
1883 };
1884 assert_eq!(message, "This error is made up.");
1885 }
1886
1887 fn make_crt_error_result(response_status: i32, crt_error: Error) -> MetaRequestResult {
1888 MetaRequestResult {
1889 response_status,
1890 crt_error,
1891 error_response_headers: None,
1892 error_response_body: None,
1893 }
1894 }
1895
1896 #[test]
1897 fn parse_no_signing_credential_error() {
1898 let error_code = mountpoint_s3_crt::auth::ErrorCode::AWS_AUTH_SIGNING_NO_CREDENTIALS as i32;
1899 let result = make_crt_error_result(0, error_code.into());
1900 let result = try_parse_generic_error(&result);
1901 let Some(S3RequestError::NoSigningCredentials) = result else {
1902 panic!("wrong result, got: {result:?}");
1903 };
1904 }
1905
1906 #[test]
1907 fn parse_test_other_crt_error() {
1908 let error_code = mountpoint_s3_crt::auth::ErrorCode::AWS_AUTH_SIGNING_UNSUPPORTED_ALGORITHM as i32;
1910 let result = make_crt_error_result(0, error_code.into());
1911 let result = try_parse_generic_error(&result);
1912 let Some(S3RequestError::CrtError(error)) = result else {
1913 panic!("wrong result, got: {result:?}");
1914 };
1915 assert_eq!(error, error_code.into());
1916 }
1917
1918 #[test]
1919 fn test_checksum_sha256() {
1920 let mut headers = Headers::new(&Allocator::default()).unwrap();
1921 let value = "QwzjTQIHJO11oZbfwq1nx3dy0Wk=";
1922 let header = Header::new("x-amz-checksum-sha256", value.to_owned());
1923 headers.add_header(&header).unwrap();
1924
1925 let checksum = parse_checksum(&headers).expect("failed to parse headers");
1926 assert_eq!(checksum.checksum_crc32, None, "other checksums shouldn't be set");
1927 assert_eq!(checksum.checksum_crc32c, None, "other checksums shouldn't be set");
1928 assert_eq!(checksum.checksum_sha1, None, "other checksums shouldn't be set");
1929 assert_eq!(
1930 checksum.checksum_sha256,
1931 Some(value.to_owned()),
1932 "sha256 header should match"
1933 );
1934 }
1935
1936 #[test]
1937 fn test_operation_name_to_static_metrics_string() {
1938 assert_eq!(operation_name_to_static_metrics_string(Some("GetObject")), "GetObject");
1939 assert_eq!(
1940 operation_name_to_static_metrics_string(Some("RenameObject")),
1941 "RenameObject",
1942 );
1943 assert_eq!(operation_name_to_static_metrics_string(None), "Unknown");
1944 }
1945}