1use std::cmp;
2use std::ffi::{OsStr, OsString};
3use std::future::Future;
4use std::num::NonZeroUsize;
5use std::ops::Range;
6use std::ops::{Deref, DerefMut};
7use std::os::unix::prelude::OsStrExt;
8use std::pin::Pin;
9use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
10use std::sync::{Arc, Mutex};
11use std::task::{Context, Poll};
12use std::time::{Duration, Instant};
13
14use futures::future::{Fuse, FusedFuture};
15use futures::FutureExt;
16pub use mountpoint_s3_crt::auth::credentials::{CredentialsProvider, CredentialsProviderStaticOptions};
17use mountpoint_s3_crt::auth::credentials::{CredentialsProviderChainDefaultOptions, CredentialsProviderProfileOptions};
18use mountpoint_s3_crt::auth::signing_config::SigningConfig;
19use mountpoint_s3_crt::common::allocator::Allocator;
20pub use mountpoint_s3_crt::common::error::Error as CrtError;
21use mountpoint_s3_crt::common::string::AwsString;
22use mountpoint_s3_crt::common::uri::Uri;
23use mountpoint_s3_crt::http::request_response::{Header, Headers, HeadersError, Message};
24use mountpoint_s3_crt::io::channel_bootstrap::{ClientBootstrap, ClientBootstrapOptions};
25pub use mountpoint_s3_crt::io::event_loop::EventLoopGroup;
26use mountpoint_s3_crt::io::host_resolver::{AddressKinds, HostResolver, HostResolverDefaultOptions};
27use mountpoint_s3_crt::io::retry_strategy::{ExponentialBackoffJitterMode, RetryStrategy, StandardRetryOptions};
28use mountpoint_s3_crt::io::stream::InputStream;
29use mountpoint_s3_crt::s3::client::{
30 init_signing_config, BufferPoolUsageStats, ChecksumConfig, Client, ClientConfig, MetaRequest, MetaRequestOptions,
31 MetaRequestResult, MetaRequestType, RequestMetrics, RequestType,
32};
33
34use async_trait::async_trait;
35use futures::channel::oneshot;
36use percent_encoding::{percent_encode, AsciiSet, NON_ALPHANUMERIC};
37use pin_project::pin_project;
38use thiserror::Error;
39use tracing::{debug, error, trace, Span};
40
41use crate::checksums::{crc32_to_base64, crc32c_to_base64, crc64nvme_to_base64, sha1_to_base64, sha256_to_base64};
42use crate::endpoint_config::EndpointError;
43use crate::endpoint_config::{self, EndpointConfig};
44use crate::error_metadata::{ClientErrorMetadata, ProvideErrorMetadata};
45use crate::object_client::*;
46use crate::user_agent::UserAgent;
47
48macro_rules! request_span {
49 ($self:expr, $method:expr, $($field:tt)*) => {{
50 let counter = $self.next_request_counter();
51 let span = tracing::warn_span!(target: "mountpoint_s3_client::s3_crt_client::request", $method, id = counter, $($field)*);
55 span.in_scope(|| tracing::debug!("new request"));
56 span
57 }};
58 ($self:expr, $method:expr) => { request_span!($self, $method,) };
59}
60
61pub(crate) mod copy_object;
62pub(crate) mod delete_object;
63pub(crate) mod get_object;
64
65pub(crate) use get_object::S3GetObjectResponse;
66pub(crate) mod get_object_attributes;
67
68pub(crate) mod head_object;
69pub(crate) mod list_objects;
70
71pub(crate) mod head_bucket;
72pub(crate) mod put_object;
73pub use head_bucket::HeadBucketError;
74pub(crate) use put_object::S3PutObjectRequest;
75
76macro_rules! event {
79 ($level:expr, $($args:tt)*) => {
80 match $level {
81 ::tracing::Level::ERROR => ::tracing::event!(::tracing::Level::ERROR, $($args)*),
82 ::tracing::Level::WARN => ::tracing::event!(::tracing::Level::WARN, $($args)*),
83 ::tracing::Level::INFO => ::tracing::event!(::tracing::Level::INFO, $($args)*),
84 ::tracing::Level::DEBUG => ::tracing::event!(::tracing::Level::DEBUG, $($args)*),
85 ::tracing::Level::TRACE => ::tracing::event!(::tracing::Level::TRACE, $($args)*),
86 }
87 }
88}
89
90#[derive(Debug, Clone)]
92pub struct S3ClientConfig {
93 auth_config: S3ClientAuthConfig,
94 throughput_target_gbps: f64,
95 read_part_size: usize,
96 write_part_size: usize,
97 endpoint_config: EndpointConfig,
98 user_agent: Option<UserAgent>,
99 request_payer: Option<String>,
100 bucket_owner: Option<String>,
101 max_attempts: Option<NonZeroUsize>,
102 read_backpressure: bool,
103 initial_read_window: usize,
104 network_interface_names: Vec<String>,
105 telemetry_callback: Option<Arc<dyn OnTelemetry>>,
106 event_loop_threads: Option<u16>,
107}
108
109impl Default for S3ClientConfig {
110 fn default() -> Self {
111 const DEFAULT_PART_SIZE: usize = 8 * 1024 * 1024;
112 Self {
113 auth_config: Default::default(),
114 throughput_target_gbps: 10.0,
115 read_part_size: DEFAULT_PART_SIZE,
116 write_part_size: DEFAULT_PART_SIZE,
117 endpoint_config: EndpointConfig::new("us-east-1"),
118 user_agent: None,
119 request_payer: None,
120 bucket_owner: None,
121 max_attempts: None,
122 read_backpressure: false,
123 initial_read_window: DEFAULT_PART_SIZE,
124 network_interface_names: vec![],
125 telemetry_callback: None,
126 event_loop_threads: None,
127 }
128 }
129}
130
131impl S3ClientConfig {
132 pub fn new() -> Self {
133 Self::default()
134 }
135
136 #[must_use = "S3ClientConfig follows a builder pattern"]
138 pub fn auth_config(mut self, auth_config: S3ClientAuthConfig) -> Self {
139 self.auth_config = auth_config;
140 self
141 }
142
143 #[must_use = "S3ClientConfig follows a builder pattern"]
145 pub fn part_size(mut self, part_size: usize) -> Self {
146 self.read_part_size = part_size;
147 self.write_part_size = part_size;
148 self
149 }
150
151 #[must_use = "S3ClientConfig follows a builder pattern"]
153 pub fn read_part_size(mut self, size: usize) -> Self {
154 self.read_part_size = size;
155 self
156 }
157
158 #[must_use = "S3ClientConfig follows a builder pattern"]
160 pub fn write_part_size(mut self, size: usize) -> Self {
161 self.write_part_size = size;
162 self
163 }
164
165 #[must_use = "S3ClientConfig follows a builder pattern"]
167 pub fn throughput_target_gbps(mut self, throughput_target_gbps: f64) -> Self {
168 self.throughput_target_gbps = throughput_target_gbps;
169 self
170 }
171
172 #[must_use = "S3ClientConfig follows a builder pattern"]
174 pub fn endpoint_config(mut self, endpoint_config: EndpointConfig) -> Self {
175 self.endpoint_config = endpoint_config;
176 self
177 }
178
179 #[must_use = "S3ClientConfig follows a builder pattern"]
181 pub fn user_agent(mut self, user_agent: UserAgent) -> Self {
182 self.user_agent = Some(user_agent);
183 self
184 }
185
186 #[must_use = "S3ClientConfig follows a builder pattern"]
188 pub fn request_payer(mut self, request_payer: &str) -> Self {
189 self.request_payer = Some(request_payer.to_owned());
190 self
191 }
192
193 #[must_use = "S3ClientConfig follows a builder pattern"]
195 pub fn bucket_owner(mut self, bucket_owner: &str) -> Self {
196 self.bucket_owner = Some(bucket_owner.to_owned());
197 self
198 }
199
200 #[must_use = "S3ClientConfig follows a builder pattern"]
203 pub fn max_attempts(mut self, max_attempts: NonZeroUsize) -> Self {
204 self.max_attempts = Some(max_attempts);
205 self
206 }
207
208 #[must_use = "S3ClientConfig follows a builder pattern"]
210 pub fn read_backpressure(mut self, read_backpressure: bool) -> Self {
211 self.read_backpressure = read_backpressure;
212 self
213 }
214
215 #[must_use = "S3ClientConfig follows a builder pattern"]
217 pub fn initial_read_window(mut self, initial_read_window: usize) -> Self {
218 self.initial_read_window = initial_read_window;
219 self
220 }
221
222 #[must_use = "S3ClientConfig follows a builder pattern"]
224 pub fn network_interface_names(mut self, network_interface_names: Vec<String>) -> Self {
225 self.network_interface_names = network_interface_names;
226 self
227 }
228
229 #[must_use = "S3ClientConfig follows a builder pattern"]
231 pub fn telemetry_callback(mut self, telemetry_callback: Arc<dyn OnTelemetry>) -> Self {
232 self.telemetry_callback = Some(telemetry_callback);
233 self
234 }
235
236 #[must_use = "S3ClientConfig follows a builder pattern"]
238 pub fn event_loop_threads(mut self, event_loop_threads: u16) -> Self {
239 self.event_loop_threads = Some(event_loop_threads);
240 self
241 }
242}
243
244#[derive(Debug, Clone, Default)]
246pub enum S3ClientAuthConfig {
247 #[default]
249 Default,
250 NoSigning,
252 Profile(String),
254 Provider(CredentialsProvider),
256}
257
258#[derive(Debug, Clone)]
268pub struct S3CrtClient {
269 inner: Arc<S3CrtClientInner>,
270}
271
272impl S3CrtClient {
273 pub fn new(config: S3ClientConfig) -> Result<Self, NewClientError> {
275 Ok(Self {
276 inner: Arc::new(S3CrtClientInner::new(config)?),
277 })
278 }
279
280 pub fn endpoint_config(&self) -> EndpointConfig {
282 self.inner.endpoint_config.clone()
283 }
284
285 #[doc(hidden)]
286 pub fn event_loop_group(&self) -> EventLoopGroup {
287 self.inner.event_loop_group.clone()
288 }
289}
290
291#[derive(Debug)]
292struct S3CrtClientInner {
293 s3_client: Client,
294 event_loop_group: EventLoopGroup,
295 endpoint_config: EndpointConfig,
296 allocator: Allocator,
297 next_request_counter: AtomicU64,
298 user_agent_header: String,
301 request_payer: Option<String>,
302 read_part_size: usize,
303 write_part_size: usize,
304 enable_backpressure: bool,
305 initial_read_window_size: usize,
306 bucket_owner: Option<String>,
307 credentials_provider: Option<CredentialsProvider>,
308 host_resolver: HostResolver,
309 telemetry_callback: Option<Arc<dyn OnTelemetry>>,
310}
311
312impl S3CrtClientInner {
313 fn new(config: S3ClientConfig) -> Result<Self, NewClientError> {
314 let allocator = Allocator::default();
315
316 let mut event_loop_group = EventLoopGroup::new_default(&allocator, config.event_loop_threads, || {}).unwrap();
317
318 let resolver_options = HostResolverDefaultOptions {
319 max_entries: 8,
320 event_loop_group: &mut event_loop_group,
321 };
322
323 let mut host_resolver = HostResolver::new_default(&allocator, &resolver_options).unwrap();
324
325 let bootstrap_options = ClientBootstrapOptions {
326 event_loop_group: &mut event_loop_group,
327 host_resolver: &mut host_resolver,
328 };
329
330 let mut client_bootstrap = ClientBootstrap::new(&allocator, &bootstrap_options).unwrap();
331
332 let mut client_config = ClientConfig::new();
333
334 let retry_strategy = {
335 let mut retry_strategy_options = StandardRetryOptions::default(&mut event_loop_group);
336 let max_attempts = std::env::var("AWS_MAX_ATTEMPTS")
337 .ok()
338 .and_then(|s| s.parse::<usize>().ok())
339 .or_else(|| config.max_attempts.map(|m| m.get()))
340 .unwrap_or(3);
341 retry_strategy_options.backoff_retry_options.max_retries = max_attempts.saturating_sub(1);
344 retry_strategy_options.backoff_retry_options.backoff_scale_factor = Duration::from_millis(500);
345 retry_strategy_options.backoff_retry_options.jitter_mode = ExponentialBackoffJitterMode::Full;
346 RetryStrategy::standard(&allocator, &retry_strategy_options).unwrap()
347 };
348
349 trace!("constructing client with auth config {:?}", config.auth_config);
350 let credentials_provider = match config.auth_config {
351 S3ClientAuthConfig::Default => {
352 let credentials_chain_default_options = CredentialsProviderChainDefaultOptions {
353 bootstrap: &mut client_bootstrap,
354 };
355 CredentialsProvider::new_chain_default(&allocator, credentials_chain_default_options)
356 .map_err(NewClientError::ProviderFailure)?
357 }
358 S3ClientAuthConfig::NoSigning => {
359 CredentialsProvider::new_anonymous(&allocator).map_err(NewClientError::ProviderFailure)?
360 }
361 S3ClientAuthConfig::Profile(profile_name) => {
362 let credentials_profile_options = CredentialsProviderProfileOptions {
363 bootstrap: &mut client_bootstrap,
364 profile_name_override: &profile_name,
365 };
366 CredentialsProvider::new_profile(&allocator, credentials_profile_options)
367 .map_err(NewClientError::ProviderFailure)?
368 }
369 S3ClientAuthConfig::Provider(provider) => provider,
370 };
371
372 let endpoint_config = config.endpoint_config;
373 client_config.region(endpoint_config.get_region());
374 let signing_config = init_signing_config(
375 endpoint_config.get_region(),
376 credentials_provider.clone(),
377 None,
378 None,
379 None,
380 );
381
382 let endpoint_config = match endpoint_config.get_endpoint() {
383 None => {
384 if let Ok(aws_endpoint_url) = std::env::var("AWS_ENDPOINT_URL") {
386 debug!("using AWS_ENDPOINT_URL {}", aws_endpoint_url);
387 let env_uri = Uri::new_from_str(&allocator, &aws_endpoint_url)
388 .map_err(|e| EndpointError::InvalidUri(endpoint_config::InvalidUriError::CouldNotParse(e)))?;
389 endpoint_config.endpoint(env_uri)
390 } else {
391 endpoint_config
392 }
393 }
394 Some(_) => endpoint_config,
395 };
396
397 client_config.express_support(true);
398 client_config.read_backpressure(config.read_backpressure);
399 client_config.initial_read_window(config.initial_read_window);
400 client_config.signing_config(signing_config);
401
402 client_config
403 .client_bootstrap(client_bootstrap)
404 .retry_strategy(retry_strategy);
405
406 client_config.throughput_target_gbps(config.throughput_target_gbps);
407
408 let max_part_size = cmp::min(5_u64 * 1024 * 1024 * 1024, usize::MAX as u64) as usize;
410 for part_size in [config.read_part_size, config.write_part_size] {
412 if !(5 * 1024 * 1024..=max_part_size).contains(&part_size) {
413 return Err(NewClientError::InvalidConfiguration(format!(
414 "part size must be at between 5MiB and {}GiB",
415 max_part_size / 1024 / 1024 / 1024
416 )));
417 }
418 }
419
420 if !config.network_interface_names.is_empty() {
421 client_config.network_interface_names(config.network_interface_names);
422 }
423
424 let user_agent = config.user_agent.unwrap_or_else(|| UserAgent::new(None));
425 let user_agent_header = user_agent.build();
426
427 let s3_client = Client::new(&allocator, client_config).map_err(NewClientError::CrtError)?;
428
429 Ok(Self {
430 allocator,
431 s3_client,
432 event_loop_group,
433 endpoint_config,
434 next_request_counter: AtomicU64::new(0),
435 user_agent_header,
436 request_payer: config.request_payer,
437 read_part_size: config.read_part_size,
438 write_part_size: config.write_part_size,
439 enable_backpressure: config.read_backpressure,
440 initial_read_window_size: config.initial_read_window,
441 bucket_owner: config.bucket_owner,
442 credentials_provider: Some(credentials_provider),
443 host_resolver,
444 telemetry_callback: config.telemetry_callback,
445 })
446 }
447
448 fn new_request_template(&self, method: &str, bucket: &str) -> Result<S3Message, ConstructionError> {
453 let endpoint = self.endpoint_config.resolve_for_bucket(bucket)?;
454 let uri = endpoint.uri()?;
455 trace!(?uri, "resolved endpoint");
456
457 let signing_config = if let Some(credentials_provider) = &self.credentials_provider {
458 let auth_scheme = match endpoint.auth_scheme() {
459 Ok(auth_scheme) => auth_scheme,
460 Err(e) => {
461 error!(error=?e, "no auth scheme for endpoint");
462 return Err(e.into());
463 }
464 };
465 trace!(?auth_scheme, "resolved auth scheme");
466 let algorithm = Some(auth_scheme.scheme_name());
467 let service = Some(auth_scheme.signing_name());
468 let use_double_uri_encode = Some(!auth_scheme.disable_double_encoding());
469 Some(init_signing_config(
470 auth_scheme.signing_region(),
471 credentials_provider.clone(),
472 algorithm,
473 service,
474 use_double_uri_encode,
475 ))
476 } else {
477 None
478 };
479
480 let hostname = uri.host_name().to_str().unwrap();
481 let path_prefix = uri.path().to_os_string().into_string().unwrap();
482 let port = uri.host_port();
483 let hostname_header = if port > 0 {
484 format!("{}:{}", hostname, port)
485 } else {
486 hostname.to_string()
487 };
488
489 let mut message = Message::new_request(&self.allocator)?;
490 message.set_request_method(method)?;
491 message.add_header(&Header::new("Host", hostname_header))?;
492 message.add_header(&Header::new("accept", "application/xml"))?;
493 message.add_header(&Header::new("User-Agent", &self.user_agent_header))?;
494
495 if let Some(ref payer) = self.request_payer {
496 message.add_header(&Header::new("x-amz-request-payer", payer))?;
497 }
498
499 if let Some(ref owner) = self.bucket_owner {
500 message.add_header(&Header::new("x-amz-expected-bucket-owner", owner))?;
501 }
502
503 Ok(S3Message {
504 inner: message,
505 uri,
506 path_prefix,
507 checksum_config: None,
508 signing_config,
509 })
510 }
511
512 #[allow(clippy::too_many_arguments)]
519 fn meta_request_with_callbacks<E: std::error::Error + Send + 'static>(
520 &self,
521 mut options: MetaRequestOptions,
522 request_span: Span,
523 on_request_finish: impl Fn(&RequestMetrics) + Send + 'static,
524 mut on_headers: impl FnMut(&Headers, i32) + Send + 'static,
525 mut on_body: impl FnMut(u64, &[u8]) + Send + 'static,
526 parse_meta_request_error: impl FnOnce(&MetaRequestResult) -> Option<E> + Send + 'static,
527 on_meta_request_result: impl FnOnce(ObjectClientResult<(), E, S3RequestError>) + Send + 'static,
528 ) -> Result<CancellingMetaRequest, S3RequestError> {
529 let span_telemetry = request_span.clone();
530 let span_body = request_span.clone();
531 let span_finish = request_span;
532
533 let endpoint = options.get_endpoint().expect("S3Message always has an endpoint");
534 let hostname = endpoint.host_name().to_str().unwrap().to_owned();
535 let host_resolver = self.host_resolver.clone();
536 let telemetry_callback = self.telemetry_callback.clone();
537
538 let start_time = Instant::now();
539 let first_body_part = Arc::new(AtomicBool::new(true));
540 let first_body_part_clone = Arc::clone(&first_body_part);
541 let total_bytes = Arc::new(AtomicU64::new(0));
542 let total_bytes_clone = Arc::clone(&total_bytes);
543
544 options
545 .on_telemetry(move |metrics| {
546 let _guard = span_telemetry.enter();
547
548 on_request_finish(metrics);
549
550 let http_status = metrics.status_code();
551 let request_canceled = metrics.is_canceled();
552 let request_failure = http_status.map(|status| !(200..299).contains(&status)).unwrap_or(!request_canceled);
553 let crt_error = Some(metrics.error()).filter(|e| e.is_err());
554 let request_type = request_type_to_metrics_string(metrics.request_type());
555 let request_id = metrics.request_id().unwrap_or_else(|| "<unknown>".into());
556 let duration = metrics.total_duration();
557 let ttfb = metrics.time_to_first_byte();
558 let range = metrics.response_headers().and_then(|headers| extract_range_header(&headers));
559
560 let message = if request_failure {
561 "S3 request failed"
562 } else if request_canceled {
563 "S3 request canceled"
564 } else {
565 "S3 request finished"
566 };
567 debug!(%request_type, ?crt_error, http_status, ?range, ?duration, ?ttfb, %request_id, "{}", message);
568 trace!(detailed_metrics=?metrics, "S3 request completed");
569
570 let op = span_telemetry.metadata().map(|m| m.name()).unwrap_or("unknown");
571 if let Some(ttfb) = ttfb {
572 metrics::histogram!("s3.requests.first_byte_latency_us", "op" => op, "type" => request_type).record(ttfb.as_micros() as f64);
573 }
574 metrics::histogram!("s3.requests.total_latency_us", "op" => op, "type" => request_type).record(duration.as_micros() as f64);
575 metrics::counter!("s3.requests", "op" => op, "type" => request_type).increment(1);
576 if request_failure {
577 metrics::counter!("s3.requests.failures", "op" => op, "type" => request_type, "status" => http_status.unwrap_or(-1).to_string()).increment(1);
578 } else if request_canceled {
579 metrics::counter!("s3.requests.canceled", "op" => op, "type" => request_type).increment(1);
580 }
581
582 if let Some(telemetry_callback) = &telemetry_callback {
583 telemetry_callback.on_telemetry(metrics);
584 }
585 })
586 .on_headers(move |headers, response_status| {
587 (on_headers)(headers, response_status);
588 })
589 .on_body(move |range_start, data| {
590 let _guard = span_body.enter();
591
592 if first_body_part.compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst).ok() == Some(true) {
593 let latency = start_time.elapsed().as_micros() as f64;
594 let op = span_body.metadata().map(|m| m.name()).unwrap_or("unknown");
595 metrics::histogram!("s3.meta_requests.first_byte_latency_us", "op" => op).record(latency);
596 }
597 total_bytes.fetch_add(data.len() as u64, Ordering::SeqCst);
598
599 trace!(start = range_start, length = data.len(), "body part received");
600
601 (on_body)(range_start, data);
602 })
603 .on_finish(move |request_result| {
604 let _guard = span_finish.enter();
605
606 let op = span_finish.metadata().map(|m| m.name()).unwrap_or("unknown");
607 let duration = start_time.elapsed();
608
609 metrics::counter!("s3.meta_requests", "op" => op).increment(1);
610 metrics::histogram!("s3.meta_requests.total_latency_us", "op" => op).record(duration.as_micros() as f64);
611 if first_body_part_clone.compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst).ok() == Some(true) {
613 let latency = duration.as_micros() as f64;
614 metrics::histogram!("s3.meta_requests.first_byte_latency_us", "op" => op).record(latency);
615 }
616 let total_bytes = total_bytes_clone.load(Ordering::SeqCst);
617 if op == "get_object" {
620 emit_throughput_metric(total_bytes, duration, op);
621 }
622 let hostname_awsstring = AwsString::from_str(&hostname, &Allocator::default());
623 if let Ok(host_count) = host_resolver.get_host_address_count(&hostname_awsstring, AddressKinds::a()) {
624 metrics::gauge!("s3.client.host_count", "host" => hostname).set(host_count as f64);
625 }
626
627 let status_code = request_result.response_status;
628 let log_level = if (200..=399).contains(&status_code) || status_code == 404 || request_result.is_canceled() {
629 tracing::Level::DEBUG
630 } else {
631 tracing::Level::WARN
632 };
633
634 let result =
635 if !request_result.is_err() {
636 event!(log_level, ?duration, "meta request finished");
637 Ok(())
638 } else {
639 let error = parse_meta_request_error(&request_result).map(ObjectClientError::ServiceError);
642 let maybe_err = error.or_else(|| try_parse_generic_error(&request_result).map(ObjectClientError::ClientError));
643
644 let request_id = match &request_result.error_response_headers {
648 Some(headers) => headers.get("x-amz-request-id").map(|s| s.value().to_string_lossy().to_string()).ok(),
649 None => None,
650 };
651 let request_id = request_id.unwrap_or_else(|| "<unknown>".into());
652
653 let message = if request_result.is_canceled() {
654 "meta request canceled"
655 } else {
656 "meta request failed"
657 };
658 if let Some(error) = &maybe_err {
659 event!(log_level, ?duration, %request_id, ?error, message);
660 debug!("meta request result: {:?}", request_result);
661 } else {
662 event!(log_level, ?duration, %request_id, ?request_result, message);
663 }
664
665 if request_result.is_canceled() {
666 metrics::counter!("s3.meta_requests.canceled", "op" => op).increment(1);
667 } else {
668 let error_status = if request_result.response_status >= 100 {
670 request_result.response_status
671 } else {
672 -request_result.crt_error.raw_error()
673 };
674 metrics::counter!("s3.meta_requests.failures", "op" => op, "status" => format!("{error_status}")).increment(1);
675 }
676
677 Err(maybe_err.unwrap_or_else(|| ObjectClientError::ClientError(S3RequestError::ResponseError(request_result))))
679 };
680
681 on_meta_request_result(result);
682 });
683
684 let meta_request = self.s3_client.make_meta_request(options)?;
686 Self::poll_client_metrics(&self.s3_client);
687 Ok(CancellingMetaRequest::wrap(meta_request))
688 }
689
690 fn meta_request_with_body_payload<E: std::error::Error + Send + 'static>(
697 &self,
698 options: MetaRequestOptions,
699 request_span: Span,
700 parse_meta_request_error: impl FnOnce(&MetaRequestResult) -> Option<E> + Send + 'static,
701 ) -> Result<S3MetaRequest<Vec<u8>, E>, S3RequestError> {
702 let (tx, rx) = oneshot::channel::<ObjectClientResult<Vec<u8>, E, S3RequestError>>();
703
704 let body: Arc<Mutex<Vec<u8>>> = Default::default();
706 let body_clone = Arc::clone(&body);
707
708 let meta_request = self.meta_request_with_callbacks(
709 options,
710 request_span,
711 |_| {},
712 |_, _| {},
713 move |offset, data| {
714 let mut body = body_clone.lock().unwrap();
715 assert_eq!(offset as usize, body.len());
716 body.extend_from_slice(data);
717 },
718 parse_meta_request_error,
719 move |result| _ = tx.send(result.map(|_| std::mem::take(&mut *body.lock().unwrap()))),
720 )?;
721 Ok(S3MetaRequest {
722 receiver: rx.fuse(),
723 meta_request,
724 })
725 }
726
727 fn meta_request_with_headers_payload<E: std::error::Error + Send + 'static>(
734 &self,
735 options: MetaRequestOptions,
736 request_span: Span,
737 parse_meta_request_error: impl FnOnce(&MetaRequestResult) -> Option<E> + Send + 'static,
738 ) -> Result<S3MetaRequest<Headers, E>, S3RequestError> {
739 let (tx, rx) = oneshot::channel::<ObjectClientResult<Headers, E, S3RequestError>>();
740
741 let on_headers: Arc<Mutex<Option<Headers>>> = Default::default();
744 let on_result = on_headers.clone();
745
746 let meta_request = self.meta_request_with_callbacks(
747 options,
748 request_span,
749 |_| {},
750 move |headers, status| {
751 if (200..300).contains(&status) {
754 *on_headers.lock().unwrap() = Some(headers.clone());
755 }
756 },
757 |_, _| {},
758 parse_meta_request_error,
759 move |result| {
760 let headers =
762 result.and_then(|_| {
763 on_result.lock().unwrap().take().ok_or_else(|| {
764 S3RequestError::internal_failure(ResponseHeadersError::MissingHeaders).into()
765 })
766 });
767 _ = tx.send(headers);
768 },
769 )?;
770 Ok(S3MetaRequest {
771 receiver: rx.fuse(),
772 meta_request,
773 })
774 }
775
776 fn meta_request_without_payload<E: std::error::Error + Send + 'static>(
782 &self,
783 options: MetaRequestOptions,
784 request_span: Span,
785 parse_meta_request_error: impl FnOnce(&MetaRequestResult) -> Option<E> + Send + 'static,
786 ) -> Result<S3MetaRequest<(), E>, S3RequestError> {
787 let (tx, rx) = oneshot::channel::<ObjectClientResult<(), E, S3RequestError>>();
788
789 let meta_request = self.meta_request_with_callbacks(
790 options,
791 request_span,
792 |_| {},
793 |_, _| {},
794 |_, _| {},
795 parse_meta_request_error,
796 move |result| _ = tx.send(result),
797 )?;
798 Ok(S3MetaRequest {
799 receiver: rx.fuse(),
800 meta_request,
801 })
802 }
803
804 fn poll_client_metrics(s3_client: &Client) {
805 let metrics = s3_client.poll_client_metrics();
806 metrics::gauge!("s3.client.num_requests_being_processed").set(metrics.num_requests_tracked_requests as f64);
807 metrics::gauge!("s3.client.num_requests_being_prepared").set(metrics.num_requests_being_prepared as f64);
808 metrics::gauge!("s3.client.request_queue_size").set(metrics.request_queue_size as f64);
809 metrics::gauge!("s3.client.num_auto_default_network_io").set(metrics.num_auto_default_network_io as f64);
810 metrics::gauge!("s3.client.num_auto_ranged_get_network_io").set(metrics.num_auto_ranged_get_network_io as f64);
811 metrics::gauge!("s3.client.num_auto_ranged_put_network_io").set(metrics.num_auto_ranged_put_network_io as f64);
812 metrics::gauge!("s3.client.num_auto_ranged_copy_network_io")
813 .set(metrics.num_auto_ranged_copy_network_io as f64);
814 metrics::gauge!("s3.client.num_total_network_io").set(metrics.num_total_network_io() as f64);
815 metrics::gauge!("s3.client.num_requests_stream_queued_waiting")
816 .set(metrics.num_requests_stream_queued_waiting as f64);
817 metrics::gauge!("s3.client.num_requests_streaming_response")
818 .set(metrics.num_requests_streaming_response as f64);
819
820 let start = Instant::now();
822 let buffer_pool_stats = s3_client.poll_buffer_pool_usage_stats();
823 metrics::histogram!("s3.client.buffer_pool.get_usage_latency_us").record(start.elapsed().as_micros() as f64);
824 metrics::gauge!("s3.client.buffer_pool.mem_limit").set(buffer_pool_stats.mem_limit as f64);
825 metrics::gauge!("s3.client.buffer_pool.primary_cutoff").set(buffer_pool_stats.primary_cutoff as f64);
826 metrics::gauge!("s3.client.buffer_pool.primary_used").set(buffer_pool_stats.primary_used as f64);
827 metrics::gauge!("s3.client.buffer_pool.primary_allocated").set(buffer_pool_stats.primary_allocated as f64);
828 metrics::gauge!("s3.client.buffer_pool.primary_reserved").set(buffer_pool_stats.primary_reserved as f64);
829 metrics::gauge!("s3.client.buffer_pool.primary_num_blocks").set(buffer_pool_stats.primary_num_blocks as f64);
830 metrics::gauge!("s3.client.buffer_pool.secondary_reserved").set(buffer_pool_stats.secondary_reserved as f64);
831 metrics::gauge!("s3.client.buffer_pool.secondary_used").set(buffer_pool_stats.secondary_used as f64);
832 metrics::gauge!("s3.client.buffer_pool.forced_used").set(buffer_pool_stats.forced_used as f64);
833 }
834
835 fn next_request_counter(&self) -> u64 {
836 self.next_request_counter.fetch_add(1, Ordering::SeqCst)
837 }
838}
839
840#[derive(Debug, Error)]
842enum ResponseHeadersError {
843 #[error("response headers are missing")]
844 MissingHeaders,
845}
846
847#[derive(Debug, Clone, Copy)]
849enum S3Operation {
850 DeleteObject,
851 GetObject,
852 GetObjectAttributes,
853 HeadBucket,
854 HeadObject,
855 ListObjects,
856 PutObject,
857 CopyObject,
858 PutObjectSingle,
859}
860
861impl S3Operation {
862 fn meta_request_type(&self) -> MetaRequestType {
864 match self {
865 S3Operation::GetObject => MetaRequestType::GetObject,
866 S3Operation::PutObject => MetaRequestType::PutObject,
867 S3Operation::CopyObject => MetaRequestType::CopyObject,
868 _ => MetaRequestType::Default,
869 }
870 }
871
872 fn operation_name(&self) -> Option<&'static str> {
875 match self {
876 S3Operation::DeleteObject => Some("DeleteObject"),
877 S3Operation::GetObject => None,
878 S3Operation::GetObjectAttributes => Some("GetObjectAttributes"),
879 S3Operation::HeadBucket => Some("HeadBucket"),
880 S3Operation::HeadObject => Some("HeadObject"),
881 S3Operation::ListObjects => Some("ListObjectsV2"),
882 S3Operation::PutObject => None,
883 S3Operation::CopyObject => None,
884 S3Operation::PutObjectSingle => Some("PutObject"),
885 }
886 }
887}
888
889#[derive(Debug)]
894struct S3Message<'a> {
895 inner: Message<'a>,
896 uri: Uri,
897 path_prefix: String,
898 checksum_config: Option<ChecksumConfig>,
899 signing_config: Option<SigningConfig>,
900}
901
902impl<'a> S3Message<'a> {
903 fn set_header(
906 &mut self,
907 header: &Header<impl AsRef<OsStr>, impl AsRef<OsStr>>,
908 ) -> Result<(), mountpoint_s3_crt::common::error::Error> {
909 self.inner.set_header(header)
910 }
911
912 fn set_request_path_and_query<P: AsRef<OsStr>>(
915 &mut self,
916 path: impl AsRef<OsStr>,
917 query: impl AsRef<[(P, P)]>,
918 ) -> Result<(), mountpoint_s3_crt::common::error::Error> {
919 const URLENCODE_QUERY_FRAGMENT: &AsciiSet =
921 &NON_ALPHANUMERIC.remove(b'-').remove(b'.').remove(b'_').remove(b'~');
922 const URLENCODE_PATH_FRAGMENT: &AsciiSet = &URLENCODE_QUERY_FRAGMENT.remove(b'/');
923
924 fn write_encoded_fragment(s: &mut OsString, piece: impl AsRef<OsStr>, encoding: &'static AsciiSet) {
925 let iter = percent_encode(piece.as_ref().as_bytes(), encoding);
926 s.extend(iter.map(|s| OsStr::from_bytes(s.as_bytes())));
927 }
928
929 let space_needed = self.path_prefix.len()
933 + path.as_ref().len()
934 + query
935 .as_ref()
936 .iter()
937 .map(|(key, value)| key.as_ref().len() + value.as_ref().len() + 2) .sum::<usize>();
939
940 let mut full_path = OsString::with_capacity(space_needed);
941
942 write_encoded_fragment(&mut full_path, &self.path_prefix, URLENCODE_PATH_FRAGMENT);
943 write_encoded_fragment(&mut full_path, &path, URLENCODE_PATH_FRAGMENT);
944
945 if !query.as_ref().is_empty() {
947 full_path.push("?");
948 for (i, (key, value)) in query.as_ref().iter().enumerate() {
949 if i != 0 {
950 full_path.push("&");
951 }
952 write_encoded_fragment(&mut full_path, key, URLENCODE_QUERY_FRAGMENT);
953 full_path.push("=");
954 write_encoded_fragment(&mut full_path, value, URLENCODE_QUERY_FRAGMENT);
955 }
956 }
957
958 self.inner.set_request_path(full_path)
959 }
960
961 fn set_request_path(&mut self, path: impl AsRef<OsStr>) -> Result<(), mountpoint_s3_crt::common::error::Error> {
964 self.set_request_path_and_query::<&str>(path, &[])
965 }
966
967 fn set_checksum_config(&mut self, checksum_config: Option<ChecksumConfig>) {
969 self.checksum_config = checksum_config;
970 }
971
972 fn set_body_stream(&mut self, input_stream: Option<InputStream<'a>>) -> Option<InputStream<'a>> {
975 self.inner.set_body_stream(input_stream)
976 }
977
978 fn set_content_length_header(
980 &mut self,
981 content_length: usize,
982 ) -> Result<(), mountpoint_s3_crt::common::error::Error> {
983 self.inner
984 .set_header(&Header::new("Content-Length", content_length.to_string()))
985 }
986
987 fn set_checksum_header(
989 &mut self,
990 checksum: &UploadChecksum,
991 ) -> Result<(), mountpoint_s3_crt::common::error::Error> {
992 let header = match checksum {
993 UploadChecksum::Crc64nvme(crc64) => Header::new("x-amz-checksum-crc64nvme", crc64nvme_to_base64(crc64)),
994 UploadChecksum::Crc32c(crc32c) => Header::new("x-amz-checksum-crc32c", crc32c_to_base64(crc32c)),
995 UploadChecksum::Crc32(crc32) => Header::new("x-amz-checksum-crc32", crc32_to_base64(crc32)),
996 UploadChecksum::Sha1(sha1) => Header::new("x-amz-checksum-sha1", sha1_to_base64(sha1)),
997 UploadChecksum::Sha256(sha256) => Header::new("x-amz-checksum-sha256", sha256_to_base64(sha256)),
998 };
999 self.inner.set_header(&header)
1000 }
1001
1002 fn into_options(self, operation: S3Operation) -> MetaRequestOptions<'a> {
1003 let mut options = MetaRequestOptions::new();
1004 if let Some(checksum_config) = self.checksum_config {
1005 options.checksum_config(checksum_config);
1006 }
1007 if let Some(signing_config) = self.signing_config {
1008 options.signing_config(signing_config);
1009 }
1010 options
1011 .message(self.inner)
1012 .endpoint(self.uri)
1013 .request_type(operation.meta_request_type());
1014 if let Some(operation_name) = operation.operation_name() {
1015 options.operation_name(operation_name);
1016 }
1017 options
1018 }
1019}
1020
1021#[derive(Debug)]
1022#[pin_project]
1023#[must_use]
1024struct S3MetaRequest<T, E> {
1025 #[pin]
1027 receiver: Fuse<oneshot::Receiver<ObjectClientResult<T, E, S3RequestError>>>,
1028 meta_request: CancellingMetaRequest,
1029}
1030
1031impl<T: Send, E: Send> Future for S3MetaRequest<T, E> {
1032 type Output = ObjectClientResult<T, E, S3RequestError>;
1033
1034 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1035 let this = self.project();
1036 this.receiver
1037 .poll(cx)
1038 .map(|result| result.unwrap_or_else(|err| Err(S3RequestError::internal_failure(err).into())))
1039 }
1040}
1041
1042impl<T: Send, E: Send> FusedFuture for S3MetaRequest<T, E> {
1043 fn is_terminated(&self) -> bool {
1044 self.receiver.is_terminated()
1045 }
1046}
1047
1048#[derive(Debug)]
1052struct CancellingMetaRequest {
1053 inner: MetaRequest,
1054}
1055
1056impl CancellingMetaRequest {
1057 fn wrap(meta_request: MetaRequest) -> Self {
1058 Self { inner: meta_request }
1059 }
1060}
1061
1062impl Drop for CancellingMetaRequest {
1063 fn drop(&mut self) {
1064 self.inner.cancel();
1065 }
1066}
1067
1068impl Deref for CancellingMetaRequest {
1069 type Target = MetaRequest;
1070
1071 fn deref(&self) -> &Self::Target {
1072 &self.inner
1073 }
1074}
1075
1076impl DerefMut for CancellingMetaRequest {
1077 fn deref_mut(&mut self) -> &mut Self::Target {
1078 &mut self.inner
1079 }
1080}
1081
1082#[derive(Error, Debug)]
1084#[non_exhaustive]
1085pub enum NewClientError {
1086 #[error("invalid S3 endpoint")]
1088 InvalidEndpoint(#[from] EndpointError),
1089 #[error("invalid AWS credentials")]
1091 ProviderFailure(#[source] mountpoint_s3_crt::common::error::Error),
1092 #[error("invalid configuration: {0}")]
1094 InvalidConfiguration(String),
1095 #[error("Unknown CRT error")]
1097 CrtError(#[source] mountpoint_s3_crt::common::error::Error),
1098}
1099
1100#[derive(Error, Debug)]
1102pub enum S3RequestError {
1103 #[error("Internal S3 client error")]
1105 InternalError(#[source] Box<dyn std::error::Error + Send + Sync>),
1106
1107 #[error("Unknown CRT error")]
1109 CrtError(#[from] mountpoint_s3_crt::common::error::Error),
1110
1111 #[error("Failed to construct request")]
1113 ConstructionFailure(#[from] ConstructionError),
1114
1115 #[error("Unknown response error: {0:?}")]
1117 ResponseError(MetaRequestResult),
1118
1119 #[error("Wrong region (expecting {0})")]
1121 IncorrectRegion(String),
1122
1123 #[error("Forbidden: {0}")]
1125 Forbidden(String, ClientErrorMetadata),
1126
1127 #[error("No signing credentials available, see CRT debug logs")]
1129 NoSigningCredentials,
1130
1131 #[error("Request canceled")]
1133 RequestCanceled,
1134
1135 #[error("Request throttled")]
1137 Throttled,
1138
1139 #[error("Polled for data with empty read window")]
1143 EmptyReadWindow,
1144}
1145
1146impl S3RequestError {
1147 fn construction_failure(inner: impl Into<ConstructionError>) -> Self {
1148 S3RequestError::ConstructionFailure(inner.into())
1149 }
1150
1151 fn internal_failure(inner: impl std::error::Error + Send + Sync + 'static) -> Self {
1152 S3RequestError::InternalError(Box::new(inner))
1153 }
1154}
1155
1156impl ProvideErrorMetadata for S3RequestError {
1157 fn meta(&self) -> ClientErrorMetadata {
1158 match self {
1159 Self::ResponseError(request_result) => {
1160 let http_code = if request_result.response_status >= 100 {
1161 Some(request_result.response_status)
1162 } else {
1163 None
1164 };
1165 ClientErrorMetadata {
1166 http_code,
1167 ..Default::default()
1168 }
1169 }
1170 Self::Forbidden(_, metadata) => metadata.clone(),
1171 Self::Throttled => ClientErrorMetadata {
1172 http_code: Some(503),
1173 ..Default::default()
1174 },
1175 _ => Default::default(),
1176 }
1177 }
1178}
1179
1180#[derive(Error, Debug)]
1181pub enum ConstructionError {
1182 #[error("Unknown CRT error")]
1184 CrtError(#[from] mountpoint_s3_crt::common::error::Error),
1185
1186 #[error("Invalid S3 endpoint")]
1188 InvalidEndpoint(#[from] EndpointError),
1189}
1190
1191fn request_type_to_metrics_string(request_type: RequestType) -> &'static str {
1196 match request_type {
1197 RequestType::Unknown => "Default",
1198 RequestType::HeadObject => "HeadObject",
1199 RequestType::GetObject => "GetObject",
1200 RequestType::ListParts => "ListParts",
1201 RequestType::CreateMultipartUpload => "CreateMultipartUpload",
1202 RequestType::UploadPart => "UploadPart",
1203 RequestType::AbortMultipartUpload => "AbortMultipartUpload",
1204 RequestType::CompleteMultipartUpload => "CompleteMultipartUpload",
1205 RequestType::UploadPartCopy => "UploadPartCopy",
1206 RequestType::CopyObject => "CopyObject",
1207 RequestType::PutObject => "PutObject",
1208 }
1209}
1210
1211fn extract_range_header(headers: &Headers) -> Option<Range<u64>> {
1213 let header = headers.get("Content-Range").ok()?;
1214 let value = header.value().to_str()?;
1215
1216 if !value.starts_with("bytes ") {
1219 return None;
1220 }
1221 let (_, value) = value.split_at("bytes ".len());
1222 let (range, _) = value.split_once('/')?;
1223 let (start, end) = range.split_once('-')?;
1224 let start = start.parse::<u64>().ok()?;
1225 let end = end.parse::<u64>().ok()?;
1226
1227 Some(start..end + 1)
1229}
1230
1231fn parse_checksum(headers: &Headers) -> Result<Checksum, HeadersError> {
1233 let checksum_crc32 = headers.get_as_optional_string("x-amz-checksum-crc32")?;
1234 let checksum_crc32c = headers.get_as_optional_string("x-amz-checksum-crc32c")?;
1235 let checksum_sha1 = headers.get_as_optional_string("x-amz-checksum-sha1")?;
1236 let checksum_sha256 = headers.get_as_optional_string("x-amz-checksum-sha256")?;
1237 let checksum_crc64nvme = headers.get_as_optional_string("x-amz-checksum-crc64nvme")?;
1238
1239 Ok(Checksum {
1240 checksum_crc64nvme,
1241 checksum_crc32,
1242 checksum_crc32c,
1243 checksum_sha1,
1244 checksum_sha256,
1245 })
1246}
1247
1248fn try_parse_generic_error(request_result: &MetaRequestResult) -> Option<S3RequestError> {
1250 fn try_parse_redirect(request_result: &MetaRequestResult) -> Option<S3RequestError> {
1252 let headers = request_result.error_response_headers.as_ref()?;
1253 let region_header = headers.get("x-amz-bucket-region").ok()?;
1254 let region = region_header.value().to_owned().into_string().ok()?;
1255 Some(S3RequestError::IncorrectRegion(region))
1256 }
1257
1258 fn try_parse_forbidden(request_result: &MetaRequestResult) -> Option<S3RequestError> {
1260 let Some(body) = request_result.error_response_body.as_ref() else {
1261 return Some(S3RequestError::Forbidden(
1264 "<no message>".to_owned(),
1265 ClientErrorMetadata {
1266 http_code: Some(request_result.response_status),
1267 ..Default::default()
1268 },
1269 ));
1270 };
1271 let error_elem = xmltree::Element::parse(body.as_bytes()).ok()?;
1272 let error_code = error_elem.get_child("Code")?;
1273 let error_code_str = error_code.get_text()?;
1274 if request_result.response_status == 403
1277 || matches!(
1278 error_code_str.deref(),
1279 "AccessDenied" | "InvalidToken" | "ExpiredToken" | "SignatureDoesNotMatch"
1280 )
1281 {
1282 let message = error_elem
1283 .get_child("Message")
1284 .and_then(|e| e.get_text())
1285 .unwrap_or(error_code_str.clone());
1286 Some(S3RequestError::Forbidden(
1287 message.to_string(),
1288 ClientErrorMetadata {
1289 http_code: Some(request_result.response_status),
1290 error_code: Some(error_code_str.to_string()),
1291 error_message: Some(message.into_owned()),
1292 },
1293 ))
1294 } else {
1295 None
1296 }
1297 }
1298
1299 fn try_parse_no_credentials_or_generic(request_result: &MetaRequestResult) -> S3RequestError {
1301 let crt_error_code = request_result.crt_error.raw_error();
1302 if crt_error_code == mountpoint_s3_crt::auth::ErrorCode::AWS_AUTH_SIGNING_NO_CREDENTIALS as i32 {
1303 S3RequestError::NoSigningCredentials
1304 } else {
1305 S3RequestError::CrtError(crt_error_code.into())
1306 }
1307 }
1308
1309 fn try_parse_throttled(request_result: &MetaRequestResult) -> Option<S3RequestError> {
1310 let crt_error_code = request_result.crt_error.raw_error();
1311 if crt_error_code == mountpoint_s3_crt::s3::ErrorCode::AWS_ERROR_S3_SLOW_DOWN as i32 {
1312 Some(S3RequestError::Throttled)
1313 } else {
1314 None
1315 }
1316 }
1317
1318 fn try_parse_canceled_request(request_result: &MetaRequestResult) -> Option<S3RequestError> {
1320 request_result.is_canceled().then_some(S3RequestError::RequestCanceled)
1321 }
1322
1323 match request_result.response_status {
1324 301 => try_parse_redirect(request_result),
1325 400 => try_parse_forbidden(request_result).or_else(|| try_parse_redirect(request_result)),
1328 403 => try_parse_forbidden(request_result),
1329 0 => try_parse_throttled(request_result)
1331 .or_else(|| try_parse_canceled_request(request_result))
1332 .or_else(|| Some(try_parse_no_credentials_or_generic(request_result))),
1333 _ => None,
1334 }
1335}
1336
1337fn emit_throughput_metric(bytes: u64, duration: Duration, op: &'static str) {
1340 let throughput_mbps = bytes as f64 / 1024.0 / 1024.0 / duration.as_secs_f64();
1341 const MEGABYTE: u64 = 1024 * 1024;
1343 let bucket = if bytes < MEGABYTE {
1344 "<1MiB"
1345 } else if bytes <= 16 * MEGABYTE {
1346 "1-16MiB"
1347 } else {
1348 ">16MiB"
1349 };
1350 metrics::histogram!("s3.meta_requests.throughput_mibs", "op" => op, "size" => bucket).record(throughput_mbps);
1351}
1352
1353#[cfg_attr(not(docsrs), async_trait)]
1354impl ObjectClient for S3CrtClient {
1355 type GetObjectResponse = S3GetObjectResponse;
1356 type PutObjectRequest = S3PutObjectRequest;
1357 type ClientError = S3RequestError;
1358
1359 fn read_part_size(&self) -> Option<usize> {
1360 Some(self.inner.read_part_size)
1361 }
1362
1363 fn write_part_size(&self) -> Option<usize> {
1364 Some(self.inner.write_part_size)
1369 }
1370
1371 fn initial_read_window_size(&self) -> Option<usize> {
1372 if self.inner.enable_backpressure {
1373 Some(self.inner.initial_read_window_size)
1374 } else {
1375 None
1376 }
1377 }
1378
1379 fn mem_usage_stats(&self) -> Option<BufferPoolUsageStats> {
1380 let start = Instant::now();
1381 let crt_buffer_pool_stats = self.inner.s3_client.poll_buffer_pool_usage_stats();
1382 metrics::histogram!("s3.client.buffer_pool.get_usage_latency_us").record(start.elapsed().as_micros() as f64);
1383 Some(crt_buffer_pool_stats)
1384 }
1385
1386 async fn delete_object(
1387 &self,
1388 bucket: &str,
1389 key: &str,
1390 ) -> ObjectClientResult<DeleteObjectResult, DeleteObjectError, Self::ClientError> {
1391 self.delete_object(bucket, key).await
1392 }
1393
1394 async fn copy_object(
1395 &self,
1396 source_bucket: &str,
1397 source_key: &str,
1398 destination_bucket: &str,
1399 destination_key: &str,
1400 params: &CopyObjectParams,
1401 ) -> ObjectClientResult<CopyObjectResult, CopyObjectError, S3RequestError> {
1402 self.copy_object(source_bucket, source_key, destination_bucket, destination_key, params)
1403 .await
1404 }
1405
1406 async fn get_object(
1407 &self,
1408 bucket: &str,
1409 key: &str,
1410 params: &GetObjectParams,
1411 ) -> ObjectClientResult<Self::GetObjectResponse, GetObjectError, Self::ClientError> {
1412 self.get_object(bucket, key, params).await
1413 }
1414
1415 async fn list_objects(
1416 &self,
1417 bucket: &str,
1418 continuation_token: Option<&str>,
1419 delimiter: &str,
1420 max_keys: usize,
1421 prefix: &str,
1422 ) -> ObjectClientResult<ListObjectsResult, ListObjectsError, Self::ClientError> {
1423 self.list_objects(bucket, continuation_token, delimiter, max_keys, prefix)
1424 .await
1425 }
1426
1427 async fn head_object(
1428 &self,
1429 bucket: &str,
1430 key: &str,
1431 params: &HeadObjectParams,
1432 ) -> ObjectClientResult<HeadObjectResult, HeadObjectError, Self::ClientError> {
1433 self.head_object(bucket, key, params).await
1434 }
1435
1436 async fn put_object(
1437 &self,
1438 bucket: &str,
1439 key: &str,
1440 params: &PutObjectParams,
1441 ) -> ObjectClientResult<Self::PutObjectRequest, PutObjectError, Self::ClientError> {
1442 self.put_object(bucket, key, params).await
1443 }
1444
1445 async fn put_object_single<'a>(
1446 &self,
1447 bucket: &str,
1448 key: &str,
1449 params: &PutObjectSingleParams,
1450 contents: impl AsRef<[u8]> + Send + 'a,
1451 ) -> ObjectClientResult<PutObjectResult, PutObjectError, Self::ClientError> {
1452 self.put_object_single(bucket, key, params, contents).await
1453 }
1454
1455 async fn get_object_attributes(
1456 &self,
1457 bucket: &str,
1458 key: &str,
1459 max_parts: Option<usize>,
1460 part_number_marker: Option<usize>,
1461 object_attributes: &[ObjectAttribute],
1462 ) -> ObjectClientResult<GetObjectAttributesResult, GetObjectAttributesError, Self::ClientError> {
1463 self.get_object_attributes(bucket, key, max_parts, part_number_marker, object_attributes)
1464 .await
1465 }
1466}
1467
1468pub trait OnTelemetry: std::fmt::Debug + Send + Sync {
1470 fn on_telemetry(&self, request_metrics: &RequestMetrics);
1471}
1472
1473#[cfg(test)]
1474mod tests {
1475 use mountpoint_s3_crt::common::error::Error;
1476 use rusty_fork::rusty_fork_test;
1477 use std::assert_eq;
1478
1479 use super::*;
1480 use test_case::test_case;
1481
1482 fn client_new_fails_with_invalid_part_size(part_size: usize) {
1484 let config = S3ClientConfig::default().part_size(part_size);
1485 let e = S3CrtClient::new(config).expect_err("creating a new client should fail");
1486 let message = if cfg!(target_pointer_width = "64") {
1487 "invalid configuration: part size must be at between 5MiB and 5GiB".to_string()
1488 } else {
1489 format!(
1490 "invalid configuration: part size must be at between 5MiB and {}GiB",
1491 usize::MAX / 1024 / 1024 / 1024
1492 )
1493 };
1494 assert_eq!(e.to_string(), message);
1495 }
1496
1497 #[cfg(target_pointer_width = "64")]
1500 #[test]
1501 fn client_new_fails_with_greater_part_size() {
1502 let part_size = 6 * 1024 * 1024 * 1024; client_new_fails_with_invalid_part_size(part_size);
1504 }
1505
1506 #[test]
1507 fn client_new_fails_with_smaller_part_size() {
1508 let part_size = 4 * 1024 * 1024; client_new_fails_with_invalid_part_size(part_size);
1510 }
1511
1512 #[test]
1514 fn test_user_agent_with_prefix() {
1515 let user_agent_prefix = String::from("someprefix");
1516 let expected_user_agent = "someprefix mountpoint-s3-client/";
1517
1518 let config = S3ClientConfig {
1519 user_agent: Some(UserAgent::new(Some(user_agent_prefix))),
1520 ..Default::default()
1521 };
1522
1523 let client = S3CrtClient::new(config).expect("Create test client");
1524
1525 let mut message = client
1526 .inner
1527 .new_request_template("GET", "amzn-s3-demo-bucket")
1528 .expect("new request template expected");
1529
1530 let headers = message.inner.get_headers().expect("Expected a block of HTTP headers");
1531
1532 let user_agent_header = headers
1533 .get("User-Agent")
1534 .expect("User Agent Header expected with given prefix");
1535 let user_agent_header_value = user_agent_header.value();
1536
1537 assert!(user_agent_header_value
1538 .to_string_lossy()
1539 .starts_with(expected_user_agent));
1540 }
1541
1542 fn assert_expected_host(expected_host: &str, endpoint_config: EndpointConfig) {
1543 let config = S3ClientConfig {
1544 endpoint_config,
1545 ..Default::default()
1546 };
1547
1548 let client = S3CrtClient::new(config).expect("create test client");
1549
1550 let mut message = client
1551 .inner
1552 .new_request_template("GET", "")
1553 .expect("new request template expected");
1554
1555 let headers = message.inner.get_headers().expect("expected a block of HTTP headers");
1556
1557 let host_header = headers.get("Host").expect("Host header expected");
1558 let host_header_value = host_header.value();
1559
1560 assert_eq!(host_header_value.to_string_lossy(), expected_host);
1561 }
1562
1563 rusty_fork_test! {
1565 #[test]
1566 fn test_endpoint_favors_parameter_over_env_variable() {
1567 let endpoint_uri = Uri::new_from_str(&Allocator::default(), "https://s3.us-west-2.amazonaws.com").unwrap();
1568 let endpoint_config = EndpointConfig::new("region-place-holder").endpoint(endpoint_uri);
1569 std::env::set_var("AWS_ENDPOINT_URL", "https://s3.us-east-1.amazonaws.com");
1570 assert_expected_host("s3.us-west-2.amazonaws.com", endpoint_config);
1572 }
1573
1574 #[test]
1575 fn test_endpoint_favors_env_variable() {
1576 let endpoint_config = EndpointConfig::new("us-east-1");
1577 std::env::set_var("AWS_ENDPOINT_URL", "https://s3.eu-west-1.amazonaws.com");
1578 assert_expected_host("s3.eu-west-1.amazonaws.com", endpoint_config);
1579 }
1580
1581 #[test]
1582 fn test_endpoint_with_invalid_env_variable() {
1583 let endpoint_config = EndpointConfig::new("us-east-1");
1584 std::env::set_var("AWS_ENDPOINT_URL", "htp:/bad:url");
1585 let config = S3ClientConfig {
1586 endpoint_config,
1587 ..Default::default()
1588 };
1589
1590 let client = S3CrtClient::new(config);
1591 match client {
1592 Ok(_) => panic!("expected an error"),
1593 Err(e) => assert_eq!(e.to_string().to_lowercase(), "invalid s3 endpoint"),
1594 }
1595 }
1596
1597 }
1598
1599 #[test]
1601 fn test_user_agent_without_prefix() {
1602 let expected_user_agent = "mountpoint-s3-client/";
1603
1604 let config: S3ClientConfig = Default::default();
1605
1606 let client = S3CrtClient::new(config).expect("Create test client");
1607
1608 let mut message = client
1609 .inner
1610 .new_request_template("GET", "amzn-s3-demo-bucket")
1611 .expect("new request template expected");
1612
1613 let headers = message.inner.get_headers().expect("Expected a block of HTTP headers");
1614
1615 let user_agent_header = headers
1616 .get("User-Agent")
1617 .expect("User Agent Header expected with given prefix");
1618 let user_agent_header_value = user_agent_header.value();
1619
1620 assert!(user_agent_header_value
1621 .to_string_lossy()
1622 .starts_with(expected_user_agent));
1623 }
1624
1625 #[test_case("bytes 200-1000/67589" => Some(200..1001))]
1626 #[test_case("bytes 200-1000/*" => Some(200..1001))]
1627 #[test_case("bytes 200-1000" => None)]
1628 #[test_case("bytes */67589" => None)]
1629 #[test_case("octets 200-1000]" => None)]
1630 fn parse_content_range(range: &str) -> Option<Range<u64>> {
1631 let mut headers = Headers::new(&Allocator::default()).unwrap();
1632 let header = Header::new("Content-Range", range);
1633 headers.add_header(&header).unwrap();
1634 extract_range_header(&headers)
1635 }
1636
1637 #[test]
1639 fn test_expected_bucket_owner() {
1640 let expected_bucket_owner = "111122223333";
1641
1642 let config: S3ClientConfig = S3ClientConfig::new().bucket_owner("111122223333");
1643
1644 let client = S3CrtClient::new(config).expect("Create test client");
1645
1646 let mut message = client
1647 .inner
1648 .new_request_template("GET", "amzn-s3-demo-bucket")
1649 .expect("new request template expected");
1650
1651 let headers = message.inner.get_headers().expect("Expected a block of HTTP headers");
1652
1653 let expected_bucket_owner_header = headers
1654 .get("x-amz-expected-bucket-owner")
1655 .expect("the headers should contain x-amz-expected-bucket-owner");
1656 let expected_bucket_owner_value = expected_bucket_owner_header.value();
1657
1658 assert!(expected_bucket_owner_value
1659 .to_string_lossy()
1660 .starts_with(expected_bucket_owner));
1661 }
1662
1663 fn make_result(
1664 response_status: i32,
1665 body: impl Into<OsString>,
1666 bucket_region_header: Option<&str>,
1667 ) -> MetaRequestResult {
1668 let error_response_headers = bucket_region_header.map(|h| {
1669 let mut headers = Headers::new(&Allocator::default()).unwrap();
1670 headers.add_header(&Header::new("x-amz-bucket-region", h)).unwrap();
1671 headers
1672 });
1673 MetaRequestResult {
1674 response_status,
1675 crt_error: 1i32.into(),
1676 error_response_headers,
1677 error_response_body: Some(body.into()),
1678 }
1679 }
1680
1681 #[test]
1682 fn parse_301_redirect() {
1683 let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>PermanentRedirect</Code><Message>The bucket you are attempting to access must be addressed using the specified endpoint. Please send all future requests to this endpoint.</Message><Endpoint>amzn-s3-demo-bucket.s3-us-west-2.amazonaws.com</Endpoint><Bucket>amzn-s3-demo-bucket</Bucket><RequestId>CM0Z9YFABRVSWXDJ</RequestId><HostId>HHmbUixasrJ02DlkOSCvJId897Jm0ERHuE2XMkSn2Oax1J/ad2+AU9nFrODN1ay13cWFgIAYBnI=</HostId></Error>"#;
1684 let result = make_result(301, OsStr::from_bytes(&body[..]), Some("us-west-2"));
1685 let result = try_parse_generic_error(&result);
1686 let Some(S3RequestError::IncorrectRegion(region)) = result else {
1687 panic!("wrong result, got: {:?}", result);
1688 };
1689 assert_eq!(region, "us-west-2");
1690 }
1691
1692 #[test]
1693 fn parse_403_access_denied() {
1694 let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>AccessDenied</Code><Message>Access Denied</Message><RequestId>CM0R497NB0WAQ977</RequestId><HostId>w1TqUKGaIuNAIgzqm/L2azuzgEBINxTngWPbV1iH2IvpLsVCCTKHJTh4HsGp4JnggHqVkA+KN1MGqHDw1+WEuA==</HostId></Error>"#;
1695 let result = make_result(403, OsStr::from_bytes(&body[..]), None);
1696 let result = try_parse_generic_error(&result);
1697 let Some(S3RequestError::Forbidden(message, _)) = result else {
1698 panic!("wrong result, got: {:?}", result);
1699 };
1700 assert_eq!(message, "Access Denied");
1701 }
1702
1703 #[test]
1704 fn parse_400_invalid_token() {
1705 let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>InvalidToken</Code><Message>The provided token is malformed or otherwise invalid.</Message><Token-0>THEREALTOKENGOESHERE</Token-0><RequestId>CBFNVADDAZ8661HK</RequestId><HostId>rb5dpgYeIFxi8p5BzVK8s8wG/nQ4a7C5kMBp/KWIT4bvOUihugpssMTy7xS0mispbz6IIaX8W1g=</HostId></Error>"#;
1706 let result = make_result(400, OsStr::from_bytes(&body[..]), None);
1707 let result = try_parse_generic_error(&result);
1708 let Some(S3RequestError::Forbidden(message, _)) = result else {
1709 panic!("wrong result, got: {:?}", result);
1710 };
1711 assert_eq!(message, "The provided token is malformed or otherwise invalid.");
1712 }
1713
1714 #[test]
1715 fn parse_400_expired_token() {
1716 let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>ExpiredToken</Code><Message>The provided token has expired.</Message><Token-0>THEREALTOKENGOESHERE</Token-0><RequestId>RFXW0E15XSRPJYSW</RequestId><HostId>djitP7S+g43JSzR4pMOJpOO3RYpQUOUsmD4AqhRe3v24+JB/c+vwOEZgI8A35KDUe1cqQ5yKHwg=</HostId></Error>"#;
1717 let result = make_result(400, OsStr::from_bytes(&body[..]), None);
1718 let result = try_parse_generic_error(&result);
1719 let Some(S3RequestError::Forbidden(message, _)) = result else {
1720 panic!("wrong result, got: {:?}", result);
1721 };
1722 assert_eq!(message, "The provided token has expired.");
1723 }
1724
1725 #[test]
1726 fn parse_400_redirect() {
1727 let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>AuthorizationHeaderMalformed</Code><Message>The authorization header is malformed; the region \'us-east-1\' is wrong; expecting \'us-west-2\'</Message><Region>us-west-2</Region><RequestId>VR3NH4JF5F39GB66</RequestId><HostId>ZDzYFC1w0E5K34+ZCAnvh9ZiGaAhvx5COyZVYTUnKvSP/694xCiXmJ2AEGZd5T1Epy9vB4EOOjk=</HostId></Error>"#;
1729 let result = make_result(400, OsStr::from_bytes(&body[..]), Some("us-west-2"));
1730 let result = try_parse_generic_error(&result);
1731 let Some(S3RequestError::IncorrectRegion(region)) = result else {
1732 panic!("wrong result, got: {:?}", result);
1733 };
1734 assert_eq!(region, "us-west-2");
1735 }
1736
1737 #[test]
1738 fn parse_403_signature_does_not_match() {
1739 let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>SignatureDoesNotMatch</Code><Message>The request signature we calculated does not match the signature you provided. Check your key and signing method.</Message><AWSAccessKeyId>ASIASMEXAMPLE0000000</AWSAccessKeyId><StringToSign>EXAMPLE</StringToSign><SignatureProvided>EXAMPLE</SignatureProvided><StringToSignBytes>EXAMPLE</StringToSignBytes><CanonicalRequest>EXAMPLE</CanonicalRequest><CanonicalRequestBytes>EXAMPLE</CanonicalRequestBytes><RequestId>A1F516XX5M8AATSQ</RequestId><HostId>qs9dULIp5ABM7U+H8nGfzKtMYTxvqxIVvOYZ8lEFBDyTF4Fe+876Y4bLptG4mb+PTZFyG4yaUjg=</HostId></Error>"#;
1740 let result = make_result(403, OsStr::from_bytes(&body[..]), None);
1741 let result = try_parse_generic_error(&result);
1742 let Some(S3RequestError::Forbidden(message, _)) = result else {
1743 panic!("wrong result, got: {:?}", result);
1744 };
1745 assert_eq!(message, "The request signature we calculated does not match the signature you provided. Check your key and signing method.");
1746 }
1747
1748 #[test]
1749 fn parse_403_made_up_error() {
1750 let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>NotARealError</Code><Message>This error is made up.</Message><RequestId>CM0R497NB0WAQ977</RequestId><HostId>w1TqUKGaIuNAIgzqm/L2azuzgEBINxTngWPbV1iH2IvpLsVCCTKHJTh4HsGp4JnggHqVkA+KN1MGqHDw1+WEuA==</HostId></Error>"#;
1752 let result = make_result(403, OsStr::from_bytes(&body[..]), None);
1753 let result = try_parse_generic_error(&result);
1754 let Some(S3RequestError::Forbidden(message, _)) = result else {
1755 panic!("wrong result, got: {:?}", result);
1756 };
1757 assert_eq!(message, "This error is made up.");
1758 }
1759
1760 fn make_crt_error_result(response_status: i32, crt_error: Error) -> MetaRequestResult {
1761 MetaRequestResult {
1762 response_status,
1763 crt_error,
1764 error_response_headers: None,
1765 error_response_body: None,
1766 }
1767 }
1768
1769 #[test]
1770 fn parse_no_signing_credential_error() {
1771 let error_code = mountpoint_s3_crt::auth::ErrorCode::AWS_AUTH_SIGNING_NO_CREDENTIALS as i32;
1772 let result = make_crt_error_result(0, error_code.into());
1773 let result = try_parse_generic_error(&result);
1774 let Some(S3RequestError::NoSigningCredentials) = result else {
1775 panic!("wrong result, got: {:?}", result);
1776 };
1777 }
1778
1779 #[test]
1780 fn parse_test_other_crt_error() {
1781 let error_code = mountpoint_s3_crt::auth::ErrorCode::AWS_AUTH_SIGNING_UNSUPPORTED_ALGORITHM as i32;
1783 let result = make_crt_error_result(0, error_code.into());
1784 let result = try_parse_generic_error(&result);
1785 let Some(S3RequestError::CrtError(error)) = result else {
1786 panic!("wrong result, got: {:?}", result);
1787 };
1788 assert_eq!(error, error_code.into());
1789 }
1790
1791 #[test]
1792 fn test_checksum_sha256() {
1793 let mut headers = Headers::new(&Allocator::default()).unwrap();
1794 let value = "QwzjTQIHJO11oZbfwq1nx3dy0Wk=";
1795 let header = Header::new("x-amz-checksum-sha256", value.to_owned());
1796 headers.add_header(&header).unwrap();
1797
1798 let checksum = parse_checksum(&headers).expect("failed to parse headers");
1799 assert_eq!(checksum.checksum_crc32, None, "other checksums shouldn't be set");
1800 assert_eq!(checksum.checksum_crc32c, None, "other checksums shouldn't be set");
1801 assert_eq!(checksum.checksum_sha1, None, "other checksums shouldn't be set");
1802 assert_eq!(
1803 checksum.checksum_sha256,
1804 Some(value.to_owned()),
1805 "sha256 header should match"
1806 );
1807 }
1808}