aws_sdk_s3/
s3_express.rs

1// Code generated by software.amazon.smithy.rust.codegen.smithy-rs. DO NOT EDIT.
2/*
3 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
4 * SPDX-License-Identifier: Apache-2.0
5 */
6
7/// Supporting code for S3 Express auth
8pub(crate) mod auth {
9    use aws_runtime::auth::sigv4::SigV4Signer;
10    use aws_smithy_runtime_api::client::auth::{AuthScheme, AuthSchemeId, Sign};
11    use aws_smithy_runtime_api::client::identity::SharedIdentityResolver;
12    use aws_smithy_runtime_api::client::runtime_components::GetIdentityResolver;
13
14    /// Auth scheme ID for S3 Express.
15    pub(crate) const SCHEME_ID: AuthSchemeId = AuthSchemeId::new("sigv4-s3express");
16
17    /// S3 Express auth scheme.
18    #[derive(Debug, Default)]
19    pub(crate) struct S3ExpressAuthScheme {
20        signer: SigV4Signer,
21    }
22
23    impl S3ExpressAuthScheme {
24        /// Creates a new `S3ExpressAuthScheme`.
25        pub(crate) fn new() -> Self {
26            Default::default()
27        }
28    }
29
30    impl AuthScheme for S3ExpressAuthScheme {
31        fn scheme_id(&self) -> AuthSchemeId {
32            SCHEME_ID
33        }
34
35        fn identity_resolver(&self, identity_resolvers: &dyn GetIdentityResolver) -> Option<SharedIdentityResolver> {
36            identity_resolvers.identity_resolver(self.scheme_id())
37        }
38
39        fn signer(&self) -> &dyn Sign {
40            &self.signer
41        }
42    }
43}
44
45/// Supporting code for S3 Express identity cache
46pub(crate) mod identity_cache {
47    use aws_credential_types::Credentials;
48    use aws_smithy_async::time::SharedTimeSource;
49    use aws_smithy_runtime::expiring_cache::ExpiringCache;
50    use aws_smithy_runtime_api::box_error::BoxError;
51    use aws_smithy_runtime_api::client::identity::Identity;
52    use aws_smithy_types::DateTime;
53    use fastrand::Rng;
54    use hmac::{digest::FixedOutput, Hmac, Mac};
55    use lru::LruCache;
56    use sha2::Sha256;
57    use std::fmt;
58    use std::future::Future;
59    use std::hash::Hash;
60    use std::num::NonZeroUsize;
61    use std::sync::Mutex;
62    use std::time::{Duration, SystemTime};
63
64    pub(crate) const DEFAULT_MAX_CACHE_CAPACITY: usize = 100;
65    pub(crate) const DEFAULT_BUFFER_TIME: Duration = Duration::from_secs(10);
66
67    #[derive(Clone, Eq, PartialEq, Hash)]
68    pub(crate) struct CacheKey(String);
69
70    /// The caching implementation for S3 Express identity.
71    ///
72    /// While customers can either disable S3 Express itself or provide a custom S3 Express identity
73    /// provider, configuring S3 Express identity cache is not supported. Thus, this is _the_
74    /// implementation of S3 Express identity cache.
75    pub(crate) struct S3ExpressIdentityCache {
76        inner: Mutex<LruCache<CacheKey, ExpiringCache<Identity, BoxError>>>,
77        time_source: SharedTimeSource,
78        buffer_time: Duration,
79        random_bytes: [u8; 64],
80    }
81
82    impl fmt::Debug for S3ExpressIdentityCache {
83        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84            let (size, capacity) = {
85                let cache = self.inner.lock().unwrap();
86                (cache.len(), cache.cap())
87            };
88            write!(
89                f,
90                "S3ExpressIdentityCache {{ time_source: {:?}, buffer_time: {:?} }}, with size/capacity: {}/{}",
91                self.time_source, &self.buffer_time, size, capacity,
92            )
93        }
94    }
95
96    impl S3ExpressIdentityCache {
97        pub(crate) fn new(capacity: usize, time_source: SharedTimeSource, buffer_time: Duration) -> Self {
98            // It'd be nice to use a cryptographically secure random generator but not necessary.
99            // The cache is memory only and randomization here is mostly to obfuscate the key and
100            // make it reasonable length.
101            let mut rng = Rng::default();
102            let mut random_bytes = [0u8; 64];
103            rng.fill(&mut random_bytes);
104            Self {
105                inner: Mutex::new(LruCache::new(NonZeroUsize::new(capacity).unwrap())),
106                time_source,
107                buffer_time,
108                random_bytes,
109            }
110        }
111
112        pub(crate) fn key(&self, bucket_name: &str, creds: &Credentials) -> CacheKey {
113            CacheKey({
114                let mut mac = Hmac::<Sha256>::new_from_slice(self.random_bytes.as_slice()).expect("should be created from random 64 bytes");
115                let input = format!("{}{}", creds.access_key_id(), creds.secret_access_key());
116                mac.update(input.as_ref());
117                let mut inner = hex::encode(mac.finalize_fixed());
118                inner.push_str(bucket_name);
119                inner
120            })
121        }
122
123        pub(crate) async fn get_or_load<F, Fut>(&self, key: CacheKey, loader: F) -> Result<Identity, BoxError>
124        where
125            F: FnOnce() -> Fut,
126            Fut: Future<Output = Result<(Identity, SystemTime), BoxError>>,
127        {
128            let expiring_cache = {
129                let mut inner = self.inner.lock().unwrap();
130                inner.get_or_insert_mut(key, || ExpiringCache::new(self.buffer_time)).clone()
131            };
132
133            let now = self.time_source.now();
134
135            match expiring_cache.yield_or_clear_if_expired(now).await {
136                Some(identity) => {
137                    tracing::debug!(
138                        buffer_time=?self.buffer_time,
139                        cached_expiration=?identity.expiration(),
140                        now=?now,
141                        "loaded identity from cache"
142                    );
143                    Ok(identity)
144                }
145                None => {
146                    let start_time = self.time_source.now();
147                    let identity = expiring_cache.get_or_load(loader).await?;
148                    let expiration = identity.expiration().ok_or("SessionCredentials` always has expiration")?;
149                    let printable = DateTime::from(expiration);
150                    tracing::info!(
151                        new_expiration=%printable,
152                        valid_for=?expiration.duration_since(self.time_source.now()).unwrap_or_default(),
153                        "identity cache miss occurred; added new identity (took {:?})",
154                        self.time_source.now().duration_since(start_time).unwrap_or_default()
155                    );
156                    Ok(identity)
157                }
158            }
159        }
160    }
161
162    #[cfg(test)]
163    mod tests {
164        use super::*;
165        use aws_smithy_async::rt::sleep::TokioSleep;
166        use aws_smithy_async::test_util::ManualTimeSource;
167        use aws_smithy_runtime_api::client::identity::http::Token;
168        use aws_smithy_runtime_api::client::identity::{IdentityFuture, ResolveIdentity, SharedIdentityResolver};
169        use aws_smithy_runtime_api::client::runtime_components::{RuntimeComponents, RuntimeComponentsBuilder};
170        use aws_smithy_runtime_api::shared::IntoShared;
171        use aws_smithy_types::config_bag::ConfigBag;
172        use futures_util::stream::FuturesUnordered;
173        use std::sync::Arc;
174        use std::time::{Duration, SystemTime, UNIX_EPOCH};
175        use tracing::info;
176
177        fn epoch_secs(secs: u64) -> SystemTime {
178            SystemTime::UNIX_EPOCH + Duration::from_secs(secs)
179        }
180
181        fn identity_expiring_in(expired_secs: u64) -> Identity {
182            let expiration = Some(epoch_secs(expired_secs));
183            Identity::new(Token::new("test", expiration), expiration)
184        }
185
186        fn test_identity_resolver(load_list: Vec<Result<Identity, BoxError>>) -> SharedIdentityResolver {
187            #[derive(Debug)]
188            struct Resolver(Mutex<Vec<Result<Identity, BoxError>>>);
189            impl ResolveIdentity for Resolver {
190                fn resolve_identity<'a>(&'a self, _: &'a RuntimeComponents, _config_bag: &'a ConfigBag) -> IdentityFuture<'a> {
191                    let mut list = self.0.lock().unwrap();
192                    if list.len() > 0 {
193                        let next = list.remove(0);
194                        info!("refreshing the identity to {:?}", next);
195                        IdentityFuture::ready(next)
196                    } else {
197                        drop(list);
198                        panic!("no more identities")
199                    }
200                }
201            }
202
203            SharedIdentityResolver::new(Resolver(Mutex::new(load_list)))
204        }
205
206        async fn load(identity_resolver: SharedIdentityResolver, runtime_components: &RuntimeComponents) -> Result<(Identity, SystemTime), BoxError> {
207            let identity = identity_resolver.resolve_identity(&runtime_components, &ConfigBag::base()).await.unwrap();
208            Ok((identity.clone(), identity.expiration().unwrap()))
209        }
210
211        async fn expect_identity<F, Fut>(expired_secs: u64, sut: &S3ExpressIdentityCache, key: CacheKey, loader: F)
212        where
213            F: FnOnce() -> Fut,
214            Fut: Future<Output = Result<(Identity, SystemTime), BoxError>>,
215        {
216            let identity = sut.get_or_load(key, loader).await.unwrap();
217            assert_eq!(Some(epoch_secs(expired_secs)), identity.expiration());
218        }
219
220        #[tokio::test]
221        async fn reload_expired_test_identity() {
222            let time = ManualTimeSource::new(UNIX_EPOCH);
223            let runtime_components = RuntimeComponentsBuilder::for_tests()
224                .with_time_source(Some(time.clone()))
225                .with_sleep_impl(Some(TokioSleep::new()))
226                .build()
227                .unwrap();
228
229            let sut = S3ExpressIdentityCache::new(1, time.clone().into_shared(), DEFAULT_BUFFER_TIME);
230
231            let identity_resolver = test_identity_resolver(vec![Ok(identity_expiring_in(1000)), Ok(identity_expiring_in(2000))]);
232
233            let key = sut.key("test-bucket--usw2-az1--x-s3", &Credentials::for_tests_with_session_token());
234
235            // First call to the cache, populating a cache entry.
236            expect_identity(1000, &sut, key.clone(), || {
237                let identity_resolver = identity_resolver.clone();
238                let runtime_components = runtime_components.clone();
239                async move { load(identity_resolver, &runtime_components).await }
240            })
241            .await;
242
243            // Testing for a cache hit by advancing time such that the updated time is before the expiration of the first identity
244            // i.e. 500 < 1000.
245            time.set_time(epoch_secs(500));
246
247            expect_identity(1000, &sut, key.clone(), || async move { panic!("new identity should not be loaded") }).await;
248
249            // Testing for a cache miss by advancing time such that the updated time is now after the expiration of the first identity
250            // and before the expiration of the second identity i.e. 1000 < 1500 && 1500 < 2000.
251            time.set_time(epoch_secs(1500));
252
253            expect_identity(2000, &sut, key, || async move { load(identity_resolver, &runtime_components).await }).await;
254        }
255
256        #[test]
257        fn load_contention() {
258            let rt = tokio::runtime::Builder::new_multi_thread()
259                .enable_time()
260                .worker_threads(16)
261                .build()
262                .unwrap();
263
264            let time = ManualTimeSource::new(epoch_secs(0));
265            let runtime_components = RuntimeComponentsBuilder::for_tests()
266                .with_time_source(Some(time.clone()))
267                .with_sleep_impl(Some(TokioSleep::new()))
268                .build()
269                .unwrap();
270
271            let number_of_buckets = 4;
272            let sut = Arc::new(S3ExpressIdentityCache::new(
273                number_of_buckets,
274                time.clone().into_shared(),
275                DEFAULT_BUFFER_TIME,
276            ));
277
278            // Nested for loops below advance time by 200 in total, and each identity has the expiration
279            // such that no matter what order async tasks are executed, it never expires.
280            let safe_expiration = number_of_buckets as u64 * 50 + DEFAULT_BUFFER_TIME.as_secs() + 1;
281            let identity_resolver = test_identity_resolver(vec![
282                Ok(identity_expiring_in(safe_expiration)),
283                Ok(identity_expiring_in(safe_expiration)),
284                Ok(identity_expiring_in(safe_expiration)),
285                Ok(identity_expiring_in(safe_expiration)),
286            ]);
287
288            let mut tasks = Vec::new();
289            for i in 0..number_of_buckets {
290                let key = sut.key(&format!("test-bucket-{i}-usw2-az1--x-s3"), &Credentials::for_tests_with_session_token());
291                for _ in 0..50 {
292                    let sut = sut.clone();
293                    let key = key.clone();
294                    let identity_resolver = identity_resolver.clone();
295                    let time = time.clone();
296                    let runtime_components = runtime_components.clone();
297                    tasks.push(rt.spawn(async move {
298                        let now = time.advance(Duration::from_secs(1));
299                        let identity: Identity = sut
300                            .get_or_load(key, || async move { load(identity_resolver, &runtime_components).await })
301                            .await
302                            .unwrap();
303
304                        assert!(identity.expiration().unwrap() >= now, "{:?} >= {:?}", identity.expiration(), now);
305                    }));
306                }
307            }
308            let tasks = tasks.into_iter().collect::<FuturesUnordered<_>>();
309            for task in tasks {
310                rt.block_on(task).unwrap();
311            }
312        }
313
314        #[tokio::test]
315        async fn identity_fetch_triggered_by_lru_eviction() {
316            let time = ManualTimeSource::new(UNIX_EPOCH);
317            let runtime_components = RuntimeComponentsBuilder::for_tests()
318                .with_time_source(Some(time.clone()))
319                .with_sleep_impl(Some(TokioSleep::new()))
320                .build()
321                .unwrap();
322
323            // Create a cache of size 2.
324            let sut = S3ExpressIdentityCache::new(2, time.into_shared(), DEFAULT_BUFFER_TIME);
325
326            let identity_resolver = test_identity_resolver(vec![
327                Ok(identity_expiring_in(1000)),
328                Ok(identity_expiring_in(2000)),
329                Ok(identity_expiring_in(3000)),
330                Ok(identity_expiring_in(4000)),
331            ]);
332
333            let [key1, key2, key3] =
334                [1, 2, 3].map(|i| sut.key(&format!("test-bucket-{i}--usw2-az1--x-s3"), &Credentials::for_tests_with_session_token()));
335
336            // This should pupulate a cache entry for `key1`.
337            expect_identity(1000, &sut, key1.clone(), || {
338                let identity_resolver = identity_resolver.clone();
339                let runtime_components = runtime_components.clone();
340                async move { load(identity_resolver, &runtime_components).await }
341            })
342            .await;
343            // This immediate next call for `key1` should be a cache hit.
344            expect_identity(1000, &sut, key1.clone(), || async move { panic!("new identity should not be loaded") }).await;
345
346            // This should pupulate a cache entry for `key2`.
347            expect_identity(2000, &sut, key2, || {
348                let identity_resolver = identity_resolver.clone();
349                let runtime_components = runtime_components.clone();
350                async move { load(identity_resolver, &runtime_components).await }
351            })
352            .await;
353
354            // This should pupulate a cache entry for `key3`, but evicting a cache entry for `key1` because the cache is full.
355            expect_identity(3000, &sut, key3.clone(), || {
356                let identity_resolver = identity_resolver.clone();
357                let runtime_components = runtime_components.clone();
358                async move { load(identity_resolver, &runtime_components).await }
359            })
360            .await;
361
362            // Attempt to get an identity for `key1` should end up fetching a new one since its cache entry has been evicted.
363            // This fetch should now evict a cache entry for `key2`.
364            expect_identity(4000, &sut, key1, || async move { load(identity_resolver, &runtime_components).await }).await;
365
366            // A cache entry for `key3` should still exist in the cache.
367            expect_identity(3000, &sut, key3, || async move { panic!("new identity should not be loaded") }).await;
368        }
369    }
370}
371/// Supporting code for S3 Express identity provider
372pub(crate) mod identity_provider {
373    use std::time::{Duration, SystemTime};
374
375    use crate::s3_express::identity_cache::S3ExpressIdentityCache;
376    use crate::types::SessionCredentials;
377    use aws_credential_types::provider::error::CredentialsError;
378    use aws_credential_types::Credentials;
379    use aws_smithy_async::time::{SharedTimeSource, TimeSource};
380    use aws_smithy_runtime_api::box_error::BoxError;
381    use aws_smithy_runtime_api::client::endpoint::EndpointResolverParams;
382    use aws_smithy_runtime_api::client::identity::{Identity, IdentityCacheLocation, IdentityFuture, ResolveCachedIdentity, ResolveIdentity};
383    use aws_smithy_runtime_api::client::interceptors::SharedInterceptor;
384    use aws_smithy_runtime_api::client::runtime_components::{GetIdentityResolver, RuntimeComponents};
385    use aws_smithy_runtime_api::shared::IntoShared;
386    use aws_smithy_types::config_bag::ConfigBag;
387
388    use super::identity_cache::{DEFAULT_BUFFER_TIME, DEFAULT_MAX_CACHE_CAPACITY};
389
390    #[derive(Debug)]
391    pub(crate) struct DefaultS3ExpressIdentityProvider {
392        behavior_version: crate::config::BehaviorVersion,
393        cache: S3ExpressIdentityCache,
394    }
395
396    impl TryFrom<SessionCredentials> for Credentials {
397        type Error = BoxError;
398
399        fn try_from(session_creds: SessionCredentials) -> Result<Self, Self::Error> {
400            Ok(Credentials::new(
401                session_creds.access_key_id,
402                session_creds.secret_access_key,
403                Some(session_creds.session_token),
404                Some(
405                    SystemTime::try_from(session_creds.expiration)
406                        .map_err(|_| CredentialsError::unhandled("credential expiration time cannot be represented by a SystemTime"))?,
407                ),
408                "s3express",
409            ))
410        }
411    }
412
413    impl DefaultS3ExpressIdentityProvider {
414        pub(crate) fn builder() -> Builder {
415            Builder::default()
416        }
417
418        async fn identity<'a>(&'a self, runtime_components: &'a RuntimeComponents, config_bag: &'a ConfigBag) -> Result<Identity, BoxError> {
419            let bucket_name = self.bucket_name(config_bag)?;
420
421            let sigv4_identity_resolver = runtime_components
422                .identity_resolver(aws_runtime::auth::sigv4::SCHEME_ID)
423                .ok_or("identity resolver for sigv4 should be set for S3")?;
424            let aws_identity = runtime_components
425                .identity_cache()
426                .resolve_cached_identity(sigv4_identity_resolver, runtime_components, config_bag)
427                .await?;
428
429            let credentials = aws_identity
430                .data::<Credentials>()
431                .ok_or("wrong identity type for SigV4. Expected AWS credentials but got `{identity:?}")?;
432
433            let key = self.cache.key(bucket_name, credentials);
434            self.cache
435                .get_or_load(key, || async move {
436                    let creds = self.express_session_credentials(bucket_name, runtime_components, config_bag).await?;
437                    let data = Credentials::try_from(creds)?;
438                    Ok((Identity::new(data.clone(), data.expiry()), data.expiry().unwrap()))
439                })
440                .await
441        }
442
443        fn bucket_name<'a>(&'a self, config_bag: &'a ConfigBag) -> Result<&'a str, BoxError> {
444            let params = config_bag.load::<EndpointResolverParams>().expect("endpoint resolver params must be set");
445            let params = params
446                .get::<crate::config::endpoint::Params>()
447                .expect("`Params` should be wrapped in `EndpointResolverParams`");
448            params.bucket().ok_or("A bucket was not set in endpoint params".into())
449        }
450
451        async fn express_session_credentials<'a>(
452            &'a self,
453            bucket_name: &'a str,
454            runtime_components: &'a RuntimeComponents,
455            config_bag: &'a ConfigBag,
456        ) -> Result<SessionCredentials, BoxError> {
457            let mut config_builder = crate::config::Builder::from_config_bag(config_bag).behavior_version(self.behavior_version);
458
459            // inherits all runtime components from a current S3 operation but clears out
460            // out interceptors configured for that operation
461            let mut rc_builder = runtime_components.to_builder();
462            rc_builder.set_interceptors(std::iter::empty::<SharedInterceptor>());
463            config_builder.runtime_components = rc_builder;
464
465            let client = crate::Client::from_conf(config_builder.build());
466            let response = client.create_session().bucket(bucket_name).send().await?;
467
468            response.credentials.ok_or("no session credentials in response".into())
469        }
470    }
471
472    #[derive(Default)]
473    pub(crate) struct Builder {
474        behavior_version: Option<crate::config::BehaviorVersion>,
475        time_source: Option<SharedTimeSource>,
476        buffer_time: Option<Duration>,
477    }
478
479    impl Builder {
480        pub(crate) fn behavior_version(mut self, behavior_version: crate::config::BehaviorVersion) -> Self {
481            self.set_behavior_version(Some(behavior_version));
482            self
483        }
484        pub(crate) fn set_behavior_version(&mut self, behavior_version: Option<crate::config::BehaviorVersion>) -> &mut Self {
485            self.behavior_version = behavior_version;
486            self
487        }
488        pub(crate) fn time_source(mut self, time_source: impl TimeSource + 'static) -> Self {
489            self.set_time_source(time_source.into_shared());
490            self
491        }
492        pub(crate) fn set_time_source(&mut self, time_source: SharedTimeSource) -> &mut Self {
493            self.time_source = Some(time_source.into_shared());
494            self
495        }
496        #[allow(dead_code)]
497        pub(crate) fn buffer_time(mut self, buffer_time: Duration) -> Self {
498            self.set_buffer_time(Some(buffer_time));
499            self
500        }
501        #[allow(dead_code)]
502        pub(crate) fn set_buffer_time(&mut self, buffer_time: Option<Duration>) -> &mut Self {
503            self.buffer_time = buffer_time;
504            self
505        }
506        pub(crate) fn build(self) -> DefaultS3ExpressIdentityProvider {
507            DefaultS3ExpressIdentityProvider {
508                behavior_version: self.behavior_version.expect("required field `behavior_version` should be set"),
509                cache: S3ExpressIdentityCache::new(
510                    DEFAULT_MAX_CACHE_CAPACITY,
511                    self.time_source.unwrap_or_default(),
512                    self.buffer_time.unwrap_or(DEFAULT_BUFFER_TIME),
513                ),
514            }
515        }
516    }
517
518    impl ResolveIdentity for DefaultS3ExpressIdentityProvider {
519        fn resolve_identity<'a>(&'a self, runtime_components: &'a RuntimeComponents, config_bag: &'a ConfigBag) -> IdentityFuture<'a> {
520            IdentityFuture::new(async move { self.identity(runtime_components, config_bag).await })
521        }
522
523        fn cache_location(&self) -> IdentityCacheLocation {
524            IdentityCacheLocation::IdentityResolver
525        }
526    }
527}
528
529/// Supporting code for S3 Express runtime plugin
530pub(crate) mod runtime_plugin {
531    use std::borrow::Cow;
532
533    use aws_runtime::auth::SigV4SessionTokenNameOverride;
534    use aws_sigv4::http_request::{SignatureLocation, SigningSettings};
535    use aws_smithy_runtime_api::{
536        box_error::BoxError,
537        client::{runtime_components::RuntimeComponentsBuilder, runtime_plugin::RuntimePlugin},
538    };
539    use aws_smithy_types::config_bag::{ConfigBag, FrozenLayer, Layer};
540    use aws_types::os_shim_internal::Env;
541
542    mod env {
543        pub(super) const S3_DISABLE_EXPRESS_SESSION_AUTH: &str = "AWS_S3_DISABLE_EXPRESS_SESSION_AUTH";
544    }
545
546    #[derive(Debug)]
547    pub(crate) struct S3ExpressRuntimePlugin {
548        config: FrozenLayer,
549        runtime_components_builder: RuntimeComponentsBuilder,
550    }
551
552    impl S3ExpressRuntimePlugin {
553        // `new` will be called as `additional_client_plugins` within `base_client_runtime_plugins`.
554        // This guarantees that `new` receives a fully constructed service config, with required
555        // runtime components registered with `RuntimeComponents`.
556        pub(crate) fn new(service_config: crate::config::Config) -> Self {
557            Self::new_with(service_config, Env::real())
558        }
559
560        fn new_with(service_config: crate::config::Config, env: Env) -> Self {
561            Self {
562                config: config(service_config.config.load::<crate::config::DisableS3ExpressSessionAuth>().cloned(), env),
563                runtime_components_builder: runtime_components_builder(service_config),
564            }
565        }
566    }
567
568    fn config(disable_s3_express_session_token: Option<crate::config::DisableS3ExpressSessionAuth>, env: Env) -> FrozenLayer {
569        let mut layer = Layer::new("S3ExpressRuntimePlugin");
570        if disable_s3_express_session_token.is_none() {
571            match env.get(env::S3_DISABLE_EXPRESS_SESSION_AUTH) {
572                Ok(value) if value.eq_ignore_ascii_case("true") || value.eq_ignore_ascii_case("false") => {
573                    let value = value.to_lowercase().parse::<bool>().expect("just checked to be a bool-valued string");
574                    layer.store_or_unset(Some(crate::config::DisableS3ExpressSessionAuth(value)));
575                }
576                Ok(value) => {
577                    tracing::warn!(
578                        "environment variable `{}` ignored since it only accepts either `true` or `false` (case-insensitive), but got `{}`.",
579                        env::S3_DISABLE_EXPRESS_SESSION_AUTH,
580                        value
581                    )
582                }
583                _ => {
584                    // TODO(aws-sdk-rust#1073): Transfer a value of
585                    //  `s3_disable_express_session_auth` from a profile file to `layer`
586                }
587            }
588        }
589
590        let session_token_name_override = SigV4SessionTokenNameOverride::new(|settings: &SigningSettings, cfg: &ConfigBag| {
591            // Not configured for S3 express, use the original session token name override
592            if !crate::s3_express::utils::for_s3_express(cfg) {
593                return Ok(settings.session_token_name_override);
594            }
595
596            let session_token_name_override = Some(match settings.signature_location {
597                SignatureLocation::Headers => "x-amz-s3session-token",
598                SignatureLocation::QueryParams => "X-Amz-S3session-Token",
599                _ => {
600                    return Err(BoxError::from(
601                        "`SignatureLocation` adds a new variant, which needs to be handled in a separate match arm",
602                    ))
603                }
604            });
605            Ok(session_token_name_override)
606        });
607        layer.store_or_unset(Some(session_token_name_override));
608
609        layer.freeze()
610    }
611
612    fn runtime_components_builder(service_config: crate::config::Config) -> RuntimeComponentsBuilder {
613        match (
614            service_config.runtime_components.identity_resolver(&super::auth::SCHEME_ID),
615            service_config.runtime_components.identity_resolver(&aws_runtime::auth::sigv4::SCHEME_ID),
616        ) {
617            (None, Some(_)) => RuntimeComponentsBuilder::new("S3ExpressRuntimePlugin").with_identity_resolver(
618                super::auth::SCHEME_ID,
619                super::identity_provider::DefaultS3ExpressIdentityProvider::builder()
620                    .time_source(
621                        service_config
622                            .runtime_components
623                            .time_source()
624                            .expect("should be set in `service_config`"),
625                    )
626                    .behavior_version(service_config.behavior_version.expect("should be set in `service_config`"))
627                    .build(),
628            ),
629            _ => RuntimeComponentsBuilder::new("S3ExpressRuntimePlugin"),
630        }
631    }
632
633    impl RuntimePlugin for S3ExpressRuntimePlugin {
634        fn config(&self) -> Option<FrozenLayer> {
635            Some(self.config.clone())
636        }
637
638        fn runtime_components(&self, _: &RuntimeComponentsBuilder) -> Cow<'_, RuntimeComponentsBuilder> {
639            Cow::Borrowed(&self.runtime_components_builder)
640        }
641    }
642
643    #[cfg(test)]
644    mod tests {
645        use super::*;
646        use aws_credential_types::Credentials;
647        use aws_smithy_runtime_api::client::identity::ResolveIdentity;
648
649        #[test]
650        fn disable_option_set_from_service_client_should_take_the_highest_precedence() {
651            // Disable option is set from service client.
652            let disable_s3_express_session_token = crate::config::DisableS3ExpressSessionAuth(true);
653
654            // An environment variable says the session auth is _not_ disabled, but it will be
655            // overruled by what is in `layer`.
656            let actual = config(
657                Some(disable_s3_express_session_token),
658                Env::from_slice(&[(super::env::S3_DISABLE_EXPRESS_SESSION_AUTH, "false")]),
659            );
660
661            // A config layer from this runtime plugin should not provide a new `DisableS3ExpressSessionAuth`
662            // if the disable option is set from service client.
663            assert!(actual.load::<crate::config::DisableS3ExpressSessionAuth>().is_none());
664        }
665
666        #[test]
667        fn disable_option_set_from_env_should_take_the_second_highest_precedence() {
668            // An environment variable says session auth is disabled
669            let actual = config(None, Env::from_slice(&[(super::env::S3_DISABLE_EXPRESS_SESSION_AUTH, "true")]));
670
671            assert!(actual.load::<crate::config::DisableS3ExpressSessionAuth>().unwrap().0);
672        }
673
674        #[should_panic]
675        #[test]
676        fn disable_option_set_from_profile_file_should_take_the_lowest_precedence() {
677            // TODO(aws-sdk-rust#1073): Implement a test that mimics only setting
678            //  `s3_disable_express_session_auth` in a profile file
679            todo!()
680        }
681
682        #[test]
683        fn disable_option_should_be_unspecified_if_unset() {
684            let actual = config(None, Env::from_slice(&[]));
685
686            assert!(actual.load::<crate::config::DisableS3ExpressSessionAuth>().is_none());
687        }
688
689        #[test]
690        fn s3_express_runtime_plugin_should_set_default_identity_resolver() {
691            let config = crate::Config::builder()
692                .behavior_version_latest()
693                .time_source(aws_smithy_async::time::SystemTimeSource::new())
694                .credentials_provider(Credentials::for_tests())
695                .build();
696
697            let actual = runtime_components_builder(config);
698            assert!(actual.identity_resolver(&crate::s3_express::auth::SCHEME_ID).is_some());
699        }
700
701        #[test]
702        fn s3_express_plugin_should_not_set_default_identity_resolver_without_sigv4_counterpart() {
703            let config = crate::Config::builder()
704                .behavior_version_latest()
705                .time_source(aws_smithy_async::time::SystemTimeSource::new())
706                .build();
707
708            let actual = runtime_components_builder(config);
709            assert!(actual.identity_resolver(&crate::s3_express::auth::SCHEME_ID).is_none());
710        }
711
712        #[tokio::test]
713        async fn s3_express_plugin_should_not_set_default_identity_resolver_if_user_provided() {
714            let expected_access_key_id = "expected acccess key ID";
715            let config = crate::Config::builder()
716                .behavior_version_latest()
717                .credentials_provider(Credentials::for_tests())
718                .express_credentials_provider(Credentials::new(
719                    expected_access_key_id,
720                    "secret",
721                    None,
722                    None,
723                    "test express credentials provider",
724                ))
725                .time_source(aws_smithy_async::time::SystemTimeSource::new())
726                .build();
727
728            // `RuntimeComponentsBuilder` from `S3ExpressRuntimePlugin` should not provide an S3Express identity resolver.
729            let runtime_components_builder = runtime_components_builder(config.clone());
730            assert!(runtime_components_builder
731                .identity_resolver(&crate::s3_express::auth::SCHEME_ID)
732                .is_none());
733
734            // Get the S3Express identity resolver from the service config.
735            let express_identity_resolver = config.runtime_components.identity_resolver(&crate::s3_express::auth::SCHEME_ID).unwrap();
736            let creds = express_identity_resolver
737                .resolve_identity(&RuntimeComponentsBuilder::for_tests().build().unwrap(), &ConfigBag::base())
738                .await
739                .unwrap();
740
741            // Verify credentials are the one generated by the S3Express identity resolver user provided.
742            assert_eq!(expected_access_key_id, creds.data::<Credentials>().unwrap().access_key_id());
743        }
744    }
745}
746
747pub(crate) mod checksum {
748    use crate::http_request_checksum::DefaultRequestChecksumOverride;
749    use aws_smithy_checksums::ChecksumAlgorithm;
750    use aws_smithy_types::config_bag::ConfigBag;
751
752    pub(crate) fn provide_default_checksum_algorithm() -> crate::http_request_checksum::DefaultRequestChecksumOverride {
753        fn _provide_default_checksum_algorithm(original_checksum: Option<ChecksumAlgorithm>, cfg: &ConfigBag) -> Option<ChecksumAlgorithm> {
754            // S3 does not have the `ChecksumAlgorithm::Md5`, therefore customers cannot set it
755            // from outside.
756            if original_checksum != Some(ChecksumAlgorithm::Md5) {
757                return original_checksum;
758            }
759
760            if crate::s3_express::utils::for_s3_express(cfg) {
761                // S3 Express requires setting the default checksum algorithm to CRC-32
762                Some(ChecksumAlgorithm::Crc32)
763            } else {
764                original_checksum
765            }
766        }
767        DefaultRequestChecksumOverride::new(_provide_default_checksum_algorithm)
768    }
769}
770
771pub(crate) mod utils {
772    use aws_smithy_types::{config_bag::ConfigBag, Document};
773
774    pub(crate) fn for_s3_express(cfg: &ConfigBag) -> bool {
775        // logic borrowed from aws_smithy_runtime::client::orchestrator::auth::extract_endpoint_auth_scheme_config
776        let endpoint = cfg
777            .load::<crate::config::endpoint::Endpoint>()
778            .expect("endpoint added to config bag by endpoint orchestrator");
779
780        let auth_schemes = match endpoint.properties().get("authSchemes") {
781            Some(Document::Array(schemes)) => schemes,
782            _ => return false,
783        };
784        auth_schemes.iter().any(|doc| {
785            let config_scheme_id = doc.as_object().and_then(|object| object.get("name")).and_then(Document::as_string);
786            config_scheme_id == Some(crate::s3_express::auth::SCHEME_ID.as_str())
787        })
788    }
789}