Skip to main content

aws_sdk_s3/
s3_express.rs

1// Code generated by software.amazon.smithy.rust.codegen.smithy-rs. DO NOT EDIT.
2/*
3 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
4 * SPDX-License-Identifier: Apache-2.0
5 */
6
7/// Supporting code for S3 Express auth
8pub(crate) mod auth {
9    use aws_runtime::auth::sigv4::SigV4Signer;
10    use aws_smithy_runtime_api::client::auth::{AuthScheme, AuthSchemeId, Sign};
11    use aws_smithy_runtime_api::client::identity::SharedIdentityResolver;
12    use aws_smithy_runtime_api::client::runtime_components::GetIdentityResolver;
13
14    /// Auth scheme ID for S3 Express.
15    pub(crate) const SCHEME_ID: AuthSchemeId = AuthSchemeId::new("sigv4-s3express");
16
17    /// S3 Express auth scheme.
18    #[derive(Debug, Default)]
19    pub(crate) struct S3ExpressAuthScheme {
20        signer: SigV4Signer,
21    }
22
23    impl S3ExpressAuthScheme {
24        /// Creates a new `S3ExpressAuthScheme`.
25        pub(crate) fn new() -> Self {
26            Default::default()
27        }
28    }
29
30    impl AuthScheme for S3ExpressAuthScheme {
31        fn scheme_id(&self) -> AuthSchemeId {
32            SCHEME_ID
33        }
34
35        fn identity_resolver(&self, identity_resolvers: &dyn GetIdentityResolver) -> Option<SharedIdentityResolver> {
36            identity_resolvers.identity_resolver(self.scheme_id())
37        }
38
39        fn signer(&self) -> &dyn Sign {
40            &self.signer
41        }
42    }
43}
44
45/// Supporting code for S3 Express identity cache
46pub(crate) mod identity_cache {
47    use aws_credential_types::Credentials;
48    use aws_smithy_async::time::SharedTimeSource;
49    use aws_smithy_runtime::expiring_cache::ExpiringCache;
50    use aws_smithy_runtime_api::box_error::BoxError;
51    use aws_smithy_runtime_api::client::identity::Identity;
52    use aws_smithy_types::DateTime;
53    use fastrand::Rng;
54    use hmac::KeyInit;
55    use hmac::{digest::FixedOutput, Hmac, Mac};
56    use lru::LruCache;
57    use sha2::Sha256;
58    use std::fmt;
59    use std::future::Future;
60    use std::hash::Hash;
61    use std::num::NonZeroUsize;
62    use std::sync::Mutex;
63    use std::time::{Duration, SystemTime};
64
65    pub(crate) const DEFAULT_MAX_CACHE_CAPACITY: usize = 100;
66    pub(crate) const DEFAULT_BUFFER_TIME: Duration = Duration::from_secs(10);
67
68    #[derive(Clone, Eq, PartialEq, Hash)]
69    pub(crate) struct CacheKey(String);
70
71    /// The caching implementation for S3 Express identity.
72    ///
73    /// While customers can either disable S3 Express itself or provide a custom S3 Express identity
74    /// provider, configuring S3 Express identity cache is not supported. Thus, this is _the_
75    /// implementation of S3 Express identity cache.
76    pub(crate) struct S3ExpressIdentityCache {
77        inner: Mutex<LruCache<CacheKey, ExpiringCache<Identity, BoxError>>>,
78        time_source: SharedTimeSource,
79        buffer_time: Duration,
80        random_bytes: [u8; 64],
81    }
82
83    impl fmt::Debug for S3ExpressIdentityCache {
84        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
85            let (size, capacity) = {
86                let cache = self.inner.lock().unwrap();
87                (cache.len(), cache.cap())
88            };
89            write!(
90                f,
91                "S3ExpressIdentityCache {{ time_source: {:?}, buffer_time: {:?} }}, with size/capacity: {}/{}",
92                self.time_source, &self.buffer_time, size, capacity,
93            )
94        }
95    }
96
97    impl S3ExpressIdentityCache {
98        pub(crate) fn new(capacity: usize, time_source: SharedTimeSource, buffer_time: Duration) -> Self {
99            // It'd be nice to use a cryptographically secure random generator but not necessary.
100            // The cache is memory only and randomization here is mostly to obfuscate the key and
101            // make it reasonable length.
102            let mut rng = Rng::default();
103            let mut random_bytes = [0u8; 64];
104            rng.fill(&mut random_bytes);
105            Self {
106                inner: Mutex::new(LruCache::new(NonZeroUsize::new(capacity).unwrap())),
107                time_source,
108                buffer_time,
109                random_bytes,
110            }
111        }
112
113        pub(crate) fn key(&self, bucket_name: &str, creds: &Credentials) -> CacheKey {
114            CacheKey({
115                let mut mac = Hmac::<Sha256>::new_from_slice(self.random_bytes.as_slice()).expect("should be created from random 64 bytes");
116                let input = format!("{}{}", creds.access_key_id(), creds.secret_access_key());
117                mac.update(input.as_ref());
118                let mut inner = hex::encode(mac.finalize_fixed());
119                inner.push_str(bucket_name);
120                inner
121            })
122        }
123
124        pub(crate) async fn get_or_load<F, Fut>(&self, key: CacheKey, loader: F) -> Result<Identity, BoxError>
125        where
126            F: FnOnce() -> Fut,
127            Fut: Future<Output = Result<(Identity, SystemTime), BoxError>>,
128        {
129            let expiring_cache = {
130                let mut inner = self.inner.lock().unwrap();
131                inner.get_or_insert_mut(key, || ExpiringCache::new(self.buffer_time)).clone()
132            };
133
134            let now = self.time_source.now();
135
136            match expiring_cache.yield_or_clear_if_expired(now).await {
137                Some(identity) => {
138                    tracing::debug!(
139                        buffer_time=?self.buffer_time,
140                        cached_expiration=?identity.expiration(),
141                        now=?now,
142                        "loaded identity from cache"
143                    );
144                    Ok(identity)
145                }
146                None => {
147                    let start_time = self.time_source.now();
148                    let identity = expiring_cache.get_or_load(loader).await?;
149                    let expiration = identity.expiration().ok_or("SessionCredentials` always has expiration")?;
150                    let printable = DateTime::from(expiration);
151                    tracing::info!(
152                        new_expiration=%printable,
153                        valid_for=?expiration.duration_since(self.time_source.now()).unwrap_or_default(),
154                        "identity cache miss occurred; added new identity (took {:?})",
155                        self.time_source.now().duration_since(start_time).unwrap_or_default()
156                    );
157                    Ok(identity)
158                }
159            }
160        }
161    }
162
163    #[cfg(test)]
164    mod tests {
165        use super::*;
166        use aws_smithy_async::rt::sleep::TokioSleep;
167        use aws_smithy_async::test_util::ManualTimeSource;
168        use aws_smithy_runtime_api::client::identity::http::Token;
169        use aws_smithy_runtime_api::client::identity::{IdentityFuture, ResolveIdentity, SharedIdentityResolver};
170        use aws_smithy_runtime_api::client::runtime_components::{RuntimeComponents, RuntimeComponentsBuilder};
171        use aws_smithy_runtime_api::shared::IntoShared;
172        use aws_smithy_types::config_bag::ConfigBag;
173        use futures_util::stream::FuturesUnordered;
174        use std::sync::Arc;
175        use std::time::{Duration, SystemTime, UNIX_EPOCH};
176        use tracing::info;
177
178        fn epoch_secs(secs: u64) -> SystemTime {
179            SystemTime::UNIX_EPOCH + Duration::from_secs(secs)
180        }
181
182        fn identity_expiring_in(expired_secs: u64) -> Identity {
183            let expiration = Some(epoch_secs(expired_secs));
184            Identity::new(Token::new("test", expiration), expiration)
185        }
186
187        fn test_identity_resolver(load_list: Vec<Result<Identity, BoxError>>) -> SharedIdentityResolver {
188            #[derive(Debug)]
189            struct Resolver(Mutex<Vec<Result<Identity, BoxError>>>);
190            impl ResolveIdentity for Resolver {
191                fn resolve_identity<'a>(&'a self, _: &'a RuntimeComponents, _config_bag: &'a ConfigBag) -> IdentityFuture<'a> {
192                    let mut list = self.0.lock().unwrap();
193                    if list.len() > 0 {
194                        let next = list.remove(0);
195                        info!("refreshing the identity to {:?}", next);
196                        IdentityFuture::ready(next)
197                    } else {
198                        drop(list);
199                        panic!("no more identities")
200                    }
201                }
202            }
203
204            SharedIdentityResolver::new(Resolver(Mutex::new(load_list)))
205        }
206
207        async fn load(identity_resolver: SharedIdentityResolver, runtime_components: &RuntimeComponents) -> Result<(Identity, SystemTime), BoxError> {
208            let identity = identity_resolver.resolve_identity(&runtime_components, &ConfigBag::base()).await.unwrap();
209            Ok((identity.clone(), identity.expiration().unwrap()))
210        }
211
212        async fn expect_identity<F, Fut>(expired_secs: u64, sut: &S3ExpressIdentityCache, key: CacheKey, loader: F)
213        where
214            F: FnOnce() -> Fut,
215            Fut: Future<Output = Result<(Identity, SystemTime), BoxError>>,
216        {
217            let identity = sut.get_or_load(key, loader).await.unwrap();
218            assert_eq!(Some(epoch_secs(expired_secs)), identity.expiration());
219        }
220
221        #[tokio::test]
222        async fn reload_expired_test_identity() {
223            let time = ManualTimeSource::new(UNIX_EPOCH);
224            let runtime_components = RuntimeComponentsBuilder::for_tests()
225                .with_time_source(Some(time.clone()))
226                .with_sleep_impl(Some(TokioSleep::new()))
227                .build()
228                .unwrap();
229
230            let sut = S3ExpressIdentityCache::new(1, time.clone().into_shared(), DEFAULT_BUFFER_TIME);
231
232            let identity_resolver = test_identity_resolver(vec![Ok(identity_expiring_in(1000)), Ok(identity_expiring_in(2000))]);
233
234            let key = sut.key("test-bucket--usw2-az1--x-s3", &Credentials::for_tests_with_session_token());
235
236            // First call to the cache, populating a cache entry.
237            expect_identity(1000, &sut, key.clone(), || {
238                let identity_resolver = identity_resolver.clone();
239                let runtime_components = runtime_components.clone();
240                async move { load(identity_resolver, &runtime_components).await }
241            })
242            .await;
243
244            // Testing for a cache hit by advancing time such that the updated time is before the expiration of the first identity
245            // i.e. 500 < 1000.
246            time.set_time(epoch_secs(500));
247
248            expect_identity(1000, &sut, key.clone(), || async move { panic!("new identity should not be loaded") }).await;
249
250            // Testing for a cache miss by advancing time such that the updated time is now after the expiration of the first identity
251            // and before the expiration of the second identity i.e. 1000 < 1500 && 1500 < 2000.
252            time.set_time(epoch_secs(1500));
253
254            expect_identity(2000, &sut, key, || async move { load(identity_resolver, &runtime_components).await }).await;
255        }
256
257        #[test]
258        fn load_contention() {
259            let rt = tokio::runtime::Builder::new_multi_thread()
260                .enable_time()
261                .worker_threads(16)
262                .build()
263                .unwrap();
264
265            let time = ManualTimeSource::new(epoch_secs(0));
266            let runtime_components = RuntimeComponentsBuilder::for_tests()
267                .with_time_source(Some(time.clone()))
268                .with_sleep_impl(Some(TokioSleep::new()))
269                .build()
270                .unwrap();
271
272            let number_of_buckets = 4;
273            let sut = Arc::new(S3ExpressIdentityCache::new(
274                number_of_buckets,
275                time.clone().into_shared(),
276                DEFAULT_BUFFER_TIME,
277            ));
278
279            // Nested for loops below advance time by 200 in total, and each identity has the expiration
280            // such that no matter what order async tasks are executed, it never expires.
281            let safe_expiration = number_of_buckets as u64 * 50 + DEFAULT_BUFFER_TIME.as_secs() + 1;
282            let identity_resolver = test_identity_resolver(vec![
283                Ok(identity_expiring_in(safe_expiration)),
284                Ok(identity_expiring_in(safe_expiration)),
285                Ok(identity_expiring_in(safe_expiration)),
286                Ok(identity_expiring_in(safe_expiration)),
287            ]);
288
289            let mut tasks = Vec::new();
290            for i in 0..number_of_buckets {
291                let key = sut.key(&format!("test-bucket-{i}-usw2-az1--x-s3"), &Credentials::for_tests_with_session_token());
292                for _ in 0..50 {
293                    let sut = sut.clone();
294                    let key = key.clone();
295                    let identity_resolver = identity_resolver.clone();
296                    let time = time.clone();
297                    let runtime_components = runtime_components.clone();
298                    tasks.push(rt.spawn(async move {
299                        let now = time.advance(Duration::from_secs(1));
300                        let identity: Identity = sut
301                            .get_or_load(key, || async move { load(identity_resolver, &runtime_components).await })
302                            .await
303                            .unwrap();
304
305                        assert!(identity.expiration().unwrap() >= now, "{:?} >= {:?}", identity.expiration(), now);
306                    }));
307                }
308            }
309            let tasks = tasks.into_iter().collect::<FuturesUnordered<_>>();
310            for task in tasks {
311                rt.block_on(task).unwrap();
312            }
313        }
314
315        #[tokio::test]
316        async fn identity_fetch_triggered_by_lru_eviction() {
317            let time = ManualTimeSource::new(UNIX_EPOCH);
318            let runtime_components = RuntimeComponentsBuilder::for_tests()
319                .with_time_source(Some(time.clone()))
320                .with_sleep_impl(Some(TokioSleep::new()))
321                .build()
322                .unwrap();
323
324            // Create a cache of size 2.
325            let sut = S3ExpressIdentityCache::new(2, time.into_shared(), DEFAULT_BUFFER_TIME);
326
327            let identity_resolver = test_identity_resolver(vec![
328                Ok(identity_expiring_in(1000)),
329                Ok(identity_expiring_in(2000)),
330                Ok(identity_expiring_in(3000)),
331                Ok(identity_expiring_in(4000)),
332            ]);
333
334            let [key1, key2, key3] =
335                [1, 2, 3].map(|i| sut.key(&format!("test-bucket-{i}--usw2-az1--x-s3"), &Credentials::for_tests_with_session_token()));
336
337            // This should pupulate a cache entry for `key1`.
338            expect_identity(1000, &sut, key1.clone(), || {
339                let identity_resolver = identity_resolver.clone();
340                let runtime_components = runtime_components.clone();
341                async move { load(identity_resolver, &runtime_components).await }
342            })
343            .await;
344            // This immediate next call for `key1` should be a cache hit.
345            expect_identity(1000, &sut, key1.clone(), || async move { panic!("new identity should not be loaded") }).await;
346
347            // This should pupulate a cache entry for `key2`.
348            expect_identity(2000, &sut, key2, || {
349                let identity_resolver = identity_resolver.clone();
350                let runtime_components = runtime_components.clone();
351                async move { load(identity_resolver, &runtime_components).await }
352            })
353            .await;
354
355            // This should pupulate a cache entry for `key3`, but evicting a cache entry for `key1` because the cache is full.
356            expect_identity(3000, &sut, key3.clone(), || {
357                let identity_resolver = identity_resolver.clone();
358                let runtime_components = runtime_components.clone();
359                async move { load(identity_resolver, &runtime_components).await }
360            })
361            .await;
362
363            // Attempt to get an identity for `key1` should end up fetching a new one since its cache entry has been evicted.
364            // This fetch should now evict a cache entry for `key2`.
365            expect_identity(4000, &sut, key1, || async move { load(identity_resolver, &runtime_components).await }).await;
366
367            // A cache entry for `key3` should still exist in the cache.
368            expect_identity(3000, &sut, key3, || async move { panic!("new identity should not be loaded") }).await;
369        }
370    }
371}
372/// Supporting code for S3 Express identity provider
373pub(crate) mod identity_provider {
374    use std::time::{Duration, SystemTime};
375
376    use crate::s3_express::identity_cache::S3ExpressIdentityCache;
377    use crate::types::SessionCredentials;
378    use aws_credential_types::credential_feature::AwsCredentialFeature;
379    use aws_credential_types::provider::error::CredentialsError;
380    use aws_credential_types::Credentials;
381    use aws_smithy_async::time::{SharedTimeSource, TimeSource};
382    use aws_smithy_runtime_api::box_error::BoxError;
383    use aws_smithy_runtime_api::client::endpoint::EndpointResolverParams;
384    use aws_smithy_runtime_api::client::identity::{Identity, IdentityCacheLocation, IdentityFuture, ResolveCachedIdentity, ResolveIdentity};
385    use aws_smithy_runtime_api::client::interceptors::SharedInterceptor;
386    use aws_smithy_runtime_api::client::runtime_components::{GetIdentityResolver, RuntimeComponents};
387    use aws_smithy_runtime_api::shared::IntoShared;
388    use aws_smithy_types::config_bag::ConfigBag;
389
390    use super::identity_cache::{DEFAULT_BUFFER_TIME, DEFAULT_MAX_CACHE_CAPACITY};
391
392    #[derive(Debug)]
393    pub(crate) struct DefaultS3ExpressIdentityProvider {
394        behavior_version: crate::config::BehaviorVersion,
395        cache: S3ExpressIdentityCache,
396    }
397
398    impl TryFrom<SessionCredentials> for Credentials {
399        type Error = BoxError;
400
401        fn try_from(session_creds: SessionCredentials) -> Result<Self, Self::Error> {
402            Ok(Credentials::new(
403                session_creds.access_key_id,
404                session_creds.secret_access_key,
405                Some(session_creds.session_token),
406                Some(
407                    SystemTime::try_from(session_creds.expiration)
408                        .map_err(|_| CredentialsError::unhandled("credential expiration time cannot be represented by a SystemTime"))?,
409                ),
410                "s3express",
411            ))
412        }
413    }
414
415    impl DefaultS3ExpressIdentityProvider {
416        pub(crate) fn builder() -> Builder {
417            Builder::default()
418        }
419
420        async fn identity<'a>(&'a self, runtime_components: &'a RuntimeComponents, config_bag: &'a ConfigBag) -> Result<Identity, BoxError> {
421            let bucket_name = self.bucket_name(config_bag)?;
422
423            let sigv4_identity_resolver = runtime_components
424                .identity_resolver(aws_runtime::auth::sigv4::SCHEME_ID)
425                .ok_or("identity resolver for sigv4 should be set for S3")?;
426            let aws_identity = runtime_components
427                .identity_cache()
428                .resolve_cached_identity(sigv4_identity_resolver, runtime_components, config_bag)
429                .await?;
430
431            let credentials = aws_identity
432                .data::<Credentials>()
433                .ok_or("wrong identity type for SigV4. Expected AWS credentials but got `{identity:?}")?;
434
435            let key = self.cache.key(bucket_name, credentials);
436            self.cache
437                .get_or_load(key, || async move {
438                    let creds = self.express_session_credentials(bucket_name, runtime_components, config_bag).await?;
439                    let mut data = Credentials::try_from(creds)?;
440                    data.get_property_mut_or_default::<Vec<AwsCredentialFeature>>()
441                        .push(AwsCredentialFeature::S3ExpressBucket);
442                    let expiry = data.expiry().unwrap();
443                    Ok((Identity::from(data), expiry))
444                })
445                .await
446        }
447
448        fn bucket_name<'a>(&'a self, config_bag: &'a ConfigBag) -> Result<&'a str, BoxError> {
449            let params = config_bag.load::<EndpointResolverParams>().expect("endpoint resolver params must be set");
450            let params = params
451                .get::<crate::config::endpoint::Params>()
452                .expect("`Params` should be wrapped in `EndpointResolverParams`");
453            params.bucket().ok_or("A bucket was not set in endpoint params".into())
454        }
455
456        async fn express_session_credentials<'a>(
457            &'a self,
458            bucket_name: &'a str,
459            runtime_components: &'a RuntimeComponents,
460            config_bag: &'a ConfigBag,
461        ) -> Result<SessionCredentials, BoxError> {
462            let mut config_builder = crate::config::Builder::from_config_bag(config_bag).behavior_version(self.behavior_version);
463
464            // inherits all runtime components from a current S3 operation but clears out
465            // out interceptors configured for that operation
466            let mut rc_builder = runtime_components.to_builder();
467            rc_builder.set_interceptors(std::iter::empty::<SharedInterceptor>());
468            config_builder.runtime_components = rc_builder;
469
470            let client = crate::Client::from_conf(config_builder.build());
471            let response = client.create_session().bucket(bucket_name).send().await?;
472
473            response.credentials.ok_or("no session credentials in response".into())
474        }
475    }
476
477    #[derive(Default)]
478    pub(crate) struct Builder {
479        behavior_version: Option<crate::config::BehaviorVersion>,
480        time_source: Option<SharedTimeSource>,
481        buffer_time: Option<Duration>,
482    }
483
484    impl Builder {
485        pub(crate) fn behavior_version(mut self, behavior_version: crate::config::BehaviorVersion) -> Self {
486            self.set_behavior_version(Some(behavior_version));
487            self
488        }
489        pub(crate) fn set_behavior_version(&mut self, behavior_version: Option<crate::config::BehaviorVersion>) -> &mut Self {
490            self.behavior_version = behavior_version;
491            self
492        }
493        pub(crate) fn time_source(mut self, time_source: impl TimeSource + 'static) -> Self {
494            self.set_time_source(time_source.into_shared());
495            self
496        }
497        pub(crate) fn set_time_source(&mut self, time_source: SharedTimeSource) -> &mut Self {
498            self.time_source = Some(time_source.into_shared());
499            self
500        }
501        #[allow(dead_code)]
502        pub(crate) fn buffer_time(mut self, buffer_time: Duration) -> Self {
503            self.set_buffer_time(Some(buffer_time));
504            self
505        }
506        #[allow(dead_code)]
507        pub(crate) fn set_buffer_time(&mut self, buffer_time: Option<Duration>) -> &mut Self {
508            self.buffer_time = buffer_time;
509            self
510        }
511        pub(crate) fn build(self) -> DefaultS3ExpressIdentityProvider {
512            DefaultS3ExpressIdentityProvider {
513                behavior_version: self.behavior_version.expect("required field `behavior_version` should be set"),
514                cache: S3ExpressIdentityCache::new(
515                    DEFAULT_MAX_CACHE_CAPACITY,
516                    self.time_source.unwrap_or_default(),
517                    self.buffer_time.unwrap_or(DEFAULT_BUFFER_TIME),
518                ),
519            }
520        }
521    }
522
523    impl ResolveIdentity for DefaultS3ExpressIdentityProvider {
524        fn resolve_identity<'a>(&'a self, runtime_components: &'a RuntimeComponents, config_bag: &'a ConfigBag) -> IdentityFuture<'a> {
525            IdentityFuture::new(async move { self.identity(runtime_components, config_bag).await })
526        }
527
528        fn cache_location(&self) -> IdentityCacheLocation {
529            IdentityCacheLocation::IdentityResolver
530        }
531    }
532
533    #[cfg(test)]
534    mod tests {
535        use super::*;
536        use aws_credential_types::credential_feature::AwsCredentialFeature;
537        use aws_credential_types::Credentials;
538
539        // Helper function to create test runtime components with SigV4 identity resolver
540        fn create_test_runtime_components(base_credentials: Credentials) -> aws_smithy_runtime_api::client::runtime_components::RuntimeComponents {
541            use aws_credential_types::provider::SharedCredentialsProvider;
542            use aws_smithy_runtime::client::http::test_util::infallible_client_fn;
543            use aws_smithy_runtime::client::orchestrator::endpoints::StaticUriEndpointResolver;
544            use aws_smithy_runtime::client::retries::strategy::NeverRetryStrategy;
545            use aws_smithy_runtime_api::client::auth::static_resolver::StaticAuthSchemeOptionResolver;
546            use aws_smithy_runtime_api::client::identity::SharedIdentityResolver;
547            use aws_smithy_runtime_api::client::runtime_components::RuntimeComponentsBuilder;
548            use aws_smithy_types::body::SdkBody;
549
550            let sigv4_resolver = SharedIdentityResolver::new(SharedCredentialsProvider::new(base_credentials));
551
552            // Create a simple auth scheme option resolver for testing
553            let auth_option_resolver = StaticAuthSchemeOptionResolver::new(vec![aws_runtime::auth::sigv4::SCHEME_ID]);
554
555            let http_client = infallible_client_fn(|_req| {
556                http::Response::builder()
557                    .status(200)
558                    .body(SdkBody::from(
559                        r#"<?xml version="1.0" encoding="UTF-8"?>
560                        <CreateSessionResult>
561                            <Credentials>
562                                <AccessKeyId>session_access_key</AccessKeyId>
563                                <SecretAccessKey>session_secret_key</SecretAccessKey>
564                                <SessionToken>session_token</SessionToken>
565                                <Expiration>2025-01-01T00:00:00Z</Expiration>
566                            </Credentials>
567                        </CreateSessionResult>"#,
568                    ))
569                    .unwrap()
570            });
571
572            RuntimeComponentsBuilder::for_tests()
573                .with_identity_resolver(aws_runtime::auth::sigv4::SCHEME_ID, sigv4_resolver)
574                .with_http_client(Some(http_client))
575                .with_time_source(Some(aws_smithy_async::time::SystemTimeSource::new()))
576                .with_retry_strategy(Some(NeverRetryStrategy::new()))
577                .with_auth_scheme_option_resolver(Some(auth_option_resolver))
578                .with_endpoint_resolver(Some(StaticUriEndpointResolver::http_localhost(8080)))
579                .build()
580                .unwrap()
581        }
582
583        // Helper function to create config bag with minimal S3 Express bucket parameters
584        fn create_test_config_bag(bucket_name: &str) -> aws_smithy_types::config_bag::ConfigBag {
585            use aws_smithy_runtime_api::client::endpoint::EndpointResolverParams;
586            use aws_smithy_runtime_api::client::stalled_stream_protection::StalledStreamProtectionConfig;
587            use aws_smithy_types::config_bag::{ConfigBag, Layer};
588
589            let mut config_bag = ConfigBag::base();
590            let mut layer = Layer::new("test");
591
592            let endpoint_params = EndpointResolverParams::new(crate::config::endpoint::Params::builder().bucket(bucket_name).build().unwrap());
593            layer.store_put(endpoint_params);
594
595            layer.store_put(StalledStreamProtectionConfig::disabled());
596
597            layer.store_put(crate::config::Region::new("us-west-2"));
598
599            config_bag.push_layer(layer);
600
601            config_bag
602        }
603
604        #[test]
605        fn test_session_credentials_conversion() {
606            let session_creds = SessionCredentials::builder()
607                .access_key_id("test_access_key")
608                .secret_access_key("test_secret_key")
609                .session_token("test_session_token")
610                .expiration(aws_smithy_types::DateTime::from_secs(1000))
611                .build()
612                .expect("valid session credentials");
613
614            let credentials = Credentials::try_from(session_creds).expect("conversion should succeed");
615
616            assert_eq!(credentials.access_key_id(), "test_access_key");
617            assert_eq!(credentials.secret_access_key(), "test_secret_key");
618            assert_eq!(credentials.session_token(), Some("test_session_token"));
619        }
620
621        #[tokio::test]
622        async fn test_identity_provider_embeds_s3express_feature() {
623            let bucket_name = "test-bucket--usw2-az1--x-s3";
624
625            // Use helper functions to set up test components
626            let base_credentials = Credentials::for_tests();
627            let runtime_components = create_test_runtime_components(base_credentials);
628            let config_bag = create_test_config_bag(bucket_name);
629
630            // Create the identity provider
631            let provider = DefaultS3ExpressIdentityProvider::builder()
632                .behavior_version(crate::config::BehaviorVersion::latest())
633                .time_source(aws_smithy_async::time::SystemTimeSource::new())
634                .build();
635
636            // Call identity() and verify the S3ExpressBucket feature is present
637            let identity = provider
638                .identity(&runtime_components, &config_bag)
639                .await
640                .expect("identity() should succeed");
641
642            let credentials = identity.data::<Credentials>().expect("Identity should contain Credentials");
643            let features = credentials
644                .get_property::<Vec<AwsCredentialFeature>>()
645                .expect("Credentials should have features");
646            assert!(
647                features.contains(&AwsCredentialFeature::S3ExpressBucket),
648                "S3ExpressBucket feature should be present in Credentials' property field"
649            );
650
651            let identity_layer = identity
652                .property::<aws_smithy_types::config_bag::FrozenLayer>()
653                .expect("Identity should have a property layer");
654            let identity_features: Vec<AwsCredentialFeature> = identity_layer.load::<AwsCredentialFeature>().cloned().collect();
655            assert!(
656                identity_features.contains(&AwsCredentialFeature::S3ExpressBucket),
657                "S3ExpressBucket feature should be present in Identity's property field"
658            );
659        }
660    }
661}
662
663/// Supporting code for S3 Express runtime plugin
664pub(crate) mod runtime_plugin {
665    use std::borrow::Cow;
666
667    use aws_runtime::auth::SigV4SessionTokenNameOverride;
668    use aws_sigv4::http_request::{SignatureLocation, SigningSettings};
669    use aws_smithy_runtime_api::{
670        box_error::BoxError,
671        client::{runtime_components::RuntimeComponentsBuilder, runtime_plugin::RuntimePlugin},
672    };
673    use aws_smithy_types::config_bag::{ConfigBag, FrozenLayer, Layer};
674    use aws_types::os_shim_internal::Env;
675
676    mod env {
677        pub(super) const S3_DISABLE_EXPRESS_SESSION_AUTH: &str = "AWS_S3_DISABLE_EXPRESS_SESSION_AUTH";
678    }
679
680    #[derive(Debug)]
681    pub(crate) struct S3ExpressRuntimePlugin {
682        config: FrozenLayer,
683        runtime_components_builder: RuntimeComponentsBuilder,
684    }
685
686    impl S3ExpressRuntimePlugin {
687        // `new` will be called as `additional_client_plugins` within `base_client_runtime_plugins`.
688        // This guarantees that `new` receives a fully constructed service config, with required
689        // runtime components registered with `RuntimeComponents`.
690        pub(crate) fn new(service_config: crate::config::Config) -> Self {
691            Self::new_with(service_config, Env::real())
692        }
693
694        fn new_with(service_config: crate::config::Config, env: Env) -> Self {
695            Self {
696                config: config(service_config.config.load::<crate::config::DisableS3ExpressSessionAuth>().cloned(), env),
697                runtime_components_builder: runtime_components_builder(service_config),
698            }
699        }
700    }
701
702    fn config(disable_s3_express_session_token: Option<crate::config::DisableS3ExpressSessionAuth>, env: Env) -> FrozenLayer {
703        let mut layer = Layer::new("S3ExpressRuntimePlugin");
704        if disable_s3_express_session_token.is_none() {
705            match env.get(env::S3_DISABLE_EXPRESS_SESSION_AUTH) {
706                Ok(value) if value.eq_ignore_ascii_case("true") || value.eq_ignore_ascii_case("false") => {
707                    let value = value.to_lowercase().parse::<bool>().expect("just checked to be a bool-valued string");
708                    layer.store_or_unset(Some(crate::config::DisableS3ExpressSessionAuth(value)));
709                }
710                Ok(value) => {
711                    tracing::warn!(
712                        "environment variable `{}` ignored since it only accepts either `true` or `false` (case-insensitive), but got `{}`.",
713                        env::S3_DISABLE_EXPRESS_SESSION_AUTH,
714                        value
715                    )
716                }
717                _ => {
718                    // TODO(aws-sdk-rust#1073): Transfer a value of
719                    //  `s3_disable_express_session_auth` from a profile file to `layer`
720                }
721            }
722        }
723
724        let session_token_name_override = SigV4SessionTokenNameOverride::new(|settings: &SigningSettings, cfg: &ConfigBag| {
725            // Not configured for S3 express, use the original session token name override
726            if !crate::s3_express::utils::for_s3_express(cfg) {
727                return Ok(settings.session_token_name_override);
728            }
729
730            let session_token_name_override = Some(match settings.signature_location {
731                SignatureLocation::Headers => "x-amz-s3session-token",
732                SignatureLocation::QueryParams => "X-Amz-S3session-Token",
733                _ => {
734                    return Err(BoxError::from(
735                        "`SignatureLocation` adds a new variant, which needs to be handled in a separate match arm",
736                    ))
737                }
738            });
739            Ok(session_token_name_override)
740        });
741        layer.store_or_unset(Some(session_token_name_override));
742
743        layer.freeze()
744    }
745
746    fn runtime_components_builder(service_config: crate::config::Config) -> RuntimeComponentsBuilder {
747        match (
748            service_config.runtime_components.identity_resolver(&super::auth::SCHEME_ID),
749            service_config.runtime_components.identity_resolver(&aws_runtime::auth::sigv4::SCHEME_ID),
750        ) {
751            (None, Some(_)) => RuntimeComponentsBuilder::new("S3ExpressRuntimePlugin").with_identity_resolver(
752                super::auth::SCHEME_ID,
753                super::identity_provider::DefaultS3ExpressIdentityProvider::builder()
754                    .time_source(
755                        service_config
756                            .runtime_components
757                            .time_source()
758                            .expect("should be set in `service_config`"),
759                    )
760                    .behavior_version(service_config.behavior_version.expect("should be set in `service_config`"))
761                    .build(),
762            ),
763            _ => RuntimeComponentsBuilder::new("S3ExpressRuntimePlugin"),
764        }
765    }
766
767    impl RuntimePlugin for S3ExpressRuntimePlugin {
768        fn config(&self) -> Option<FrozenLayer> {
769            Some(self.config.clone())
770        }
771
772        fn runtime_components(&self, _: &RuntimeComponentsBuilder) -> Cow<'_, RuntimeComponentsBuilder> {
773            Cow::Borrowed(&self.runtime_components_builder)
774        }
775    }
776
777    #[cfg(test)]
778    mod tests {
779        use super::*;
780        use aws_credential_types::Credentials;
781        use aws_smithy_runtime_api::client::identity::ResolveIdentity;
782
783        #[test]
784        fn disable_option_set_from_service_client_should_take_the_highest_precedence() {
785            // Disable option is set from service client.
786            let disable_s3_express_session_token = crate::config::DisableS3ExpressSessionAuth(true);
787
788            // An environment variable says the session auth is _not_ disabled,
789            // but it will be overruled by what is in `layer`.
790            let actual = config(
791                Some(disable_s3_express_session_token),
792                Env::from_slice(&[(super::env::S3_DISABLE_EXPRESS_SESSION_AUTH, "false")]),
793            );
794
795            // A config layer from this runtime plugin should not provide
796            // a new `DisableS3ExpressSessionAuth` if the disable option is set from service client.
797            assert!(actual.load::<crate::config::DisableS3ExpressSessionAuth>().is_none());
798        }
799
800        #[test]
801        fn disable_option_set_from_env_should_take_the_second_highest_precedence() {
802            // Disable option is set from environment variable.
803            let actual = config(None, Env::from_slice(&[(super::env::S3_DISABLE_EXPRESS_SESSION_AUTH, "true")]));
804
805            // The config layer should provide `DisableS3ExpressSessionAuth` from the environment variable.
806            assert!(actual.load::<crate::config::DisableS3ExpressSessionAuth>().unwrap().0);
807        }
808
809        #[should_panic]
810        #[test]
811        fn disable_option_set_from_profile_file_should_take_the_lowest_precedence() {
812            todo!("TODO(aws-sdk-rust#1073): Implement profile file test")
813        }
814
815        #[test]
816        fn disable_option_should_be_unspecified_if_unset() {
817            // Disable option is not set anywhere.
818            let actual = config(None, Env::from_slice(&[]));
819
820            // The config layer should not provide `DisableS3ExpressSessionAuth` when it's not configured.
821            assert!(actual.load::<crate::config::DisableS3ExpressSessionAuth>().is_none());
822        }
823
824        #[test]
825        fn s3_express_runtime_plugin_should_set_default_identity_resolver() {
826            // Config has SigV4 credentials provider, so S3 Express identity resolver should be set.
827            let config = crate::Config::builder()
828                .behavior_version_latest()
829                .time_source(aws_smithy_async::time::SystemTimeSource::new())
830                .credentials_provider(Credentials::for_tests())
831                .build();
832
833            let actual = runtime_components_builder(config);
834            // The runtime plugin should provide a default S3 Express identity resolver.
835            assert!(actual.identity_resolver(&crate::s3_express::auth::SCHEME_ID).is_some());
836        }
837
838        #[test]
839        fn s3_express_plugin_should_not_set_default_identity_resolver_without_sigv4_counterpart() {
840            // Config does not have SigV4 credentials provider.
841            let config = crate::Config::builder()
842                .behavior_version_latest()
843                .time_source(aws_smithy_async::time::SystemTimeSource::new())
844                .build();
845
846            let actual = runtime_components_builder(config);
847            // The runtime plugin should not provide S3 Express identity resolver without SigV4 credentials.
848            assert!(actual.identity_resolver(&crate::s3_express::auth::SCHEME_ID).is_none());
849        }
850
851        #[tokio::test]
852        async fn s3_express_plugin_should_not_set_default_identity_resolver_if_user_provided() {
853            // User provides a custom S3 Express credentials provider.
854            let expected_access_key_id = "expected acccess key ID";
855            let config = crate::Config::builder()
856                .behavior_version_latest()
857                .credentials_provider(Credentials::for_tests())
858                .express_credentials_provider(Credentials::new(
859                    expected_access_key_id,
860                    "secret",
861                    None,
862                    None,
863                    "test express credentials provider",
864                ))
865                .time_source(aws_smithy_async::time::SystemTimeSource::new())
866                .build();
867
868            // The runtime plugin should not override the user-provided identity resolver.
869            let runtime_components_builder = runtime_components_builder(config.clone());
870            assert!(runtime_components_builder
871                .identity_resolver(&crate::s3_express::auth::SCHEME_ID)
872                .is_none());
873
874            // The user-provided identity resolver should be used.
875            let express_identity_resolver = config.runtime_components.identity_resolver(&crate::s3_express::auth::SCHEME_ID).unwrap();
876            let creds = express_identity_resolver
877                .resolve_identity(&RuntimeComponentsBuilder::for_tests().build().unwrap(), &ConfigBag::base())
878                .await
879                .unwrap();
880
881            assert_eq!(expected_access_key_id, creds.data::<Credentials>().unwrap().access_key_id());
882        }
883    }
884}
885
886pub(crate) mod checksum {
887    use crate::http_request_checksum::DefaultRequestChecksumOverride;
888    use aws_smithy_checksums::ChecksumAlgorithm;
889    use aws_smithy_types::config_bag::ConfigBag;
890
891    pub(crate) fn provide_default_checksum_algorithm() -> crate::http_request_checksum::DefaultRequestChecksumOverride {
892        fn _provide_default_checksum_algorithm(original_checksum: Option<ChecksumAlgorithm>, cfg: &ConfigBag) -> Option<ChecksumAlgorithm> {
893            // S3 does not have the `ChecksumAlgorithm::Md5`, therefore customers cannot set it
894            // from outside.
895            if original_checksum != Some(ChecksumAlgorithm::Md5) {
896                return original_checksum;
897            }
898
899            if crate::s3_express::utils::for_s3_express(cfg) {
900                // S3 Express requires setting the default checksum algorithm to CRC-32
901                Some(ChecksumAlgorithm::Crc32)
902            } else {
903                original_checksum
904            }
905        }
906        DefaultRequestChecksumOverride::new(_provide_default_checksum_algorithm)
907    }
908}
909
910pub(crate) mod utils {
911    use aws_smithy_types::{config_bag::ConfigBag, Document};
912
913    pub(crate) fn for_s3_express(cfg: &ConfigBag) -> bool {
914        // logic borrowed from aws_smithy_runtime::client::orchestrator::auth::extract_endpoint_auth_scheme_config
915        let endpoint = cfg
916            .load::<crate::config::endpoint::Endpoint>()
917            .expect("endpoint added to config bag by endpoint orchestrator");
918
919        let typed_schemes = endpoint.auth_schemes();
920        if !typed_schemes.is_empty() {
921            return typed_schemes
922                .iter()
923                .any(|scheme| scheme.name() == crate::s3_express::auth::SCHEME_ID.as_str());
924        }
925
926        let auth_schemes = match endpoint.properties().get("authSchemes") {
927            Some(Document::Array(schemes)) => schemes,
928            _ => return false,
929        };
930        auth_schemes.iter().any(|doc| {
931            let config_scheme_id = doc.as_object().and_then(|object| object.get("name")).and_then(Document::as_string);
932            config_scheme_id == Some(crate::s3_express::auth::SCHEME_ID.as_str())
933        })
934    }
935}