1use std::cmp;
2use std::ffi::{OsStr, OsString};
3use std::future::Future;
4use std::num::NonZeroUsize;
5use std::ops::Range;
6use std::ops::{Deref, DerefMut};
7use std::os::unix::prelude::OsStrExt;
8use std::pin::Pin;
9use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
10use std::sync::{Arc, Mutex};
11use std::task::{Context, Poll};
12use std::time::{Duration, Instant};
13
14use futures::FutureExt;
15use futures::future::{Fuse, FusedFuture};
16pub use mountpoint_s3_crt::auth::credentials::{CredentialsProvider, CredentialsProviderStaticOptions};
17use mountpoint_s3_crt::auth::credentials::{CredentialsProviderChainDefaultOptions, CredentialsProviderProfileOptions};
18use mountpoint_s3_crt::auth::signing_config::SigningConfig;
19use mountpoint_s3_crt::common::allocator::Allocator;
20pub use mountpoint_s3_crt::common::error::Error as CrtError;
21use mountpoint_s3_crt::common::string::AwsString;
22use mountpoint_s3_crt::common::uri::Uri;
23use mountpoint_s3_crt::http::request_response::{Header, Headers, HeadersError, Message};
24use mountpoint_s3_crt::io::channel_bootstrap::{ClientBootstrap, ClientBootstrapOptions};
25pub use mountpoint_s3_crt::io::event_loop::EventLoopGroup;
26use mountpoint_s3_crt::io::host_resolver::{AddressKinds, HostResolver, HostResolverDefaultOptions};
27use mountpoint_s3_crt::io::retry_strategy::{ExponentialBackoffJitterMode, RetryStrategy, StandardRetryOptions};
28use mountpoint_s3_crt::io::stream::InputStream;
29use mountpoint_s3_crt::s3::buffer::Buffer;
30use mountpoint_s3_crt::s3::client::{
31 BufferPoolUsageStats, ChecksumConfig, Client, ClientConfig, MetaRequest, MetaRequestOptions, MetaRequestResult,
32 MetaRequestType, RequestMetrics, init_signing_config,
33};
34
35use async_trait::async_trait;
36use futures::channel::oneshot;
37use mountpoint_s3_crt::s3::pool::{CrtBufferPoolFactory, MemoryPool, MemoryPoolFactory};
38use percent_encoding::{AsciiSet, NON_ALPHANUMERIC, percent_encode};
39use pin_project::pin_project;
40use thiserror::Error;
41use tracing::{Span, debug, error, trace};
42
43use crate::checksums::{crc32_to_base64, crc32c_to_base64, crc64nvme_to_base64, sha1_to_base64, sha256_to_base64};
44use crate::endpoint_config::EndpointError;
45use crate::endpoint_config::{self, EndpointConfig};
46use crate::error_metadata::{ClientErrorMetadata, ProvideErrorMetadata};
47use crate::metrics::{
48 ATTR_HTTP_STATUS, ATTR_S3_REQUEST, S3_REQUEST_CANCELED, S3_REQUEST_COUNT, S3_REQUEST_ERRORS,
49 S3_REQUEST_FIRST_BYTE_LATENCY, S3_REQUEST_TOTAL_LATENCY,
50};
51use crate::object_client::*;
52use crate::user_agent::UserAgent;
53
54macro_rules! request_span {
55 ($self:expr, $method:expr, $($field:tt)*) => {{
56 let counter = $self.next_request_counter();
57 let span = tracing::warn_span!(target: "mountpoint_s3_client::s3_crt_client::request", $method, id = counter, $($field)*);
61 span.in_scope(|| tracing::debug!("new request"));
62 span
63 }};
64 ($self:expr, $method:expr) => { request_span!($self, $method,) };
65}
66
67pub(crate) mod copy_object;
68pub(crate) mod delete_object;
69pub(crate) mod get_object;
70
71pub(crate) use get_object::S3GetObjectResponse;
72pub(crate) mod get_object_attributes;
73
74pub(crate) mod head_object;
75pub(crate) mod list_objects;
76
77pub(crate) mod rename_object;
78
79pub(crate) mod head_bucket;
80pub(crate) mod put_object;
81pub use head_bucket::HeadBucketError;
82pub(crate) use put_object::S3PutObjectRequest;
83
84macro_rules! event {
87 ($level:expr, $($args:tt)*) => {
88 match $level {
89 ::tracing::Level::ERROR => ::tracing::event!(::tracing::Level::ERROR, $($args)*),
90 ::tracing::Level::WARN => ::tracing::event!(::tracing::Level::WARN, $($args)*),
91 ::tracing::Level::INFO => ::tracing::event!(::tracing::Level::INFO, $($args)*),
92 ::tracing::Level::DEBUG => ::tracing::event!(::tracing::Level::DEBUG, $($args)*),
93 ::tracing::Level::TRACE => ::tracing::event!(::tracing::Level::TRACE, $($args)*),
94 }
95 }
96}
97
98#[derive(Debug, Clone)]
100pub struct S3ClientConfig {
101 auth_config: S3ClientAuthConfig,
102 throughput_target_gbps: f64,
103 memory_limit_in_bytes: u64,
104 read_part_size: usize,
105 write_part_size: usize,
106 endpoint_config: EndpointConfig,
107 user_agent: Option<UserAgent>,
108 request_payer: Option<String>,
109 bucket_owner: Option<String>,
110 max_attempts: Option<NonZeroUsize>,
111 read_backpressure: bool,
112 initial_read_window: usize,
113 network_interface_names: Vec<String>,
114 telemetry_callback: Option<Arc<dyn OnTelemetry>>,
115 event_loop_threads: Option<u16>,
116 buffer_pool_factory: Option<CrtBufferPoolFactory>,
117}
118
119impl Default for S3ClientConfig {
120 fn default() -> Self {
121 const DEFAULT_PART_SIZE: usize = 8 * 1024 * 1024;
122 Self {
123 auth_config: Default::default(),
124 throughput_target_gbps: 10.0,
125 memory_limit_in_bytes: 0,
126 read_part_size: DEFAULT_PART_SIZE,
127 write_part_size: DEFAULT_PART_SIZE,
128 endpoint_config: EndpointConfig::new("us-east-1"),
129 user_agent: None,
130 request_payer: None,
131 bucket_owner: None,
132 max_attempts: None,
133 read_backpressure: false,
134 initial_read_window: DEFAULT_PART_SIZE,
135 network_interface_names: vec![],
136 telemetry_callback: None,
137 event_loop_threads: None,
138 buffer_pool_factory: None,
139 }
140 }
141}
142
143impl S3ClientConfig {
144 pub fn new() -> Self {
145 Self::default()
146 }
147
148 #[must_use = "S3ClientConfig follows a builder pattern"]
150 pub fn auth_config(mut self, auth_config: S3ClientAuthConfig) -> Self {
151 self.auth_config = auth_config;
152 self
153 }
154
155 #[must_use = "S3ClientConfig follows a builder pattern"]
157 pub fn part_size(mut self, part_size: usize) -> Self {
158 self.read_part_size = part_size;
159 self.write_part_size = part_size;
160 self
161 }
162
163 #[must_use = "S3ClientConfig follows a builder pattern"]
165 pub fn read_part_size(mut self, size: usize) -> Self {
166 self.read_part_size = size;
167 self
168 }
169
170 #[must_use = "S3ClientConfig follows a builder pattern"]
172 pub fn write_part_size(mut self, size: usize) -> Self {
173 self.write_part_size = size;
174 self
175 }
176
177 #[must_use = "S3ClientConfig follows a builder pattern"]
179 pub fn throughput_target_gbps(mut self, throughput_target_gbps: f64) -> Self {
180 self.throughput_target_gbps = throughput_target_gbps;
181 self
182 }
183
184 #[must_use = "S3ClientConfig follows a builder pattern"]
186 pub fn memory_limit_in_bytes(mut self, memory_limit_in_bytes: u64) -> Self {
187 self.memory_limit_in_bytes = memory_limit_in_bytes;
188 self
189 }
190
191 #[must_use = "S3ClientConfig follows a builder pattern"]
193 pub fn endpoint_config(mut self, endpoint_config: EndpointConfig) -> Self {
194 self.endpoint_config = endpoint_config;
195 self
196 }
197
198 #[must_use = "S3ClientConfig follows a builder pattern"]
200 pub fn user_agent(mut self, user_agent: UserAgent) -> Self {
201 self.user_agent = Some(user_agent);
202 self
203 }
204
205 #[must_use = "S3ClientConfig follows a builder pattern"]
207 pub fn request_payer(mut self, request_payer: &str) -> Self {
208 self.request_payer = Some(request_payer.to_owned());
209 self
210 }
211
212 #[must_use = "S3ClientConfig follows a builder pattern"]
214 pub fn bucket_owner(mut self, bucket_owner: &str) -> Self {
215 self.bucket_owner = Some(bucket_owner.to_owned());
216 self
217 }
218
219 #[must_use = "S3ClientConfig follows a builder pattern"]
222 pub fn max_attempts(mut self, max_attempts: NonZeroUsize) -> Self {
223 self.max_attempts = Some(max_attempts);
224 self
225 }
226
227 #[must_use = "S3ClientConfig follows a builder pattern"]
229 pub fn read_backpressure(mut self, read_backpressure: bool) -> Self {
230 self.read_backpressure = read_backpressure;
231 self
232 }
233
234 #[must_use = "S3ClientConfig follows a builder pattern"]
236 pub fn initial_read_window(mut self, initial_read_window: usize) -> Self {
237 self.initial_read_window = initial_read_window;
238 self
239 }
240
241 #[must_use = "S3ClientConfig follows a builder pattern"]
243 pub fn network_interface_names(mut self, network_interface_names: Vec<String>) -> Self {
244 self.network_interface_names = network_interface_names;
245 self
246 }
247
248 #[must_use = "S3ClientConfig follows a builder pattern"]
250 pub fn telemetry_callback(mut self, telemetry_callback: Arc<dyn OnTelemetry>) -> Self {
251 self.telemetry_callback = Some(telemetry_callback);
252 self
253 }
254
255 #[must_use = "S3ClientConfig follows a builder pattern"]
257 pub fn event_loop_threads(mut self, event_loop_threads: u16) -> Self {
258 self.event_loop_threads = Some(event_loop_threads);
259 self
260 }
261
262 #[must_use = "S3ClientConfig follows a builder pattern"]
264 pub fn memory_pool(mut self, pool: impl MemoryPool) -> Self {
265 self.buffer_pool_factory = Some(CrtBufferPoolFactory::new(move |_| pool.clone()));
266 self
267 }
268
269 #[must_use = "S3ClientConfig follows a builder pattern"]
271 pub fn memory_pool_factory(mut self, pool_factory: impl MemoryPoolFactory) -> Self {
272 self.buffer_pool_factory = Some(CrtBufferPoolFactory::new(pool_factory));
273 self
274 }
275}
276
277#[derive(Debug, Clone, Default)]
279pub enum S3ClientAuthConfig {
280 #[default]
282 Default,
283 NoSigning,
285 Profile(String),
287 Provider(CredentialsProvider),
289}
290
291#[derive(Debug, Clone)]
301pub struct S3CrtClient {
302 inner: Arc<S3CrtClientInner>,
303}
304
305impl S3CrtClient {
306 pub fn new(config: S3ClientConfig) -> Result<Self, NewClientError> {
308 Ok(Self {
309 inner: Arc::new(S3CrtClientInner::new(config)?),
310 })
311 }
312
313 pub fn endpoint_config(&self) -> EndpointConfig {
315 self.inner.endpoint_config.clone()
316 }
317
318 #[doc(hidden)]
319 pub fn event_loop_group(&self) -> EventLoopGroup {
320 self.inner.event_loop_group.clone()
321 }
322}
323
324#[derive(Debug)]
325struct S3CrtClientInner {
326 s3_client: Client,
327 event_loop_group: EventLoopGroup,
328 endpoint_config: EndpointConfig,
329 allocator: Allocator,
330 next_request_counter: AtomicU64,
331 user_agent_header: String,
334 request_payer: Option<String>,
335 read_part_size: usize,
336 write_part_size: usize,
337 enable_backpressure: bool,
338 initial_read_window_size: usize,
339 bucket_owner: Option<String>,
340 credentials_provider: Option<CredentialsProvider>,
341 host_resolver: HostResolver,
342 telemetry_callback: Option<Arc<dyn OnTelemetry>>,
343}
344
345impl S3CrtClientInner {
346 fn new(config: S3ClientConfig) -> Result<Self, NewClientError> {
347 let allocator = Allocator::default();
348
349 let mut event_loop_group = EventLoopGroup::new_default(&allocator, config.event_loop_threads, || {}).unwrap();
350
351 let resolver_options = HostResolverDefaultOptions {
352 max_entries: 8,
353 event_loop_group: &mut event_loop_group,
354 };
355
356 let mut host_resolver = HostResolver::new_default(&allocator, &resolver_options).unwrap();
357
358 let bootstrap_options = ClientBootstrapOptions {
359 event_loop_group: &mut event_loop_group,
360 host_resolver: &mut host_resolver,
361 };
362
363 let mut client_bootstrap = ClientBootstrap::new(&allocator, &bootstrap_options).unwrap();
364
365 let mut client_config = ClientConfig::new();
366
367 let retry_strategy = {
368 let mut retry_strategy_options = StandardRetryOptions::default(&mut event_loop_group);
369 let max_attempts = std::env::var("AWS_MAX_ATTEMPTS")
370 .ok()
371 .and_then(|s| s.parse::<usize>().ok())
372 .or_else(|| config.max_attempts.map(|m| m.get()))
373 .unwrap_or(3);
374 retry_strategy_options.backoff_retry_options.max_retries = max_attempts.saturating_sub(1);
377 retry_strategy_options.backoff_retry_options.backoff_scale_factor = Duration::from_millis(500);
378 retry_strategy_options.backoff_retry_options.jitter_mode = ExponentialBackoffJitterMode::Full;
379 RetryStrategy::standard(&allocator, &retry_strategy_options).unwrap()
380 };
381
382 trace!("constructing client with auth config {:?}", config.auth_config);
383 let credentials_provider = match config.auth_config {
384 S3ClientAuthConfig::Default => {
385 let credentials_chain_default_options = CredentialsProviderChainDefaultOptions {
386 bootstrap: &mut client_bootstrap,
387 };
388 CredentialsProvider::new_chain_default(&allocator, credentials_chain_default_options)
389 .map_err(NewClientError::ProviderFailure)?
390 }
391 S3ClientAuthConfig::NoSigning => {
392 CredentialsProvider::new_anonymous(&allocator).map_err(NewClientError::ProviderFailure)?
393 }
394 S3ClientAuthConfig::Profile(profile_name) => {
395 let credentials_profile_options = CredentialsProviderProfileOptions {
396 bootstrap: &mut client_bootstrap,
397 profile_name_override: &profile_name,
398 };
399 CredentialsProvider::new_profile(&allocator, credentials_profile_options)
400 .map_err(NewClientError::ProviderFailure)?
401 }
402 S3ClientAuthConfig::Provider(provider) => provider,
403 };
404
405 let endpoint_config = config.endpoint_config;
406 client_config.region(endpoint_config.get_region());
407 let signing_config = init_signing_config(
408 endpoint_config.get_region(),
409 credentials_provider.clone(),
410 None,
411 None,
412 None,
413 );
414
415 let endpoint_config = match endpoint_config.get_endpoint() {
416 None => {
417 if let Ok(aws_endpoint_url) = std::env::var("AWS_ENDPOINT_URL") {
419 debug!("using AWS_ENDPOINT_URL {}", aws_endpoint_url);
420 let env_uri = Uri::new_from_str(&allocator, &aws_endpoint_url)
421 .map_err(|e| EndpointError::InvalidUri(endpoint_config::InvalidUriError::CouldNotParse(e)))?;
422 endpoint_config.endpoint(env_uri)
423 } else {
424 endpoint_config
425 }
426 }
427 Some(_) => endpoint_config,
428 };
429
430 client_config.express_support(true);
431 client_config.read_backpressure(config.read_backpressure);
432 client_config.initial_read_window(config.initial_read_window);
433 client_config.signing_config(signing_config);
434
435 client_config
436 .client_bootstrap(client_bootstrap)
437 .retry_strategy(retry_strategy);
438
439 client_config.throughput_target_gbps(config.throughput_target_gbps);
440 client_config.memory_limit_in_bytes(config.memory_limit_in_bytes);
441
442 if let Some(pool_factory) = &config.buffer_pool_factory {
443 client_config.buffer_pool_factory(pool_factory.clone());
444 }
445
446 let max_part_size = cmp::min(5_u64 * 1024 * 1024 * 1024, usize::MAX as u64) as usize;
448 for part_size in [config.read_part_size, config.write_part_size] {
450 if !(5 * 1024 * 1024..=max_part_size).contains(&part_size) {
451 return Err(NewClientError::InvalidConfiguration(format!(
452 "part size must be at between 5MiB and {}GiB",
453 max_part_size / 1024 / 1024 / 1024
454 )));
455 }
456 }
457
458 if !config.network_interface_names.is_empty() {
459 client_config.network_interface_names(config.network_interface_names);
460 }
461
462 let user_agent = config.user_agent.unwrap_or_else(|| UserAgent::new(None));
463 let user_agent_header = user_agent.build();
464
465 let s3_client = Client::new(&allocator, client_config).map_err(NewClientError::CrtError)?;
466
467 Ok(Self {
468 allocator,
469 s3_client,
470 event_loop_group,
471 endpoint_config,
472 next_request_counter: AtomicU64::new(0),
473 user_agent_header,
474 request_payer: config.request_payer,
475 read_part_size: config.read_part_size,
476 write_part_size: config.write_part_size,
477 enable_backpressure: config.read_backpressure,
478 initial_read_window_size: config.initial_read_window,
479 bucket_owner: config.bucket_owner,
480 credentials_provider: Some(credentials_provider),
481 host_resolver,
482 telemetry_callback: config.telemetry_callback,
483 })
484 }
485
486 fn new_request_template(&self, method: &str, bucket: &str) -> Result<S3Message<'_>, ConstructionError> {
491 let endpoint = self.endpoint_config.resolve_for_bucket(bucket)?;
492 let uri = endpoint.uri()?;
493 trace!(?uri, "resolved endpoint");
494
495 let signing_config = if let Some(credentials_provider) = &self.credentials_provider {
496 let auth_scheme = match endpoint.auth_scheme() {
497 Ok(auth_scheme) => auth_scheme,
498 Err(e) => {
499 error!(error=?e, "no auth scheme for endpoint");
500 return Err(e.into());
501 }
502 };
503 trace!(?auth_scheme, "resolved auth scheme");
504 let algorithm = Some(auth_scheme.scheme_name());
505 let service = Some(auth_scheme.signing_name());
506 let use_double_uri_encode = Some(!auth_scheme.disable_double_encoding());
507 Some(init_signing_config(
508 auth_scheme.signing_region(),
509 credentials_provider.clone(),
510 algorithm,
511 service,
512 use_double_uri_encode,
513 ))
514 } else {
515 None
516 };
517
518 let hostname = uri.host_name().to_str().unwrap();
519 let path_prefix = uri.path().to_os_string().into_string().unwrap();
520 let port = uri.host_port();
521 let hostname_header = if port > 0 {
522 format!("{hostname}:{port}")
523 } else {
524 hostname.to_string()
525 };
526
527 let mut message = Message::new_request(&self.allocator)?;
528 message.set_request_method(method)?;
529 message.add_header(&Header::new("Host", hostname_header))?;
530 message.add_header(&Header::new("accept", "application/xml"))?;
531 message.add_header(&Header::new("User-Agent", &self.user_agent_header))?;
532
533 if let Some(ref payer) = self.request_payer {
534 message.add_header(&Header::new("x-amz-request-payer", payer))?;
535 }
536
537 if let Some(ref owner) = self.bucket_owner {
538 message.add_header(&Header::new("x-amz-expected-bucket-owner", owner))?;
539 }
540
541 Ok(S3Message {
542 inner: message,
543 uri,
544 path_prefix,
545 checksum_config: None,
546 signing_config,
547 })
548 }
549
550 #[allow(clippy::too_many_arguments)]
557 fn meta_request_with_callbacks<E: std::error::Error + Send + 'static>(
558 &self,
559 mut options: MetaRequestOptions,
560 request_span: Span,
561 on_request_finish: impl Fn(&RequestMetrics) + Send + 'static,
562 mut on_headers: impl FnMut(&Headers, i32) + Send + 'static,
563 mut on_body: impl FnMut(u64, &Buffer) + Send + 'static,
564 parse_meta_request_error: impl FnOnce(&MetaRequestResult) -> Option<E> + Send + 'static,
565 on_meta_request_result: impl FnOnce(ObjectClientResult<(), E, S3RequestError>) + Send + 'static,
566 ) -> Result<CancellingMetaRequest, S3RequestError> {
567 let span_telemetry = request_span.clone();
568 let span_body = request_span.clone();
569 let span_finish = request_span;
570
571 let endpoint = options.get_endpoint().expect("S3Message always has an endpoint");
572 let hostname = endpoint.host_name().to_str().unwrap().to_owned();
573 let host_resolver = self.host_resolver.clone();
574 let telemetry_callback = self.telemetry_callback.clone();
575
576 let start_time = Instant::now();
577 let first_body_part = Arc::new(AtomicBool::new(true));
578 let first_body_part_clone = Arc::clone(&first_body_part);
579 let total_bytes = Arc::new(AtomicU64::new(0));
580 let total_bytes_clone = Arc::clone(&total_bytes);
581
582 options
583 .on_telemetry(move |metrics| {
584 let _guard = span_telemetry.enter();
585
586 on_request_finish(metrics);
587
588 let http_status = metrics.status_code();
589 let request_canceled = metrics.is_canceled();
590 let request_failure = http_status.map(|status| !(200..299).contains(&status)).unwrap_or(!request_canceled);
591 let crt_error = Some(metrics.error()).filter(|e| e.is_err());
592 let operation_name = operation_name_to_static_metrics_string(metrics.operation_name());
593 let request_id = metrics.request_id().unwrap_or_else(|| "<unknown>".into());
594 let duration = metrics.total_duration();
595 let ttfb = metrics.time_to_first_byte();
596 let range = metrics.response_headers().and_then(|headers| extract_range_header(&headers));
597
598 let message = if request_failure {
599 "S3 request failed"
600 } else if request_canceled {
601 "S3 request canceled"
602 } else {
603 "S3 request finished"
604 };
605 debug!(?operation_name, ?crt_error, http_status, ?range, ?duration, ?ttfb, %request_id, "{}", message);
606 trace!(detailed_metrics=?metrics, "S3 request completed");
607
608 if let Some(ttfb) = ttfb {
609 metrics::histogram!(S3_REQUEST_FIRST_BYTE_LATENCY, ATTR_S3_REQUEST => operation_name).record(ttfb.as_micros() as f64);
610 }
611 metrics::histogram!(S3_REQUEST_TOTAL_LATENCY, ATTR_S3_REQUEST => operation_name).record(duration.as_micros() as f64);
612 metrics::counter!(S3_REQUEST_COUNT, ATTR_S3_REQUEST => operation_name).increment(1);
613 if request_failure {
614 metrics::counter!(S3_REQUEST_ERRORS, ATTR_S3_REQUEST => operation_name, ATTR_HTTP_STATUS => http_status.unwrap_or(-1).to_string()).increment(1);
615 } else if request_canceled {
616 metrics::counter!(S3_REQUEST_CANCELED, ATTR_S3_REQUEST => operation_name).increment(1);
617 }
618
619 if let Some(telemetry_callback) = &telemetry_callback {
620 telemetry_callback.on_telemetry(metrics);
621 }
622 })
623 .on_headers(move |headers, response_status| {
624 (on_headers)(headers, response_status);
625 })
626 .on_body(move |range_start, data| {
627 let _guard = span_body.enter();
628
629 if first_body_part.compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst).ok() == Some(true) {
630 let latency = start_time.elapsed().as_micros() as f64;
631 let op = span_body.metadata().map(|m| m.name()).unwrap_or("unknown");
632 metrics::histogram!("s3.meta_requests.first_byte_latency_us", "op" => op).record(latency);
633 }
634 total_bytes.fetch_add(data.len() as u64, Ordering::SeqCst);
635
636 trace!(start = range_start, length = data.len(), "body part received");
637
638 (on_body)(range_start, data);
639 })
640 .on_finish(move |request_result| {
641 let _guard = span_finish.enter();
642
643 let op = span_finish.metadata().map(|m| m.name()).unwrap_or("unknown");
644 let duration = start_time.elapsed();
645
646 metrics::counter!("s3.meta_requests", "op" => op).increment(1);
647 metrics::histogram!("s3.meta_requests.total_latency_us", "op" => op).record(duration.as_micros() as f64);
648 if first_body_part_clone.compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst).ok() == Some(true) {
650 let latency = duration.as_micros() as f64;
651 metrics::histogram!("s3.meta_requests.first_byte_latency_us", "op" => op).record(latency);
652 }
653 let total_bytes = total_bytes_clone.load(Ordering::SeqCst);
654 if op == "get_object" {
657 emit_throughput_metric(total_bytes, duration, op);
658 }
659 let hostname_awsstring = AwsString::from_str(&hostname, &Allocator::default());
660 if let Ok(host_count) = host_resolver.get_host_address_count(&hostname_awsstring, AddressKinds::a()) {
661 metrics::gauge!("s3.client.host_count", "host" => hostname).set(host_count as f64);
662 }
663
664 let status_code = request_result.response_status;
665 let log_level = if (200..=399).contains(&status_code) || status_code == 404 || request_result.is_canceled() {
666 tracing::Level::DEBUG
667 } else {
668 tracing::Level::WARN
669 };
670
671 let result =
672 if !request_result.is_err() {
673 event!(log_level, ?duration, "meta request finished");
674 Ok(())
675 } else {
676 let error = parse_meta_request_error(&request_result).map(ObjectClientError::ServiceError);
679 let maybe_err = error.or_else(|| try_parse_generic_error(&request_result).map(ObjectClientError::ClientError));
680
681 let request_id = match &request_result.error_response_headers {
685 Some(headers) => headers.get("x-amz-request-id").map(|s| s.value().to_string_lossy().to_string()).ok(),
686 None => None,
687 };
688 let request_id = request_id.unwrap_or_else(|| "<unknown>".into());
689
690 let message = if request_result.is_canceled() {
691 "meta request canceled"
692 } else {
693 "meta request failed"
694 };
695 if let Some(error) = &maybe_err {
696 event!(log_level, ?duration, %request_id, ?error, message);
697 debug!("meta request result: {:?}", request_result);
698 } else {
699 event!(log_level, ?duration, %request_id, ?request_result, message);
700 }
701
702 if request_result.is_canceled() {
703 metrics::counter!("s3.meta_requests.canceled", "op" => op).increment(1);
704 } else {
705 let error_status = if request_result.response_status >= 100 {
707 request_result.response_status
708 } else {
709 -request_result.crt_error.raw_error()
710 };
711 metrics::counter!("s3.meta_requests.failures", "op" => op, "status" => format!("{error_status}")).increment(1);
712 }
713
714 Err(maybe_err.unwrap_or_else(|| ObjectClientError::ClientError(S3RequestError::ResponseError(request_result))))
716 };
717
718 on_meta_request_result(result);
719 });
720
721 let meta_request = self.s3_client.make_meta_request(options)?;
723 Self::poll_client_metrics(&self.s3_client);
724 Ok(CancellingMetaRequest::wrap(meta_request))
725 }
726
727 fn meta_request_with_body_payload<E: std::error::Error + Send + 'static>(
734 &self,
735 options: MetaRequestOptions,
736 request_span: Span,
737 parse_meta_request_error: impl FnOnce(&MetaRequestResult) -> Option<E> + Send + 'static,
738 ) -> Result<S3MetaRequest<Vec<u8>, E>, S3RequestError> {
739 let (tx, rx) = oneshot::channel::<ObjectClientResult<Vec<u8>, E, S3RequestError>>();
740
741 let body: Arc<Mutex<Vec<u8>>> = Default::default();
743 let body_clone = Arc::clone(&body);
744
745 let meta_request = self.meta_request_with_callbacks(
746 options,
747 request_span,
748 |_| {},
749 |_, _| {},
750 move |offset, data| {
751 let mut body = body_clone.lock().unwrap();
752 assert_eq!(offset as usize, body.len());
753 body.extend_from_slice(data);
754 },
755 parse_meta_request_error,
756 move |result| _ = tx.send(result.map(|_| std::mem::take(&mut *body.lock().unwrap()))),
757 )?;
758 Ok(S3MetaRequest {
759 receiver: rx.fuse(),
760 meta_request,
761 })
762 }
763
764 fn meta_request_with_headers_payload<E: std::error::Error + Send + 'static>(
771 &self,
772 options: MetaRequestOptions,
773 request_span: Span,
774 parse_meta_request_error: impl FnOnce(&MetaRequestResult) -> Option<E> + Send + 'static,
775 ) -> Result<S3MetaRequest<Headers, E>, S3RequestError> {
776 let (tx, rx) = oneshot::channel::<ObjectClientResult<Headers, E, S3RequestError>>();
777
778 let on_headers: Arc<Mutex<Option<Headers>>> = Default::default();
781 let on_result = on_headers.clone();
782
783 let meta_request = self.meta_request_with_callbacks(
784 options,
785 request_span,
786 |_| {},
787 move |headers, status| {
788 if (200..300).contains(&status) {
791 *on_headers.lock().unwrap() = Some(headers.clone());
792 }
793 },
794 |_, _| {},
795 parse_meta_request_error,
796 move |result| {
797 let headers =
799 result.and_then(|_| {
800 on_result.lock().unwrap().take().ok_or_else(|| {
801 S3RequestError::internal_failure(ResponseHeadersError::MissingHeaders).into()
802 })
803 });
804 _ = tx.send(headers);
805 },
806 )?;
807 Ok(S3MetaRequest {
808 receiver: rx.fuse(),
809 meta_request,
810 })
811 }
812
813 fn meta_request_without_payload<E: std::error::Error + Send + 'static>(
819 &self,
820 options: MetaRequestOptions,
821 request_span: Span,
822 parse_meta_request_error: impl FnOnce(&MetaRequestResult) -> Option<E> + Send + 'static,
823 ) -> Result<S3MetaRequest<(), E>, S3RequestError> {
824 let (tx, rx) = oneshot::channel::<ObjectClientResult<(), E, S3RequestError>>();
825
826 let meta_request = self.meta_request_with_callbacks(
827 options,
828 request_span,
829 |_| {},
830 |_, _| {},
831 |_, _| {},
832 parse_meta_request_error,
833 move |result| _ = tx.send(result),
834 )?;
835 Ok(S3MetaRequest {
836 receiver: rx.fuse(),
837 meta_request,
838 })
839 }
840
841 fn poll_client_metrics(s3_client: &Client) {
842 let metrics = s3_client.poll_client_metrics();
843 metrics::gauge!("s3.client.num_requests_being_processed").set(metrics.num_requests_tracked_requests as f64);
844 metrics::gauge!("s3.client.num_requests_being_prepared").set(metrics.num_requests_being_prepared as f64);
845 metrics::gauge!("s3.client.request_queue_size").set(metrics.request_queue_size as f64);
846 metrics::gauge!("s3.client.num_auto_default_network_io").set(metrics.num_auto_default_network_io as f64);
847 metrics::gauge!("s3.client.num_auto_ranged_get_network_io").set(metrics.num_auto_ranged_get_network_io as f64);
848 metrics::gauge!("s3.client.num_auto_ranged_put_network_io").set(metrics.num_auto_ranged_put_network_io as f64);
849 metrics::gauge!("s3.client.num_auto_ranged_copy_network_io")
850 .set(metrics.num_auto_ranged_copy_network_io as f64);
851 metrics::gauge!("s3.client.num_total_network_io").set(metrics.num_total_network_io() as f64);
852 metrics::gauge!("s3.client.num_requests_stream_queued_waiting")
853 .set(metrics.num_requests_stream_queued_waiting as f64);
854 metrics::gauge!("s3.client.num_requests_streaming_response")
855 .set(metrics.num_requests_streaming_response as f64);
856
857 let start = Instant::now();
859 if let Some(buffer_pool_stats) = s3_client.poll_default_buffer_pool_usage_stats() {
860 metrics::histogram!("s3.client.buffer_pool.get_usage_latency_us")
861 .record(start.elapsed().as_micros() as f64);
862 metrics::gauge!("s3.client.buffer_pool.mem_limit").set(buffer_pool_stats.mem_limit as f64);
863 metrics::gauge!("s3.client.buffer_pool.primary_cutoff").set(buffer_pool_stats.primary_cutoff as f64);
864 metrics::gauge!("s3.client.buffer_pool.primary_used").set(buffer_pool_stats.primary_used as f64);
865 metrics::gauge!("s3.client.buffer_pool.primary_allocated").set(buffer_pool_stats.primary_allocated as f64);
866 metrics::gauge!("s3.client.buffer_pool.primary_reserved").set(buffer_pool_stats.primary_reserved as f64);
867 metrics::gauge!("s3.client.buffer_pool.primary_num_blocks")
868 .set(buffer_pool_stats.primary_num_blocks as f64);
869 metrics::gauge!("s3.client.buffer_pool.secondary_reserved")
870 .set(buffer_pool_stats.secondary_reserved as f64);
871 metrics::gauge!("s3.client.buffer_pool.secondary_used").set(buffer_pool_stats.secondary_used as f64);
872 metrics::gauge!("s3.client.buffer_pool.forced_used").set(buffer_pool_stats.forced_used as f64);
873 }
874 }
875
876 fn next_request_counter(&self) -> u64 {
877 self.next_request_counter.fetch_add(1, Ordering::SeqCst)
878 }
879}
880
881#[derive(Debug, Error)]
883enum ResponseHeadersError {
884 #[error("response headers are missing")]
885 MissingHeaders,
886}
887
888#[derive(Debug, Clone, Copy)]
890enum S3Operation {
891 DeleteObject,
892 GetObject,
893 GetObjectAttributes,
894 HeadBucket,
895 HeadObject,
896 ListObjects,
897 PutObject,
898 CopyObject,
899 PutObjectSingle,
900}
901
902impl S3Operation {
903 fn meta_request_type(&self) -> MetaRequestType {
905 match self {
906 S3Operation::GetObject => MetaRequestType::GetObject,
907 S3Operation::PutObject => MetaRequestType::PutObject,
908 S3Operation::CopyObject => MetaRequestType::CopyObject,
909 _ => MetaRequestType::Default,
910 }
911 }
912
913 fn operation_name(&self) -> Option<&'static str> {
916 match self {
917 S3Operation::DeleteObject => Some("DeleteObject"),
918 S3Operation::GetObject => None,
919 S3Operation::GetObjectAttributes => Some("GetObjectAttributes"),
920 S3Operation::HeadBucket => Some("HeadBucket"),
921 S3Operation::HeadObject => Some("HeadObject"),
922 S3Operation::ListObjects => Some("ListObjectsV2"),
923 S3Operation::PutObject => None,
924 S3Operation::CopyObject => None,
925 S3Operation::PutObjectSingle => Some("PutObject"),
926 }
927 }
928}
929
930#[derive(Debug)]
935struct S3Message<'a> {
936 inner: Message<'a>,
937 uri: Uri,
938 path_prefix: String,
939 checksum_config: Option<ChecksumConfig>,
940 signing_config: Option<SigningConfig>,
941}
942
943const URLENCODE_QUERY_FRAGMENT: &AsciiSet = &NON_ALPHANUMERIC.remove(b'-').remove(b'.').remove(b'_').remove(b'~');
945const URLENCODE_PATH_FRAGMENT: &AsciiSet = &URLENCODE_QUERY_FRAGMENT.remove(b'/');
946
947fn write_encoded_fragment(s: &mut OsString, piece: impl AsRef<OsStr>, encoding: &'static AsciiSet) {
948 let iter = percent_encode(piece.as_ref().as_bytes(), encoding);
949 s.extend(iter.map(|s| OsStr::from_bytes(s.as_bytes())));
950}
951
952#[derive(Debug, Default)]
953enum QueryFragment<'a, P: AsRef<OsStr>> {
954 #[default]
955 Empty,
956 Query(&'a [(P, P)]),
957 Action(P),
958}
959
960impl<P: AsRef<OsStr>> QueryFragment<'_, P> {
961 fn size(&self) -> usize {
965 match self {
966 QueryFragment::Empty => 0,
967 QueryFragment::Query(query) => query
968 .iter()
969 .map(|(key, value)| key.as_ref().len() + value.as_ref().len() + 2)
970 .sum::<usize>(),
971 QueryFragment::Action(action) => 1 + action.as_ref().len(),
972 }
973 }
974
975 fn write(&self, path: &mut OsString) {
977 match &self {
978 QueryFragment::Query(query) if !query.is_empty() => {
979 path.push("?");
980 for (i, (key, value)) in query.iter().enumerate() {
981 if i != 0 {
982 path.push("&");
983 }
984 write_encoded_fragment(path, key, URLENCODE_QUERY_FRAGMENT);
985 path.push("=");
986 write_encoded_fragment(path, value, URLENCODE_QUERY_FRAGMENT);
987 }
988 }
989 QueryFragment::Action(action) => {
990 path.push("?");
991 path.push(action.as_ref());
992 }
993 _ => {}
994 }
995 }
996}
997
998impl<'a> S3Message<'a> {
999 fn set_header(
1002 &mut self,
1003 header: &Header<impl AsRef<OsStr>, impl AsRef<OsStr>>,
1004 ) -> Result<(), mountpoint_s3_crt::common::error::Error> {
1005 self.inner.set_header(header)
1006 }
1007
1008 fn set_request_path_and_query<P: AsRef<OsStr>>(
1011 &mut self,
1012 path: impl AsRef<OsStr>,
1013 query_or_action: QueryFragment<P>,
1014 ) -> Result<(), mountpoint_s3_crt::common::error::Error> {
1015 let space_needed = self.path_prefix.len() + query_or_action.size();
1016
1017 let mut full_path = OsString::with_capacity(space_needed);
1018
1019 write_encoded_fragment(&mut full_path, &self.path_prefix, URLENCODE_PATH_FRAGMENT);
1020 write_encoded_fragment(&mut full_path, &path, URLENCODE_PATH_FRAGMENT);
1021
1022 query_or_action.write(&mut full_path);
1024 self.inner.set_request_path(full_path)
1025 }
1026
1027 fn set_request_path(&mut self, path: impl AsRef<OsStr>) -> Result<(), mountpoint_s3_crt::common::error::Error> {
1030 self.set_request_path_and_query::<&str>(path, Default::default())
1031 }
1032
1033 fn set_checksum_config(&mut self, checksum_config: Option<ChecksumConfig>) {
1035 self.checksum_config = checksum_config;
1036 }
1037
1038 fn set_body_stream(&mut self, input_stream: Option<InputStream<'a>>) -> Option<InputStream<'a>> {
1041 self.inner.set_body_stream(input_stream)
1042 }
1043
1044 fn set_content_length_header(
1046 &mut self,
1047 content_length: usize,
1048 ) -> Result<(), mountpoint_s3_crt::common::error::Error> {
1049 self.inner
1050 .set_header(&Header::new("Content-Length", content_length.to_string()))
1051 }
1052
1053 fn set_checksum_header(
1055 &mut self,
1056 checksum: &UploadChecksum,
1057 ) -> Result<(), mountpoint_s3_crt::common::error::Error> {
1058 let header = match checksum {
1059 UploadChecksum::Crc64nvme(crc64) => Header::new("x-amz-checksum-crc64nvme", crc64nvme_to_base64(crc64)),
1060 UploadChecksum::Crc32c(crc32c) => Header::new("x-amz-checksum-crc32c", crc32c_to_base64(crc32c)),
1061 UploadChecksum::Crc32(crc32) => Header::new("x-amz-checksum-crc32", crc32_to_base64(crc32)),
1062 UploadChecksum::Sha1(sha1) => Header::new("x-amz-checksum-sha1", sha1_to_base64(sha1)),
1063 UploadChecksum::Sha256(sha256) => Header::new("x-amz-checksum-sha256", sha256_to_base64(sha256)),
1064 };
1065 self.inner.set_header(&header)
1066 }
1067
1068 fn into_options(self, operation: S3Operation) -> MetaRequestOptions<'a> {
1069 let mut options = MetaRequestOptions::new();
1070 if let Some(checksum_config) = self.checksum_config {
1071 options.checksum_config(checksum_config);
1072 }
1073 if let Some(signing_config) = self.signing_config {
1074 options.signing_config(signing_config);
1075 }
1076 options
1077 .message(self.inner)
1078 .endpoint(self.uri)
1079 .request_type(operation.meta_request_type());
1080 if let Some(operation_name) = operation.operation_name() {
1081 options.operation_name(operation_name);
1082 }
1083 options
1084 }
1085}
1086
1087#[derive(Debug)]
1088#[pin_project]
1089#[must_use]
1090struct S3MetaRequest<T, E> {
1091 #[pin]
1093 receiver: Fuse<oneshot::Receiver<ObjectClientResult<T, E, S3RequestError>>>,
1094 meta_request: CancellingMetaRequest,
1095}
1096
1097impl<T: Send, E: Send> Future for S3MetaRequest<T, E> {
1098 type Output = ObjectClientResult<T, E, S3RequestError>;
1099
1100 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1101 let this = self.project();
1102 this.receiver
1103 .poll(cx)
1104 .map(|result| result.unwrap_or_else(|err| Err(S3RequestError::internal_failure(err).into())))
1105 }
1106}
1107
1108impl<T: Send, E: Send> FusedFuture for S3MetaRequest<T, E> {
1109 fn is_terminated(&self) -> bool {
1110 self.receiver.is_terminated()
1111 }
1112}
1113
1114#[derive(Debug)]
1118struct CancellingMetaRequest {
1119 inner: MetaRequest,
1120}
1121
1122impl CancellingMetaRequest {
1123 fn wrap(meta_request: MetaRequest) -> Self {
1124 Self { inner: meta_request }
1125 }
1126}
1127
1128impl Drop for CancellingMetaRequest {
1129 fn drop(&mut self) {
1130 self.inner.cancel();
1131 }
1132}
1133
1134impl Deref for CancellingMetaRequest {
1135 type Target = MetaRequest;
1136
1137 fn deref(&self) -> &Self::Target {
1138 &self.inner
1139 }
1140}
1141
1142impl DerefMut for CancellingMetaRequest {
1143 fn deref_mut(&mut self) -> &mut Self::Target {
1144 &mut self.inner
1145 }
1146}
1147
1148#[derive(Error, Debug)]
1150#[non_exhaustive]
1151pub enum NewClientError {
1152 #[error("invalid S3 endpoint")]
1154 InvalidEndpoint(#[from] EndpointError),
1155 #[error("invalid AWS credentials")]
1157 ProviderFailure(#[source] mountpoint_s3_crt::common::error::Error),
1158 #[error("invalid configuration: {0}")]
1160 InvalidConfiguration(String),
1161 #[error("Unknown CRT error")]
1163 CrtError(#[source] mountpoint_s3_crt::common::error::Error),
1164}
1165
1166#[derive(Error, Debug)]
1168pub enum S3RequestError {
1169 #[error("Internal S3 client error")]
1171 InternalError(#[source] Box<dyn std::error::Error + Send + Sync>),
1172
1173 #[error("Unknown CRT error")]
1175 CrtError(#[from] mountpoint_s3_crt::common::error::Error),
1176
1177 #[error("Failed to construct request")]
1179 ConstructionFailure(#[from] ConstructionError),
1180
1181 #[error("Unknown response error: {0:?}")]
1183 ResponseError(MetaRequestResult),
1184
1185 #[error("Wrong region (expecting {0})")]
1187 IncorrectRegion(String, ClientErrorMetadata),
1188
1189 #[error("Forbidden: {0}")]
1191 Forbidden(String, ClientErrorMetadata),
1192
1193 #[error("No signing credentials available, see CRT debug logs")]
1195 NoSigningCredentials,
1196
1197 #[error("Request canceled")]
1199 RequestCanceled,
1200
1201 #[error("Request throttled")]
1203 Throttled,
1204
1205 #[error("Polled for data with empty read window")]
1209 EmptyReadWindow,
1210}
1211
1212impl S3RequestError {
1213 fn construction_failure(inner: impl Into<ConstructionError>) -> Self {
1214 S3RequestError::ConstructionFailure(inner.into())
1215 }
1216
1217 fn internal_failure(inner: impl std::error::Error + Send + Sync + 'static) -> Self {
1218 S3RequestError::InternalError(Box::new(inner))
1219 }
1220}
1221
1222impl ProvideErrorMetadata for S3RequestError {
1223 fn meta(&self) -> ClientErrorMetadata {
1224 match self {
1225 Self::ResponseError(request_result) => ClientErrorMetadata::from_meta_request_result(request_result),
1226 Self::Forbidden(_, metadata) => metadata.clone(),
1227 Self::Throttled => ClientErrorMetadata {
1228 http_code: Some(503),
1229 error_code: Some("SlowDown".to_string()),
1230 error_message: Some("Please reduce your request rate.".to_string()),
1231 },
1232 Self::IncorrectRegion(_, metadata) => metadata.clone(),
1233 _ => Default::default(),
1234 }
1235 }
1236}
1237
1238#[derive(Error, Debug)]
1239pub enum ConstructionError {
1240 #[error("Unknown CRT error")]
1242 CrtError(#[from] mountpoint_s3_crt::common::error::Error),
1243
1244 #[error("Invalid S3 endpoint")]
1246 InvalidEndpoint(#[from] EndpointError),
1247}
1248
1249fn operation_name_to_static_metrics_string(operation_name: Option<&str>) -> &'static str {
1251 const UNKNOWN_METRIC_STR: &str = "Unknown";
1252
1253 let Some(operation_name) = operation_name else {
1254 return UNKNOWN_METRIC_STR;
1255 };
1256
1257 macro_rules! map_to_static_str_for_known_str {
1260 ($input:expr, $($str_literal:literal),* $(,)?) => {
1261 match $input {
1262 $(
1263 $str_literal => Some($str_literal),
1264 )*
1265 _ => None
1266 }
1267 };
1268 }
1269
1270 let static_str = map_to_static_str_for_known_str!(
1271 operation_name,
1272 "GetObject",
1273 "HeadObject",
1274 "ListParts",
1275 "CreateMultipartUpload",
1276 "UploadPart",
1277 "AbortMultipartUpload",
1278 "CompleteMultipartUpload",
1279 "UploadPartCopy",
1280 "CopyObject",
1281 "PutObject",
1282 "ListObjectsV2",
1283 "DeleteObject",
1284 "GetObjectAttributes",
1285 "HeadBucket",
1286 "RenameObject",
1287 );
1288
1289 debug_assert!(
1290 static_str.is_some(),
1291 "input set as {operation_name:?} but no matcher, update required",
1292 );
1293 static_str.unwrap_or(UNKNOWN_METRIC_STR)
1294}
1295
1296fn extract_range_header(headers: &Headers) -> Option<Range<u64>> {
1298 let header = headers.get("Content-Range").ok()?;
1299 let value = header.value().to_str()?;
1300
1301 if !value.starts_with("bytes ") {
1304 return None;
1305 }
1306 let (_, value) = value.split_at("bytes ".len());
1307 let (range, _) = value.split_once('/')?;
1308 let (start, end) = range.split_once('-')?;
1309 let start = start.parse::<u64>().ok()?;
1310 let end = end.parse::<u64>().ok()?;
1311
1312 Some(start..end + 1)
1314}
1315
1316fn parse_checksum(headers: &Headers) -> Result<Checksum, HeadersError> {
1318 let checksum_crc32 = headers.get_as_optional_string("x-amz-checksum-crc32")?;
1319 let checksum_crc32c = headers.get_as_optional_string("x-amz-checksum-crc32c")?;
1320 let checksum_sha1 = headers.get_as_optional_string("x-amz-checksum-sha1")?;
1321 let checksum_sha256 = headers.get_as_optional_string("x-amz-checksum-sha256")?;
1322 let checksum_crc64nvme = headers.get_as_optional_string("x-amz-checksum-crc64nvme")?;
1323
1324 Ok(Checksum {
1325 checksum_crc64nvme,
1326 checksum_crc32,
1327 checksum_crc32c,
1328 checksum_sha1,
1329 checksum_sha256,
1330 })
1331}
1332
1333fn try_parse_generic_error(request_result: &MetaRequestResult) -> Option<S3RequestError> {
1335 fn try_parse_redirect(request_result: &MetaRequestResult) -> Option<S3RequestError> {
1337 let headers = request_result.error_response_headers.as_ref()?;
1338 let region_header = headers.get("x-amz-bucket-region").ok()?;
1339 let region = region_header.value().to_owned().into_string().ok()?;
1340 Some(S3RequestError::IncorrectRegion(
1341 region,
1342 ClientErrorMetadata::from_meta_request_result(request_result),
1343 ))
1344 }
1345
1346 fn try_parse_forbidden(request_result: &MetaRequestResult) -> Option<S3RequestError> {
1348 let Some(body) = request_result.error_response_body.as_ref() else {
1349 return Some(S3RequestError::Forbidden(
1352 "<no message>".to_owned(),
1353 ClientErrorMetadata {
1354 http_code: Some(request_result.response_status),
1355 ..Default::default()
1356 },
1357 ));
1358 };
1359 let error_elem = xmltree::Element::parse(body.as_bytes()).ok()?;
1360 let error_code = error_elem.get_child("Code")?;
1361 let error_code_str = error_code.get_text()?;
1362 if request_result.response_status == 403
1365 || matches!(
1366 error_code_str.deref(),
1367 "AccessDenied" | "InvalidToken" | "ExpiredToken" | "SignatureDoesNotMatch"
1368 )
1369 {
1370 let message = error_elem
1371 .get_child("Message")
1372 .and_then(|e| e.get_text())
1373 .unwrap_or(error_code_str.clone());
1374 Some(S3RequestError::Forbidden(
1375 message.to_string(),
1376 ClientErrorMetadata {
1377 http_code: Some(request_result.response_status),
1378 error_code: Some(error_code_str.to_string()),
1379 error_message: Some(message.into_owned()),
1380 },
1381 ))
1382 } else {
1383 None
1384 }
1385 }
1386
1387 fn try_parse_no_credentials_or_generic(request_result: &MetaRequestResult) -> S3RequestError {
1389 let crt_error_code = request_result.crt_error.raw_error();
1390 if crt_error_code == mountpoint_s3_crt::auth::ErrorCode::AWS_AUTH_SIGNING_NO_CREDENTIALS as i32 {
1391 S3RequestError::NoSigningCredentials
1392 } else {
1393 S3RequestError::CrtError(crt_error_code.into())
1394 }
1395 }
1396
1397 fn try_parse_throttled(request_result: &MetaRequestResult) -> Option<S3RequestError> {
1398 let crt_error_code = request_result.crt_error.raw_error();
1399 if crt_error_code == mountpoint_s3_crt::s3::ErrorCode::AWS_ERROR_S3_SLOW_DOWN as i32 {
1400 Some(S3RequestError::Throttled)
1401 } else {
1402 None
1403 }
1404 }
1405
1406 fn try_parse_canceled_request(request_result: &MetaRequestResult) -> Option<S3RequestError> {
1408 request_result.is_canceled().then_some(S3RequestError::RequestCanceled)
1409 }
1410
1411 match request_result.response_status {
1412 301 => try_parse_redirect(request_result),
1413 400 => try_parse_forbidden(request_result).or_else(|| try_parse_redirect(request_result)),
1416 403 => try_parse_forbidden(request_result),
1417 0 => try_parse_throttled(request_result)
1419 .or_else(|| try_parse_canceled_request(request_result))
1420 .or_else(|| Some(try_parse_no_credentials_or_generic(request_result))),
1421 _ => None,
1422 }
1423}
1424
1425fn emit_throughput_metric(bytes: u64, duration: Duration, op: &'static str) {
1428 let throughput_mbps = bytes as f64 / 1024.0 / 1024.0 / duration.as_secs_f64();
1429 const MEGABYTE: u64 = 1024 * 1024;
1431 let bucket = if bytes < MEGABYTE {
1432 "<1MiB"
1433 } else if bytes <= 16 * MEGABYTE {
1434 "1-16MiB"
1435 } else {
1436 ">16MiB"
1437 };
1438 metrics::histogram!("s3.meta_requests.throughput_mibs", "op" => op, "size" => bucket).record(throughput_mbps);
1439}
1440
1441#[cfg_attr(not(docsrs), async_trait)]
1442impl ObjectClient for S3CrtClient {
1443 type GetObjectResponse = S3GetObjectResponse;
1444 type PutObjectRequest = S3PutObjectRequest;
1445 type ClientError = S3RequestError;
1446
1447 fn read_part_size(&self) -> usize {
1448 self.inner.read_part_size
1449 }
1450
1451 fn write_part_size(&self) -> usize {
1452 self.inner.write_part_size
1457 }
1458
1459 fn initial_read_window_size(&self) -> Option<usize> {
1460 if self.inner.enable_backpressure {
1461 Some(self.inner.initial_read_window_size)
1462 } else {
1463 None
1464 }
1465 }
1466
1467 fn mem_usage_stats(&self) -> Option<BufferPoolUsageStats> {
1468 let start = Instant::now();
1469 self.inner
1470 .s3_client
1471 .poll_default_buffer_pool_usage_stats()
1472 .inspect(|_| {
1473 metrics::histogram!("s3.client.buffer_pool.get_usage_latency_us")
1474 .record(start.elapsed().as_micros() as f64);
1475 })
1476 }
1477
1478 async fn delete_object(
1479 &self,
1480 bucket: &str,
1481 key: &str,
1482 ) -> ObjectClientResult<DeleteObjectResult, DeleteObjectError, Self::ClientError> {
1483 self.delete_object(bucket, key).await
1484 }
1485
1486 async fn copy_object(
1487 &self,
1488 source_bucket: &str,
1489 source_key: &str,
1490 destination_bucket: &str,
1491 destination_key: &str,
1492 params: &CopyObjectParams,
1493 ) -> ObjectClientResult<CopyObjectResult, CopyObjectError, S3RequestError> {
1494 self.copy_object(source_bucket, source_key, destination_bucket, destination_key, params)
1495 .await
1496 }
1497
1498 async fn get_object(
1499 &self,
1500 bucket: &str,
1501 key: &str,
1502 params: &GetObjectParams,
1503 ) -> ObjectClientResult<Self::GetObjectResponse, GetObjectError, Self::ClientError> {
1504 self.get_object(bucket, key, params).await
1505 }
1506
1507 async fn list_objects(
1508 &self,
1509 bucket: &str,
1510 continuation_token: Option<&str>,
1511 delimiter: &str,
1512 max_keys: usize,
1513 prefix: &str,
1514 ) -> ObjectClientResult<ListObjectsResult, ListObjectsError, Self::ClientError> {
1515 self.list_objects(bucket, continuation_token, delimiter, max_keys, prefix)
1516 .await
1517 }
1518
1519 async fn head_object(
1520 &self,
1521 bucket: &str,
1522 key: &str,
1523 params: &HeadObjectParams,
1524 ) -> ObjectClientResult<HeadObjectResult, HeadObjectError, Self::ClientError> {
1525 self.head_object(bucket, key, params).await
1526 }
1527
1528 async fn put_object(
1529 &self,
1530 bucket: &str,
1531 key: &str,
1532 params: &PutObjectParams,
1533 ) -> ObjectClientResult<Self::PutObjectRequest, PutObjectError, Self::ClientError> {
1534 self.put_object(bucket, key, params).await
1535 }
1536
1537 async fn put_object_single<'a>(
1538 &self,
1539 bucket: &str,
1540 key: &str,
1541 params: &PutObjectSingleParams,
1542 contents: impl AsRef<[u8]> + Send + 'a,
1543 ) -> ObjectClientResult<PutObjectResult, PutObjectError, Self::ClientError> {
1544 self.put_object_single(bucket, key, params, contents).await
1545 }
1546
1547 async fn get_object_attributes(
1548 &self,
1549 bucket: &str,
1550 key: &str,
1551 max_parts: Option<usize>,
1552 part_number_marker: Option<usize>,
1553 object_attributes: &[ObjectAttribute],
1554 ) -> ObjectClientResult<GetObjectAttributesResult, GetObjectAttributesError, Self::ClientError> {
1555 self.get_object_attributes(bucket, key, max_parts, part_number_marker, object_attributes)
1556 .await
1557 }
1558
1559 async fn rename_object(
1560 &self,
1561 bucket: &str,
1562 src_key: &str,
1563 dst_key: &str,
1564 params: &RenameObjectParams,
1565 ) -> ObjectClientResult<RenameObjectResult, RenameObjectError, Self::ClientError> {
1566 self.rename_object(bucket, src_key, dst_key, params).await
1567 }
1568}
1569
1570pub trait OnTelemetry: std::fmt::Debug + Send + Sync {
1572 fn on_telemetry(&self, request_metrics: &RequestMetrics);
1573}
1574
1575#[cfg(test)]
1576mod tests {
1577 use mountpoint_s3_crt::common::error::Error;
1578 use rusty_fork::rusty_fork_test;
1579 use std::assert_eq;
1580
1581 use super::*;
1582 use test_case::test_case;
1583
1584 fn client_new_fails_with_invalid_part_size(part_size: usize) {
1586 let config = S3ClientConfig::default().part_size(part_size);
1587 let e = S3CrtClient::new(config).expect_err("creating a new client should fail");
1588 let message = if cfg!(target_pointer_width = "64") {
1589 "invalid configuration: part size must be at between 5MiB and 5GiB".to_string()
1590 } else {
1591 format!(
1592 "invalid configuration: part size must be at between 5MiB and {}GiB",
1593 usize::MAX / 1024 / 1024 / 1024
1594 )
1595 };
1596 assert_eq!(e.to_string(), message);
1597 }
1598
1599 #[cfg(target_pointer_width = "64")]
1602 #[test]
1603 fn client_new_fails_with_greater_part_size() {
1604 let part_size = 6 * 1024 * 1024 * 1024; client_new_fails_with_invalid_part_size(part_size);
1606 }
1607
1608 #[test]
1609 fn client_new_fails_with_smaller_part_size() {
1610 let part_size = 4 * 1024 * 1024; client_new_fails_with_invalid_part_size(part_size);
1612 }
1613
1614 #[test]
1616 fn test_user_agent_with_prefix() {
1617 let user_agent_prefix = String::from("someprefix");
1618 let expected_user_agent = "someprefix mountpoint-s3-client/";
1619
1620 let config = S3ClientConfig {
1621 user_agent: Some(UserAgent::new(Some(user_agent_prefix))),
1622 ..Default::default()
1623 };
1624
1625 let client = S3CrtClient::new(config).expect("Create test client");
1626
1627 let mut message = client
1628 .inner
1629 .new_request_template("GET", "amzn-s3-demo-bucket")
1630 .expect("new request template expected");
1631
1632 let headers = message.inner.get_headers().expect("Expected a block of HTTP headers");
1633
1634 let user_agent_header = headers
1635 .get("User-Agent")
1636 .expect("User Agent Header expected with given prefix");
1637 let user_agent_header_value = user_agent_header.value();
1638
1639 assert!(
1640 user_agent_header_value
1641 .to_string_lossy()
1642 .starts_with(expected_user_agent)
1643 );
1644 }
1645
1646 fn assert_expected_host(expected_host: &str, endpoint_config: EndpointConfig) {
1647 let config = S3ClientConfig {
1648 endpoint_config,
1649 ..Default::default()
1650 };
1651
1652 let client = S3CrtClient::new(config).expect("create test client");
1653
1654 let mut message = client
1655 .inner
1656 .new_request_template("GET", "")
1657 .expect("new request template expected");
1658
1659 let headers = message.inner.get_headers().expect("expected a block of HTTP headers");
1660
1661 let host_header = headers.get("Host").expect("Host header expected");
1662 let host_header_value = host_header.value();
1663
1664 assert_eq!(host_header_value.to_string_lossy(), expected_host);
1665 }
1666
1667 rusty_fork_test! {
1669 #[test]
1670 fn test_endpoint_favors_parameter_over_env_variable() {
1671 let endpoint_uri = Uri::new_from_str(&Allocator::default(), "https://s3.us-west-2.amazonaws.com").unwrap();
1672 let endpoint_config = EndpointConfig::new("region-place-holder").endpoint(endpoint_uri);
1673
1674 unsafe { std::env::set_var("AWS_ENDPOINT_URL", "https://s3.us-east-1.amazonaws.com"); }
1676
1677 assert_expected_host("s3.us-west-2.amazonaws.com", endpoint_config);
1679 }
1680
1681 #[test]
1682 fn test_endpoint_favors_env_variable() {
1683 let endpoint_config = EndpointConfig::new("us-east-1");
1684
1685 unsafe { std::env::set_var("AWS_ENDPOINT_URL", "https://s3.eu-west-1.amazonaws.com"); }
1687
1688 assert_expected_host("s3.eu-west-1.amazonaws.com", endpoint_config);
1689 }
1690
1691 #[test]
1692 fn test_endpoint_with_invalid_env_variable() {
1693 let endpoint_config = EndpointConfig::new("us-east-1");
1694
1695 unsafe { std::env::set_var("AWS_ENDPOINT_URL", "htp:/bad:url"); }
1697
1698 let config = S3ClientConfig {
1699 endpoint_config,
1700 ..Default::default()
1701 };
1702
1703 let client = S3CrtClient::new(config);
1704 match client {
1705 Ok(_) => panic!("expected an error"),
1706 Err(e) => assert_eq!(e.to_string().to_lowercase(), "invalid s3 endpoint"),
1707 }
1708 }
1709
1710 }
1711
1712 #[test]
1714 fn test_user_agent_without_prefix() {
1715 let expected_user_agent = "mountpoint-s3-client/";
1716
1717 let config: S3ClientConfig = Default::default();
1718
1719 let client = S3CrtClient::new(config).expect("Create test client");
1720
1721 let mut message = client
1722 .inner
1723 .new_request_template("GET", "amzn-s3-demo-bucket")
1724 .expect("new request template expected");
1725
1726 let headers = message.inner.get_headers().expect("Expected a block of HTTP headers");
1727
1728 let user_agent_header = headers
1729 .get("User-Agent")
1730 .expect("User Agent Header expected with given prefix");
1731 let user_agent_header_value = user_agent_header.value();
1732
1733 assert!(
1734 user_agent_header_value
1735 .to_string_lossy()
1736 .starts_with(expected_user_agent)
1737 );
1738 }
1739
1740 #[test_case("bytes 200-1000/67589" => Some(200..1001))]
1741 #[test_case("bytes 200-1000/*" => Some(200..1001))]
1742 #[test_case("bytes 200-1000" => None)]
1743 #[test_case("bytes */67589" => None)]
1744 #[test_case("octets 200-1000]" => None)]
1745 fn parse_content_range(range: &str) -> Option<Range<u64>> {
1746 let mut headers = Headers::new(&Allocator::default()).unwrap();
1747 let header = Header::new("Content-Range", range);
1748 headers.add_header(&header).unwrap();
1749 extract_range_header(&headers)
1750 }
1751
1752 #[test]
1754 fn test_expected_bucket_owner() {
1755 let expected_bucket_owner = "111122223333";
1756
1757 let config: S3ClientConfig = S3ClientConfig::new().bucket_owner("111122223333");
1758
1759 let client = S3CrtClient::new(config).expect("Create test client");
1760
1761 let mut message = client
1762 .inner
1763 .new_request_template("GET", "amzn-s3-demo-bucket")
1764 .expect("new request template expected");
1765
1766 let headers = message.inner.get_headers().expect("Expected a block of HTTP headers");
1767
1768 let expected_bucket_owner_header = headers
1769 .get("x-amz-expected-bucket-owner")
1770 .expect("the headers should contain x-amz-expected-bucket-owner");
1771 let expected_bucket_owner_value = expected_bucket_owner_header.value();
1772
1773 assert!(
1774 expected_bucket_owner_value
1775 .to_string_lossy()
1776 .starts_with(expected_bucket_owner)
1777 );
1778 }
1779
1780 fn make_result(
1781 response_status: i32,
1782 body: impl Into<OsString>,
1783 bucket_region_header: Option<&str>,
1784 ) -> MetaRequestResult {
1785 let error_response_headers = bucket_region_header.map(|h| {
1786 let mut headers = Headers::new(&Allocator::default()).unwrap();
1787 headers.add_header(&Header::new("x-amz-bucket-region", h)).unwrap();
1788 headers
1789 });
1790 MetaRequestResult {
1791 response_status,
1792 crt_error: 1i32.into(),
1793 error_response_headers,
1794 error_response_body: Some(body.into()),
1795 }
1796 }
1797
1798 #[test]
1799 fn parse_301_redirect() {
1800 let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>PermanentRedirect</Code><Message>The bucket you are attempting to access must be addressed using the specified endpoint. Please send all future requests to this endpoint.</Message><Endpoint>amzn-s3-demo-bucket.s3-us-west-2.amazonaws.com</Endpoint><Bucket>amzn-s3-demo-bucket</Bucket><RequestId>CM0Z9YFABRVSWXDJ</RequestId><HostId>HHmbUixasrJ02DlkOSCvJId897Jm0ERHuE2XMkSn2Oax1J/ad2+AU9nFrODN1ay13cWFgIAYBnI=</HostId></Error>"#;
1801 let result = make_result(301, OsStr::from_bytes(&body[..]), Some("us-west-2"));
1802 let result = try_parse_generic_error(&result);
1803 let Some(S3RequestError::IncorrectRegion(region, _)) = result else {
1804 panic!("wrong result, got: {result:?}");
1805 };
1806 assert_eq!(region, "us-west-2");
1807 }
1808
1809 #[test]
1810 fn parse_403_access_denied() {
1811 let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>AccessDenied</Code><Message>Access Denied</Message><RequestId>CM0R497NB0WAQ977</RequestId><HostId>w1TqUKGaIuNAIgzqm/L2azuzgEBINxTngWPbV1iH2IvpLsVCCTKHJTh4HsGp4JnggHqVkA+KN1MGqHDw1+WEuA==</HostId></Error>"#;
1812 let result = make_result(403, OsStr::from_bytes(&body[..]), None);
1813 let result = try_parse_generic_error(&result);
1814 let Some(S3RequestError::Forbidden(message, _)) = result else {
1815 panic!("wrong result, got: {result:?}");
1816 };
1817 assert_eq!(message, "Access Denied");
1818 }
1819
1820 #[test]
1821 fn parse_400_invalid_token() {
1822 let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>InvalidToken</Code><Message>The provided token is malformed or otherwise invalid.</Message><Token-0>THEREALTOKENGOESHERE</Token-0><RequestId>CBFNVADDAZ8661HK</RequestId><HostId>rb5dpgYeIFxi8p5BzVK8s8wG/nQ4a7C5kMBp/KWIT4bvOUihugpssMTy7xS0mispbz6IIaX8W1g=</HostId></Error>"#;
1823 let result = make_result(400, OsStr::from_bytes(&body[..]), None);
1824 let result = try_parse_generic_error(&result);
1825 let Some(S3RequestError::Forbidden(message, _)) = result else {
1826 panic!("wrong result, got: {result:?}");
1827 };
1828 assert_eq!(message, "The provided token is malformed or otherwise invalid.");
1829 }
1830
1831 #[test]
1832 fn parse_400_expired_token() {
1833 let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>ExpiredToken</Code><Message>The provided token has expired.</Message><Token-0>THEREALTOKENGOESHERE</Token-0><RequestId>RFXW0E15XSRPJYSW</RequestId><HostId>djitP7S+g43JSzR4pMOJpOO3RYpQUOUsmD4AqhRe3v24+JB/c+vwOEZgI8A35KDUe1cqQ5yKHwg=</HostId></Error>"#;
1834 let result = make_result(400, OsStr::from_bytes(&body[..]), None);
1835 let result = try_parse_generic_error(&result);
1836 let Some(S3RequestError::Forbidden(message, _)) = result else {
1837 panic!("wrong result, got: {result:?}");
1838 };
1839 assert_eq!(message, "The provided token has expired.");
1840 }
1841
1842 #[test]
1843 fn parse_400_redirect() {
1844 let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>AuthorizationHeaderMalformed</Code><Message>The authorization header is malformed; the region \'us-east-1\' is wrong; expecting \'us-west-2\'</Message><Region>us-west-2</Region><RequestId>VR3NH4JF5F39GB66</RequestId><HostId>ZDzYFC1w0E5K34+ZCAnvh9ZiGaAhvx5COyZVYTUnKvSP/694xCiXmJ2AEGZd5T1Epy9vB4EOOjk=</HostId></Error>"#;
1846 let result = make_result(400, OsStr::from_bytes(&body[..]), Some("us-west-2"));
1847 let result = try_parse_generic_error(&result);
1848 let Some(S3RequestError::IncorrectRegion(region, _)) = result else {
1849 panic!("wrong result, got: {result:?}");
1850 };
1851 assert_eq!(region, "us-west-2");
1852 }
1853
1854 #[test]
1855 fn parse_403_signature_does_not_match() {
1856 let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>SignatureDoesNotMatch</Code><Message>The request signature we calculated does not match the signature you provided. Check your key and signing method.</Message><AWSAccessKeyId>ASIASMEXAMPLE0000000</AWSAccessKeyId><StringToSign>EXAMPLE</StringToSign><SignatureProvided>EXAMPLE</SignatureProvided><StringToSignBytes>EXAMPLE</StringToSignBytes><CanonicalRequest>EXAMPLE</CanonicalRequest><CanonicalRequestBytes>EXAMPLE</CanonicalRequestBytes><RequestId>A1F516XX5M8AATSQ</RequestId><HostId>qs9dULIp5ABM7U+H8nGfzKtMYTxvqxIVvOYZ8lEFBDyTF4Fe+876Y4bLptG4mb+PTZFyG4yaUjg=</HostId></Error>"#;
1857 let result = make_result(403, OsStr::from_bytes(&body[..]), None);
1858 let result = try_parse_generic_error(&result);
1859 let Some(S3RequestError::Forbidden(message, _)) = result else {
1860 panic!("wrong result, got: {result:?}");
1861 };
1862 assert_eq!(
1863 message,
1864 "The request signature we calculated does not match the signature you provided. Check your key and signing method."
1865 );
1866 }
1867
1868 #[test]
1869 fn parse_403_made_up_error() {
1870 let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>NotARealError</Code><Message>This error is made up.</Message><RequestId>CM0R497NB0WAQ977</RequestId><HostId>w1TqUKGaIuNAIgzqm/L2azuzgEBINxTngWPbV1iH2IvpLsVCCTKHJTh4HsGp4JnggHqVkA+KN1MGqHDw1+WEuA==</HostId></Error>"#;
1872 let result = make_result(403, OsStr::from_bytes(&body[..]), None);
1873 let result = try_parse_generic_error(&result);
1874 let Some(S3RequestError::Forbidden(message, _)) = result else {
1875 panic!("wrong result, got: {result:?}");
1876 };
1877 assert_eq!(message, "This error is made up.");
1878 }
1879
1880 fn make_crt_error_result(response_status: i32, crt_error: Error) -> MetaRequestResult {
1881 MetaRequestResult {
1882 response_status,
1883 crt_error,
1884 error_response_headers: None,
1885 error_response_body: None,
1886 }
1887 }
1888
1889 #[test]
1890 fn parse_no_signing_credential_error() {
1891 let error_code = mountpoint_s3_crt::auth::ErrorCode::AWS_AUTH_SIGNING_NO_CREDENTIALS as i32;
1892 let result = make_crt_error_result(0, error_code.into());
1893 let result = try_parse_generic_error(&result);
1894 let Some(S3RequestError::NoSigningCredentials) = result else {
1895 panic!("wrong result, got: {result:?}");
1896 };
1897 }
1898
1899 #[test]
1900 fn parse_test_other_crt_error() {
1901 let error_code = mountpoint_s3_crt::auth::ErrorCode::AWS_AUTH_SIGNING_UNSUPPORTED_ALGORITHM as i32;
1903 let result = make_crt_error_result(0, error_code.into());
1904 let result = try_parse_generic_error(&result);
1905 let Some(S3RequestError::CrtError(error)) = result else {
1906 panic!("wrong result, got: {result:?}");
1907 };
1908 assert_eq!(error, error_code.into());
1909 }
1910
1911 #[test]
1912 fn test_checksum_sha256() {
1913 let mut headers = Headers::new(&Allocator::default()).unwrap();
1914 let value = "QwzjTQIHJO11oZbfwq1nx3dy0Wk=";
1915 let header = Header::new("x-amz-checksum-sha256", value.to_owned());
1916 headers.add_header(&header).unwrap();
1917
1918 let checksum = parse_checksum(&headers).expect("failed to parse headers");
1919 assert_eq!(checksum.checksum_crc32, None, "other checksums shouldn't be set");
1920 assert_eq!(checksum.checksum_crc32c, None, "other checksums shouldn't be set");
1921 assert_eq!(checksum.checksum_sha1, None, "other checksums shouldn't be set");
1922 assert_eq!(
1923 checksum.checksum_sha256,
1924 Some(value.to_owned()),
1925 "sha256 header should match"
1926 );
1927 }
1928
1929 #[test]
1930 fn test_operation_name_to_static_metrics_string() {
1931 assert_eq!(operation_name_to_static_metrics_string(Some("GetObject")), "GetObject");
1932 assert_eq!(
1933 operation_name_to_static_metrics_string(Some("RenameObject")),
1934 "RenameObject",
1935 );
1936 assert_eq!(operation_name_to_static_metrics_string(None), "Unknown");
1937 }
1938}