1use std::cmp;
2use std::ffi::{OsStr, OsString};
3use std::future::Future;
4use std::num::NonZeroUsize;
5use std::ops::Range;
6use std::ops::{Deref, DerefMut};
7use std::os::unix::prelude::OsStrExt;
8use std::pin::Pin;
9use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
10use std::sync::{Arc, Mutex};
11use std::task::{Context, Poll};
12use std::time::{Duration, Instant};
13
14use futures::FutureExt;
15use futures::future::{Fuse, FusedFuture};
16pub use mountpoint_s3_crt::auth::credentials::{CredentialsProvider, CredentialsProviderStaticOptions};
17use mountpoint_s3_crt::auth::credentials::{CredentialsProviderChainDefaultOptions, CredentialsProviderProfileOptions};
18use mountpoint_s3_crt::auth::signing_config::SigningConfig;
19use mountpoint_s3_crt::common::allocator::Allocator;
20pub use mountpoint_s3_crt::common::error::Error as CrtError;
21use mountpoint_s3_crt::common::string::AwsString;
22use mountpoint_s3_crt::common::uri::Uri;
23use mountpoint_s3_crt::http::request_response::{Header, Headers, HeadersError, Message};
24use mountpoint_s3_crt::io::channel_bootstrap::{ClientBootstrap, ClientBootstrapOptions};
25pub use mountpoint_s3_crt::io::event_loop::EventLoopGroup;
26use mountpoint_s3_crt::io::host_resolver::{AddressKinds, HostResolver, HostResolverDefaultOptions};
27use mountpoint_s3_crt::io::retry_strategy::{ExponentialBackoffJitterMode, RetryStrategy, StandardRetryOptions};
28use mountpoint_s3_crt::io::stream::InputStream;
29use mountpoint_s3_crt::s3::buffer::Buffer;
30use mountpoint_s3_crt::s3::client::{
31 BufferPoolUsageStats, ChecksumConfig, Client, ClientConfig, MetaRequest, MetaRequestOptions, MetaRequestResult,
32 MetaRequestType, RequestMetrics, RequestType, init_signing_config,
33};
34
35use async_trait::async_trait;
36use futures::channel::oneshot;
37use mountpoint_s3_crt::s3::pool::{CrtBufferPoolFactory, MemoryPool, MemoryPoolFactory};
38use percent_encoding::{AsciiSet, NON_ALPHANUMERIC, percent_encode};
39use pin_project::pin_project;
40use thiserror::Error;
41use tracing::{Span, debug, error, trace};
42
43use crate::checksums::{crc32_to_base64, crc32c_to_base64, crc64nvme_to_base64, sha1_to_base64, sha256_to_base64};
44use crate::endpoint_config::EndpointError;
45use crate::endpoint_config::{self, EndpointConfig};
46use crate::error_metadata::{ClientErrorMetadata, ProvideErrorMetadata};
47use crate::metrics::{
48 ATTR_HTTP_STATUS, ATTR_S3_REQUEST, S3_REQUEST_CANCELED, S3_REQUEST_COUNT, S3_REQUEST_FAILURE,
49 S3_REQUEST_FIRST_BYTE_LATENCY, S3_REQUEST_TOTAL_LATENCY,
50};
51use crate::object_client::*;
52use crate::user_agent::UserAgent;
53
54macro_rules! request_span {
55 ($self:expr, $method:expr, $($field:tt)*) => {{
56 let counter = $self.next_request_counter();
57 let span = tracing::warn_span!(target: "mountpoint_s3_client::s3_crt_client::request", $method, id = counter, $($field)*);
61 span.in_scope(|| tracing::debug!("new request"));
62 span
63 }};
64 ($self:expr, $method:expr) => { request_span!($self, $method,) };
65}
66
67pub(crate) mod copy_object;
68pub(crate) mod delete_object;
69pub(crate) mod get_object;
70
71pub(crate) use get_object::S3GetObjectResponse;
72pub(crate) mod get_object_attributes;
73
74pub(crate) mod head_object;
75pub(crate) mod list_objects;
76
77pub(crate) mod rename_object;
78
79pub(crate) mod head_bucket;
80pub(crate) mod put_object;
81pub use head_bucket::HeadBucketError;
82pub(crate) use put_object::S3PutObjectRequest;
83
84macro_rules! event {
87 ($level:expr, $($args:tt)*) => {
88 match $level {
89 ::tracing::Level::ERROR => ::tracing::event!(::tracing::Level::ERROR, $($args)*),
90 ::tracing::Level::WARN => ::tracing::event!(::tracing::Level::WARN, $($args)*),
91 ::tracing::Level::INFO => ::tracing::event!(::tracing::Level::INFO, $($args)*),
92 ::tracing::Level::DEBUG => ::tracing::event!(::tracing::Level::DEBUG, $($args)*),
93 ::tracing::Level::TRACE => ::tracing::event!(::tracing::Level::TRACE, $($args)*),
94 }
95 }
96}
97
98#[derive(Debug, Clone)]
100pub struct S3ClientConfig {
101 auth_config: S3ClientAuthConfig,
102 throughput_target_gbps: f64,
103 memory_limit_in_bytes: u64,
104 read_part_size: usize,
105 write_part_size: usize,
106 endpoint_config: EndpointConfig,
107 user_agent: Option<UserAgent>,
108 request_payer: Option<String>,
109 bucket_owner: Option<String>,
110 max_attempts: Option<NonZeroUsize>,
111 read_backpressure: bool,
112 initial_read_window: usize,
113 network_interface_names: Vec<String>,
114 telemetry_callback: Option<Arc<dyn OnTelemetry>>,
115 event_loop_threads: Option<u16>,
116 buffer_pool_factory: Option<CrtBufferPoolFactory>,
117}
118
119impl Default for S3ClientConfig {
120 fn default() -> Self {
121 const DEFAULT_PART_SIZE: usize = 8 * 1024 * 1024;
122 Self {
123 auth_config: Default::default(),
124 throughput_target_gbps: 10.0,
125 memory_limit_in_bytes: 0,
126 read_part_size: DEFAULT_PART_SIZE,
127 write_part_size: DEFAULT_PART_SIZE,
128 endpoint_config: EndpointConfig::new("us-east-1"),
129 user_agent: None,
130 request_payer: None,
131 bucket_owner: None,
132 max_attempts: None,
133 read_backpressure: false,
134 initial_read_window: DEFAULT_PART_SIZE,
135 network_interface_names: vec![],
136 telemetry_callback: None,
137 event_loop_threads: None,
138 buffer_pool_factory: None,
139 }
140 }
141}
142
143impl S3ClientConfig {
144 pub fn new() -> Self {
145 Self::default()
146 }
147
148 #[must_use = "S3ClientConfig follows a builder pattern"]
150 pub fn auth_config(mut self, auth_config: S3ClientAuthConfig) -> Self {
151 self.auth_config = auth_config;
152 self
153 }
154
155 #[must_use = "S3ClientConfig follows a builder pattern"]
157 pub fn part_size(mut self, part_size: usize) -> Self {
158 self.read_part_size = part_size;
159 self.write_part_size = part_size;
160 self
161 }
162
163 #[must_use = "S3ClientConfig follows a builder pattern"]
165 pub fn read_part_size(mut self, size: usize) -> Self {
166 self.read_part_size = size;
167 self
168 }
169
170 #[must_use = "S3ClientConfig follows a builder pattern"]
172 pub fn write_part_size(mut self, size: usize) -> Self {
173 self.write_part_size = size;
174 self
175 }
176
177 #[must_use = "S3ClientConfig follows a builder pattern"]
179 pub fn throughput_target_gbps(mut self, throughput_target_gbps: f64) -> Self {
180 self.throughput_target_gbps = throughput_target_gbps;
181 self
182 }
183
184 #[must_use = "S3ClientConfig follows a builder pattern"]
186 pub fn memory_limit_in_bytes(mut self, memory_limit_in_bytes: u64) -> Self {
187 self.memory_limit_in_bytes = memory_limit_in_bytes;
188 self
189 }
190
191 #[must_use = "S3ClientConfig follows a builder pattern"]
193 pub fn endpoint_config(mut self, endpoint_config: EndpointConfig) -> Self {
194 self.endpoint_config = endpoint_config;
195 self
196 }
197
198 #[must_use = "S3ClientConfig follows a builder pattern"]
200 pub fn user_agent(mut self, user_agent: UserAgent) -> Self {
201 self.user_agent = Some(user_agent);
202 self
203 }
204
205 #[must_use = "S3ClientConfig follows a builder pattern"]
207 pub fn request_payer(mut self, request_payer: &str) -> Self {
208 self.request_payer = Some(request_payer.to_owned());
209 self
210 }
211
212 #[must_use = "S3ClientConfig follows a builder pattern"]
214 pub fn bucket_owner(mut self, bucket_owner: &str) -> Self {
215 self.bucket_owner = Some(bucket_owner.to_owned());
216 self
217 }
218
219 #[must_use = "S3ClientConfig follows a builder pattern"]
222 pub fn max_attempts(mut self, max_attempts: NonZeroUsize) -> Self {
223 self.max_attempts = Some(max_attempts);
224 self
225 }
226
227 #[must_use = "S3ClientConfig follows a builder pattern"]
229 pub fn read_backpressure(mut self, read_backpressure: bool) -> Self {
230 self.read_backpressure = read_backpressure;
231 self
232 }
233
234 #[must_use = "S3ClientConfig follows a builder pattern"]
236 pub fn initial_read_window(mut self, initial_read_window: usize) -> Self {
237 self.initial_read_window = initial_read_window;
238 self
239 }
240
241 #[must_use = "S3ClientConfig follows a builder pattern"]
243 pub fn network_interface_names(mut self, network_interface_names: Vec<String>) -> Self {
244 self.network_interface_names = network_interface_names;
245 self
246 }
247
248 #[must_use = "S3ClientConfig follows a builder pattern"]
250 pub fn telemetry_callback(mut self, telemetry_callback: Arc<dyn OnTelemetry>) -> Self {
251 self.telemetry_callback = Some(telemetry_callback);
252 self
253 }
254
255 #[must_use = "S3ClientConfig follows a builder pattern"]
257 pub fn event_loop_threads(mut self, event_loop_threads: u16) -> Self {
258 self.event_loop_threads = Some(event_loop_threads);
259 self
260 }
261
262 #[must_use = "S3ClientConfig follows a builder pattern"]
264 pub fn memory_pool(mut self, pool: impl MemoryPool) -> Self {
265 self.buffer_pool_factory = Some(CrtBufferPoolFactory::new(move |_| pool.clone()));
266 self
267 }
268
269 #[must_use = "S3ClientConfig follows a builder pattern"]
271 pub fn memory_pool_factory(mut self, pool_factory: impl MemoryPoolFactory) -> Self {
272 self.buffer_pool_factory = Some(CrtBufferPoolFactory::new(pool_factory));
273 self
274 }
275}
276
277#[derive(Debug, Clone, Default)]
279pub enum S3ClientAuthConfig {
280 #[default]
282 Default,
283 NoSigning,
285 Profile(String),
287 Provider(CredentialsProvider),
289}
290
291#[derive(Debug, Clone)]
301pub struct S3CrtClient {
302 inner: Arc<S3CrtClientInner>,
303}
304
305impl S3CrtClient {
306 pub fn new(config: S3ClientConfig) -> Result<Self, NewClientError> {
308 Ok(Self {
309 inner: Arc::new(S3CrtClientInner::new(config)?),
310 })
311 }
312
313 pub fn endpoint_config(&self) -> EndpointConfig {
315 self.inner.endpoint_config.clone()
316 }
317
318 #[doc(hidden)]
319 pub fn event_loop_group(&self) -> EventLoopGroup {
320 self.inner.event_loop_group.clone()
321 }
322}
323
324#[derive(Debug)]
325struct S3CrtClientInner {
326 s3_client: Client,
327 event_loop_group: EventLoopGroup,
328 endpoint_config: EndpointConfig,
329 allocator: Allocator,
330 next_request_counter: AtomicU64,
331 user_agent_header: String,
334 request_payer: Option<String>,
335 read_part_size: usize,
336 write_part_size: usize,
337 enable_backpressure: bool,
338 initial_read_window_size: usize,
339 bucket_owner: Option<String>,
340 credentials_provider: Option<CredentialsProvider>,
341 host_resolver: HostResolver,
342 telemetry_callback: Option<Arc<dyn OnTelemetry>>,
343}
344
345impl S3CrtClientInner {
346 fn new(config: S3ClientConfig) -> Result<Self, NewClientError> {
347 let allocator = Allocator::default();
348
349 let mut event_loop_group = EventLoopGroup::new_default(&allocator, config.event_loop_threads, || {}).unwrap();
350
351 let resolver_options = HostResolverDefaultOptions {
352 max_entries: 8,
353 event_loop_group: &mut event_loop_group,
354 };
355
356 let mut host_resolver = HostResolver::new_default(&allocator, &resolver_options).unwrap();
357
358 let bootstrap_options = ClientBootstrapOptions {
359 event_loop_group: &mut event_loop_group,
360 host_resolver: &mut host_resolver,
361 };
362
363 let mut client_bootstrap = ClientBootstrap::new(&allocator, &bootstrap_options).unwrap();
364
365 let mut client_config = ClientConfig::new();
366
367 let retry_strategy = {
368 let mut retry_strategy_options = StandardRetryOptions::default(&mut event_loop_group);
369 let max_attempts = std::env::var("AWS_MAX_ATTEMPTS")
370 .ok()
371 .and_then(|s| s.parse::<usize>().ok())
372 .or_else(|| config.max_attempts.map(|m| m.get()))
373 .unwrap_or(3);
374 retry_strategy_options.backoff_retry_options.max_retries = max_attempts.saturating_sub(1);
377 retry_strategy_options.backoff_retry_options.backoff_scale_factor = Duration::from_millis(500);
378 retry_strategy_options.backoff_retry_options.jitter_mode = ExponentialBackoffJitterMode::Full;
379 RetryStrategy::standard(&allocator, &retry_strategy_options).unwrap()
380 };
381
382 trace!("constructing client with auth config {:?}", config.auth_config);
383 let credentials_provider = match config.auth_config {
384 S3ClientAuthConfig::Default => {
385 let credentials_chain_default_options = CredentialsProviderChainDefaultOptions {
386 bootstrap: &mut client_bootstrap,
387 };
388 CredentialsProvider::new_chain_default(&allocator, credentials_chain_default_options)
389 .map_err(NewClientError::ProviderFailure)?
390 }
391 S3ClientAuthConfig::NoSigning => {
392 CredentialsProvider::new_anonymous(&allocator).map_err(NewClientError::ProviderFailure)?
393 }
394 S3ClientAuthConfig::Profile(profile_name) => {
395 let credentials_profile_options = CredentialsProviderProfileOptions {
396 bootstrap: &mut client_bootstrap,
397 profile_name_override: &profile_name,
398 };
399 CredentialsProvider::new_profile(&allocator, credentials_profile_options)
400 .map_err(NewClientError::ProviderFailure)?
401 }
402 S3ClientAuthConfig::Provider(provider) => provider,
403 };
404
405 let endpoint_config = config.endpoint_config;
406 client_config.region(endpoint_config.get_region());
407 let signing_config = init_signing_config(
408 endpoint_config.get_region(),
409 credentials_provider.clone(),
410 None,
411 None,
412 None,
413 );
414
415 let endpoint_config = match endpoint_config.get_endpoint() {
416 None => {
417 if let Ok(aws_endpoint_url) = std::env::var("AWS_ENDPOINT_URL") {
419 debug!("using AWS_ENDPOINT_URL {}", aws_endpoint_url);
420 let env_uri = Uri::new_from_str(&allocator, &aws_endpoint_url)
421 .map_err(|e| EndpointError::InvalidUri(endpoint_config::InvalidUriError::CouldNotParse(e)))?;
422 endpoint_config.endpoint(env_uri)
423 } else {
424 endpoint_config
425 }
426 }
427 Some(_) => endpoint_config,
428 };
429
430 client_config.express_support(true);
431 client_config.read_backpressure(config.read_backpressure);
432 client_config.initial_read_window(config.initial_read_window);
433 client_config.signing_config(signing_config);
434
435 client_config
436 .client_bootstrap(client_bootstrap)
437 .retry_strategy(retry_strategy);
438
439 client_config.throughput_target_gbps(config.throughput_target_gbps);
440 client_config.memory_limit_in_bytes(config.memory_limit_in_bytes);
441
442 if let Some(pool_factory) = &config.buffer_pool_factory {
443 client_config.buffer_pool_factory(pool_factory.clone());
444 }
445
446 let max_part_size = cmp::min(5_u64 * 1024 * 1024 * 1024, usize::MAX as u64) as usize;
448 for part_size in [config.read_part_size, config.write_part_size] {
450 if !(5 * 1024 * 1024..=max_part_size).contains(&part_size) {
451 return Err(NewClientError::InvalidConfiguration(format!(
452 "part size must be at between 5MiB and {}GiB",
453 max_part_size / 1024 / 1024 / 1024
454 )));
455 }
456 }
457
458 if !config.network_interface_names.is_empty() {
459 client_config.network_interface_names(config.network_interface_names);
460 }
461
462 let user_agent = config.user_agent.unwrap_or_else(|| UserAgent::new(None));
463 let user_agent_header = user_agent.build();
464
465 let s3_client = Client::new(&allocator, client_config).map_err(NewClientError::CrtError)?;
466
467 Ok(Self {
468 allocator,
469 s3_client,
470 event_loop_group,
471 endpoint_config,
472 next_request_counter: AtomicU64::new(0),
473 user_agent_header,
474 request_payer: config.request_payer,
475 read_part_size: config.read_part_size,
476 write_part_size: config.write_part_size,
477 enable_backpressure: config.read_backpressure,
478 initial_read_window_size: config.initial_read_window,
479 bucket_owner: config.bucket_owner,
480 credentials_provider: Some(credentials_provider),
481 host_resolver,
482 telemetry_callback: config.telemetry_callback,
483 })
484 }
485
486 fn new_request_template(&self, method: &str, bucket: &str) -> Result<S3Message<'_>, ConstructionError> {
491 let endpoint = self.endpoint_config.resolve_for_bucket(bucket)?;
492 let uri = endpoint.uri()?;
493 trace!(?uri, "resolved endpoint");
494
495 let signing_config = if let Some(credentials_provider) = &self.credentials_provider {
496 let auth_scheme = match endpoint.auth_scheme() {
497 Ok(auth_scheme) => auth_scheme,
498 Err(e) => {
499 error!(error=?e, "no auth scheme for endpoint");
500 return Err(e.into());
501 }
502 };
503 trace!(?auth_scheme, "resolved auth scheme");
504 let algorithm = Some(auth_scheme.scheme_name());
505 let service = Some(auth_scheme.signing_name());
506 let use_double_uri_encode = Some(!auth_scheme.disable_double_encoding());
507 Some(init_signing_config(
508 auth_scheme.signing_region(),
509 credentials_provider.clone(),
510 algorithm,
511 service,
512 use_double_uri_encode,
513 ))
514 } else {
515 None
516 };
517
518 let hostname = uri.host_name().to_str().unwrap();
519 let path_prefix = uri.path().to_os_string().into_string().unwrap();
520 let port = uri.host_port();
521 let hostname_header = if port > 0 {
522 format!("{hostname}:{port}")
523 } else {
524 hostname.to_string()
525 };
526
527 let mut message = Message::new_request(&self.allocator)?;
528 message.set_request_method(method)?;
529 message.add_header(&Header::new("Host", hostname_header))?;
530 message.add_header(&Header::new("accept", "application/xml"))?;
531 message.add_header(&Header::new("User-Agent", &self.user_agent_header))?;
532
533 if let Some(ref payer) = self.request_payer {
534 message.add_header(&Header::new("x-amz-request-payer", payer))?;
535 }
536
537 if let Some(ref owner) = self.bucket_owner {
538 message.add_header(&Header::new("x-amz-expected-bucket-owner", owner))?;
539 }
540
541 Ok(S3Message {
542 inner: message,
543 uri,
544 path_prefix,
545 checksum_config: None,
546 signing_config,
547 })
548 }
549
550 #[allow(clippy::too_many_arguments)]
557 fn meta_request_with_callbacks<E: std::error::Error + Send + 'static>(
558 &self,
559 mut options: MetaRequestOptions,
560 request_span: Span,
561 on_request_finish: impl Fn(&RequestMetrics) + Send + 'static,
562 mut on_headers: impl FnMut(&Headers, i32) + Send + 'static,
563 mut on_body: impl FnMut(u64, &Buffer) + Send + 'static,
564 parse_meta_request_error: impl FnOnce(&MetaRequestResult) -> Option<E> + Send + 'static,
565 on_meta_request_result: impl FnOnce(ObjectClientResult<(), E, S3RequestError>) + Send + 'static,
566 ) -> Result<CancellingMetaRequest, S3RequestError> {
567 let span_telemetry = request_span.clone();
568 let span_body = request_span.clone();
569 let span_finish = request_span;
570
571 let endpoint = options.get_endpoint().expect("S3Message always has an endpoint");
572 let hostname = endpoint.host_name().to_str().unwrap().to_owned();
573 let host_resolver = self.host_resolver.clone();
574 let telemetry_callback = self.telemetry_callback.clone();
575
576 let start_time = Instant::now();
577 let first_body_part = Arc::new(AtomicBool::new(true));
578 let first_body_part_clone = Arc::clone(&first_body_part);
579 let total_bytes = Arc::new(AtomicU64::new(0));
580 let total_bytes_clone = Arc::clone(&total_bytes);
581
582 options
583 .on_telemetry(move |metrics| {
584 let _guard = span_telemetry.enter();
585
586 on_request_finish(metrics);
587
588 let http_status = metrics.status_code();
589 let request_canceled = metrics.is_canceled();
590 let request_failure = http_status.map(|status| !(200..299).contains(&status)).unwrap_or(!request_canceled);
591 let crt_error = Some(metrics.error()).filter(|e| e.is_err());
592 let request_type = request_type_to_metrics_string(metrics.request_type());
593 let request_id = metrics.request_id().unwrap_or_else(|| "<unknown>".into());
594 let duration = metrics.total_duration();
595 let ttfb = metrics.time_to_first_byte();
596 let range = metrics.response_headers().and_then(|headers| extract_range_header(&headers));
597
598 let message = if request_failure {
599 "S3 request failed"
600 } else if request_canceled {
601 "S3 request canceled"
602 } else {
603 "S3 request finished"
604 };
605 debug!(%request_type, ?crt_error, http_status, ?range, ?duration, ?ttfb, %request_id, "{}", message);
606 trace!(detailed_metrics=?metrics, "S3 request completed");
607
608 if let Some(ttfb) = ttfb {
609 metrics::histogram!(S3_REQUEST_FIRST_BYTE_LATENCY, ATTR_S3_REQUEST => request_type).record(ttfb.as_micros() as f64);
610 }
611 metrics::histogram!(S3_REQUEST_TOTAL_LATENCY, ATTR_S3_REQUEST => request_type).record(duration.as_micros() as f64);
612 metrics::counter!(S3_REQUEST_COUNT, ATTR_S3_REQUEST => request_type).increment(1);
613 if request_failure {
614 metrics::counter!(S3_REQUEST_FAILURE, ATTR_S3_REQUEST => request_type, ATTR_HTTP_STATUS => http_status.unwrap_or(-1).to_string()).increment(1);
615 } else if request_canceled {
616 metrics::counter!(S3_REQUEST_CANCELED, ATTR_S3_REQUEST => request_type).increment(1);
617 }
618
619 if let Some(telemetry_callback) = &telemetry_callback {
620 telemetry_callback.on_telemetry(metrics);
621 }
622 })
623 .on_headers(move |headers, response_status| {
624 (on_headers)(headers, response_status);
625 })
626 .on_body(move |range_start, data| {
627 let _guard = span_body.enter();
628
629 if first_body_part.compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst).ok() == Some(true) {
630 let latency = start_time.elapsed().as_micros() as f64;
631 let op = span_body.metadata().map(|m| m.name()).unwrap_or("unknown");
632 metrics::histogram!("s3.meta_requests.first_byte_latency_us", "op" => op).record(latency);
633 }
634 total_bytes.fetch_add(data.len() as u64, Ordering::SeqCst);
635
636 trace!(start = range_start, length = data.len(), "body part received");
637
638 (on_body)(range_start, data);
639 })
640 .on_finish(move |request_result| {
641 let _guard = span_finish.enter();
642
643 let op = span_finish.metadata().map(|m| m.name()).unwrap_or("unknown");
644 let duration = start_time.elapsed();
645
646 metrics::counter!("s3.meta_requests", "op" => op).increment(1);
647 metrics::histogram!("s3.meta_requests.total_latency_us", "op" => op).record(duration.as_micros() as f64);
648 if first_body_part_clone.compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst).ok() == Some(true) {
650 let latency = duration.as_micros() as f64;
651 metrics::histogram!("s3.meta_requests.first_byte_latency_us", "op" => op).record(latency);
652 }
653 let total_bytes = total_bytes_clone.load(Ordering::SeqCst);
654 if op == "get_object" {
657 emit_throughput_metric(total_bytes, duration, op);
658 }
659 let hostname_awsstring = AwsString::from_str(&hostname, &Allocator::default());
660 if let Ok(host_count) = host_resolver.get_host_address_count(&hostname_awsstring, AddressKinds::a()) {
661 metrics::gauge!("s3.client.host_count", "host" => hostname).set(host_count as f64);
662 }
663
664 let status_code = request_result.response_status;
665 let log_level = if (200..=399).contains(&status_code) || status_code == 404 || request_result.is_canceled() {
666 tracing::Level::DEBUG
667 } else {
668 tracing::Level::WARN
669 };
670
671 let result =
672 if !request_result.is_err() {
673 event!(log_level, ?duration, "meta request finished");
674 Ok(())
675 } else {
676 let error = parse_meta_request_error(&request_result).map(ObjectClientError::ServiceError);
679 let maybe_err = error.or_else(|| try_parse_generic_error(&request_result).map(ObjectClientError::ClientError));
680
681 let request_id = match &request_result.error_response_headers {
685 Some(headers) => headers.get("x-amz-request-id").map(|s| s.value().to_string_lossy().to_string()).ok(),
686 None => None,
687 };
688 let request_id = request_id.unwrap_or_else(|| "<unknown>".into());
689
690 let message = if request_result.is_canceled() {
691 "meta request canceled"
692 } else {
693 "meta request failed"
694 };
695 if let Some(error) = &maybe_err {
696 event!(log_level, ?duration, %request_id, ?error, message);
697 debug!("meta request result: {:?}", request_result);
698 } else {
699 event!(log_level, ?duration, %request_id, ?request_result, message);
700 }
701
702 if request_result.is_canceled() {
703 metrics::counter!("s3.meta_requests.canceled", "op" => op).increment(1);
704 } else {
705 let error_status = if request_result.response_status >= 100 {
707 request_result.response_status
708 } else {
709 -request_result.crt_error.raw_error()
710 };
711 metrics::counter!("s3.meta_requests.failures", "op" => op, "status" => format!("{error_status}")).increment(1);
712 }
713
714 Err(maybe_err.unwrap_or_else(|| ObjectClientError::ClientError(S3RequestError::ResponseError(request_result))))
716 };
717
718 on_meta_request_result(result);
719 });
720
721 let meta_request = self.s3_client.make_meta_request(options)?;
723 Self::poll_client_metrics(&self.s3_client);
724 Ok(CancellingMetaRequest::wrap(meta_request))
725 }
726
727 fn meta_request_with_body_payload<E: std::error::Error + Send + 'static>(
734 &self,
735 options: MetaRequestOptions,
736 request_span: Span,
737 parse_meta_request_error: impl FnOnce(&MetaRequestResult) -> Option<E> + Send + 'static,
738 ) -> Result<S3MetaRequest<Vec<u8>, E>, S3RequestError> {
739 let (tx, rx) = oneshot::channel::<ObjectClientResult<Vec<u8>, E, S3RequestError>>();
740
741 let body: Arc<Mutex<Vec<u8>>> = Default::default();
743 let body_clone = Arc::clone(&body);
744
745 let meta_request = self.meta_request_with_callbacks(
746 options,
747 request_span,
748 |_| {},
749 |_, _| {},
750 move |offset, data| {
751 let mut body = body_clone.lock().unwrap();
752 assert_eq!(offset as usize, body.len());
753 body.extend_from_slice(data);
754 },
755 parse_meta_request_error,
756 move |result| _ = tx.send(result.map(|_| std::mem::take(&mut *body.lock().unwrap()))),
757 )?;
758 Ok(S3MetaRequest {
759 receiver: rx.fuse(),
760 meta_request,
761 })
762 }
763
764 fn meta_request_with_headers_payload<E: std::error::Error + Send + 'static>(
771 &self,
772 options: MetaRequestOptions,
773 request_span: Span,
774 parse_meta_request_error: impl FnOnce(&MetaRequestResult) -> Option<E> + Send + 'static,
775 ) -> Result<S3MetaRequest<Headers, E>, S3RequestError> {
776 let (tx, rx) = oneshot::channel::<ObjectClientResult<Headers, E, S3RequestError>>();
777
778 let on_headers: Arc<Mutex<Option<Headers>>> = Default::default();
781 let on_result = on_headers.clone();
782
783 let meta_request = self.meta_request_with_callbacks(
784 options,
785 request_span,
786 |_| {},
787 move |headers, status| {
788 if (200..300).contains(&status) {
791 *on_headers.lock().unwrap() = Some(headers.clone());
792 }
793 },
794 |_, _| {},
795 parse_meta_request_error,
796 move |result| {
797 let headers =
799 result.and_then(|_| {
800 on_result.lock().unwrap().take().ok_or_else(|| {
801 S3RequestError::internal_failure(ResponseHeadersError::MissingHeaders).into()
802 })
803 });
804 _ = tx.send(headers);
805 },
806 )?;
807 Ok(S3MetaRequest {
808 receiver: rx.fuse(),
809 meta_request,
810 })
811 }
812
813 fn meta_request_without_payload<E: std::error::Error + Send + 'static>(
819 &self,
820 options: MetaRequestOptions,
821 request_span: Span,
822 parse_meta_request_error: impl FnOnce(&MetaRequestResult) -> Option<E> + Send + 'static,
823 ) -> Result<S3MetaRequest<(), E>, S3RequestError> {
824 let (tx, rx) = oneshot::channel::<ObjectClientResult<(), E, S3RequestError>>();
825
826 let meta_request = self.meta_request_with_callbacks(
827 options,
828 request_span,
829 |_| {},
830 |_, _| {},
831 |_, _| {},
832 parse_meta_request_error,
833 move |result| _ = tx.send(result),
834 )?;
835 Ok(S3MetaRequest {
836 receiver: rx.fuse(),
837 meta_request,
838 })
839 }
840
841 fn poll_client_metrics(s3_client: &Client) {
842 let metrics = s3_client.poll_client_metrics();
843 metrics::gauge!("s3.client.num_requests_being_processed").set(metrics.num_requests_tracked_requests as f64);
844 metrics::gauge!("s3.client.num_requests_being_prepared").set(metrics.num_requests_being_prepared as f64);
845 metrics::gauge!("s3.client.request_queue_size").set(metrics.request_queue_size as f64);
846 metrics::gauge!("s3.client.num_auto_default_network_io").set(metrics.num_auto_default_network_io as f64);
847 metrics::gauge!("s3.client.num_auto_ranged_get_network_io").set(metrics.num_auto_ranged_get_network_io as f64);
848 metrics::gauge!("s3.client.num_auto_ranged_put_network_io").set(metrics.num_auto_ranged_put_network_io as f64);
849 metrics::gauge!("s3.client.num_auto_ranged_copy_network_io")
850 .set(metrics.num_auto_ranged_copy_network_io as f64);
851 metrics::gauge!("s3.client.num_total_network_io").set(metrics.num_total_network_io() as f64);
852 metrics::gauge!("s3.client.num_requests_stream_queued_waiting")
853 .set(metrics.num_requests_stream_queued_waiting as f64);
854 metrics::gauge!("s3.client.num_requests_streaming_response")
855 .set(metrics.num_requests_streaming_response as f64);
856
857 let start = Instant::now();
859 if let Some(buffer_pool_stats) = s3_client.poll_default_buffer_pool_usage_stats() {
860 metrics::histogram!("s3.client.buffer_pool.get_usage_latency_us")
861 .record(start.elapsed().as_micros() as f64);
862 metrics::gauge!("s3.client.buffer_pool.mem_limit").set(buffer_pool_stats.mem_limit as f64);
863 metrics::gauge!("s3.client.buffer_pool.primary_cutoff").set(buffer_pool_stats.primary_cutoff as f64);
864 metrics::gauge!("s3.client.buffer_pool.primary_used").set(buffer_pool_stats.primary_used as f64);
865 metrics::gauge!("s3.client.buffer_pool.primary_allocated").set(buffer_pool_stats.primary_allocated as f64);
866 metrics::gauge!("s3.client.buffer_pool.primary_reserved").set(buffer_pool_stats.primary_reserved as f64);
867 metrics::gauge!("s3.client.buffer_pool.primary_num_blocks")
868 .set(buffer_pool_stats.primary_num_blocks as f64);
869 metrics::gauge!("s3.client.buffer_pool.secondary_reserved")
870 .set(buffer_pool_stats.secondary_reserved as f64);
871 metrics::gauge!("s3.client.buffer_pool.secondary_used").set(buffer_pool_stats.secondary_used as f64);
872 metrics::gauge!("s3.client.buffer_pool.forced_used").set(buffer_pool_stats.forced_used as f64);
873 }
874 }
875
876 fn next_request_counter(&self) -> u64 {
877 self.next_request_counter.fetch_add(1, Ordering::SeqCst)
878 }
879}
880
881#[derive(Debug, Error)]
883enum ResponseHeadersError {
884 #[error("response headers are missing")]
885 MissingHeaders,
886}
887
888#[derive(Debug, Clone, Copy)]
890enum S3Operation {
891 DeleteObject,
892 GetObject,
893 GetObjectAttributes,
894 HeadBucket,
895 HeadObject,
896 ListObjects,
897 PutObject,
898 CopyObject,
899 PutObjectSingle,
900}
901
902impl S3Operation {
903 fn meta_request_type(&self) -> MetaRequestType {
905 match self {
906 S3Operation::GetObject => MetaRequestType::GetObject,
907 S3Operation::PutObject => MetaRequestType::PutObject,
908 S3Operation::CopyObject => MetaRequestType::CopyObject,
909 _ => MetaRequestType::Default,
910 }
911 }
912
913 fn operation_name(&self) -> Option<&'static str> {
916 match self {
917 S3Operation::DeleteObject => Some("DeleteObject"),
918 S3Operation::GetObject => None,
919 S3Operation::GetObjectAttributes => Some("GetObjectAttributes"),
920 S3Operation::HeadBucket => Some("HeadBucket"),
921 S3Operation::HeadObject => Some("HeadObject"),
922 S3Operation::ListObjects => Some("ListObjectsV2"),
923 S3Operation::PutObject => None,
924 S3Operation::CopyObject => None,
925 S3Operation::PutObjectSingle => Some("PutObject"),
926 }
927 }
928}
929
930#[derive(Debug)]
935struct S3Message<'a> {
936 inner: Message<'a>,
937 uri: Uri,
938 path_prefix: String,
939 checksum_config: Option<ChecksumConfig>,
940 signing_config: Option<SigningConfig>,
941}
942
943const URLENCODE_QUERY_FRAGMENT: &AsciiSet = &NON_ALPHANUMERIC.remove(b'-').remove(b'.').remove(b'_').remove(b'~');
945const URLENCODE_PATH_FRAGMENT: &AsciiSet = &URLENCODE_QUERY_FRAGMENT.remove(b'/');
946
947fn write_encoded_fragment(s: &mut OsString, piece: impl AsRef<OsStr>, encoding: &'static AsciiSet) {
948 let iter = percent_encode(piece.as_ref().as_bytes(), encoding);
949 s.extend(iter.map(|s| OsStr::from_bytes(s.as_bytes())));
950}
951
952#[derive(Debug, Default)]
953enum QueryFragment<'a, P: AsRef<OsStr>> {
954 #[default]
955 Empty,
956 Query(&'a [(P, P)]),
957 Action(P),
958}
959
960impl<P: AsRef<OsStr>> QueryFragment<'_, P> {
961 fn size(&self) -> usize {
965 match self {
966 QueryFragment::Empty => 0,
967 QueryFragment::Query(query) => query
968 .iter()
969 .map(|(key, value)| key.as_ref().len() + value.as_ref().len() + 2)
970 .sum::<usize>(),
971 QueryFragment::Action(action) => 1 + action.as_ref().len(),
972 }
973 }
974
975 fn write(&self, path: &mut OsString) {
977 match &self {
978 QueryFragment::Query(query) if !query.is_empty() => {
979 path.push("?");
980 for (i, (key, value)) in query.iter().enumerate() {
981 if i != 0 {
982 path.push("&");
983 }
984 write_encoded_fragment(path, key, URLENCODE_QUERY_FRAGMENT);
985 path.push("=");
986 write_encoded_fragment(path, value, URLENCODE_QUERY_FRAGMENT);
987 }
988 }
989 QueryFragment::Action(action) => {
990 path.push("?");
991 path.push(action.as_ref());
992 }
993 _ => {}
994 }
995 }
996}
997
998impl<'a> S3Message<'a> {
999 fn set_header(
1002 &mut self,
1003 header: &Header<impl AsRef<OsStr>, impl AsRef<OsStr>>,
1004 ) -> Result<(), mountpoint_s3_crt::common::error::Error> {
1005 self.inner.set_header(header)
1006 }
1007
1008 fn set_request_path_and_query<P: AsRef<OsStr>>(
1011 &mut self,
1012 path: impl AsRef<OsStr>,
1013 query_or_action: QueryFragment<P>,
1014 ) -> Result<(), mountpoint_s3_crt::common::error::Error> {
1015 let space_needed = self.path_prefix.len() + query_or_action.size();
1016
1017 let mut full_path = OsString::with_capacity(space_needed);
1018
1019 write_encoded_fragment(&mut full_path, &self.path_prefix, URLENCODE_PATH_FRAGMENT);
1020 write_encoded_fragment(&mut full_path, &path, URLENCODE_PATH_FRAGMENT);
1021
1022 query_or_action.write(&mut full_path);
1024 self.inner.set_request_path(full_path)
1025 }
1026
1027 fn set_request_path(&mut self, path: impl AsRef<OsStr>) -> Result<(), mountpoint_s3_crt::common::error::Error> {
1030 self.set_request_path_and_query::<&str>(path, Default::default())
1031 }
1032
1033 fn set_checksum_config(&mut self, checksum_config: Option<ChecksumConfig>) {
1035 self.checksum_config = checksum_config;
1036 }
1037
1038 fn set_body_stream(&mut self, input_stream: Option<InputStream<'a>>) -> Option<InputStream<'a>> {
1041 self.inner.set_body_stream(input_stream)
1042 }
1043
1044 fn set_content_length_header(
1046 &mut self,
1047 content_length: usize,
1048 ) -> Result<(), mountpoint_s3_crt::common::error::Error> {
1049 self.inner
1050 .set_header(&Header::new("Content-Length", content_length.to_string()))
1051 }
1052
1053 fn set_checksum_header(
1055 &mut self,
1056 checksum: &UploadChecksum,
1057 ) -> Result<(), mountpoint_s3_crt::common::error::Error> {
1058 let header = match checksum {
1059 UploadChecksum::Crc64nvme(crc64) => Header::new("x-amz-checksum-crc64nvme", crc64nvme_to_base64(crc64)),
1060 UploadChecksum::Crc32c(crc32c) => Header::new("x-amz-checksum-crc32c", crc32c_to_base64(crc32c)),
1061 UploadChecksum::Crc32(crc32) => Header::new("x-amz-checksum-crc32", crc32_to_base64(crc32)),
1062 UploadChecksum::Sha1(sha1) => Header::new("x-amz-checksum-sha1", sha1_to_base64(sha1)),
1063 UploadChecksum::Sha256(sha256) => Header::new("x-amz-checksum-sha256", sha256_to_base64(sha256)),
1064 };
1065 self.inner.set_header(&header)
1066 }
1067
1068 fn into_options(self, operation: S3Operation) -> MetaRequestOptions<'a> {
1069 let mut options = MetaRequestOptions::new();
1070 if let Some(checksum_config) = self.checksum_config {
1071 options.checksum_config(checksum_config);
1072 }
1073 if let Some(signing_config) = self.signing_config {
1074 options.signing_config(signing_config);
1075 }
1076 options
1077 .message(self.inner)
1078 .endpoint(self.uri)
1079 .request_type(operation.meta_request_type());
1080 if let Some(operation_name) = operation.operation_name() {
1081 options.operation_name(operation_name);
1082 }
1083 options
1084 }
1085}
1086
1087#[derive(Debug)]
1088#[pin_project]
1089#[must_use]
1090struct S3MetaRequest<T, E> {
1091 #[pin]
1093 receiver: Fuse<oneshot::Receiver<ObjectClientResult<T, E, S3RequestError>>>,
1094 meta_request: CancellingMetaRequest,
1095}
1096
1097impl<T: Send, E: Send> Future for S3MetaRequest<T, E> {
1098 type Output = ObjectClientResult<T, E, S3RequestError>;
1099
1100 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1101 let this = self.project();
1102 this.receiver
1103 .poll(cx)
1104 .map(|result| result.unwrap_or_else(|err| Err(S3RequestError::internal_failure(err).into())))
1105 }
1106}
1107
1108impl<T: Send, E: Send> FusedFuture for S3MetaRequest<T, E> {
1109 fn is_terminated(&self) -> bool {
1110 self.receiver.is_terminated()
1111 }
1112}
1113
1114#[derive(Debug)]
1118struct CancellingMetaRequest {
1119 inner: MetaRequest,
1120}
1121
1122impl CancellingMetaRequest {
1123 fn wrap(meta_request: MetaRequest) -> Self {
1124 Self { inner: meta_request }
1125 }
1126}
1127
1128impl Drop for CancellingMetaRequest {
1129 fn drop(&mut self) {
1130 self.inner.cancel();
1131 }
1132}
1133
1134impl Deref for CancellingMetaRequest {
1135 type Target = MetaRequest;
1136
1137 fn deref(&self) -> &Self::Target {
1138 &self.inner
1139 }
1140}
1141
1142impl DerefMut for CancellingMetaRequest {
1143 fn deref_mut(&mut self) -> &mut Self::Target {
1144 &mut self.inner
1145 }
1146}
1147
1148#[derive(Error, Debug)]
1150#[non_exhaustive]
1151pub enum NewClientError {
1152 #[error("invalid S3 endpoint")]
1154 InvalidEndpoint(#[from] EndpointError),
1155 #[error("invalid AWS credentials")]
1157 ProviderFailure(#[source] mountpoint_s3_crt::common::error::Error),
1158 #[error("invalid configuration: {0}")]
1160 InvalidConfiguration(String),
1161 #[error("Unknown CRT error")]
1163 CrtError(#[source] mountpoint_s3_crt::common::error::Error),
1164}
1165
1166#[derive(Error, Debug)]
1168pub enum S3RequestError {
1169 #[error("Internal S3 client error")]
1171 InternalError(#[source] Box<dyn std::error::Error + Send + Sync>),
1172
1173 #[error("Unknown CRT error")]
1175 CrtError(#[from] mountpoint_s3_crt::common::error::Error),
1176
1177 #[error("Failed to construct request")]
1179 ConstructionFailure(#[from] ConstructionError),
1180
1181 #[error("Unknown response error: {0:?}")]
1183 ResponseError(MetaRequestResult),
1184
1185 #[error("Wrong region (expecting {0})")]
1187 IncorrectRegion(String, ClientErrorMetadata),
1188
1189 #[error("Forbidden: {0}")]
1191 Forbidden(String, ClientErrorMetadata),
1192
1193 #[error("No signing credentials available, see CRT debug logs")]
1195 NoSigningCredentials,
1196
1197 #[error("Request canceled")]
1199 RequestCanceled,
1200
1201 #[error("Request throttled")]
1203 Throttled,
1204
1205 #[error("Polled for data with empty read window")]
1209 EmptyReadWindow,
1210}
1211
1212impl S3RequestError {
1213 fn construction_failure(inner: impl Into<ConstructionError>) -> Self {
1214 S3RequestError::ConstructionFailure(inner.into())
1215 }
1216
1217 fn internal_failure(inner: impl std::error::Error + Send + Sync + 'static) -> Self {
1218 S3RequestError::InternalError(Box::new(inner))
1219 }
1220}
1221
1222impl ProvideErrorMetadata for S3RequestError {
1223 fn meta(&self) -> ClientErrorMetadata {
1224 match self {
1225 Self::ResponseError(request_result) => ClientErrorMetadata::from_meta_request_result(request_result),
1226 Self::Forbidden(_, metadata) => metadata.clone(),
1227 Self::Throttled => ClientErrorMetadata {
1228 http_code: Some(503),
1229 error_code: Some("SlowDown".to_string()),
1230 error_message: Some("Please reduce your request rate.".to_string()),
1231 },
1232 Self::IncorrectRegion(_, metadata) => metadata.clone(),
1233 _ => Default::default(),
1234 }
1235 }
1236}
1237
1238#[derive(Error, Debug)]
1239pub enum ConstructionError {
1240 #[error("Unknown CRT error")]
1242 CrtError(#[from] mountpoint_s3_crt::common::error::Error),
1243
1244 #[error("Invalid S3 endpoint")]
1246 InvalidEndpoint(#[from] EndpointError),
1247}
1248
1249fn request_type_to_metrics_string(request_type: RequestType) -> &'static str {
1254 match request_type {
1255 RequestType::Unknown => "Default",
1256 RequestType::HeadObject => "HeadObject",
1257 RequestType::GetObject => "GetObject",
1258 RequestType::ListParts => "ListParts",
1259 RequestType::CreateMultipartUpload => "CreateMultipartUpload",
1260 RequestType::UploadPart => "UploadPart",
1261 RequestType::AbortMultipartUpload => "AbortMultipartUpload",
1262 RequestType::CompleteMultipartUpload => "CompleteMultipartUpload",
1263 RequestType::UploadPartCopy => "UploadPartCopy",
1264 RequestType::CopyObject => "CopyObject",
1265 RequestType::PutObject => "PutObject",
1266 }
1267}
1268
1269fn extract_range_header(headers: &Headers) -> Option<Range<u64>> {
1271 let header = headers.get("Content-Range").ok()?;
1272 let value = header.value().to_str()?;
1273
1274 if !value.starts_with("bytes ") {
1277 return None;
1278 }
1279 let (_, value) = value.split_at("bytes ".len());
1280 let (range, _) = value.split_once('/')?;
1281 let (start, end) = range.split_once('-')?;
1282 let start = start.parse::<u64>().ok()?;
1283 let end = end.parse::<u64>().ok()?;
1284
1285 Some(start..end + 1)
1287}
1288
1289fn parse_checksum(headers: &Headers) -> Result<Checksum, HeadersError> {
1291 let checksum_crc32 = headers.get_as_optional_string("x-amz-checksum-crc32")?;
1292 let checksum_crc32c = headers.get_as_optional_string("x-amz-checksum-crc32c")?;
1293 let checksum_sha1 = headers.get_as_optional_string("x-amz-checksum-sha1")?;
1294 let checksum_sha256 = headers.get_as_optional_string("x-amz-checksum-sha256")?;
1295 let checksum_crc64nvme = headers.get_as_optional_string("x-amz-checksum-crc64nvme")?;
1296
1297 Ok(Checksum {
1298 checksum_crc64nvme,
1299 checksum_crc32,
1300 checksum_crc32c,
1301 checksum_sha1,
1302 checksum_sha256,
1303 })
1304}
1305
1306fn try_parse_generic_error(request_result: &MetaRequestResult) -> Option<S3RequestError> {
1308 fn try_parse_redirect(request_result: &MetaRequestResult) -> Option<S3RequestError> {
1310 let headers = request_result.error_response_headers.as_ref()?;
1311 let region_header = headers.get("x-amz-bucket-region").ok()?;
1312 let region = region_header.value().to_owned().into_string().ok()?;
1313 Some(S3RequestError::IncorrectRegion(
1314 region,
1315 ClientErrorMetadata::from_meta_request_result(request_result),
1316 ))
1317 }
1318
1319 fn try_parse_forbidden(request_result: &MetaRequestResult) -> Option<S3RequestError> {
1321 let Some(body) = request_result.error_response_body.as_ref() else {
1322 return Some(S3RequestError::Forbidden(
1325 "<no message>".to_owned(),
1326 ClientErrorMetadata {
1327 http_code: Some(request_result.response_status),
1328 ..Default::default()
1329 },
1330 ));
1331 };
1332 let error_elem = xmltree::Element::parse(body.as_bytes()).ok()?;
1333 let error_code = error_elem.get_child("Code")?;
1334 let error_code_str = error_code.get_text()?;
1335 if request_result.response_status == 403
1338 || matches!(
1339 error_code_str.deref(),
1340 "AccessDenied" | "InvalidToken" | "ExpiredToken" | "SignatureDoesNotMatch"
1341 )
1342 {
1343 let message = error_elem
1344 .get_child("Message")
1345 .and_then(|e| e.get_text())
1346 .unwrap_or(error_code_str.clone());
1347 Some(S3RequestError::Forbidden(
1348 message.to_string(),
1349 ClientErrorMetadata {
1350 http_code: Some(request_result.response_status),
1351 error_code: Some(error_code_str.to_string()),
1352 error_message: Some(message.into_owned()),
1353 },
1354 ))
1355 } else {
1356 None
1357 }
1358 }
1359
1360 fn try_parse_no_credentials_or_generic(request_result: &MetaRequestResult) -> S3RequestError {
1362 let crt_error_code = request_result.crt_error.raw_error();
1363 if crt_error_code == mountpoint_s3_crt::auth::ErrorCode::AWS_AUTH_SIGNING_NO_CREDENTIALS as i32 {
1364 S3RequestError::NoSigningCredentials
1365 } else {
1366 S3RequestError::CrtError(crt_error_code.into())
1367 }
1368 }
1369
1370 fn try_parse_throttled(request_result: &MetaRequestResult) -> Option<S3RequestError> {
1371 let crt_error_code = request_result.crt_error.raw_error();
1372 if crt_error_code == mountpoint_s3_crt::s3::ErrorCode::AWS_ERROR_S3_SLOW_DOWN as i32 {
1373 Some(S3RequestError::Throttled)
1374 } else {
1375 None
1376 }
1377 }
1378
1379 fn try_parse_canceled_request(request_result: &MetaRequestResult) -> Option<S3RequestError> {
1381 request_result.is_canceled().then_some(S3RequestError::RequestCanceled)
1382 }
1383
1384 match request_result.response_status {
1385 301 => try_parse_redirect(request_result),
1386 400 => try_parse_forbidden(request_result).or_else(|| try_parse_redirect(request_result)),
1389 403 => try_parse_forbidden(request_result),
1390 0 => try_parse_throttled(request_result)
1392 .or_else(|| try_parse_canceled_request(request_result))
1393 .or_else(|| Some(try_parse_no_credentials_or_generic(request_result))),
1394 _ => None,
1395 }
1396}
1397
1398fn emit_throughput_metric(bytes: u64, duration: Duration, op: &'static str) {
1401 let throughput_mbps = bytes as f64 / 1024.0 / 1024.0 / duration.as_secs_f64();
1402 const MEGABYTE: u64 = 1024 * 1024;
1404 let bucket = if bytes < MEGABYTE {
1405 "<1MiB"
1406 } else if bytes <= 16 * MEGABYTE {
1407 "1-16MiB"
1408 } else {
1409 ">16MiB"
1410 };
1411 metrics::histogram!("s3.meta_requests.throughput_mibs", "op" => op, "size" => bucket).record(throughput_mbps);
1412}
1413
1414#[cfg_attr(not(docsrs), async_trait)]
1415impl ObjectClient for S3CrtClient {
1416 type GetObjectResponse = S3GetObjectResponse;
1417 type PutObjectRequest = S3PutObjectRequest;
1418 type ClientError = S3RequestError;
1419
1420 fn read_part_size(&self) -> usize {
1421 self.inner.read_part_size
1422 }
1423
1424 fn write_part_size(&self) -> usize {
1425 self.inner.write_part_size
1430 }
1431
1432 fn initial_read_window_size(&self) -> Option<usize> {
1433 if self.inner.enable_backpressure {
1434 Some(self.inner.initial_read_window_size)
1435 } else {
1436 None
1437 }
1438 }
1439
1440 fn mem_usage_stats(&self) -> Option<BufferPoolUsageStats> {
1441 let start = Instant::now();
1442 self.inner
1443 .s3_client
1444 .poll_default_buffer_pool_usage_stats()
1445 .inspect(|_| {
1446 metrics::histogram!("s3.client.buffer_pool.get_usage_latency_us")
1447 .record(start.elapsed().as_micros() as f64);
1448 })
1449 }
1450
1451 async fn delete_object(
1452 &self,
1453 bucket: &str,
1454 key: &str,
1455 ) -> ObjectClientResult<DeleteObjectResult, DeleteObjectError, Self::ClientError> {
1456 self.delete_object(bucket, key).await
1457 }
1458
1459 async fn copy_object(
1460 &self,
1461 source_bucket: &str,
1462 source_key: &str,
1463 destination_bucket: &str,
1464 destination_key: &str,
1465 params: &CopyObjectParams,
1466 ) -> ObjectClientResult<CopyObjectResult, CopyObjectError, S3RequestError> {
1467 self.copy_object(source_bucket, source_key, destination_bucket, destination_key, params)
1468 .await
1469 }
1470
1471 async fn get_object(
1472 &self,
1473 bucket: &str,
1474 key: &str,
1475 params: &GetObjectParams,
1476 ) -> ObjectClientResult<Self::GetObjectResponse, GetObjectError, Self::ClientError> {
1477 self.get_object(bucket, key, params).await
1478 }
1479
1480 async fn list_objects(
1481 &self,
1482 bucket: &str,
1483 continuation_token: Option<&str>,
1484 delimiter: &str,
1485 max_keys: usize,
1486 prefix: &str,
1487 ) -> ObjectClientResult<ListObjectsResult, ListObjectsError, Self::ClientError> {
1488 self.list_objects(bucket, continuation_token, delimiter, max_keys, prefix)
1489 .await
1490 }
1491
1492 async fn head_object(
1493 &self,
1494 bucket: &str,
1495 key: &str,
1496 params: &HeadObjectParams,
1497 ) -> ObjectClientResult<HeadObjectResult, HeadObjectError, Self::ClientError> {
1498 self.head_object(bucket, key, params).await
1499 }
1500
1501 async fn put_object(
1502 &self,
1503 bucket: &str,
1504 key: &str,
1505 params: &PutObjectParams,
1506 ) -> ObjectClientResult<Self::PutObjectRequest, PutObjectError, Self::ClientError> {
1507 self.put_object(bucket, key, params).await
1508 }
1509
1510 async fn put_object_single<'a>(
1511 &self,
1512 bucket: &str,
1513 key: &str,
1514 params: &PutObjectSingleParams,
1515 contents: impl AsRef<[u8]> + Send + 'a,
1516 ) -> ObjectClientResult<PutObjectResult, PutObjectError, Self::ClientError> {
1517 self.put_object_single(bucket, key, params, contents).await
1518 }
1519
1520 async fn get_object_attributes(
1521 &self,
1522 bucket: &str,
1523 key: &str,
1524 max_parts: Option<usize>,
1525 part_number_marker: Option<usize>,
1526 object_attributes: &[ObjectAttribute],
1527 ) -> ObjectClientResult<GetObjectAttributesResult, GetObjectAttributesError, Self::ClientError> {
1528 self.get_object_attributes(bucket, key, max_parts, part_number_marker, object_attributes)
1529 .await
1530 }
1531
1532 async fn rename_object(
1533 &self,
1534 bucket: &str,
1535 src_key: &str,
1536 dst_key: &str,
1537 params: &RenameObjectParams,
1538 ) -> ObjectClientResult<RenameObjectResult, RenameObjectError, Self::ClientError> {
1539 self.rename_object(bucket, src_key, dst_key, params).await
1540 }
1541}
1542
1543pub trait OnTelemetry: std::fmt::Debug + Send + Sync {
1545 fn on_telemetry(&self, request_metrics: &RequestMetrics);
1546}
1547
1548#[cfg(test)]
1549mod tests {
1550 use mountpoint_s3_crt::common::error::Error;
1551 use rusty_fork::rusty_fork_test;
1552 use std::assert_eq;
1553
1554 use super::*;
1555 use test_case::test_case;
1556
1557 fn client_new_fails_with_invalid_part_size(part_size: usize) {
1559 let config = S3ClientConfig::default().part_size(part_size);
1560 let e = S3CrtClient::new(config).expect_err("creating a new client should fail");
1561 let message = if cfg!(target_pointer_width = "64") {
1562 "invalid configuration: part size must be at between 5MiB and 5GiB".to_string()
1563 } else {
1564 format!(
1565 "invalid configuration: part size must be at between 5MiB and {}GiB",
1566 usize::MAX / 1024 / 1024 / 1024
1567 )
1568 };
1569 assert_eq!(e.to_string(), message);
1570 }
1571
1572 #[cfg(target_pointer_width = "64")]
1575 #[test]
1576 fn client_new_fails_with_greater_part_size() {
1577 let part_size = 6 * 1024 * 1024 * 1024; client_new_fails_with_invalid_part_size(part_size);
1579 }
1580
1581 #[test]
1582 fn client_new_fails_with_smaller_part_size() {
1583 let part_size = 4 * 1024 * 1024; client_new_fails_with_invalid_part_size(part_size);
1585 }
1586
1587 #[test]
1589 fn test_user_agent_with_prefix() {
1590 let user_agent_prefix = String::from("someprefix");
1591 let expected_user_agent = "someprefix mountpoint-s3-client/";
1592
1593 let config = S3ClientConfig {
1594 user_agent: Some(UserAgent::new(Some(user_agent_prefix))),
1595 ..Default::default()
1596 };
1597
1598 let client = S3CrtClient::new(config).expect("Create test client");
1599
1600 let mut message = client
1601 .inner
1602 .new_request_template("GET", "amzn-s3-demo-bucket")
1603 .expect("new request template expected");
1604
1605 let headers = message.inner.get_headers().expect("Expected a block of HTTP headers");
1606
1607 let user_agent_header = headers
1608 .get("User-Agent")
1609 .expect("User Agent Header expected with given prefix");
1610 let user_agent_header_value = user_agent_header.value();
1611
1612 assert!(
1613 user_agent_header_value
1614 .to_string_lossy()
1615 .starts_with(expected_user_agent)
1616 );
1617 }
1618
1619 fn assert_expected_host(expected_host: &str, endpoint_config: EndpointConfig) {
1620 let config = S3ClientConfig {
1621 endpoint_config,
1622 ..Default::default()
1623 };
1624
1625 let client = S3CrtClient::new(config).expect("create test client");
1626
1627 let mut message = client
1628 .inner
1629 .new_request_template("GET", "")
1630 .expect("new request template expected");
1631
1632 let headers = message.inner.get_headers().expect("expected a block of HTTP headers");
1633
1634 let host_header = headers.get("Host").expect("Host header expected");
1635 let host_header_value = host_header.value();
1636
1637 assert_eq!(host_header_value.to_string_lossy(), expected_host);
1638 }
1639
1640 rusty_fork_test! {
1642 #[test]
1643 fn test_endpoint_favors_parameter_over_env_variable() {
1644 let endpoint_uri = Uri::new_from_str(&Allocator::default(), "https://s3.us-west-2.amazonaws.com").unwrap();
1645 let endpoint_config = EndpointConfig::new("region-place-holder").endpoint(endpoint_uri);
1646
1647 unsafe { std::env::set_var("AWS_ENDPOINT_URL", "https://s3.us-east-1.amazonaws.com"); }
1649
1650 assert_expected_host("s3.us-west-2.amazonaws.com", endpoint_config);
1652 }
1653
1654 #[test]
1655 fn test_endpoint_favors_env_variable() {
1656 let endpoint_config = EndpointConfig::new("us-east-1");
1657
1658 unsafe { std::env::set_var("AWS_ENDPOINT_URL", "https://s3.eu-west-1.amazonaws.com"); }
1660
1661 assert_expected_host("s3.eu-west-1.amazonaws.com", endpoint_config);
1662 }
1663
1664 #[test]
1665 fn test_endpoint_with_invalid_env_variable() {
1666 let endpoint_config = EndpointConfig::new("us-east-1");
1667
1668 unsafe { std::env::set_var("AWS_ENDPOINT_URL", "htp:/bad:url"); }
1670
1671 let config = S3ClientConfig {
1672 endpoint_config,
1673 ..Default::default()
1674 };
1675
1676 let client = S3CrtClient::new(config);
1677 match client {
1678 Ok(_) => panic!("expected an error"),
1679 Err(e) => assert_eq!(e.to_string().to_lowercase(), "invalid s3 endpoint"),
1680 }
1681 }
1682
1683 }
1684
1685 #[test]
1687 fn test_user_agent_without_prefix() {
1688 let expected_user_agent = "mountpoint-s3-client/";
1689
1690 let config: S3ClientConfig = Default::default();
1691
1692 let client = S3CrtClient::new(config).expect("Create test client");
1693
1694 let mut message = client
1695 .inner
1696 .new_request_template("GET", "amzn-s3-demo-bucket")
1697 .expect("new request template expected");
1698
1699 let headers = message.inner.get_headers().expect("Expected a block of HTTP headers");
1700
1701 let user_agent_header = headers
1702 .get("User-Agent")
1703 .expect("User Agent Header expected with given prefix");
1704 let user_agent_header_value = user_agent_header.value();
1705
1706 assert!(
1707 user_agent_header_value
1708 .to_string_lossy()
1709 .starts_with(expected_user_agent)
1710 );
1711 }
1712
1713 #[test_case("bytes 200-1000/67589" => Some(200..1001))]
1714 #[test_case("bytes 200-1000/*" => Some(200..1001))]
1715 #[test_case("bytes 200-1000" => None)]
1716 #[test_case("bytes */67589" => None)]
1717 #[test_case("octets 200-1000]" => None)]
1718 fn parse_content_range(range: &str) -> Option<Range<u64>> {
1719 let mut headers = Headers::new(&Allocator::default()).unwrap();
1720 let header = Header::new("Content-Range", range);
1721 headers.add_header(&header).unwrap();
1722 extract_range_header(&headers)
1723 }
1724
1725 #[test]
1727 fn test_expected_bucket_owner() {
1728 let expected_bucket_owner = "111122223333";
1729
1730 let config: S3ClientConfig = S3ClientConfig::new().bucket_owner("111122223333");
1731
1732 let client = S3CrtClient::new(config).expect("Create test client");
1733
1734 let mut message = client
1735 .inner
1736 .new_request_template("GET", "amzn-s3-demo-bucket")
1737 .expect("new request template expected");
1738
1739 let headers = message.inner.get_headers().expect("Expected a block of HTTP headers");
1740
1741 let expected_bucket_owner_header = headers
1742 .get("x-amz-expected-bucket-owner")
1743 .expect("the headers should contain x-amz-expected-bucket-owner");
1744 let expected_bucket_owner_value = expected_bucket_owner_header.value();
1745
1746 assert!(
1747 expected_bucket_owner_value
1748 .to_string_lossy()
1749 .starts_with(expected_bucket_owner)
1750 );
1751 }
1752
1753 fn make_result(
1754 response_status: i32,
1755 body: impl Into<OsString>,
1756 bucket_region_header: Option<&str>,
1757 ) -> MetaRequestResult {
1758 let error_response_headers = bucket_region_header.map(|h| {
1759 let mut headers = Headers::new(&Allocator::default()).unwrap();
1760 headers.add_header(&Header::new("x-amz-bucket-region", h)).unwrap();
1761 headers
1762 });
1763 MetaRequestResult {
1764 response_status,
1765 crt_error: 1i32.into(),
1766 error_response_headers,
1767 error_response_body: Some(body.into()),
1768 }
1769 }
1770
1771 #[test]
1772 fn parse_301_redirect() {
1773 let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>PermanentRedirect</Code><Message>The bucket you are attempting to access must be addressed using the specified endpoint. Please send all future requests to this endpoint.</Message><Endpoint>amzn-s3-demo-bucket.s3-us-west-2.amazonaws.com</Endpoint><Bucket>amzn-s3-demo-bucket</Bucket><RequestId>CM0Z9YFABRVSWXDJ</RequestId><HostId>HHmbUixasrJ02DlkOSCvJId897Jm0ERHuE2XMkSn2Oax1J/ad2+AU9nFrODN1ay13cWFgIAYBnI=</HostId></Error>"#;
1774 let result = make_result(301, OsStr::from_bytes(&body[..]), Some("us-west-2"));
1775 let result = try_parse_generic_error(&result);
1776 let Some(S3RequestError::IncorrectRegion(region, _)) = result else {
1777 panic!("wrong result, got: {result:?}");
1778 };
1779 assert_eq!(region, "us-west-2");
1780 }
1781
1782 #[test]
1783 fn parse_403_access_denied() {
1784 let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>AccessDenied</Code><Message>Access Denied</Message><RequestId>CM0R497NB0WAQ977</RequestId><HostId>w1TqUKGaIuNAIgzqm/L2azuzgEBINxTngWPbV1iH2IvpLsVCCTKHJTh4HsGp4JnggHqVkA+KN1MGqHDw1+WEuA==</HostId></Error>"#;
1785 let result = make_result(403, OsStr::from_bytes(&body[..]), None);
1786 let result = try_parse_generic_error(&result);
1787 let Some(S3RequestError::Forbidden(message, _)) = result else {
1788 panic!("wrong result, got: {result:?}");
1789 };
1790 assert_eq!(message, "Access Denied");
1791 }
1792
1793 #[test]
1794 fn parse_400_invalid_token() {
1795 let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>InvalidToken</Code><Message>The provided token is malformed or otherwise invalid.</Message><Token-0>THEREALTOKENGOESHERE</Token-0><RequestId>CBFNVADDAZ8661HK</RequestId><HostId>rb5dpgYeIFxi8p5BzVK8s8wG/nQ4a7C5kMBp/KWIT4bvOUihugpssMTy7xS0mispbz6IIaX8W1g=</HostId></Error>"#;
1796 let result = make_result(400, OsStr::from_bytes(&body[..]), None);
1797 let result = try_parse_generic_error(&result);
1798 let Some(S3RequestError::Forbidden(message, _)) = result else {
1799 panic!("wrong result, got: {result:?}");
1800 };
1801 assert_eq!(message, "The provided token is malformed or otherwise invalid.");
1802 }
1803
1804 #[test]
1805 fn parse_400_expired_token() {
1806 let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>ExpiredToken</Code><Message>The provided token has expired.</Message><Token-0>THEREALTOKENGOESHERE</Token-0><RequestId>RFXW0E15XSRPJYSW</RequestId><HostId>djitP7S+g43JSzR4pMOJpOO3RYpQUOUsmD4AqhRe3v24+JB/c+vwOEZgI8A35KDUe1cqQ5yKHwg=</HostId></Error>"#;
1807 let result = make_result(400, OsStr::from_bytes(&body[..]), None);
1808 let result = try_parse_generic_error(&result);
1809 let Some(S3RequestError::Forbidden(message, _)) = result else {
1810 panic!("wrong result, got: {result:?}");
1811 };
1812 assert_eq!(message, "The provided token has expired.");
1813 }
1814
1815 #[test]
1816 fn parse_400_redirect() {
1817 let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>AuthorizationHeaderMalformed</Code><Message>The authorization header is malformed; the region \'us-east-1\' is wrong; expecting \'us-west-2\'</Message><Region>us-west-2</Region><RequestId>VR3NH4JF5F39GB66</RequestId><HostId>ZDzYFC1w0E5K34+ZCAnvh9ZiGaAhvx5COyZVYTUnKvSP/694xCiXmJ2AEGZd5T1Epy9vB4EOOjk=</HostId></Error>"#;
1819 let result = make_result(400, OsStr::from_bytes(&body[..]), Some("us-west-2"));
1820 let result = try_parse_generic_error(&result);
1821 let Some(S3RequestError::IncorrectRegion(region, _)) = result else {
1822 panic!("wrong result, got: {result:?}");
1823 };
1824 assert_eq!(region, "us-west-2");
1825 }
1826
1827 #[test]
1828 fn parse_403_signature_does_not_match() {
1829 let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>SignatureDoesNotMatch</Code><Message>The request signature we calculated does not match the signature you provided. Check your key and signing method.</Message><AWSAccessKeyId>ASIASMEXAMPLE0000000</AWSAccessKeyId><StringToSign>EXAMPLE</StringToSign><SignatureProvided>EXAMPLE</SignatureProvided><StringToSignBytes>EXAMPLE</StringToSignBytes><CanonicalRequest>EXAMPLE</CanonicalRequest><CanonicalRequestBytes>EXAMPLE</CanonicalRequestBytes><RequestId>A1F516XX5M8AATSQ</RequestId><HostId>qs9dULIp5ABM7U+H8nGfzKtMYTxvqxIVvOYZ8lEFBDyTF4Fe+876Y4bLptG4mb+PTZFyG4yaUjg=</HostId></Error>"#;
1830 let result = make_result(403, OsStr::from_bytes(&body[..]), None);
1831 let result = try_parse_generic_error(&result);
1832 let Some(S3RequestError::Forbidden(message, _)) = result else {
1833 panic!("wrong result, got: {result:?}");
1834 };
1835 assert_eq!(
1836 message,
1837 "The request signature we calculated does not match the signature you provided. Check your key and signing method."
1838 );
1839 }
1840
1841 #[test]
1842 fn parse_403_made_up_error() {
1843 let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>NotARealError</Code><Message>This error is made up.</Message><RequestId>CM0R497NB0WAQ977</RequestId><HostId>w1TqUKGaIuNAIgzqm/L2azuzgEBINxTngWPbV1iH2IvpLsVCCTKHJTh4HsGp4JnggHqVkA+KN1MGqHDw1+WEuA==</HostId></Error>"#;
1845 let result = make_result(403, OsStr::from_bytes(&body[..]), None);
1846 let result = try_parse_generic_error(&result);
1847 let Some(S3RequestError::Forbidden(message, _)) = result else {
1848 panic!("wrong result, got: {result:?}");
1849 };
1850 assert_eq!(message, "This error is made up.");
1851 }
1852
1853 fn make_crt_error_result(response_status: i32, crt_error: Error) -> MetaRequestResult {
1854 MetaRequestResult {
1855 response_status,
1856 crt_error,
1857 error_response_headers: None,
1858 error_response_body: None,
1859 }
1860 }
1861
1862 #[test]
1863 fn parse_no_signing_credential_error() {
1864 let error_code = mountpoint_s3_crt::auth::ErrorCode::AWS_AUTH_SIGNING_NO_CREDENTIALS as i32;
1865 let result = make_crt_error_result(0, error_code.into());
1866 let result = try_parse_generic_error(&result);
1867 let Some(S3RequestError::NoSigningCredentials) = result else {
1868 panic!("wrong result, got: {result:?}");
1869 };
1870 }
1871
1872 #[test]
1873 fn parse_test_other_crt_error() {
1874 let error_code = mountpoint_s3_crt::auth::ErrorCode::AWS_AUTH_SIGNING_UNSUPPORTED_ALGORITHM as i32;
1876 let result = make_crt_error_result(0, error_code.into());
1877 let result = try_parse_generic_error(&result);
1878 let Some(S3RequestError::CrtError(error)) = result else {
1879 panic!("wrong result, got: {result:?}");
1880 };
1881 assert_eq!(error, error_code.into());
1882 }
1883
1884 #[test]
1885 fn test_checksum_sha256() {
1886 let mut headers = Headers::new(&Allocator::default()).unwrap();
1887 let value = "QwzjTQIHJO11oZbfwq1nx3dy0Wk=";
1888 let header = Header::new("x-amz-checksum-sha256", value.to_owned());
1889 headers.add_header(&header).unwrap();
1890
1891 let checksum = parse_checksum(&headers).expect("failed to parse headers");
1892 assert_eq!(checksum.checksum_crc32, None, "other checksums shouldn't be set");
1893 assert_eq!(checksum.checksum_crc32c, None, "other checksums shouldn't be set");
1894 assert_eq!(checksum.checksum_sha1, None, "other checksums shouldn't be set");
1895 assert_eq!(
1896 checksum.checksum_sha256,
1897 Some(value.to_owned()),
1898 "sha256 header should match"
1899 );
1900 }
1901}