1pub(crate) mod auth {
9 use aws_runtime::auth::sigv4::SigV4Signer;
10 use aws_smithy_runtime_api::client::auth::{AuthScheme, AuthSchemeId, Sign};
11 use aws_smithy_runtime_api::client::identity::SharedIdentityResolver;
12 use aws_smithy_runtime_api::client::runtime_components::GetIdentityResolver;
13
14 pub(crate) const SCHEME_ID: AuthSchemeId = AuthSchemeId::new("sigv4-s3express");
16
17 #[derive(Debug, Default)]
19 pub(crate) struct S3ExpressAuthScheme {
20 signer: SigV4Signer,
21 }
22
23 impl S3ExpressAuthScheme {
24 pub(crate) fn new() -> Self {
26 Default::default()
27 }
28 }
29
30 impl AuthScheme for S3ExpressAuthScheme {
31 fn scheme_id(&self) -> AuthSchemeId {
32 SCHEME_ID
33 }
34
35 fn identity_resolver(&self, identity_resolvers: &dyn GetIdentityResolver) -> Option<SharedIdentityResolver> {
36 identity_resolvers.identity_resolver(self.scheme_id())
37 }
38
39 fn signer(&self) -> &dyn Sign {
40 &self.signer
41 }
42 }
43}
44
45pub(crate) mod identity_cache {
47 use aws_credential_types::Credentials;
48 use aws_smithy_async::time::SharedTimeSource;
49 use aws_smithy_runtime::expiring_cache::ExpiringCache;
50 use aws_smithy_runtime_api::box_error::BoxError;
51 use aws_smithy_runtime_api::client::identity::Identity;
52 use aws_smithy_types::DateTime;
53 use fastrand::Rng;
54 use hmac::KeyInit;
55 use hmac::{digest::FixedOutput, Hmac, Mac};
56 use lru::LruCache;
57 use sha2::Sha256;
58 use std::fmt;
59 use std::future::Future;
60 use std::hash::Hash;
61 use std::num::NonZeroUsize;
62 use std::sync::Mutex;
63 use std::time::{Duration, SystemTime};
64
65 pub(crate) const DEFAULT_MAX_CACHE_CAPACITY: usize = 100;
66 pub(crate) const DEFAULT_BUFFER_TIME: Duration = Duration::from_secs(10);
67
68 #[derive(Clone, Eq, PartialEq, Hash)]
69 pub(crate) struct CacheKey(String);
70
71 pub(crate) struct S3ExpressIdentityCache {
77 inner: Mutex<LruCache<CacheKey, ExpiringCache<Identity, BoxError>>>,
78 time_source: SharedTimeSource,
79 buffer_time: Duration,
80 random_bytes: [u8; 64],
81 }
82
83 impl fmt::Debug for S3ExpressIdentityCache {
84 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
85 let (size, capacity) = {
86 let cache = self.inner.lock().unwrap();
87 (cache.len(), cache.cap())
88 };
89 write!(
90 f,
91 "S3ExpressIdentityCache {{ time_source: {:?}, buffer_time: {:?} }}, with size/capacity: {}/{}",
92 self.time_source, &self.buffer_time, size, capacity,
93 )
94 }
95 }
96
97 impl S3ExpressIdentityCache {
98 pub(crate) fn new(capacity: usize, time_source: SharedTimeSource, buffer_time: Duration) -> Self {
99 let mut rng = Rng::default();
103 let mut random_bytes = [0u8; 64];
104 rng.fill(&mut random_bytes);
105 Self {
106 inner: Mutex::new(LruCache::new(NonZeroUsize::new(capacity).unwrap())),
107 time_source,
108 buffer_time,
109 random_bytes,
110 }
111 }
112
113 pub(crate) fn key(&self, bucket_name: &str, creds: &Credentials) -> CacheKey {
114 CacheKey({
115 let mut mac = Hmac::<Sha256>::new_from_slice(self.random_bytes.as_slice()).expect("should be created from random 64 bytes");
116 let input = format!("{}{}", creds.access_key_id(), creds.secret_access_key());
117 mac.update(input.as_ref());
118 let mut inner = hex::encode(mac.finalize_fixed());
119 inner.push_str(bucket_name);
120 inner
121 })
122 }
123
124 pub(crate) async fn get_or_load<F, Fut>(&self, key: CacheKey, loader: F) -> Result<Identity, BoxError>
125 where
126 F: FnOnce() -> Fut,
127 Fut: Future<Output = Result<(Identity, SystemTime), BoxError>>,
128 {
129 let expiring_cache = {
130 let mut inner = self.inner.lock().unwrap();
131 inner.get_or_insert_mut(key, || ExpiringCache::new(self.buffer_time)).clone()
132 };
133
134 let now = self.time_source.now();
135
136 match expiring_cache.yield_or_clear_if_expired(now).await {
137 Some(identity) => {
138 tracing::debug!(
139 buffer_time=?self.buffer_time,
140 cached_expiration=?identity.expiration(),
141 now=?now,
142 "loaded identity from cache"
143 );
144 Ok(identity)
145 }
146 None => {
147 let start_time = self.time_source.now();
148 let identity = expiring_cache.get_or_load(loader).await?;
149 let expiration = identity.expiration().ok_or("SessionCredentials` always has expiration")?;
150 let printable = DateTime::from(expiration);
151 tracing::info!(
152 new_expiration=%printable,
153 valid_for=?expiration.duration_since(self.time_source.now()).unwrap_or_default(),
154 "identity cache miss occurred; added new identity (took {:?})",
155 self.time_source.now().duration_since(start_time).unwrap_or_default()
156 );
157 Ok(identity)
158 }
159 }
160 }
161 }
162
163 #[cfg(test)]
164 mod tests {
165 use super::*;
166 use aws_smithy_async::rt::sleep::TokioSleep;
167 use aws_smithy_async::test_util::ManualTimeSource;
168 use aws_smithy_runtime_api::client::identity::http::Token;
169 use aws_smithy_runtime_api::client::identity::{IdentityFuture, ResolveIdentity, SharedIdentityResolver};
170 use aws_smithy_runtime_api::client::runtime_components::{RuntimeComponents, RuntimeComponentsBuilder};
171 use aws_smithy_runtime_api::shared::IntoShared;
172 use aws_smithy_types::config_bag::ConfigBag;
173 use futures_util::stream::FuturesUnordered;
174 use std::sync::Arc;
175 use std::time::{Duration, SystemTime, UNIX_EPOCH};
176 use tracing::info;
177
178 fn epoch_secs(secs: u64) -> SystemTime {
179 SystemTime::UNIX_EPOCH + Duration::from_secs(secs)
180 }
181
182 fn identity_expiring_in(expired_secs: u64) -> Identity {
183 let expiration = Some(epoch_secs(expired_secs));
184 Identity::new(Token::new("test", expiration), expiration)
185 }
186
187 fn test_identity_resolver(load_list: Vec<Result<Identity, BoxError>>) -> SharedIdentityResolver {
188 #[derive(Debug)]
189 struct Resolver(Mutex<Vec<Result<Identity, BoxError>>>);
190 impl ResolveIdentity for Resolver {
191 fn resolve_identity<'a>(&'a self, _: &'a RuntimeComponents, _config_bag: &'a ConfigBag) -> IdentityFuture<'a> {
192 let mut list = self.0.lock().unwrap();
193 if list.len() > 0 {
194 let next = list.remove(0);
195 info!("refreshing the identity to {:?}", next);
196 IdentityFuture::ready(next)
197 } else {
198 drop(list);
199 panic!("no more identities")
200 }
201 }
202 }
203
204 SharedIdentityResolver::new(Resolver(Mutex::new(load_list)))
205 }
206
207 async fn load(identity_resolver: SharedIdentityResolver, runtime_components: &RuntimeComponents) -> Result<(Identity, SystemTime), BoxError> {
208 let identity = identity_resolver.resolve_identity(&runtime_components, &ConfigBag::base()).await.unwrap();
209 Ok((identity.clone(), identity.expiration().unwrap()))
210 }
211
212 async fn expect_identity<F, Fut>(expired_secs: u64, sut: &S3ExpressIdentityCache, key: CacheKey, loader: F)
213 where
214 F: FnOnce() -> Fut,
215 Fut: Future<Output = Result<(Identity, SystemTime), BoxError>>,
216 {
217 let identity = sut.get_or_load(key, loader).await.unwrap();
218 assert_eq!(Some(epoch_secs(expired_secs)), identity.expiration());
219 }
220
221 #[tokio::test]
222 async fn reload_expired_test_identity() {
223 let time = ManualTimeSource::new(UNIX_EPOCH);
224 let runtime_components = RuntimeComponentsBuilder::for_tests()
225 .with_time_source(Some(time.clone()))
226 .with_sleep_impl(Some(TokioSleep::new()))
227 .build()
228 .unwrap();
229
230 let sut = S3ExpressIdentityCache::new(1, time.clone().into_shared(), DEFAULT_BUFFER_TIME);
231
232 let identity_resolver = test_identity_resolver(vec![Ok(identity_expiring_in(1000)), Ok(identity_expiring_in(2000))]);
233
234 let key = sut.key("test-bucket--usw2-az1--x-s3", &Credentials::for_tests_with_session_token());
235
236 expect_identity(1000, &sut, key.clone(), || {
238 let identity_resolver = identity_resolver.clone();
239 let runtime_components = runtime_components.clone();
240 async move { load(identity_resolver, &runtime_components).await }
241 })
242 .await;
243
244 time.set_time(epoch_secs(500));
247
248 expect_identity(1000, &sut, key.clone(), || async move { panic!("new identity should not be loaded") }).await;
249
250 time.set_time(epoch_secs(1500));
253
254 expect_identity(2000, &sut, key, || async move { load(identity_resolver, &runtime_components).await }).await;
255 }
256
257 #[test]
258 fn load_contention() {
259 let rt = tokio::runtime::Builder::new_multi_thread()
260 .enable_time()
261 .worker_threads(16)
262 .build()
263 .unwrap();
264
265 let time = ManualTimeSource::new(epoch_secs(0));
266 let runtime_components = RuntimeComponentsBuilder::for_tests()
267 .with_time_source(Some(time.clone()))
268 .with_sleep_impl(Some(TokioSleep::new()))
269 .build()
270 .unwrap();
271
272 let number_of_buckets = 4;
273 let sut = Arc::new(S3ExpressIdentityCache::new(
274 number_of_buckets,
275 time.clone().into_shared(),
276 DEFAULT_BUFFER_TIME,
277 ));
278
279 let safe_expiration = number_of_buckets as u64 * 50 + DEFAULT_BUFFER_TIME.as_secs() + 1;
282 let identity_resolver = test_identity_resolver(vec![
283 Ok(identity_expiring_in(safe_expiration)),
284 Ok(identity_expiring_in(safe_expiration)),
285 Ok(identity_expiring_in(safe_expiration)),
286 Ok(identity_expiring_in(safe_expiration)),
287 ]);
288
289 let mut tasks = Vec::new();
290 for i in 0..number_of_buckets {
291 let key = sut.key(&format!("test-bucket-{i}-usw2-az1--x-s3"), &Credentials::for_tests_with_session_token());
292 for _ in 0..50 {
293 let sut = sut.clone();
294 let key = key.clone();
295 let identity_resolver = identity_resolver.clone();
296 let time = time.clone();
297 let runtime_components = runtime_components.clone();
298 tasks.push(rt.spawn(async move {
299 let now = time.advance(Duration::from_secs(1));
300 let identity: Identity = sut
301 .get_or_load(key, || async move { load(identity_resolver, &runtime_components).await })
302 .await
303 .unwrap();
304
305 assert!(identity.expiration().unwrap() >= now, "{:?} >= {:?}", identity.expiration(), now);
306 }));
307 }
308 }
309 let tasks = tasks.into_iter().collect::<FuturesUnordered<_>>();
310 for task in tasks {
311 rt.block_on(task).unwrap();
312 }
313 }
314
315 #[tokio::test]
316 async fn identity_fetch_triggered_by_lru_eviction() {
317 let time = ManualTimeSource::new(UNIX_EPOCH);
318 let runtime_components = RuntimeComponentsBuilder::for_tests()
319 .with_time_source(Some(time.clone()))
320 .with_sleep_impl(Some(TokioSleep::new()))
321 .build()
322 .unwrap();
323
324 let sut = S3ExpressIdentityCache::new(2, time.into_shared(), DEFAULT_BUFFER_TIME);
326
327 let identity_resolver = test_identity_resolver(vec![
328 Ok(identity_expiring_in(1000)),
329 Ok(identity_expiring_in(2000)),
330 Ok(identity_expiring_in(3000)),
331 Ok(identity_expiring_in(4000)),
332 ]);
333
334 let [key1, key2, key3] =
335 [1, 2, 3].map(|i| sut.key(&format!("test-bucket-{i}--usw2-az1--x-s3"), &Credentials::for_tests_with_session_token()));
336
337 expect_identity(1000, &sut, key1.clone(), || {
339 let identity_resolver = identity_resolver.clone();
340 let runtime_components = runtime_components.clone();
341 async move { load(identity_resolver, &runtime_components).await }
342 })
343 .await;
344 expect_identity(1000, &sut, key1.clone(), || async move { panic!("new identity should not be loaded") }).await;
346
347 expect_identity(2000, &sut, key2, || {
349 let identity_resolver = identity_resolver.clone();
350 let runtime_components = runtime_components.clone();
351 async move { load(identity_resolver, &runtime_components).await }
352 })
353 .await;
354
355 expect_identity(3000, &sut, key3.clone(), || {
357 let identity_resolver = identity_resolver.clone();
358 let runtime_components = runtime_components.clone();
359 async move { load(identity_resolver, &runtime_components).await }
360 })
361 .await;
362
363 expect_identity(4000, &sut, key1, || async move { load(identity_resolver, &runtime_components).await }).await;
366
367 expect_identity(3000, &sut, key3, || async move { panic!("new identity should not be loaded") }).await;
369 }
370 }
371}
372pub(crate) mod identity_provider {
374 use std::time::{Duration, SystemTime};
375
376 use crate::s3_express::identity_cache::S3ExpressIdentityCache;
377 use crate::types::SessionCredentials;
378 use aws_credential_types::credential_feature::AwsCredentialFeature;
379 use aws_credential_types::provider::error::CredentialsError;
380 use aws_credential_types::Credentials;
381 use aws_smithy_async::time::{SharedTimeSource, TimeSource};
382 use aws_smithy_runtime_api::box_error::BoxError;
383 use aws_smithy_runtime_api::client::endpoint::EndpointResolverParams;
384 use aws_smithy_runtime_api::client::identity::{Identity, IdentityCacheLocation, IdentityFuture, ResolveCachedIdentity, ResolveIdentity};
385 use aws_smithy_runtime_api::client::interceptors::SharedInterceptor;
386 use aws_smithy_runtime_api::client::runtime_components::{GetIdentityResolver, RuntimeComponents};
387 use aws_smithy_runtime_api::shared::IntoShared;
388 use aws_smithy_types::config_bag::ConfigBag;
389
390 use super::identity_cache::{DEFAULT_BUFFER_TIME, DEFAULT_MAX_CACHE_CAPACITY};
391
392 #[derive(Debug)]
393 pub(crate) struct DefaultS3ExpressIdentityProvider {
394 behavior_version: crate::config::BehaviorVersion,
395 cache: S3ExpressIdentityCache,
396 }
397
398 impl TryFrom<SessionCredentials> for Credentials {
399 type Error = BoxError;
400
401 fn try_from(session_creds: SessionCredentials) -> Result<Self, Self::Error> {
402 Ok(Credentials::new(
403 session_creds.access_key_id,
404 session_creds.secret_access_key,
405 Some(session_creds.session_token),
406 Some(
407 SystemTime::try_from(session_creds.expiration)
408 .map_err(|_| CredentialsError::unhandled("credential expiration time cannot be represented by a SystemTime"))?,
409 ),
410 "s3express",
411 ))
412 }
413 }
414
415 impl DefaultS3ExpressIdentityProvider {
416 pub(crate) fn builder() -> Builder {
417 Builder::default()
418 }
419
420 async fn identity<'a>(&'a self, runtime_components: &'a RuntimeComponents, config_bag: &'a ConfigBag) -> Result<Identity, BoxError> {
421 let bucket_name = self.bucket_name(config_bag)?;
422
423 let sigv4_identity_resolver = runtime_components
424 .identity_resolver(aws_runtime::auth::sigv4::SCHEME_ID)
425 .ok_or("identity resolver for sigv4 should be set for S3")?;
426 let aws_identity = runtime_components
427 .identity_cache()
428 .resolve_cached_identity(sigv4_identity_resolver, runtime_components, config_bag)
429 .await?;
430
431 let credentials = aws_identity
432 .data::<Credentials>()
433 .ok_or("wrong identity type for SigV4. Expected AWS credentials but got `{identity:?}")?;
434
435 let key = self.cache.key(bucket_name, credentials);
436 self.cache
437 .get_or_load(key, || async move {
438 let creds = self.express_session_credentials(bucket_name, runtime_components, config_bag).await?;
439 let mut data = Credentials::try_from(creds)?;
440 data.get_property_mut_or_default::<Vec<AwsCredentialFeature>>()
441 .push(AwsCredentialFeature::S3ExpressBucket);
442 let expiry = data.expiry().unwrap();
443 Ok((Identity::from(data), expiry))
444 })
445 .await
446 }
447
448 fn bucket_name<'a>(&'a self, config_bag: &'a ConfigBag) -> Result<&'a str, BoxError> {
449 let params = config_bag.load::<EndpointResolverParams>().expect("endpoint resolver params must be set");
450 let params = params
451 .get::<crate::config::endpoint::Params>()
452 .expect("`Params` should be wrapped in `EndpointResolverParams`");
453 params.bucket().ok_or("A bucket was not set in endpoint params".into())
454 }
455
456 async fn express_session_credentials<'a>(
457 &'a self,
458 bucket_name: &'a str,
459 runtime_components: &'a RuntimeComponents,
460 config_bag: &'a ConfigBag,
461 ) -> Result<SessionCredentials, BoxError> {
462 let mut config_builder = crate::config::Builder::from_config_bag(config_bag).behavior_version(self.behavior_version);
463
464 let mut rc_builder = runtime_components.to_builder();
467 rc_builder.set_interceptors(std::iter::empty::<SharedInterceptor>());
468 config_builder.runtime_components = rc_builder;
469
470 let client = crate::Client::from_conf(config_builder.build());
471 let response = client.create_session().bucket(bucket_name).send().await?;
472
473 response.credentials.ok_or("no session credentials in response".into())
474 }
475 }
476
477 #[derive(Default)]
478 pub(crate) struct Builder {
479 behavior_version: Option<crate::config::BehaviorVersion>,
480 time_source: Option<SharedTimeSource>,
481 buffer_time: Option<Duration>,
482 }
483
484 impl Builder {
485 pub(crate) fn behavior_version(mut self, behavior_version: crate::config::BehaviorVersion) -> Self {
486 self.set_behavior_version(Some(behavior_version));
487 self
488 }
489 pub(crate) fn set_behavior_version(&mut self, behavior_version: Option<crate::config::BehaviorVersion>) -> &mut Self {
490 self.behavior_version = behavior_version;
491 self
492 }
493 pub(crate) fn time_source(mut self, time_source: impl TimeSource + 'static) -> Self {
494 self.set_time_source(time_source.into_shared());
495 self
496 }
497 pub(crate) fn set_time_source(&mut self, time_source: SharedTimeSource) -> &mut Self {
498 self.time_source = Some(time_source.into_shared());
499 self
500 }
501 #[allow(dead_code)]
502 pub(crate) fn buffer_time(mut self, buffer_time: Duration) -> Self {
503 self.set_buffer_time(Some(buffer_time));
504 self
505 }
506 #[allow(dead_code)]
507 pub(crate) fn set_buffer_time(&mut self, buffer_time: Option<Duration>) -> &mut Self {
508 self.buffer_time = buffer_time;
509 self
510 }
511 pub(crate) fn build(self) -> DefaultS3ExpressIdentityProvider {
512 DefaultS3ExpressIdentityProvider {
513 behavior_version: self.behavior_version.expect("required field `behavior_version` should be set"),
514 cache: S3ExpressIdentityCache::new(
515 DEFAULT_MAX_CACHE_CAPACITY,
516 self.time_source.unwrap_or_default(),
517 self.buffer_time.unwrap_or(DEFAULT_BUFFER_TIME),
518 ),
519 }
520 }
521 }
522
523 impl ResolveIdentity for DefaultS3ExpressIdentityProvider {
524 fn resolve_identity<'a>(&'a self, runtime_components: &'a RuntimeComponents, config_bag: &'a ConfigBag) -> IdentityFuture<'a> {
525 IdentityFuture::new(async move { self.identity(runtime_components, config_bag).await })
526 }
527
528 fn cache_location(&self) -> IdentityCacheLocation {
529 IdentityCacheLocation::IdentityResolver
530 }
531 }
532
533 #[cfg(test)]
534 mod tests {
535 use super::*;
536 use aws_credential_types::credential_feature::AwsCredentialFeature;
537 use aws_credential_types::Credentials;
538
539 fn create_test_runtime_components(base_credentials: Credentials) -> aws_smithy_runtime_api::client::runtime_components::RuntimeComponents {
541 use aws_credential_types::provider::SharedCredentialsProvider;
542 use aws_smithy_runtime::client::http::test_util::infallible_client_fn;
543 use aws_smithy_runtime::client::orchestrator::endpoints::StaticUriEndpointResolver;
544 use aws_smithy_runtime::client::retries::strategy::NeverRetryStrategy;
545 use aws_smithy_runtime_api::client::auth::static_resolver::StaticAuthSchemeOptionResolver;
546 use aws_smithy_runtime_api::client::identity::SharedIdentityResolver;
547 use aws_smithy_runtime_api::client::runtime_components::RuntimeComponentsBuilder;
548 use aws_smithy_types::body::SdkBody;
549
550 let sigv4_resolver = SharedIdentityResolver::new(SharedCredentialsProvider::new(base_credentials));
551
552 let auth_option_resolver = StaticAuthSchemeOptionResolver::new(vec![aws_runtime::auth::sigv4::SCHEME_ID]);
554
555 let http_client = infallible_client_fn(|_req| {
556 http::Response::builder()
557 .status(200)
558 .body(SdkBody::from(
559 r#"<?xml version="1.0" encoding="UTF-8"?>
560 <CreateSessionResult>
561 <Credentials>
562 <AccessKeyId>session_access_key</AccessKeyId>
563 <SecretAccessKey>session_secret_key</SecretAccessKey>
564 <SessionToken>session_token</SessionToken>
565 <Expiration>2025-01-01T00:00:00Z</Expiration>
566 </Credentials>
567 </CreateSessionResult>"#,
568 ))
569 .unwrap()
570 });
571
572 RuntimeComponentsBuilder::for_tests()
573 .with_identity_resolver(aws_runtime::auth::sigv4::SCHEME_ID, sigv4_resolver)
574 .with_http_client(Some(http_client))
575 .with_time_source(Some(aws_smithy_async::time::SystemTimeSource::new()))
576 .with_retry_strategy(Some(NeverRetryStrategy::new()))
577 .with_auth_scheme_option_resolver(Some(auth_option_resolver))
578 .with_endpoint_resolver(Some(StaticUriEndpointResolver::http_localhost(8080)))
579 .build()
580 .unwrap()
581 }
582
583 fn create_test_config_bag(bucket_name: &str) -> aws_smithy_types::config_bag::ConfigBag {
585 use aws_smithy_runtime_api::client::endpoint::EndpointResolverParams;
586 use aws_smithy_runtime_api::client::stalled_stream_protection::StalledStreamProtectionConfig;
587 use aws_smithy_types::config_bag::{ConfigBag, Layer};
588
589 let mut config_bag = ConfigBag::base();
590 let mut layer = Layer::new("test");
591
592 let endpoint_params = EndpointResolverParams::new(crate::config::endpoint::Params::builder().bucket(bucket_name).build().unwrap());
593 layer.store_put(endpoint_params);
594
595 layer.store_put(StalledStreamProtectionConfig::disabled());
596
597 layer.store_put(crate::config::Region::new("us-west-2"));
598
599 config_bag.push_layer(layer);
600
601 config_bag
602 }
603
604 #[test]
605 fn test_session_credentials_conversion() {
606 let session_creds = SessionCredentials::builder()
607 .access_key_id("test_access_key")
608 .secret_access_key("test_secret_key")
609 .session_token("test_session_token")
610 .expiration(aws_smithy_types::DateTime::from_secs(1000))
611 .build()
612 .expect("valid session credentials");
613
614 let credentials = Credentials::try_from(session_creds).expect("conversion should succeed");
615
616 assert_eq!(credentials.access_key_id(), "test_access_key");
617 assert_eq!(credentials.secret_access_key(), "test_secret_key");
618 assert_eq!(credentials.session_token(), Some("test_session_token"));
619 }
620
621 #[tokio::test]
622 async fn test_identity_provider_embeds_s3express_feature() {
623 let bucket_name = "test-bucket--usw2-az1--x-s3";
624
625 let base_credentials = Credentials::for_tests();
627 let runtime_components = create_test_runtime_components(base_credentials);
628 let config_bag = create_test_config_bag(bucket_name);
629
630 let provider = DefaultS3ExpressIdentityProvider::builder()
632 .behavior_version(crate::config::BehaviorVersion::latest())
633 .time_source(aws_smithy_async::time::SystemTimeSource::new())
634 .build();
635
636 let identity = provider
638 .identity(&runtime_components, &config_bag)
639 .await
640 .expect("identity() should succeed");
641
642 let credentials = identity.data::<Credentials>().expect("Identity should contain Credentials");
643 let features = credentials
644 .get_property::<Vec<AwsCredentialFeature>>()
645 .expect("Credentials should have features");
646 assert!(
647 features.contains(&AwsCredentialFeature::S3ExpressBucket),
648 "S3ExpressBucket feature should be present in Credentials' property field"
649 );
650
651 let identity_layer = identity
652 .property::<aws_smithy_types::config_bag::FrozenLayer>()
653 .expect("Identity should have a property layer");
654 let identity_features: Vec<AwsCredentialFeature> = identity_layer.load::<AwsCredentialFeature>().cloned().collect();
655 assert!(
656 identity_features.contains(&AwsCredentialFeature::S3ExpressBucket),
657 "S3ExpressBucket feature should be present in Identity's property field"
658 );
659 }
660 }
661}
662
663pub(crate) mod runtime_plugin {
665 use std::borrow::Cow;
666
667 use aws_runtime::auth::SigV4SessionTokenNameOverride;
668 use aws_sigv4::http_request::{SignatureLocation, SigningSettings};
669 use aws_smithy_runtime_api::{
670 box_error::BoxError,
671 client::{runtime_components::RuntimeComponentsBuilder, runtime_plugin::RuntimePlugin},
672 };
673 use aws_smithy_types::config_bag::{ConfigBag, FrozenLayer, Layer};
674 use aws_types::os_shim_internal::Env;
675
676 mod env {
677 pub(super) const S3_DISABLE_EXPRESS_SESSION_AUTH: &str = "AWS_S3_DISABLE_EXPRESS_SESSION_AUTH";
678 }
679
680 #[derive(Debug)]
681 pub(crate) struct S3ExpressRuntimePlugin {
682 config: FrozenLayer,
683 runtime_components_builder: RuntimeComponentsBuilder,
684 }
685
686 impl S3ExpressRuntimePlugin {
687 pub(crate) fn new(service_config: crate::config::Config) -> Self {
691 Self::new_with(service_config, Env::real())
692 }
693
694 fn new_with(service_config: crate::config::Config, env: Env) -> Self {
695 Self {
696 config: config(service_config.config.load::<crate::config::DisableS3ExpressSessionAuth>().cloned(), env),
697 runtime_components_builder: runtime_components_builder(service_config),
698 }
699 }
700 }
701
702 fn config(disable_s3_express_session_token: Option<crate::config::DisableS3ExpressSessionAuth>, env: Env) -> FrozenLayer {
703 let mut layer = Layer::new("S3ExpressRuntimePlugin");
704 if disable_s3_express_session_token.is_none() {
705 match env.get(env::S3_DISABLE_EXPRESS_SESSION_AUTH) {
706 Ok(value) if value.eq_ignore_ascii_case("true") || value.eq_ignore_ascii_case("false") => {
707 let value = value.to_lowercase().parse::<bool>().expect("just checked to be a bool-valued string");
708 layer.store_or_unset(Some(crate::config::DisableS3ExpressSessionAuth(value)));
709 }
710 Ok(value) => {
711 tracing::warn!(
712 "environment variable `{}` ignored since it only accepts either `true` or `false` (case-insensitive), but got `{}`.",
713 env::S3_DISABLE_EXPRESS_SESSION_AUTH,
714 value
715 )
716 }
717 _ => {
718 }
721 }
722 }
723
724 let session_token_name_override = SigV4SessionTokenNameOverride::new(|settings: &SigningSettings, cfg: &ConfigBag| {
725 if !crate::s3_express::utils::for_s3_express(cfg) {
727 return Ok(settings.session_token_name_override);
728 }
729
730 let session_token_name_override = Some(match settings.signature_location {
731 SignatureLocation::Headers => "x-amz-s3session-token",
732 SignatureLocation::QueryParams => "X-Amz-S3session-Token",
733 _ => {
734 return Err(BoxError::from(
735 "`SignatureLocation` adds a new variant, which needs to be handled in a separate match arm",
736 ))
737 }
738 });
739 Ok(session_token_name_override)
740 });
741 layer.store_or_unset(Some(session_token_name_override));
742
743 layer.freeze()
744 }
745
746 fn runtime_components_builder(service_config: crate::config::Config) -> RuntimeComponentsBuilder {
747 match (
748 service_config.runtime_components.identity_resolver(&super::auth::SCHEME_ID),
749 service_config.runtime_components.identity_resolver(&aws_runtime::auth::sigv4::SCHEME_ID),
750 ) {
751 (None, Some(_)) => RuntimeComponentsBuilder::new("S3ExpressRuntimePlugin").with_identity_resolver(
752 super::auth::SCHEME_ID,
753 super::identity_provider::DefaultS3ExpressIdentityProvider::builder()
754 .time_source(
755 service_config
756 .runtime_components
757 .time_source()
758 .expect("should be set in `service_config`"),
759 )
760 .behavior_version(service_config.behavior_version.expect("should be set in `service_config`"))
761 .build(),
762 ),
763 _ => RuntimeComponentsBuilder::new("S3ExpressRuntimePlugin"),
764 }
765 }
766
767 impl RuntimePlugin for S3ExpressRuntimePlugin {
768 fn config(&self) -> Option<FrozenLayer> {
769 Some(self.config.clone())
770 }
771
772 fn runtime_components(&self, _: &RuntimeComponentsBuilder) -> Cow<'_, RuntimeComponentsBuilder> {
773 Cow::Borrowed(&self.runtime_components_builder)
774 }
775 }
776
777 #[cfg(test)]
778 mod tests {
779 use super::*;
780 use aws_credential_types::Credentials;
781 use aws_smithy_runtime_api::client::identity::ResolveIdentity;
782
783 #[test]
784 fn disable_option_set_from_service_client_should_take_the_highest_precedence() {
785 let disable_s3_express_session_token = crate::config::DisableS3ExpressSessionAuth(true);
787
788 let actual = config(
791 Some(disable_s3_express_session_token),
792 Env::from_slice(&[(super::env::S3_DISABLE_EXPRESS_SESSION_AUTH, "false")]),
793 );
794
795 assert!(actual.load::<crate::config::DisableS3ExpressSessionAuth>().is_none());
798 }
799
800 #[test]
801 fn disable_option_set_from_env_should_take_the_second_highest_precedence() {
802 let actual = config(None, Env::from_slice(&[(super::env::S3_DISABLE_EXPRESS_SESSION_AUTH, "true")]));
804
805 assert!(actual.load::<crate::config::DisableS3ExpressSessionAuth>().unwrap().0);
807 }
808
809 #[should_panic]
810 #[test]
811 fn disable_option_set_from_profile_file_should_take_the_lowest_precedence() {
812 todo!("TODO(aws-sdk-rust#1073): Implement profile file test")
813 }
814
815 #[test]
816 fn disable_option_should_be_unspecified_if_unset() {
817 let actual = config(None, Env::from_slice(&[]));
819
820 assert!(actual.load::<crate::config::DisableS3ExpressSessionAuth>().is_none());
822 }
823
824 #[test]
825 fn s3_express_runtime_plugin_should_set_default_identity_resolver() {
826 let config = crate::Config::builder()
828 .behavior_version_latest()
829 .time_source(aws_smithy_async::time::SystemTimeSource::new())
830 .credentials_provider(Credentials::for_tests())
831 .build();
832
833 let actual = runtime_components_builder(config);
834 assert!(actual.identity_resolver(&crate::s3_express::auth::SCHEME_ID).is_some());
836 }
837
838 #[test]
839 fn s3_express_plugin_should_not_set_default_identity_resolver_without_sigv4_counterpart() {
840 let config = crate::Config::builder()
842 .behavior_version_latest()
843 .time_source(aws_smithy_async::time::SystemTimeSource::new())
844 .build();
845
846 let actual = runtime_components_builder(config);
847 assert!(actual.identity_resolver(&crate::s3_express::auth::SCHEME_ID).is_none());
849 }
850
851 #[tokio::test]
852 async fn s3_express_plugin_should_not_set_default_identity_resolver_if_user_provided() {
853 let expected_access_key_id = "expected acccess key ID";
855 let config = crate::Config::builder()
856 .behavior_version_latest()
857 .credentials_provider(Credentials::for_tests())
858 .express_credentials_provider(Credentials::new(
859 expected_access_key_id,
860 "secret",
861 None,
862 None,
863 "test express credentials provider",
864 ))
865 .time_source(aws_smithy_async::time::SystemTimeSource::new())
866 .build();
867
868 let runtime_components_builder = runtime_components_builder(config.clone());
870 assert!(runtime_components_builder
871 .identity_resolver(&crate::s3_express::auth::SCHEME_ID)
872 .is_none());
873
874 let express_identity_resolver = config.runtime_components.identity_resolver(&crate::s3_express::auth::SCHEME_ID).unwrap();
876 let creds = express_identity_resolver
877 .resolve_identity(&RuntimeComponentsBuilder::for_tests().build().unwrap(), &ConfigBag::base())
878 .await
879 .unwrap();
880
881 assert_eq!(expected_access_key_id, creds.data::<Credentials>().unwrap().access_key_id());
882 }
883 }
884}
885
886pub(crate) mod checksum {
887 use crate::http_request_checksum::DefaultRequestChecksumOverride;
888 use aws_smithy_checksums::ChecksumAlgorithm;
889 use aws_smithy_types::config_bag::ConfigBag;
890
891 pub(crate) fn provide_default_checksum_algorithm() -> crate::http_request_checksum::DefaultRequestChecksumOverride {
892 fn _provide_default_checksum_algorithm(original_checksum: Option<ChecksumAlgorithm>, cfg: &ConfigBag) -> Option<ChecksumAlgorithm> {
893 if original_checksum != Some(ChecksumAlgorithm::Md5) {
896 return original_checksum;
897 }
898
899 if crate::s3_express::utils::for_s3_express(cfg) {
900 Some(ChecksumAlgorithm::Crc32)
902 } else {
903 original_checksum
904 }
905 }
906 DefaultRequestChecksumOverride::new(_provide_default_checksum_algorithm)
907 }
908}
909
910pub(crate) mod utils {
911 use aws_smithy_types::{config_bag::ConfigBag, Document};
912
913 pub(crate) fn for_s3_express(cfg: &ConfigBag) -> bool {
914 let endpoint = cfg
916 .load::<crate::config::endpoint::Endpoint>()
917 .expect("endpoint added to config bag by endpoint orchestrator");
918
919 let typed_schemes = endpoint.auth_schemes();
920 if !typed_schemes.is_empty() {
921 return typed_schemes
922 .iter()
923 .any(|scheme| scheme.name() == crate::s3_express::auth::SCHEME_ID.as_str());
924 }
925
926 let auth_schemes = match endpoint.properties().get("authSchemes") {
927 Some(Document::Array(schemes)) => schemes,
928 _ => return false,
929 };
930 auth_schemes.iter().any(|doc| {
931 let config_scheme_id = doc.as_object().and_then(|object| object.get("name")).and_then(Document::as_string);
932 config_scheme_id == Some(crate::s3_express::auth::SCHEME_ID.as_str())
933 })
934 }
935}