1use std::cmp;
2use std::ffi::{OsStr, OsString};
3use std::future::Future;
4use std::num::NonZeroUsize;
5use std::ops::Range;
6use std::ops::{Deref, DerefMut};
7use std::os::unix::prelude::OsStrExt;
8use std::pin::Pin;
9use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
10use std::sync::{Arc, Mutex};
11use std::task::{Context, Poll};
12use std::time::{Duration, Instant};
13
14use futures::FutureExt;
15use futures::future::{Fuse, FusedFuture};
16pub use mountpoint_s3_crt::auth::credentials::{CredentialsProvider, CredentialsProviderStaticOptions};
17use mountpoint_s3_crt::auth::credentials::{CredentialsProviderChainDefaultOptions, CredentialsProviderProfileOptions};
18use mountpoint_s3_crt::auth::signing_config::SigningConfig;
19use mountpoint_s3_crt::common::allocator::Allocator;
20pub use mountpoint_s3_crt::common::error::Error as CrtError;
21use mountpoint_s3_crt::common::string::AwsString;
22use mountpoint_s3_crt::common::uri::Uri;
23use mountpoint_s3_crt::http::request_response::{Header, Headers, HeadersError, Message};
24use mountpoint_s3_crt::io::channel_bootstrap::{ClientBootstrap, ClientBootstrapOptions};
25pub use mountpoint_s3_crt::io::event_loop::EventLoopGroup;
26use mountpoint_s3_crt::io::host_resolver::{AddressKinds, HostResolver, HostResolverDefaultOptions};
27use mountpoint_s3_crt::io::retry_strategy::{ExponentialBackoffJitterMode, RetryStrategy, StandardRetryOptions};
28use mountpoint_s3_crt::io::stream::InputStream;
29use mountpoint_s3_crt::s3::buffer::Buffer;
30use mountpoint_s3_crt::s3::client::{
31 BufferPoolUsageStats, ChecksumConfig, Client, ClientConfig, MetaRequest, MetaRequestOptions, MetaRequestResult,
32 MetaRequestType, RequestMetrics, init_signing_config,
33};
34
35use async_trait::async_trait;
36use futures::channel::oneshot;
37use mountpoint_s3_crt::s3::pool::{CrtBufferPoolFactory, MemoryPool, MemoryPoolFactory, MemoryPoolFactoryWrapper};
38use percent_encoding::{AsciiSet, NON_ALPHANUMERIC, percent_encode};
39use pin_project::pin_project;
40use thiserror::Error;
41use tracing::{Span, debug, error, trace};
42
43use crate::checksums::{crc32_to_base64, crc32c_to_base64, crc64nvme_to_base64, sha1_to_base64, sha256_to_base64};
44use crate::endpoint_config::EndpointError;
45use crate::endpoint_config::{self, EndpointConfig};
46use crate::error_metadata::{ClientErrorMetadata, ProvideErrorMetadata};
47use crate::metrics::{
48 ATTR_HTTP_STATUS, ATTR_S3_REQUEST, S3_REQUEST_CANCELED, S3_REQUEST_COUNT, S3_REQUEST_ERRORS,
49 S3_REQUEST_FIRST_BYTE_LATENCY, S3_REQUEST_TOTAL_LATENCY,
50};
51use crate::object_client::*;
52use crate::user_agent::UserAgent;
53
54macro_rules! request_span {
55 ($self:expr, $method:expr, $($field:tt)*) => {{
56 let counter = $self.next_request_counter();
57 let span = tracing::warn_span!(target: "mountpoint_s3_client::s3_crt_client::request", $method, id = counter, $($field)*);
61 span.in_scope(|| tracing::debug!("new request"));
62 span
63 }};
64 ($self:expr, $method:expr) => { request_span!($self, $method,) };
65}
66
67pub(crate) mod copy_object;
68pub(crate) mod delete_object;
69pub(crate) mod get_object;
70
71pub(crate) use get_object::S3GetObjectResponse;
72pub(crate) mod get_object_attributes;
73
74pub(crate) mod head_object;
75pub(crate) mod list_objects;
76
77pub(crate) mod rename_object;
78
79pub(crate) mod head_bucket;
80pub(crate) mod put_object;
81pub use head_bucket::HeadBucketError;
82pub(crate) use put_object::S3PutObjectRequest;
83
84macro_rules! event {
87 ($level:expr, $($args:tt)*) => {
88 match $level {
89 ::tracing::Level::ERROR => ::tracing::event!(::tracing::Level::ERROR, $($args)*),
90 ::tracing::Level::WARN => ::tracing::event!(::tracing::Level::WARN, $($args)*),
91 ::tracing::Level::INFO => ::tracing::event!(::tracing::Level::INFO, $($args)*),
92 ::tracing::Level::DEBUG => ::tracing::event!(::tracing::Level::DEBUG, $($args)*),
93 ::tracing::Level::TRACE => ::tracing::event!(::tracing::Level::TRACE, $($args)*),
94 }
95 }
96}
97
98#[derive(Debug, Clone)]
100pub struct S3ClientConfig {
101 auth_config: S3ClientAuthConfig,
102 throughput_target_gbps: f64,
103 memory_limit_in_bytes: u64,
104 read_part_size: usize,
105 write_part_size: usize,
106 endpoint_config: EndpointConfig,
107 user_agent: Option<UserAgent>,
108 request_payer: Option<String>,
109 bucket_owner: Option<String>,
110 max_attempts: Option<NonZeroUsize>,
111 read_backpressure: bool,
112 initial_read_window: usize,
113 network_interface_names: Vec<String>,
114 telemetry_callback: Option<Arc<dyn OnTelemetry>>,
115 event_loop_threads: Option<u16>,
116 buffer_pool_factory: Option<MemoryPoolFactoryWrapper>,
117}
118
119impl Default for S3ClientConfig {
120 fn default() -> Self {
121 const DEFAULT_PART_SIZE: usize = 8 * 1024 * 1024;
122 Self {
123 auth_config: Default::default(),
124 throughput_target_gbps: 10.0,
125 memory_limit_in_bytes: 0,
126 read_part_size: DEFAULT_PART_SIZE,
127 write_part_size: DEFAULT_PART_SIZE,
128 endpoint_config: EndpointConfig::new("us-east-1"),
129 user_agent: None,
130 request_payer: None,
131 bucket_owner: None,
132 max_attempts: None,
133 read_backpressure: false,
134 initial_read_window: DEFAULT_PART_SIZE,
135 network_interface_names: vec![],
136 telemetry_callback: None,
137 event_loop_threads: None,
138 buffer_pool_factory: None,
139 }
140 }
141}
142
143impl S3ClientConfig {
144 pub fn new() -> Self {
145 Self::default()
146 }
147
148 #[must_use = "S3ClientConfig follows a builder pattern"]
150 pub fn auth_config(mut self, auth_config: S3ClientAuthConfig) -> Self {
151 self.auth_config = auth_config;
152 self
153 }
154
155 #[must_use = "S3ClientConfig follows a builder pattern"]
157 pub fn part_size(mut self, part_size: usize) -> Self {
158 self.read_part_size = part_size;
159 self.write_part_size = part_size;
160 self
161 }
162
163 #[must_use = "S3ClientConfig follows a builder pattern"]
165 pub fn read_part_size(mut self, size: usize) -> Self {
166 self.read_part_size = size;
167 self
168 }
169
170 #[must_use = "S3ClientConfig follows a builder pattern"]
172 pub fn write_part_size(mut self, size: usize) -> Self {
173 self.write_part_size = size;
174 self
175 }
176
177 #[must_use = "S3ClientConfig follows a builder pattern"]
179 pub fn throughput_target_gbps(mut self, throughput_target_gbps: f64) -> Self {
180 self.throughput_target_gbps = throughput_target_gbps;
181 self
182 }
183
184 #[must_use = "S3ClientConfig follows a builder pattern"]
186 pub fn memory_limit_in_bytes(mut self, memory_limit_in_bytes: u64) -> Self {
187 self.memory_limit_in_bytes = memory_limit_in_bytes;
188 self
189 }
190
191 #[must_use = "S3ClientConfig follows a builder pattern"]
193 pub fn endpoint_config(mut self, endpoint_config: EndpointConfig) -> Self {
194 self.endpoint_config = endpoint_config;
195 self
196 }
197
198 #[must_use = "S3ClientConfig follows a builder pattern"]
200 pub fn user_agent(mut self, user_agent: UserAgent) -> Self {
201 self.user_agent = Some(user_agent);
202 self
203 }
204
205 #[must_use = "S3ClientConfig follows a builder pattern"]
207 pub fn request_payer(mut self, request_payer: &str) -> Self {
208 self.request_payer = Some(request_payer.to_owned());
209 self
210 }
211
212 #[must_use = "S3ClientConfig follows a builder pattern"]
214 pub fn bucket_owner(mut self, bucket_owner: &str) -> Self {
215 self.bucket_owner = Some(bucket_owner.to_owned());
216 self
217 }
218
219 #[must_use = "S3ClientConfig follows a builder pattern"]
222 pub fn max_attempts(mut self, max_attempts: NonZeroUsize) -> Self {
223 self.max_attempts = Some(max_attempts);
224 self
225 }
226
227 #[must_use = "S3ClientConfig follows a builder pattern"]
229 pub fn read_backpressure(mut self, read_backpressure: bool) -> Self {
230 self.read_backpressure = read_backpressure;
231 self
232 }
233
234 #[must_use = "S3ClientConfig follows a builder pattern"]
236 pub fn initial_read_window(mut self, initial_read_window: usize) -> Self {
237 self.initial_read_window = initial_read_window;
238 self
239 }
240
241 #[must_use = "S3ClientConfig follows a builder pattern"]
243 pub fn network_interface_names(mut self, network_interface_names: Vec<String>) -> Self {
244 self.network_interface_names = network_interface_names;
245 self
246 }
247
248 #[must_use = "S3ClientConfig follows a builder pattern"]
250 pub fn telemetry_callback(mut self, telemetry_callback: Arc<dyn OnTelemetry>) -> Self {
251 self.telemetry_callback = Some(telemetry_callback);
252 self
253 }
254
255 #[must_use = "S3ClientConfig follows a builder pattern"]
257 pub fn event_loop_threads(mut self, event_loop_threads: u16) -> Self {
258 self.event_loop_threads = Some(event_loop_threads);
259 self
260 }
261
262 #[must_use = "S3ClientConfig follows a builder pattern"]
264 pub fn memory_pool(mut self, pool: impl MemoryPool) -> Self {
265 self.buffer_pool_factory = Some(MemoryPoolFactoryWrapper::new(move |_| pool.clone()));
266 self
267 }
268
269 #[must_use = "S3ClientConfig follows a builder pattern"]
271 pub fn memory_pool_factory(mut self, pool_factory: impl MemoryPoolFactory) -> Self {
272 self.buffer_pool_factory = Some(MemoryPoolFactoryWrapper::new(pool_factory));
273 self
274 }
275}
276
277#[derive(Debug, Clone, Default)]
279pub enum S3ClientAuthConfig {
280 #[default]
282 Default,
283 NoSigning,
285 Profile(String),
287 Provider(CredentialsProvider),
289}
290
291#[derive(Debug, Clone)]
301pub struct S3CrtClient {
302 inner: Arc<S3CrtClientInner>,
303}
304
305impl S3CrtClient {
306 pub fn new(config: S3ClientConfig) -> Result<Self, NewClientError> {
308 Ok(Self {
309 inner: Arc::new(S3CrtClientInner::new(config)?),
310 })
311 }
312
313 pub fn endpoint_config(&self) -> EndpointConfig {
315 self.inner.endpoint_config.clone()
316 }
317
318 #[doc(hidden)]
319 pub fn event_loop_group(&self) -> EventLoopGroup {
320 self.inner.event_loop_group.clone()
321 }
322}
323
324#[derive(Debug)]
325struct S3CrtClientInner {
326 s3_client: Client,
327 event_loop_group: EventLoopGroup,
328 endpoint_config: EndpointConfig,
329 allocator: Allocator,
330 next_request_counter: AtomicU64,
331 user_agent_header: String,
334 request_payer: Option<String>,
335 read_part_size: usize,
336 write_part_size: usize,
337 enable_backpressure: bool,
338 initial_read_window_size: usize,
339 bucket_owner: Option<String>,
340 credentials_provider: Option<CredentialsProvider>,
341 host_resolver: HostResolver,
342 telemetry_callback: Option<Arc<dyn OnTelemetry>>,
343}
344
345impl S3CrtClientInner {
346 fn new(config: S3ClientConfig) -> Result<Self, NewClientError> {
347 let allocator = Allocator::default();
348
349 let mut event_loop_group = EventLoopGroup::new_default(&allocator, config.event_loop_threads, || {}).unwrap();
350
351 let resolver_options = HostResolverDefaultOptions {
352 max_entries: 8,
353 event_loop_group: &mut event_loop_group,
354 };
355
356 let mut host_resolver = HostResolver::new_default(&allocator, &resolver_options).unwrap();
357
358 let bootstrap_options = ClientBootstrapOptions {
359 event_loop_group: &mut event_loop_group,
360 host_resolver: &mut host_resolver,
361 };
362
363 let mut client_bootstrap = ClientBootstrap::new(&allocator, &bootstrap_options).unwrap();
364
365 let mut client_config = ClientConfig::new();
366
367 let retry_strategy = {
368 let mut retry_strategy_options = StandardRetryOptions::default(&mut event_loop_group);
369 let max_attempts = std::env::var("AWS_MAX_ATTEMPTS")
370 .ok()
371 .and_then(|s| s.parse::<usize>().ok())
372 .or_else(|| config.max_attempts.map(|m| m.get()))
373 .unwrap_or(3);
374 retry_strategy_options.backoff_retry_options.max_retries = max_attempts.saturating_sub(1);
377 retry_strategy_options.backoff_retry_options.backoff_scale_factor = Duration::from_millis(500);
378 retry_strategy_options.backoff_retry_options.jitter_mode = ExponentialBackoffJitterMode::Full;
379 RetryStrategy::standard(&allocator, &retry_strategy_options).unwrap()
380 };
381
382 trace!("constructing client with auth config {:?}", config.auth_config);
383 let credentials_provider = match config.auth_config {
384 S3ClientAuthConfig::Default => {
385 let credentials_chain_default_options = CredentialsProviderChainDefaultOptions {
386 bootstrap: &mut client_bootstrap,
387 };
388 CredentialsProvider::new_chain_default(&allocator, credentials_chain_default_options)
389 .map_err(NewClientError::ProviderFailure)?
390 }
391 S3ClientAuthConfig::NoSigning => {
392 CredentialsProvider::new_anonymous(&allocator).map_err(NewClientError::ProviderFailure)?
393 }
394 S3ClientAuthConfig::Profile(profile_name) => {
395 let credentials_profile_options = CredentialsProviderProfileOptions {
396 bootstrap: &mut client_bootstrap,
397 profile_name_override: &profile_name,
398 };
399 CredentialsProvider::new_profile(&allocator, credentials_profile_options)
400 .map_err(NewClientError::ProviderFailure)?
401 }
402 S3ClientAuthConfig::Provider(provider) => provider,
403 };
404
405 let endpoint_config = config.endpoint_config;
406 client_config.region(endpoint_config.get_region());
407 let signing_config = init_signing_config(
408 endpoint_config.get_region(),
409 credentials_provider.clone(),
410 None,
411 None,
412 None,
413 );
414
415 let endpoint_config = match endpoint_config.get_endpoint() {
416 None => {
417 if let Ok(aws_endpoint_url) = std::env::var("AWS_ENDPOINT_URL") {
419 debug!("using AWS_ENDPOINT_URL {}", aws_endpoint_url);
420 let env_uri = Uri::new_from_str(&allocator, &aws_endpoint_url)
421 .map_err(|e| EndpointError::InvalidUri(endpoint_config::InvalidUriError::CouldNotParse(e)))?;
422 endpoint_config.endpoint(env_uri)
423 } else {
424 endpoint_config
425 }
426 }
427 Some(_) => endpoint_config,
428 };
429
430 client_config.express_support(true);
431 client_config.read_backpressure(config.read_backpressure);
432 client_config.initial_read_window(config.initial_read_window);
433 client_config.signing_config(signing_config);
434
435 client_config
436 .client_bootstrap(client_bootstrap)
437 .retry_strategy(retry_strategy);
438
439 client_config.throughput_target_gbps(config.throughput_target_gbps);
440 client_config.memory_limit_in_bytes(config.memory_limit_in_bytes);
441
442 if let Some(wrapper) = config.buffer_pool_factory {
443 let pool_factory = CrtBufferPoolFactory::new(wrapper, event_loop_group.clone());
444 client_config.buffer_pool_factory(pool_factory);
445 }
446
447 let max_part_size = cmp::min(5_u64 * 1024 * 1024 * 1024, usize::MAX as u64) as usize;
449 for part_size in [config.read_part_size, config.write_part_size] {
451 if !(5 * 1024 * 1024..=max_part_size).contains(&part_size) {
452 return Err(NewClientError::InvalidConfiguration(format!(
453 "part size must be at between 5MiB and {}GiB",
454 max_part_size / 1024 / 1024 / 1024
455 )));
456 }
457 }
458
459 if !config.network_interface_names.is_empty() {
460 client_config.network_interface_names(config.network_interface_names);
461 }
462
463 let user_agent = config.user_agent.unwrap_or_else(|| UserAgent::new(None));
464 let user_agent_header = user_agent.build();
465
466 let s3_client = Client::new(&allocator, client_config).map_err(NewClientError::CrtError)?;
467
468 Ok(Self {
469 allocator,
470 s3_client,
471 event_loop_group,
472 endpoint_config,
473 next_request_counter: AtomicU64::new(0),
474 user_agent_header,
475 request_payer: config.request_payer,
476 read_part_size: config.read_part_size,
477 write_part_size: config.write_part_size,
478 enable_backpressure: config.read_backpressure,
479 initial_read_window_size: config.initial_read_window,
480 bucket_owner: config.bucket_owner,
481 credentials_provider: Some(credentials_provider),
482 host_resolver,
483 telemetry_callback: config.telemetry_callback,
484 })
485 }
486
487 fn new_request_template(&self, method: &str, bucket: &str) -> Result<S3Message<'_>, ConstructionError> {
492 let endpoint = self.endpoint_config.resolve_for_bucket(bucket)?;
493 let uri = endpoint.uri()?;
494 trace!(?uri, "resolved endpoint");
495
496 let signing_config = if let Some(credentials_provider) = &self.credentials_provider {
497 let auth_scheme = match endpoint.auth_scheme() {
498 Ok(auth_scheme) => auth_scheme,
499 Err(e) => {
500 error!(error=?e, "no auth scheme for endpoint");
501 return Err(e.into());
502 }
503 };
504 trace!(?auth_scheme, "resolved auth scheme");
505 let algorithm = Some(auth_scheme.scheme_name());
506 let service = Some(auth_scheme.signing_name());
507 let use_double_uri_encode = Some(!auth_scheme.disable_double_encoding());
508 Some(init_signing_config(
509 auth_scheme.signing_region(),
510 credentials_provider.clone(),
511 algorithm,
512 service,
513 use_double_uri_encode,
514 ))
515 } else {
516 None
517 };
518
519 let hostname = uri.host_name().to_str().unwrap();
520 let path_prefix = uri.path().to_os_string().into_string().unwrap();
521 let port = uri.host_port();
522 let hostname_header = if port > 0 {
523 format!("{hostname}:{port}")
524 } else {
525 hostname.to_string()
526 };
527
528 let mut message = Message::new_request(&self.allocator)?;
529 message.set_request_method(method)?;
530 message.add_header(&Header::new("Host", hostname_header))?;
531 message.add_header(&Header::new("accept", "application/xml"))?;
532 message.add_header(&Header::new("User-Agent", &self.user_agent_header))?;
533
534 if let Some(ref payer) = self.request_payer {
535 message.add_header(&Header::new("x-amz-request-payer", payer))?;
536 }
537
538 if let Some(ref owner) = self.bucket_owner {
539 message.add_header(&Header::new("x-amz-expected-bucket-owner", owner))?;
540 }
541
542 Ok(S3Message {
543 inner: message,
544 uri,
545 path_prefix,
546 checksum_config: None,
547 signing_config,
548 })
549 }
550
551 #[allow(clippy::too_many_arguments)]
558 fn meta_request_with_callbacks<E: std::error::Error + Send + 'static>(
559 &self,
560 mut options: MetaRequestOptions,
561 request_span: Span,
562 on_request_finish: impl Fn(&RequestMetrics) + Send + 'static,
563 mut on_headers: impl FnMut(&Headers, i32) + Send + 'static,
564 mut on_body: impl FnMut(u64, &Buffer) + Send + 'static,
565 parse_meta_request_error: impl FnOnce(&MetaRequestResult) -> Option<E> + Send + 'static,
566 on_meta_request_result: impl FnOnce(ObjectClientResult<(), E, S3RequestError>) + Send + 'static,
567 ) -> Result<CancellingMetaRequest, S3RequestError> {
568 let span_telemetry = request_span.clone();
569 let span_body = request_span.clone();
570 let span_finish = request_span;
571
572 let endpoint = options.get_endpoint().expect("S3Message always has an endpoint");
573 let hostname = endpoint.host_name().to_str().unwrap().to_owned();
574 let host_resolver = self.host_resolver.clone();
575 let telemetry_callback = self.telemetry_callback.clone();
576
577 let start_time = Instant::now();
578 let first_body_part = Arc::new(AtomicBool::new(true));
579 let first_body_part_clone = Arc::clone(&first_body_part);
580 let total_bytes = Arc::new(AtomicU64::new(0));
581 let total_bytes_clone = Arc::clone(&total_bytes);
582
583 options
584 .on_telemetry(move |metrics| {
585 let _guard = span_telemetry.enter();
586
587 on_request_finish(metrics);
588
589 let http_status = metrics.status_code();
590 let request_canceled = metrics.is_canceled();
591 let request_failure = http_status.map(|status| !(200..299).contains(&status)).unwrap_or(!request_canceled);
592 let crt_error = Some(metrics.error()).filter(|e| e.is_err());
593 let operation_name = operation_name_to_static_metrics_string(metrics.operation_name());
594 let request_id = metrics.request_id().unwrap_or_else(|| "<unknown>".into());
595 let duration = metrics.total_duration();
596 let ttfb = metrics.time_to_first_byte();
597 let range = metrics.response_headers().and_then(|headers| extract_range_header(&headers));
598
599 let message = if request_failure {
600 "S3 request failed"
601 } else if request_canceled {
602 "S3 request canceled"
603 } else {
604 "S3 request finished"
605 };
606 debug!(?operation_name, ?crt_error, http_status, ?range, ?duration, ?ttfb, %request_id, "{}", message);
607 trace!(detailed_metrics=?metrics, "S3 request completed");
608
609 if let Some(ttfb) = ttfb {
610 metrics::histogram!(S3_REQUEST_FIRST_BYTE_LATENCY, ATTR_S3_REQUEST => operation_name).record(ttfb.as_micros() as f64);
611 }
612 metrics::histogram!(S3_REQUEST_TOTAL_LATENCY, ATTR_S3_REQUEST => operation_name).record(duration.as_micros() as f64);
613 metrics::counter!(S3_REQUEST_COUNT, ATTR_S3_REQUEST => operation_name).increment(1);
614 if request_failure {
615 metrics::counter!(S3_REQUEST_ERRORS, ATTR_S3_REQUEST => operation_name, ATTR_HTTP_STATUS => http_status.unwrap_or(-1).to_string()).increment(1);
616 } else if request_canceled {
617 metrics::counter!(S3_REQUEST_CANCELED, ATTR_S3_REQUEST => operation_name).increment(1);
618 }
619
620 if let Some(telemetry_callback) = &telemetry_callback {
621 telemetry_callback.on_telemetry(metrics);
622 }
623 })
624 .on_headers(move |headers, response_status| {
625 (on_headers)(headers, response_status);
626 })
627 .on_body(move |range_start, data| {
628 let _guard = span_body.enter();
629
630 if first_body_part.compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst).ok() == Some(true) {
631 let latency = start_time.elapsed().as_micros() as f64;
632 let op = span_body.metadata().map(|m| m.name()).unwrap_or("unknown");
633 metrics::histogram!("s3.meta_requests.first_byte_latency_us", "op" => op).record(latency);
634 }
635 total_bytes.fetch_add(data.len() as u64, Ordering::SeqCst);
636
637 trace!(start = range_start, length = data.len(), "body part received");
638
639 (on_body)(range_start, data);
640 })
641 .on_finish(move |request_result| {
642 let _guard = span_finish.enter();
643
644 let op = span_finish.metadata().map(|m| m.name()).unwrap_or("unknown");
645 let duration = start_time.elapsed();
646
647 metrics::counter!("s3.meta_requests", "op" => op).increment(1);
648 metrics::histogram!("s3.meta_requests.total_latency_us", "op" => op).record(duration.as_micros() as f64);
649 if first_body_part_clone.compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst).ok() == Some(true) {
651 let latency = duration.as_micros() as f64;
652 metrics::histogram!("s3.meta_requests.first_byte_latency_us", "op" => op).record(latency);
653 }
654 let total_bytes = total_bytes_clone.load(Ordering::SeqCst);
655 if op == "get_object" {
658 emit_throughput_metric(total_bytes, duration, op);
659 }
660 let hostname_awsstring = AwsString::from_str(&hostname, &Allocator::default());
661 if let Ok(host_count) = host_resolver.get_host_address_count(&hostname_awsstring, AddressKinds::a()) {
662 metrics::gauge!("s3.client.host_count", "host" => hostname).set(host_count as f64);
663 }
664
665 let status_code = request_result.response_status;
666 let log_level = if (200..=399).contains(&status_code) || status_code == 404 || request_result.is_canceled() {
667 tracing::Level::DEBUG
668 } else {
669 tracing::Level::WARN
670 };
671
672 let result =
673 if !request_result.is_err() {
674 event!(log_level, ?duration, "meta request finished");
675 Ok(())
676 } else {
677 let error = parse_meta_request_error(&request_result).map(ObjectClientError::ServiceError);
680 let maybe_err = error.or_else(|| try_parse_generic_error(&request_result).map(ObjectClientError::ClientError));
681
682 let request_id = match &request_result.error_response_headers {
686 Some(headers) => headers.get("x-amz-request-id").map(|s| s.value().to_string_lossy().to_string()).ok(),
687 None => None,
688 };
689 let request_id = request_id.unwrap_or_else(|| "<unknown>".into());
690
691 let message = if request_result.is_canceled() {
692 "meta request canceled"
693 } else {
694 "meta request failed"
695 };
696 if let Some(error) = &maybe_err {
697 event!(log_level, ?duration, %request_id, ?error, message);
698 debug!("meta request result: {:?}", request_result);
699 } else {
700 event!(log_level, ?duration, %request_id, ?request_result, message);
701 }
702
703 if request_result.is_canceled() {
704 metrics::counter!("s3.meta_requests.canceled", "op" => op).increment(1);
705 } else {
706 let error_status = if request_result.response_status >= 100 {
708 request_result.response_status
709 } else {
710 -request_result.crt_error.raw_error()
711 };
712 metrics::counter!("s3.meta_requests.failures", "op" => op, "status" => format!("{error_status}")).increment(1);
713 }
714
715 Err(maybe_err.unwrap_or_else(|| ObjectClientError::ClientError(S3RequestError::ResponseError(request_result))))
717 };
718
719 on_meta_request_result(result);
720 });
721
722 let meta_request = self.s3_client.make_meta_request(options)?;
724 Self::poll_client_metrics(&self.s3_client);
725 Ok(CancellingMetaRequest::wrap(meta_request))
726 }
727
728 fn meta_request_with_body_payload<E: std::error::Error + Send + 'static>(
735 &self,
736 options: MetaRequestOptions,
737 request_span: Span,
738 parse_meta_request_error: impl FnOnce(&MetaRequestResult) -> Option<E> + Send + 'static,
739 ) -> Result<S3MetaRequest<Vec<u8>, E>, S3RequestError> {
740 let (tx, rx) = oneshot::channel::<ObjectClientResult<Vec<u8>, E, S3RequestError>>();
741
742 let body: Arc<Mutex<Vec<u8>>> = Default::default();
744 let body_clone = Arc::clone(&body);
745
746 let meta_request = self.meta_request_with_callbacks(
747 options,
748 request_span,
749 |_| {},
750 |_, _| {},
751 move |offset, data| {
752 let mut body = body_clone.lock().unwrap();
753 assert_eq!(offset as usize, body.len());
754 body.extend_from_slice(data);
755 },
756 parse_meta_request_error,
757 move |result| _ = tx.send(result.map(|_| std::mem::take(&mut *body.lock().unwrap()))),
758 )?;
759 Ok(S3MetaRequest {
760 receiver: rx.fuse(),
761 meta_request,
762 })
763 }
764
765 fn meta_request_with_headers_payload<E: std::error::Error + Send + 'static>(
772 &self,
773 options: MetaRequestOptions,
774 request_span: Span,
775 parse_meta_request_error: impl FnOnce(&MetaRequestResult) -> Option<E> + Send + 'static,
776 ) -> Result<S3MetaRequest<Headers, E>, S3RequestError> {
777 let (tx, rx) = oneshot::channel::<ObjectClientResult<Headers, E, S3RequestError>>();
778
779 let on_headers: Arc<Mutex<Option<Headers>>> = Default::default();
782 let on_result = on_headers.clone();
783
784 let meta_request = self.meta_request_with_callbacks(
785 options,
786 request_span,
787 |_| {},
788 move |headers, status| {
789 if (200..300).contains(&status) {
792 *on_headers.lock().unwrap() = Some(headers.clone());
793 }
794 },
795 |_, _| {},
796 parse_meta_request_error,
797 move |result| {
798 let headers =
800 result.and_then(|_| {
801 on_result.lock().unwrap().take().ok_or_else(|| {
802 S3RequestError::internal_failure(ResponseHeadersError::MissingHeaders).into()
803 })
804 });
805 _ = tx.send(headers);
806 },
807 )?;
808 Ok(S3MetaRequest {
809 receiver: rx.fuse(),
810 meta_request,
811 })
812 }
813
814 fn meta_request_without_payload<E: std::error::Error + Send + 'static>(
820 &self,
821 options: MetaRequestOptions,
822 request_span: Span,
823 parse_meta_request_error: impl FnOnce(&MetaRequestResult) -> Option<E> + Send + 'static,
824 ) -> Result<S3MetaRequest<(), E>, S3RequestError> {
825 let (tx, rx) = oneshot::channel::<ObjectClientResult<(), E, S3RequestError>>();
826
827 let meta_request = self.meta_request_with_callbacks(
828 options,
829 request_span,
830 |_| {},
831 |_, _| {},
832 |_, _| {},
833 parse_meta_request_error,
834 move |result| _ = tx.send(result),
835 )?;
836 Ok(S3MetaRequest {
837 receiver: rx.fuse(),
838 meta_request,
839 })
840 }
841
842 fn poll_client_metrics(s3_client: &Client) {
843 let metrics = s3_client.poll_client_metrics();
844 metrics::gauge!("s3.client.num_requests_being_processed").set(metrics.num_requests_tracked_requests as f64);
845 metrics::gauge!("s3.client.num_requests_being_prepared").set(metrics.num_requests_being_prepared as f64);
846 metrics::gauge!("s3.client.request_queue_size").set(metrics.request_queue_size as f64);
847 metrics::gauge!("s3.client.num_auto_default_network_io").set(metrics.num_auto_default_network_io as f64);
848 metrics::gauge!("s3.client.num_auto_ranged_get_network_io").set(metrics.num_auto_ranged_get_network_io as f64);
849 metrics::gauge!("s3.client.num_auto_ranged_put_network_io").set(metrics.num_auto_ranged_put_network_io as f64);
850 metrics::gauge!("s3.client.num_auto_ranged_copy_network_io")
851 .set(metrics.num_auto_ranged_copy_network_io as f64);
852 metrics::gauge!("s3.client.num_total_network_io").set(metrics.num_total_network_io() as f64);
853 metrics::gauge!("s3.client.num_requests_stream_queued_waiting")
854 .set(metrics.num_requests_stream_queued_waiting as f64);
855 metrics::gauge!("s3.client.num_requests_streaming_response")
856 .set(metrics.num_requests_streaming_response as f64);
857
858 let start = Instant::now();
860 if let Some(buffer_pool_stats) = s3_client.poll_default_buffer_pool_usage_stats() {
861 metrics::histogram!("s3.client.buffer_pool.get_usage_latency_us")
862 .record(start.elapsed().as_micros() as f64);
863 metrics::gauge!("s3.client.buffer_pool.mem_limit").set(buffer_pool_stats.mem_limit as f64);
864 metrics::gauge!("s3.client.buffer_pool.primary_cutoff").set(buffer_pool_stats.primary_cutoff as f64);
865 metrics::gauge!("s3.client.buffer_pool.primary_used").set(buffer_pool_stats.primary_used as f64);
866 metrics::gauge!("s3.client.buffer_pool.primary_allocated").set(buffer_pool_stats.primary_allocated as f64);
867 metrics::gauge!("s3.client.buffer_pool.primary_reserved").set(buffer_pool_stats.primary_reserved as f64);
868 metrics::gauge!("s3.client.buffer_pool.primary_num_blocks")
869 .set(buffer_pool_stats.primary_num_blocks as f64);
870 metrics::gauge!("s3.client.buffer_pool.secondary_reserved")
871 .set(buffer_pool_stats.secondary_reserved as f64);
872 metrics::gauge!("s3.client.buffer_pool.secondary_used").set(buffer_pool_stats.secondary_used as f64);
873 metrics::gauge!("s3.client.buffer_pool.forced_used").set(buffer_pool_stats.forced_used as f64);
874 }
875 }
876
877 fn next_request_counter(&self) -> u64 {
878 self.next_request_counter.fetch_add(1, Ordering::SeqCst)
879 }
880}
881
882#[derive(Debug, Error)]
884enum ResponseHeadersError {
885 #[error("response headers are missing")]
886 MissingHeaders,
887}
888
889#[derive(Debug, Clone, Copy)]
891enum S3Operation {
892 DeleteObject,
893 GetObject,
894 GetObjectAttributes,
895 HeadBucket,
896 HeadObject,
897 ListObjects,
898 PutObject,
899 CopyObject,
900 PutObjectSingle,
901}
902
903impl S3Operation {
904 fn meta_request_type(&self) -> MetaRequestType {
906 match self {
907 S3Operation::GetObject => MetaRequestType::GetObject,
908 S3Operation::PutObject => MetaRequestType::PutObject,
909 S3Operation::CopyObject => MetaRequestType::CopyObject,
910 _ => MetaRequestType::Default,
911 }
912 }
913
914 fn operation_name(&self) -> Option<&'static str> {
917 match self {
918 S3Operation::DeleteObject => Some("DeleteObject"),
919 S3Operation::GetObject => None,
920 S3Operation::GetObjectAttributes => Some("GetObjectAttributes"),
921 S3Operation::HeadBucket => Some("HeadBucket"),
922 S3Operation::HeadObject => Some("HeadObject"),
923 S3Operation::ListObjects => Some("ListObjectsV2"),
924 S3Operation::PutObject => None,
925 S3Operation::CopyObject => None,
926 S3Operation::PutObjectSingle => Some("PutObject"),
927 }
928 }
929}
930
931#[derive(Debug)]
936struct S3Message<'a> {
937 inner: Message<'a>,
938 uri: Uri,
939 path_prefix: String,
940 checksum_config: Option<ChecksumConfig>,
941 signing_config: Option<SigningConfig>,
942}
943
944const URLENCODE_QUERY_FRAGMENT: &AsciiSet = &NON_ALPHANUMERIC.remove(b'-').remove(b'.').remove(b'_').remove(b'~');
946const URLENCODE_PATH_FRAGMENT: &AsciiSet = &URLENCODE_QUERY_FRAGMENT.remove(b'/');
947
948fn write_encoded_fragment(s: &mut OsString, piece: impl AsRef<OsStr>, encoding: &'static AsciiSet) {
949 let iter = percent_encode(piece.as_ref().as_bytes(), encoding);
950 s.extend(iter.map(|s| OsStr::from_bytes(s.as_bytes())));
951}
952
953#[derive(Debug, Default)]
954enum QueryFragment<'a, P: AsRef<OsStr>> {
955 #[default]
956 Empty,
957 Query(&'a [(P, P)]),
958 Action(P),
959}
960
961impl<P: AsRef<OsStr>> QueryFragment<'_, P> {
962 fn size(&self) -> usize {
966 match self {
967 QueryFragment::Empty => 0,
968 QueryFragment::Query(query) => query
969 .iter()
970 .map(|(key, value)| key.as_ref().len() + value.as_ref().len() + 2)
971 .sum::<usize>(),
972 QueryFragment::Action(action) => 1 + action.as_ref().len(),
973 }
974 }
975
976 fn write(&self, path: &mut OsString) {
978 match &self {
979 QueryFragment::Query(query) if !query.is_empty() => {
980 path.push("?");
981 for (i, (key, value)) in query.iter().enumerate() {
982 if i != 0 {
983 path.push("&");
984 }
985 write_encoded_fragment(path, key, URLENCODE_QUERY_FRAGMENT);
986 path.push("=");
987 write_encoded_fragment(path, value, URLENCODE_QUERY_FRAGMENT);
988 }
989 }
990 QueryFragment::Action(action) => {
991 path.push("?");
992 path.push(action.as_ref());
993 }
994 _ => {}
995 }
996 }
997}
998
999impl<'a> S3Message<'a> {
1000 fn set_header(
1003 &mut self,
1004 header: &Header<impl AsRef<OsStr>, impl AsRef<OsStr>>,
1005 ) -> Result<(), mountpoint_s3_crt::common::error::Error> {
1006 self.inner.set_header(header)
1007 }
1008
1009 fn set_request_path_and_query<P: AsRef<OsStr>>(
1012 &mut self,
1013 path: impl AsRef<OsStr>,
1014 query_or_action: QueryFragment<P>,
1015 ) -> Result<(), mountpoint_s3_crt::common::error::Error> {
1016 let space_needed = self.path_prefix.len() + query_or_action.size();
1017
1018 let mut full_path = OsString::with_capacity(space_needed);
1019
1020 write_encoded_fragment(&mut full_path, &self.path_prefix, URLENCODE_PATH_FRAGMENT);
1021 write_encoded_fragment(&mut full_path, &path, URLENCODE_PATH_FRAGMENT);
1022
1023 query_or_action.write(&mut full_path);
1025 self.inner.set_request_path(full_path)
1026 }
1027
1028 fn set_request_path(&mut self, path: impl AsRef<OsStr>) -> Result<(), mountpoint_s3_crt::common::error::Error> {
1031 self.set_request_path_and_query::<&str>(path, Default::default())
1032 }
1033
1034 fn set_checksum_config(&mut self, checksum_config: Option<ChecksumConfig>) {
1036 self.checksum_config = checksum_config;
1037 }
1038
1039 fn set_body_stream(&mut self, input_stream: Option<InputStream<'a>>) -> Option<InputStream<'a>> {
1042 self.inner.set_body_stream(input_stream)
1043 }
1044
1045 fn set_content_length_header(
1047 &mut self,
1048 content_length: usize,
1049 ) -> Result<(), mountpoint_s3_crt::common::error::Error> {
1050 self.inner
1051 .set_header(&Header::new("Content-Length", content_length.to_string()))
1052 }
1053
1054 fn set_checksum_header(
1056 &mut self,
1057 checksum: &UploadChecksum,
1058 ) -> Result<(), mountpoint_s3_crt::common::error::Error> {
1059 let header = match checksum {
1060 UploadChecksum::Crc64nvme(crc64) => Header::new("x-amz-checksum-crc64nvme", crc64nvme_to_base64(crc64)),
1061 UploadChecksum::Crc32c(crc32c) => Header::new("x-amz-checksum-crc32c", crc32c_to_base64(crc32c)),
1062 UploadChecksum::Crc32(crc32) => Header::new("x-amz-checksum-crc32", crc32_to_base64(crc32)),
1063 UploadChecksum::Sha1(sha1) => Header::new("x-amz-checksum-sha1", sha1_to_base64(sha1)),
1064 UploadChecksum::Sha256(sha256) => Header::new("x-amz-checksum-sha256", sha256_to_base64(sha256)),
1065 };
1066 self.inner.set_header(&header)
1067 }
1068
1069 fn into_options(self, operation: S3Operation) -> MetaRequestOptions<'a> {
1070 let mut options = MetaRequestOptions::new();
1071 if let Some(checksum_config) = self.checksum_config {
1072 options.checksum_config(checksum_config);
1073 }
1074 if let Some(signing_config) = self.signing_config {
1075 options.signing_config(signing_config);
1076 }
1077 options
1078 .message(self.inner)
1079 .endpoint(self.uri)
1080 .request_type(operation.meta_request_type());
1081 if let Some(operation_name) = operation.operation_name() {
1082 options.operation_name(operation_name);
1083 }
1084 options
1085 }
1086}
1087
1088#[derive(Debug)]
1089#[pin_project]
1090#[must_use]
1091struct S3MetaRequest<T, E> {
1092 #[pin]
1094 receiver: Fuse<oneshot::Receiver<ObjectClientResult<T, E, S3RequestError>>>,
1095 meta_request: CancellingMetaRequest,
1096}
1097
1098impl<T: Send, E: Send> Future for S3MetaRequest<T, E> {
1099 type Output = ObjectClientResult<T, E, S3RequestError>;
1100
1101 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1102 let this = self.project();
1103 this.receiver
1104 .poll(cx)
1105 .map(|result| result.unwrap_or_else(|err| Err(S3RequestError::internal_failure(err).into())))
1106 }
1107}
1108
1109impl<T: Send, E: Send> FusedFuture for S3MetaRequest<T, E> {
1110 fn is_terminated(&self) -> bool {
1111 self.receiver.is_terminated()
1112 }
1113}
1114
1115#[derive(Debug)]
1119struct CancellingMetaRequest {
1120 inner: MetaRequest,
1121}
1122
1123impl CancellingMetaRequest {
1124 fn wrap(meta_request: MetaRequest) -> Self {
1125 Self { inner: meta_request }
1126 }
1127}
1128
1129impl Drop for CancellingMetaRequest {
1130 fn drop(&mut self) {
1131 self.inner.cancel();
1132 }
1133}
1134
1135impl Deref for CancellingMetaRequest {
1136 type Target = MetaRequest;
1137
1138 fn deref(&self) -> &Self::Target {
1139 &self.inner
1140 }
1141}
1142
1143impl DerefMut for CancellingMetaRequest {
1144 fn deref_mut(&mut self) -> &mut Self::Target {
1145 &mut self.inner
1146 }
1147}
1148
1149#[derive(Error, Debug)]
1151#[non_exhaustive]
1152pub enum NewClientError {
1153 #[error("invalid S3 endpoint")]
1155 InvalidEndpoint(#[from] EndpointError),
1156 #[error("invalid AWS credentials")]
1158 ProviderFailure(#[source] mountpoint_s3_crt::common::error::Error),
1159 #[error("invalid configuration: {0}")]
1161 InvalidConfiguration(String),
1162 #[error("Unknown CRT error")]
1164 CrtError(#[source] mountpoint_s3_crt::common::error::Error),
1165}
1166
1167#[derive(Error, Debug)]
1169pub enum S3RequestError {
1170 #[error("Internal S3 client error")]
1172 InternalError(#[source] Box<dyn std::error::Error + Send + Sync>),
1173
1174 #[error("Unknown CRT error")]
1176 CrtError(#[from] mountpoint_s3_crt::common::error::Error),
1177
1178 #[error("Failed to construct request")]
1180 ConstructionFailure(#[from] ConstructionError),
1181
1182 #[error("Unknown response error: {0:?}")]
1184 ResponseError(MetaRequestResult),
1185
1186 #[error("Wrong region (expecting {0})")]
1188 IncorrectRegion(String, ClientErrorMetadata),
1189
1190 #[error("Forbidden: {0}")]
1192 Forbidden(String, ClientErrorMetadata),
1193
1194 #[error("No signing credentials available, see CRT debug logs")]
1196 NoSigningCredentials,
1197
1198 #[error("Request canceled")]
1200 RequestCanceled,
1201
1202 #[error("Request throttled")]
1204 Throttled,
1205
1206 #[error("Polled for data with empty read window")]
1210 EmptyReadWindow,
1211}
1212
1213impl S3RequestError {
1214 fn construction_failure(inner: impl Into<ConstructionError>) -> Self {
1215 S3RequestError::ConstructionFailure(inner.into())
1216 }
1217
1218 fn internal_failure(inner: impl std::error::Error + Send + Sync + 'static) -> Self {
1219 S3RequestError::InternalError(Box::new(inner))
1220 }
1221}
1222
1223impl ProvideErrorMetadata for S3RequestError {
1224 fn meta(&self) -> ClientErrorMetadata {
1225 match self {
1226 Self::ResponseError(request_result) => ClientErrorMetadata::from_meta_request_result(request_result),
1227 Self::Forbidden(_, metadata) => metadata.clone(),
1228 Self::Throttled => ClientErrorMetadata {
1229 http_code: Some(503),
1230 error_code: Some("SlowDown".to_string()),
1231 error_message: Some("Please reduce your request rate.".to_string()),
1232 },
1233 Self::IncorrectRegion(_, metadata) => metadata.clone(),
1234 _ => Default::default(),
1235 }
1236 }
1237}
1238
1239#[derive(Error, Debug)]
1240pub enum ConstructionError {
1241 #[error("Unknown CRT error")]
1243 CrtError(#[from] mountpoint_s3_crt::common::error::Error),
1244
1245 #[error("Invalid S3 endpoint")]
1247 InvalidEndpoint(#[from] EndpointError),
1248}
1249
1250fn operation_name_to_static_metrics_string(operation_name: Option<&str>) -> &'static str {
1252 const UNKNOWN_METRIC_STR: &str = "Unknown";
1253
1254 let Some(operation_name) = operation_name else {
1255 return UNKNOWN_METRIC_STR;
1256 };
1257
1258 macro_rules! map_to_static_str_for_known_str {
1261 ($input:expr, $($str_literal:literal),* $(,)?) => {
1262 match $input {
1263 $(
1264 $str_literal => Some($str_literal),
1265 )*
1266 _ => None
1267 }
1268 };
1269 }
1270
1271 let static_str = map_to_static_str_for_known_str!(
1272 operation_name,
1273 "GetObject",
1274 "HeadObject",
1275 "ListParts",
1276 "CreateMultipartUpload",
1277 "UploadPart",
1278 "AbortMultipartUpload",
1279 "CompleteMultipartUpload",
1280 "UploadPartCopy",
1281 "CopyObject",
1282 "PutObject",
1283 "ListObjectsV2",
1284 "DeleteObject",
1285 "GetObjectAttributes",
1286 "HeadBucket",
1287 "RenameObject",
1288 );
1289
1290 debug_assert!(
1291 static_str.is_some(),
1292 "input set as {operation_name:?} but no matcher, update required",
1293 );
1294 static_str.unwrap_or(UNKNOWN_METRIC_STR)
1295}
1296
1297fn extract_range_header(headers: &Headers) -> Option<Range<u64>> {
1299 let header = headers.get("Content-Range").ok()?;
1300 let value = header.value().to_str()?;
1301
1302 if !value.starts_with("bytes ") {
1305 return None;
1306 }
1307 let (_, value) = value.split_at("bytes ".len());
1308 let (range, _) = value.split_once('/')?;
1309 let (start, end) = range.split_once('-')?;
1310 let start = start.parse::<u64>().ok()?;
1311 let end = end.parse::<u64>().ok()?;
1312
1313 Some(start..end + 1)
1315}
1316
1317fn parse_checksum(headers: &Headers) -> Result<Checksum, HeadersError> {
1319 let checksum_crc32 = headers.get_as_optional_string("x-amz-checksum-crc32")?;
1320 let checksum_crc32c = headers.get_as_optional_string("x-amz-checksum-crc32c")?;
1321 let checksum_sha1 = headers.get_as_optional_string("x-amz-checksum-sha1")?;
1322 let checksum_sha256 = headers.get_as_optional_string("x-amz-checksum-sha256")?;
1323 let checksum_crc64nvme = headers.get_as_optional_string("x-amz-checksum-crc64nvme")?;
1324
1325 Ok(Checksum {
1326 checksum_crc64nvme,
1327 checksum_crc32,
1328 checksum_crc32c,
1329 checksum_sha1,
1330 checksum_sha256,
1331 })
1332}
1333
1334fn try_parse_generic_error(request_result: &MetaRequestResult) -> Option<S3RequestError> {
1336 fn try_parse_redirect(request_result: &MetaRequestResult) -> Option<S3RequestError> {
1338 let headers = request_result.error_response_headers.as_ref()?;
1339 let region_header = headers.get("x-amz-bucket-region").ok()?;
1340 let region = region_header.value().to_owned().into_string().ok()?;
1341 Some(S3RequestError::IncorrectRegion(
1342 region,
1343 ClientErrorMetadata::from_meta_request_result(request_result),
1344 ))
1345 }
1346
1347 fn try_parse_forbidden(request_result: &MetaRequestResult) -> Option<S3RequestError> {
1349 let Some(body) = request_result.error_response_body.as_ref() else {
1350 return Some(S3RequestError::Forbidden(
1353 "<no message>".to_owned(),
1354 ClientErrorMetadata {
1355 http_code: Some(request_result.response_status),
1356 ..Default::default()
1357 },
1358 ));
1359 };
1360 let error_elem = xmltree::Element::parse(body.as_bytes()).ok()?;
1361 let error_code = error_elem.get_child("Code")?;
1362 let error_code_str = error_code.get_text()?;
1363 if request_result.response_status == 403
1366 || matches!(
1367 error_code_str.deref(),
1368 "AccessDenied" | "InvalidToken" | "ExpiredToken" | "SignatureDoesNotMatch"
1369 )
1370 {
1371 let message = error_elem
1372 .get_child("Message")
1373 .and_then(|e| e.get_text())
1374 .unwrap_or(error_code_str.clone());
1375 Some(S3RequestError::Forbidden(
1376 message.to_string(),
1377 ClientErrorMetadata {
1378 http_code: Some(request_result.response_status),
1379 error_code: Some(error_code_str.to_string()),
1380 error_message: Some(message.into_owned()),
1381 },
1382 ))
1383 } else {
1384 None
1385 }
1386 }
1387
1388 fn try_parse_no_credentials_or_generic(request_result: &MetaRequestResult) -> S3RequestError {
1390 let crt_error_code = request_result.crt_error.raw_error();
1391 if crt_error_code == mountpoint_s3_crt::auth::ErrorCode::AWS_AUTH_SIGNING_NO_CREDENTIALS as i32 {
1392 S3RequestError::NoSigningCredentials
1393 } else {
1394 S3RequestError::CrtError(crt_error_code.into())
1395 }
1396 }
1397
1398 fn try_parse_throttled(request_result: &MetaRequestResult) -> Option<S3RequestError> {
1399 let crt_error_code = request_result.crt_error.raw_error();
1400 if crt_error_code == mountpoint_s3_crt::s3::ErrorCode::AWS_ERROR_S3_SLOW_DOWN as i32 {
1401 Some(S3RequestError::Throttled)
1402 } else {
1403 None
1404 }
1405 }
1406
1407 fn try_parse_canceled_request(request_result: &MetaRequestResult) -> Option<S3RequestError> {
1409 request_result.is_canceled().then_some(S3RequestError::RequestCanceled)
1410 }
1411
1412 match request_result.response_status {
1413 301 => try_parse_redirect(request_result),
1414 400 => try_parse_forbidden(request_result).or_else(|| try_parse_redirect(request_result)),
1417 403 => try_parse_forbidden(request_result),
1418 0 => try_parse_throttled(request_result)
1420 .or_else(|| try_parse_canceled_request(request_result))
1421 .or_else(|| Some(try_parse_no_credentials_or_generic(request_result))),
1422 _ => None,
1423 }
1424}
1425
1426fn emit_throughput_metric(bytes: u64, duration: Duration, op: &'static str) {
1429 let throughput_mbps = bytes as f64 / 1024.0 / 1024.0 / duration.as_secs_f64();
1430 const MEGABYTE: u64 = 1024 * 1024;
1432 let bucket = if bytes < MEGABYTE {
1433 "<1MiB"
1434 } else if bytes <= 16 * MEGABYTE {
1435 "1-16MiB"
1436 } else {
1437 ">16MiB"
1438 };
1439 metrics::histogram!("s3.meta_requests.throughput_mibs", "op" => op, "size" => bucket).record(throughput_mbps);
1440}
1441
1442#[cfg_attr(not(docsrs), async_trait)]
1443impl ObjectClient for S3CrtClient {
1444 type GetObjectResponse = S3GetObjectResponse;
1445 type PutObjectRequest = S3PutObjectRequest;
1446 type ClientError = S3RequestError;
1447
1448 fn read_part_size(&self) -> usize {
1449 self.inner.read_part_size
1450 }
1451
1452 fn write_part_size(&self) -> usize {
1453 self.inner.write_part_size
1458 }
1459
1460 fn initial_read_window_size(&self) -> Option<usize> {
1461 if self.inner.enable_backpressure {
1462 Some(self.inner.initial_read_window_size)
1463 } else {
1464 None
1465 }
1466 }
1467
1468 fn mem_usage_stats(&self) -> Option<BufferPoolUsageStats> {
1469 let start = Instant::now();
1470 self.inner
1471 .s3_client
1472 .poll_default_buffer_pool_usage_stats()
1473 .inspect(|_| {
1474 metrics::histogram!("s3.client.buffer_pool.get_usage_latency_us")
1475 .record(start.elapsed().as_micros() as f64);
1476 })
1477 }
1478
1479 async fn delete_object(
1480 &self,
1481 bucket: &str,
1482 key: &str,
1483 ) -> ObjectClientResult<DeleteObjectResult, DeleteObjectError, Self::ClientError> {
1484 self.delete_object(bucket, key).await
1485 }
1486
1487 async fn copy_object(
1488 &self,
1489 source_bucket: &str,
1490 source_key: &str,
1491 destination_bucket: &str,
1492 destination_key: &str,
1493 params: &CopyObjectParams,
1494 ) -> ObjectClientResult<CopyObjectResult, CopyObjectError, S3RequestError> {
1495 self.copy_object(source_bucket, source_key, destination_bucket, destination_key, params)
1496 .await
1497 }
1498
1499 async fn get_object(
1500 &self,
1501 bucket: &str,
1502 key: &str,
1503 params: &GetObjectParams,
1504 ) -> ObjectClientResult<Self::GetObjectResponse, GetObjectError, Self::ClientError> {
1505 self.get_object(bucket, key, params).await
1506 }
1507
1508 async fn list_objects(
1509 &self,
1510 bucket: &str,
1511 continuation_token: Option<&str>,
1512 delimiter: &str,
1513 max_keys: usize,
1514 prefix: &str,
1515 ) -> ObjectClientResult<ListObjectsResult, ListObjectsError, Self::ClientError> {
1516 self.list_objects(bucket, continuation_token, delimiter, max_keys, prefix)
1517 .await
1518 }
1519
1520 async fn head_object(
1521 &self,
1522 bucket: &str,
1523 key: &str,
1524 params: &HeadObjectParams,
1525 ) -> ObjectClientResult<HeadObjectResult, HeadObjectError, Self::ClientError> {
1526 self.head_object(bucket, key, params).await
1527 }
1528
1529 async fn put_object(
1530 &self,
1531 bucket: &str,
1532 key: &str,
1533 params: &PutObjectParams,
1534 ) -> ObjectClientResult<Self::PutObjectRequest, PutObjectError, Self::ClientError> {
1535 self.put_object(bucket, key, params).await
1536 }
1537
1538 async fn put_object_single<'a>(
1539 &self,
1540 bucket: &str,
1541 key: &str,
1542 params: &PutObjectSingleParams,
1543 contents: impl AsRef<[u8]> + Send + 'a,
1544 ) -> ObjectClientResult<PutObjectResult, PutObjectError, Self::ClientError> {
1545 self.put_object_single(bucket, key, params, contents).await
1546 }
1547
1548 async fn get_object_attributes(
1549 &self,
1550 bucket: &str,
1551 key: &str,
1552 max_parts: Option<usize>,
1553 part_number_marker: Option<usize>,
1554 object_attributes: &[ObjectAttribute],
1555 ) -> ObjectClientResult<GetObjectAttributesResult, GetObjectAttributesError, Self::ClientError> {
1556 self.get_object_attributes(bucket, key, max_parts, part_number_marker, object_attributes)
1557 .await
1558 }
1559
1560 async fn rename_object(
1561 &self,
1562 bucket: &str,
1563 src_key: &str,
1564 dst_key: &str,
1565 params: &RenameObjectParams,
1566 ) -> ObjectClientResult<RenameObjectResult, RenameObjectError, Self::ClientError> {
1567 self.rename_object(bucket, src_key, dst_key, params).await
1568 }
1569}
1570
1571pub trait OnTelemetry: std::fmt::Debug + Send + Sync {
1573 fn on_telemetry(&self, request_metrics: &RequestMetrics);
1574}
1575
1576#[cfg(test)]
1577mod tests {
1578 use mountpoint_s3_crt::common::error::Error;
1579 use rusty_fork::rusty_fork_test;
1580 use std::assert_eq;
1581
1582 use super::*;
1583 use test_case::test_case;
1584
1585 fn client_new_fails_with_invalid_part_size(part_size: usize) {
1587 let config = S3ClientConfig::default().part_size(part_size);
1588 let e = S3CrtClient::new(config).expect_err("creating a new client should fail");
1589 let message = if cfg!(target_pointer_width = "64") {
1590 "invalid configuration: part size must be at between 5MiB and 5GiB".to_string()
1591 } else {
1592 format!(
1593 "invalid configuration: part size must be at between 5MiB and {}GiB",
1594 usize::MAX / 1024 / 1024 / 1024
1595 )
1596 };
1597 assert_eq!(e.to_string(), message);
1598 }
1599
1600 #[cfg(target_pointer_width = "64")]
1603 #[test]
1604 fn client_new_fails_with_greater_part_size() {
1605 let part_size = 6 * 1024 * 1024 * 1024; client_new_fails_with_invalid_part_size(part_size);
1607 }
1608
1609 #[test]
1610 fn client_new_fails_with_smaller_part_size() {
1611 let part_size = 4 * 1024 * 1024; client_new_fails_with_invalid_part_size(part_size);
1613 }
1614
1615 #[test]
1617 fn test_user_agent_with_prefix() {
1618 let user_agent_prefix = String::from("someprefix");
1619 let expected_user_agent = "someprefix mountpoint-s3-client/";
1620
1621 let config = S3ClientConfig {
1622 user_agent: Some(UserAgent::new(Some(user_agent_prefix))),
1623 ..Default::default()
1624 };
1625
1626 let client = S3CrtClient::new(config).expect("Create test client");
1627
1628 let mut message = client
1629 .inner
1630 .new_request_template("GET", "amzn-s3-demo-bucket")
1631 .expect("new request template expected");
1632
1633 let headers = message.inner.get_headers().expect("Expected a block of HTTP headers");
1634
1635 let user_agent_header = headers
1636 .get("User-Agent")
1637 .expect("User Agent Header expected with given prefix");
1638 let user_agent_header_value = user_agent_header.value();
1639
1640 assert!(
1641 user_agent_header_value
1642 .to_string_lossy()
1643 .starts_with(expected_user_agent)
1644 );
1645 }
1646
1647 fn assert_expected_host(expected_host: &str, endpoint_config: EndpointConfig) {
1648 let config = S3ClientConfig {
1649 endpoint_config,
1650 ..Default::default()
1651 };
1652
1653 let client = S3CrtClient::new(config).expect("create test client");
1654
1655 let mut message = client
1656 .inner
1657 .new_request_template("GET", "")
1658 .expect("new request template expected");
1659
1660 let headers = message.inner.get_headers().expect("expected a block of HTTP headers");
1661
1662 let host_header = headers.get("Host").expect("Host header expected");
1663 let host_header_value = host_header.value();
1664
1665 assert_eq!(host_header_value.to_string_lossy(), expected_host);
1666 }
1667
1668 rusty_fork_test! {
1670 #[test]
1671 fn test_endpoint_favors_parameter_over_env_variable() {
1672 let endpoint_uri = Uri::new_from_str(&Allocator::default(), "https://s3.us-west-2.amazonaws.com").unwrap();
1673 let endpoint_config = EndpointConfig::new("region-place-holder").endpoint(endpoint_uri);
1674
1675 unsafe { std::env::set_var("AWS_ENDPOINT_URL", "https://s3.us-east-1.amazonaws.com"); }
1677
1678 assert_expected_host("s3.us-west-2.amazonaws.com", endpoint_config);
1680 }
1681
1682 #[test]
1683 fn test_endpoint_favors_env_variable() {
1684 let endpoint_config = EndpointConfig::new("us-east-1");
1685
1686 unsafe { std::env::set_var("AWS_ENDPOINT_URL", "https://s3.eu-west-1.amazonaws.com"); }
1688
1689 assert_expected_host("s3.eu-west-1.amazonaws.com", endpoint_config);
1690 }
1691
1692 #[test]
1693 fn test_endpoint_with_invalid_env_variable() {
1694 let endpoint_config = EndpointConfig::new("us-east-1");
1695
1696 unsafe { std::env::set_var("AWS_ENDPOINT_URL", "htp:/bad:url"); }
1698
1699 let config = S3ClientConfig {
1700 endpoint_config,
1701 ..Default::default()
1702 };
1703
1704 let client = S3CrtClient::new(config);
1705 match client {
1706 Ok(_) => panic!("expected an error"),
1707 Err(e) => assert_eq!(e.to_string().to_lowercase(), "invalid s3 endpoint"),
1708 }
1709 }
1710
1711 }
1712
1713 #[test]
1715 fn test_user_agent_without_prefix() {
1716 let expected_user_agent = "mountpoint-s3-client/";
1717
1718 let config: S3ClientConfig = Default::default();
1719
1720 let client = S3CrtClient::new(config).expect("Create test client");
1721
1722 let mut message = client
1723 .inner
1724 .new_request_template("GET", "amzn-s3-demo-bucket")
1725 .expect("new request template expected");
1726
1727 let headers = message.inner.get_headers().expect("Expected a block of HTTP headers");
1728
1729 let user_agent_header = headers
1730 .get("User-Agent")
1731 .expect("User Agent Header expected with given prefix");
1732 let user_agent_header_value = user_agent_header.value();
1733
1734 assert!(
1735 user_agent_header_value
1736 .to_string_lossy()
1737 .starts_with(expected_user_agent)
1738 );
1739 }
1740
1741 #[test_case("bytes 200-1000/67589" => Some(200..1001))]
1742 #[test_case("bytes 200-1000/*" => Some(200..1001))]
1743 #[test_case("bytes 200-1000" => None)]
1744 #[test_case("bytes */67589" => None)]
1745 #[test_case("octets 200-1000]" => None)]
1746 fn parse_content_range(range: &str) -> Option<Range<u64>> {
1747 let mut headers = Headers::new(&Allocator::default()).unwrap();
1748 let header = Header::new("Content-Range", range);
1749 headers.add_header(&header).unwrap();
1750 extract_range_header(&headers)
1751 }
1752
1753 #[test]
1755 fn test_expected_bucket_owner() {
1756 let expected_bucket_owner = "111122223333";
1757
1758 let config: S3ClientConfig = S3ClientConfig::new().bucket_owner("111122223333");
1759
1760 let client = S3CrtClient::new(config).expect("Create test client");
1761
1762 let mut message = client
1763 .inner
1764 .new_request_template("GET", "amzn-s3-demo-bucket")
1765 .expect("new request template expected");
1766
1767 let headers = message.inner.get_headers().expect("Expected a block of HTTP headers");
1768
1769 let expected_bucket_owner_header = headers
1770 .get("x-amz-expected-bucket-owner")
1771 .expect("the headers should contain x-amz-expected-bucket-owner");
1772 let expected_bucket_owner_value = expected_bucket_owner_header.value();
1773
1774 assert!(
1775 expected_bucket_owner_value
1776 .to_string_lossy()
1777 .starts_with(expected_bucket_owner)
1778 );
1779 }
1780
1781 fn make_result(
1782 response_status: i32,
1783 body: impl Into<OsString>,
1784 bucket_region_header: Option<&str>,
1785 ) -> MetaRequestResult {
1786 let error_response_headers = bucket_region_header.map(|h| {
1787 let mut headers = Headers::new(&Allocator::default()).unwrap();
1788 headers.add_header(&Header::new("x-amz-bucket-region", h)).unwrap();
1789 headers
1790 });
1791 MetaRequestResult {
1792 response_status,
1793 crt_error: 1i32.into(),
1794 error_response_headers,
1795 error_response_body: Some(body.into()),
1796 }
1797 }
1798
1799 #[test]
1800 fn parse_301_redirect() {
1801 let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>PermanentRedirect</Code><Message>The bucket you are attempting to access must be addressed using the specified endpoint. Please send all future requests to this endpoint.</Message><Endpoint>amzn-s3-demo-bucket.s3-us-west-2.amazonaws.com</Endpoint><Bucket>amzn-s3-demo-bucket</Bucket><RequestId>CM0Z9YFABRVSWXDJ</RequestId><HostId>HHmbUixasrJ02DlkOSCvJId897Jm0ERHuE2XMkSn2Oax1J/ad2+AU9nFrODN1ay13cWFgIAYBnI=</HostId></Error>"#;
1802 let result = make_result(301, OsStr::from_bytes(&body[..]), Some("us-west-2"));
1803 let result = try_parse_generic_error(&result);
1804 let Some(S3RequestError::IncorrectRegion(region, _)) = result else {
1805 panic!("wrong result, got: {result:?}");
1806 };
1807 assert_eq!(region, "us-west-2");
1808 }
1809
1810 #[test]
1811 fn parse_403_access_denied() {
1812 let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>AccessDenied</Code><Message>Access Denied</Message><RequestId>CM0R497NB0WAQ977</RequestId><HostId>w1TqUKGaIuNAIgzqm/L2azuzgEBINxTngWPbV1iH2IvpLsVCCTKHJTh4HsGp4JnggHqVkA+KN1MGqHDw1+WEuA==</HostId></Error>"#;
1813 let result = make_result(403, OsStr::from_bytes(&body[..]), None);
1814 let result = try_parse_generic_error(&result);
1815 let Some(S3RequestError::Forbidden(message, _)) = result else {
1816 panic!("wrong result, got: {result:?}");
1817 };
1818 assert_eq!(message, "Access Denied");
1819 }
1820
1821 #[test]
1822 fn parse_400_invalid_token() {
1823 let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>InvalidToken</Code><Message>The provided token is malformed or otherwise invalid.</Message><Token-0>THEREALTOKENGOESHERE</Token-0><RequestId>CBFNVADDAZ8661HK</RequestId><HostId>rb5dpgYeIFxi8p5BzVK8s8wG/nQ4a7C5kMBp/KWIT4bvOUihugpssMTy7xS0mispbz6IIaX8W1g=</HostId></Error>"#;
1824 let result = make_result(400, OsStr::from_bytes(&body[..]), None);
1825 let result = try_parse_generic_error(&result);
1826 let Some(S3RequestError::Forbidden(message, _)) = result else {
1827 panic!("wrong result, got: {result:?}");
1828 };
1829 assert_eq!(message, "The provided token is malformed or otherwise invalid.");
1830 }
1831
1832 #[test]
1833 fn parse_400_expired_token() {
1834 let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>ExpiredToken</Code><Message>The provided token has expired.</Message><Token-0>THEREALTOKENGOESHERE</Token-0><RequestId>RFXW0E15XSRPJYSW</RequestId><HostId>djitP7S+g43JSzR4pMOJpOO3RYpQUOUsmD4AqhRe3v24+JB/c+vwOEZgI8A35KDUe1cqQ5yKHwg=</HostId></Error>"#;
1835 let result = make_result(400, OsStr::from_bytes(&body[..]), None);
1836 let result = try_parse_generic_error(&result);
1837 let Some(S3RequestError::Forbidden(message, _)) = result else {
1838 panic!("wrong result, got: {result:?}");
1839 };
1840 assert_eq!(message, "The provided token has expired.");
1841 }
1842
1843 #[test]
1844 fn parse_400_redirect() {
1845 let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>AuthorizationHeaderMalformed</Code><Message>The authorization header is malformed; the region \'us-east-1\' is wrong; expecting \'us-west-2\'</Message><Region>us-west-2</Region><RequestId>VR3NH4JF5F39GB66</RequestId><HostId>ZDzYFC1w0E5K34+ZCAnvh9ZiGaAhvx5COyZVYTUnKvSP/694xCiXmJ2AEGZd5T1Epy9vB4EOOjk=</HostId></Error>"#;
1847 let result = make_result(400, OsStr::from_bytes(&body[..]), Some("us-west-2"));
1848 let result = try_parse_generic_error(&result);
1849 let Some(S3RequestError::IncorrectRegion(region, _)) = result else {
1850 panic!("wrong result, got: {result:?}");
1851 };
1852 assert_eq!(region, "us-west-2");
1853 }
1854
1855 #[test]
1856 fn parse_403_signature_does_not_match() {
1857 let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>SignatureDoesNotMatch</Code><Message>The request signature we calculated does not match the signature you provided. Check your key and signing method.</Message><AWSAccessKeyId>ASIASMEXAMPLE0000000</AWSAccessKeyId><StringToSign>EXAMPLE</StringToSign><SignatureProvided>EXAMPLE</SignatureProvided><StringToSignBytes>EXAMPLE</StringToSignBytes><CanonicalRequest>EXAMPLE</CanonicalRequest><CanonicalRequestBytes>EXAMPLE</CanonicalRequestBytes><RequestId>A1F516XX5M8AATSQ</RequestId><HostId>qs9dULIp5ABM7U+H8nGfzKtMYTxvqxIVvOYZ8lEFBDyTF4Fe+876Y4bLptG4mb+PTZFyG4yaUjg=</HostId></Error>"#;
1858 let result = make_result(403, OsStr::from_bytes(&body[..]), None);
1859 let result = try_parse_generic_error(&result);
1860 let Some(S3RequestError::Forbidden(message, _)) = result else {
1861 panic!("wrong result, got: {result:?}");
1862 };
1863 assert_eq!(
1864 message,
1865 "The request signature we calculated does not match the signature you provided. Check your key and signing method."
1866 );
1867 }
1868
1869 #[test]
1870 fn parse_403_made_up_error() {
1871 let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>NotARealError</Code><Message>This error is made up.</Message><RequestId>CM0R497NB0WAQ977</RequestId><HostId>w1TqUKGaIuNAIgzqm/L2azuzgEBINxTngWPbV1iH2IvpLsVCCTKHJTh4HsGp4JnggHqVkA+KN1MGqHDw1+WEuA==</HostId></Error>"#;
1873 let result = make_result(403, OsStr::from_bytes(&body[..]), None);
1874 let result = try_parse_generic_error(&result);
1875 let Some(S3RequestError::Forbidden(message, _)) = result else {
1876 panic!("wrong result, got: {result:?}");
1877 };
1878 assert_eq!(message, "This error is made up.");
1879 }
1880
1881 fn make_crt_error_result(response_status: i32, crt_error: Error) -> MetaRequestResult {
1882 MetaRequestResult {
1883 response_status,
1884 crt_error,
1885 error_response_headers: None,
1886 error_response_body: None,
1887 }
1888 }
1889
1890 #[test]
1891 fn parse_no_signing_credential_error() {
1892 let error_code = mountpoint_s3_crt::auth::ErrorCode::AWS_AUTH_SIGNING_NO_CREDENTIALS as i32;
1893 let result = make_crt_error_result(0, error_code.into());
1894 let result = try_parse_generic_error(&result);
1895 let Some(S3RequestError::NoSigningCredentials) = result else {
1896 panic!("wrong result, got: {result:?}");
1897 };
1898 }
1899
1900 #[test]
1901 fn parse_test_other_crt_error() {
1902 let error_code = mountpoint_s3_crt::auth::ErrorCode::AWS_AUTH_SIGNING_UNSUPPORTED_ALGORITHM as i32;
1904 let result = make_crt_error_result(0, error_code.into());
1905 let result = try_parse_generic_error(&result);
1906 let Some(S3RequestError::CrtError(error)) = result else {
1907 panic!("wrong result, got: {result:?}");
1908 };
1909 assert_eq!(error, error_code.into());
1910 }
1911
1912 #[test]
1913 fn test_checksum_sha256() {
1914 let mut headers = Headers::new(&Allocator::default()).unwrap();
1915 let value = "QwzjTQIHJO11oZbfwq1nx3dy0Wk=";
1916 let header = Header::new("x-amz-checksum-sha256", value.to_owned());
1917 headers.add_header(&header).unwrap();
1918
1919 let checksum = parse_checksum(&headers).expect("failed to parse headers");
1920 assert_eq!(checksum.checksum_crc32, None, "other checksums shouldn't be set");
1921 assert_eq!(checksum.checksum_crc32c, None, "other checksums shouldn't be set");
1922 assert_eq!(checksum.checksum_sha1, None, "other checksums shouldn't be set");
1923 assert_eq!(
1924 checksum.checksum_sha256,
1925 Some(value.to_owned()),
1926 "sha256 header should match"
1927 );
1928 }
1929
1930 #[test]
1931 fn test_operation_name_to_static_metrics_string() {
1932 assert_eq!(operation_name_to_static_metrics_string(Some("GetObject")), "GetObject");
1933 assert_eq!(
1934 operation_name_to_static_metrics_string(Some("RenameObject")),
1935 "RenameObject",
1936 );
1937 assert_eq!(operation_name_to_static_metrics_string(None), "Unknown");
1938 }
1939}