aws_sdk_s3/
s3_express.rs

1// Code generated by software.amazon.smithy.rust.codegen.smithy-rs. DO NOT EDIT.
2/*
3 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
4 * SPDX-License-Identifier: Apache-2.0
5 */
6
7/// Supporting code for S3 Express auth
8pub(crate) mod auth {
9    use aws_runtime::auth::sigv4::SigV4Signer;
10    use aws_smithy_runtime_api::client::auth::{AuthScheme, AuthSchemeId, Sign};
11    use aws_smithy_runtime_api::client::identity::SharedIdentityResolver;
12    use aws_smithy_runtime_api::client::runtime_components::GetIdentityResolver;
13
14    /// Auth scheme ID for S3 Express.
15    pub(crate) const SCHEME_ID: AuthSchemeId = AuthSchemeId::new("sigv4-s3express");
16
17    /// S3 Express auth scheme.
18    #[derive(Debug, Default)]
19    pub(crate) struct S3ExpressAuthScheme {
20        signer: SigV4Signer,
21    }
22
23    impl S3ExpressAuthScheme {
24        /// Creates a new `S3ExpressAuthScheme`.
25        pub(crate) fn new() -> Self {
26            Default::default()
27        }
28    }
29
30    impl AuthScheme for S3ExpressAuthScheme {
31        fn scheme_id(&self) -> AuthSchemeId {
32            SCHEME_ID
33        }
34
35        fn identity_resolver(&self, identity_resolvers: &dyn GetIdentityResolver) -> Option<SharedIdentityResolver> {
36            identity_resolvers.identity_resolver(self.scheme_id())
37        }
38
39        fn signer(&self) -> &dyn Sign {
40            &self.signer
41        }
42    }
43}
44
45/// Supporting code for S3 Express identity cache
46pub(crate) mod identity_cache {
47    use aws_credential_types::Credentials;
48    use aws_smithy_async::time::SharedTimeSource;
49    use aws_smithy_runtime::expiring_cache::ExpiringCache;
50    use aws_smithy_runtime_api::box_error::BoxError;
51    use aws_smithy_runtime_api::client::identity::Identity;
52    use aws_smithy_types::DateTime;
53    use fastrand::Rng;
54    use hmac::{digest::FixedOutput, Hmac, Mac};
55    use lru::LruCache;
56    use sha2::Sha256;
57    use std::fmt;
58    use std::future::Future;
59    use std::hash::Hash;
60    use std::num::NonZeroUsize;
61    use std::sync::Mutex;
62    use std::time::{Duration, SystemTime};
63
64    pub(crate) const DEFAULT_MAX_CACHE_CAPACITY: usize = 100;
65    pub(crate) const DEFAULT_BUFFER_TIME: Duration = Duration::from_secs(10);
66
67    #[derive(Clone, Eq, PartialEq, Hash)]
68    pub(crate) struct CacheKey(String);
69
70    /// The caching implementation for S3 Express identity.
71    ///
72    /// While customers can either disable S3 Express itself or provide a custom S3 Express identity
73    /// provider, configuring S3 Express identity cache is not supported. Thus, this is _the_
74    /// implementation of S3 Express identity cache.
75    pub(crate) struct S3ExpressIdentityCache {
76        inner: Mutex<LruCache<CacheKey, ExpiringCache<Identity, BoxError>>>,
77        time_source: SharedTimeSource,
78        buffer_time: Duration,
79        random_bytes: [u8; 64],
80    }
81
82    impl fmt::Debug for S3ExpressIdentityCache {
83        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84            let (size, capacity) = {
85                let cache = self.inner.lock().unwrap();
86                (cache.len(), cache.cap())
87            };
88            write!(
89                f,
90                "S3ExpressIdentityCache {{ time_source: {:?}, buffer_time: {:?} }}, with size/capacity: {}/{}",
91                self.time_source, &self.buffer_time, size, capacity,
92            )
93        }
94    }
95
96    impl S3ExpressIdentityCache {
97        pub(crate) fn new(capacity: usize, time_source: SharedTimeSource, buffer_time: Duration) -> Self {
98            // It'd be nice to use a cryptographically secure random generator but not necessary.
99            // The cache is memory only and randomization here is mostly to obfuscate the key and
100            // make it reasonable length.
101            let mut rng = Rng::default();
102            let mut random_bytes = [0u8; 64];
103            rng.fill(&mut random_bytes);
104            Self {
105                inner: Mutex::new(LruCache::new(NonZeroUsize::new(capacity).unwrap())),
106                time_source,
107                buffer_time,
108                random_bytes,
109            }
110        }
111
112        pub(crate) fn key(&self, bucket_name: &str, creds: &Credentials) -> CacheKey {
113            CacheKey({
114                let mut mac = Hmac::<Sha256>::new_from_slice(self.random_bytes.as_slice()).expect("should be created from random 64 bytes");
115                let input = format!("{}{}", creds.access_key_id(), creds.secret_access_key());
116                mac.update(input.as_ref());
117                let mut inner = hex::encode(mac.finalize_fixed());
118                inner.push_str(bucket_name);
119                inner
120            })
121        }
122
123        pub(crate) async fn get_or_load<F, Fut>(&self, key: CacheKey, loader: F) -> Result<Identity, BoxError>
124        where
125            F: FnOnce() -> Fut,
126            Fut: Future<Output = Result<(Identity, SystemTime), BoxError>>,
127        {
128            let expiring_cache = {
129                let mut inner = self.inner.lock().unwrap();
130                inner.get_or_insert_mut(key, || ExpiringCache::new(self.buffer_time)).clone()
131            };
132
133            let now = self.time_source.now();
134
135            match expiring_cache.yield_or_clear_if_expired(now).await {
136                Some(identity) => {
137                    tracing::debug!(
138                        buffer_time=?self.buffer_time,
139                        cached_expiration=?identity.expiration(),
140                        now=?now,
141                        "loaded identity from cache"
142                    );
143                    Ok(identity)
144                }
145                None => {
146                    let start_time = self.time_source.now();
147                    let identity = expiring_cache.get_or_load(loader).await?;
148                    let expiration = identity.expiration().ok_or("SessionCredentials` always has expiration")?;
149                    let printable = DateTime::from(expiration);
150                    tracing::info!(
151                        new_expiration=%printable,
152                        valid_for=?expiration.duration_since(self.time_source.now()).unwrap_or_default(),
153                        "identity cache miss occurred; added new identity (took {:?})",
154                        self.time_source.now().duration_since(start_time).unwrap_or_default()
155                    );
156                    Ok(identity)
157                }
158            }
159        }
160    }
161
162    #[cfg(test)]
163    mod tests {
164        use super::*;
165        use aws_smithy_async::rt::sleep::TokioSleep;
166        use aws_smithy_async::test_util::ManualTimeSource;
167        use aws_smithy_runtime_api::client::identity::http::Token;
168        use aws_smithy_runtime_api::client::identity::{IdentityFuture, ResolveIdentity, SharedIdentityResolver};
169        use aws_smithy_runtime_api::client::runtime_components::{RuntimeComponents, RuntimeComponentsBuilder};
170        use aws_smithy_runtime_api::shared::IntoShared;
171        use aws_smithy_types::config_bag::ConfigBag;
172        use futures_util::stream::FuturesUnordered;
173        use std::sync::Arc;
174        use std::time::{Duration, SystemTime, UNIX_EPOCH};
175        use tracing::info;
176
177        fn epoch_secs(secs: u64) -> SystemTime {
178            SystemTime::UNIX_EPOCH + Duration::from_secs(secs)
179        }
180
181        fn identity_expiring_in(expired_secs: u64) -> Identity {
182            let expiration = Some(epoch_secs(expired_secs));
183            Identity::new(Token::new("test", expiration), expiration)
184        }
185
186        fn test_identity_resolver(load_list: Vec<Result<Identity, BoxError>>) -> SharedIdentityResolver {
187            #[derive(Debug)]
188            struct Resolver(Mutex<Vec<Result<Identity, BoxError>>>);
189            impl ResolveIdentity for Resolver {
190                fn resolve_identity<'a>(&'a self, _: &'a RuntimeComponents, _config_bag: &'a ConfigBag) -> IdentityFuture<'a> {
191                    let mut list = self.0.lock().unwrap();
192                    if list.len() > 0 {
193                        let next = list.remove(0);
194                        info!("refreshing the identity to {:?}", next);
195                        IdentityFuture::ready(next)
196                    } else {
197                        drop(list);
198                        panic!("no more identities")
199                    }
200                }
201            }
202
203            SharedIdentityResolver::new(Resolver(Mutex::new(load_list)))
204        }
205
206        async fn load(identity_resolver: SharedIdentityResolver, runtime_components: &RuntimeComponents) -> Result<(Identity, SystemTime), BoxError> {
207            let identity = identity_resolver.resolve_identity(&runtime_components, &ConfigBag::base()).await.unwrap();
208            Ok((identity.clone(), identity.expiration().unwrap()))
209        }
210
211        async fn expect_identity<F, Fut>(expired_secs: u64, sut: &S3ExpressIdentityCache, key: CacheKey, loader: F)
212        where
213            F: FnOnce() -> Fut,
214            Fut: Future<Output = Result<(Identity, SystemTime), BoxError>>,
215        {
216            let identity = sut.get_or_load(key, loader).await.unwrap();
217            assert_eq!(Some(epoch_secs(expired_secs)), identity.expiration());
218        }
219
220        #[tokio::test]
221        async fn reload_expired_test_identity() {
222            let time = ManualTimeSource::new(UNIX_EPOCH);
223            let runtime_components = RuntimeComponentsBuilder::for_tests()
224                .with_time_source(Some(time.clone()))
225                .with_sleep_impl(Some(TokioSleep::new()))
226                .build()
227                .unwrap();
228
229            let sut = S3ExpressIdentityCache::new(1, time.clone().into_shared(), DEFAULT_BUFFER_TIME);
230
231            let identity_resolver = test_identity_resolver(vec![Ok(identity_expiring_in(1000)), Ok(identity_expiring_in(2000))]);
232
233            let key = sut.key("test-bucket--usw2-az1--x-s3", &Credentials::for_tests_with_session_token());
234
235            // First call to the cache, populating a cache entry.
236            expect_identity(1000, &sut, key.clone(), || {
237                let identity_resolver = identity_resolver.clone();
238                let runtime_components = runtime_components.clone();
239                async move { load(identity_resolver, &runtime_components).await }
240            })
241            .await;
242
243            // Testing for a cache hit by advancing time such that the updated time is before the expiration of the first identity
244            // i.e. 500 < 1000.
245            time.set_time(epoch_secs(500));
246
247            expect_identity(1000, &sut, key.clone(), || async move { panic!("new identity should not be loaded") }).await;
248
249            // Testing for a cache miss by advancing time such that the updated time is now after the expiration of the first identity
250            // and before the expiration of the second identity i.e. 1000 < 1500 && 1500 < 2000.
251            time.set_time(epoch_secs(1500));
252
253            expect_identity(2000, &sut, key, || async move { load(identity_resolver, &runtime_components).await }).await;
254        }
255
256        #[test]
257        fn load_contention() {
258            let rt = tokio::runtime::Builder::new_multi_thread()
259                .enable_time()
260                .worker_threads(16)
261                .build()
262                .unwrap();
263
264            let time = ManualTimeSource::new(epoch_secs(0));
265            let runtime_components = RuntimeComponentsBuilder::for_tests()
266                .with_time_source(Some(time.clone()))
267                .with_sleep_impl(Some(TokioSleep::new()))
268                .build()
269                .unwrap();
270
271            let number_of_buckets = 4;
272            let sut = Arc::new(S3ExpressIdentityCache::new(
273                number_of_buckets,
274                time.clone().into_shared(),
275                DEFAULT_BUFFER_TIME,
276            ));
277
278            // Nested for loops below advance time by 200 in total, and each identity has the expiration
279            // such that no matter what order async tasks are executed, it never expires.
280            let safe_expiration = number_of_buckets as u64 * 50 + DEFAULT_BUFFER_TIME.as_secs() + 1;
281            let identity_resolver = test_identity_resolver(vec![
282                Ok(identity_expiring_in(safe_expiration)),
283                Ok(identity_expiring_in(safe_expiration)),
284                Ok(identity_expiring_in(safe_expiration)),
285                Ok(identity_expiring_in(safe_expiration)),
286            ]);
287
288            let mut tasks = Vec::new();
289            for i in 0..number_of_buckets {
290                let key = sut.key(&format!("test-bucket-{i}-usw2-az1--x-s3"), &Credentials::for_tests_with_session_token());
291                for _ in 0..50 {
292                    let sut = sut.clone();
293                    let key = key.clone();
294                    let identity_resolver = identity_resolver.clone();
295                    let time = time.clone();
296                    let runtime_components = runtime_components.clone();
297                    tasks.push(rt.spawn(async move {
298                        let now = time.advance(Duration::from_secs(1));
299                        let identity: Identity = sut
300                            .get_or_load(key, || async move { load(identity_resolver, &runtime_components).await })
301                            .await
302                            .unwrap();
303
304                        assert!(identity.expiration().unwrap() >= now, "{:?} >= {:?}", identity.expiration(), now);
305                    }));
306                }
307            }
308            let tasks = tasks.into_iter().collect::<FuturesUnordered<_>>();
309            for task in tasks {
310                rt.block_on(task).unwrap();
311            }
312        }
313
314        #[tokio::test]
315        async fn identity_fetch_triggered_by_lru_eviction() {
316            let time = ManualTimeSource::new(UNIX_EPOCH);
317            let runtime_components = RuntimeComponentsBuilder::for_tests()
318                .with_time_source(Some(time.clone()))
319                .with_sleep_impl(Some(TokioSleep::new()))
320                .build()
321                .unwrap();
322
323            // Create a cache of size 2.
324            let sut = S3ExpressIdentityCache::new(2, time.into_shared(), DEFAULT_BUFFER_TIME);
325
326            let identity_resolver = test_identity_resolver(vec![
327                Ok(identity_expiring_in(1000)),
328                Ok(identity_expiring_in(2000)),
329                Ok(identity_expiring_in(3000)),
330                Ok(identity_expiring_in(4000)),
331            ]);
332
333            let [key1, key2, key3] =
334                [1, 2, 3].map(|i| sut.key(&format!("test-bucket-{i}--usw2-az1--x-s3"), &Credentials::for_tests_with_session_token()));
335
336            // This should pupulate a cache entry for `key1`.
337            expect_identity(1000, &sut, key1.clone(), || {
338                let identity_resolver = identity_resolver.clone();
339                let runtime_components = runtime_components.clone();
340                async move { load(identity_resolver, &runtime_components).await }
341            })
342            .await;
343            // This immediate next call for `key1` should be a cache hit.
344            expect_identity(1000, &sut, key1.clone(), || async move { panic!("new identity should not be loaded") }).await;
345
346            // This should pupulate a cache entry for `key2`.
347            expect_identity(2000, &sut, key2, || {
348                let identity_resolver = identity_resolver.clone();
349                let runtime_components = runtime_components.clone();
350                async move { load(identity_resolver, &runtime_components).await }
351            })
352            .await;
353
354            // This should pupulate a cache entry for `key3`, but evicting a cache entry for `key1` because the cache is full.
355            expect_identity(3000, &sut, key3.clone(), || {
356                let identity_resolver = identity_resolver.clone();
357                let runtime_components = runtime_components.clone();
358                async move { load(identity_resolver, &runtime_components).await }
359            })
360            .await;
361
362            // Attempt to get an identity for `key1` should end up fetching a new one since its cache entry has been evicted.
363            // This fetch should now evict a cache entry for `key2`.
364            expect_identity(4000, &sut, key1, || async move { load(identity_resolver, &runtime_components).await }).await;
365
366            // A cache entry for `key3` should still exist in the cache.
367            expect_identity(3000, &sut, key3, || async move { panic!("new identity should not be loaded") }).await;
368        }
369    }
370}
371/// Supporting code for S3 Express identity provider
372pub(crate) mod identity_provider {
373    use std::time::{Duration, SystemTime};
374
375    use crate::s3_express::identity_cache::S3ExpressIdentityCache;
376    use crate::types::SessionCredentials;
377    use aws_credential_types::credential_feature::AwsCredentialFeature;
378    use aws_credential_types::provider::error::CredentialsError;
379    use aws_credential_types::Credentials;
380    use aws_smithy_async::time::{SharedTimeSource, TimeSource};
381    use aws_smithy_runtime_api::box_error::BoxError;
382    use aws_smithy_runtime_api::client::endpoint::EndpointResolverParams;
383    use aws_smithy_runtime_api::client::identity::{Identity, IdentityCacheLocation, IdentityFuture, ResolveCachedIdentity, ResolveIdentity};
384    use aws_smithy_runtime_api::client::interceptors::SharedInterceptor;
385    use aws_smithy_runtime_api::client::runtime_components::{GetIdentityResolver, RuntimeComponents};
386    use aws_smithy_runtime_api::shared::IntoShared;
387    use aws_smithy_types::config_bag::ConfigBag;
388
389    use super::identity_cache::{DEFAULT_BUFFER_TIME, DEFAULT_MAX_CACHE_CAPACITY};
390
391    #[derive(Debug)]
392    pub(crate) struct DefaultS3ExpressIdentityProvider {
393        behavior_version: crate::config::BehaviorVersion,
394        cache: S3ExpressIdentityCache,
395    }
396
397    impl TryFrom<SessionCredentials> for Credentials {
398        type Error = BoxError;
399
400        fn try_from(session_creds: SessionCredentials) -> Result<Self, Self::Error> {
401            Ok(Credentials::new(
402                session_creds.access_key_id,
403                session_creds.secret_access_key,
404                Some(session_creds.session_token),
405                Some(
406                    SystemTime::try_from(session_creds.expiration)
407                        .map_err(|_| CredentialsError::unhandled("credential expiration time cannot be represented by a SystemTime"))?,
408                ),
409                "s3express",
410            ))
411        }
412    }
413
414    impl DefaultS3ExpressIdentityProvider {
415        pub(crate) fn builder() -> Builder {
416            Builder::default()
417        }
418
419        async fn identity<'a>(&'a self, runtime_components: &'a RuntimeComponents, config_bag: &'a ConfigBag) -> Result<Identity, BoxError> {
420            let bucket_name = self.bucket_name(config_bag)?;
421
422            let sigv4_identity_resolver = runtime_components
423                .identity_resolver(aws_runtime::auth::sigv4::SCHEME_ID)
424                .ok_or("identity resolver for sigv4 should be set for S3")?;
425            let aws_identity = runtime_components
426                .identity_cache()
427                .resolve_cached_identity(sigv4_identity_resolver, runtime_components, config_bag)
428                .await?;
429
430            let credentials = aws_identity
431                .data::<Credentials>()
432                .ok_or("wrong identity type for SigV4. Expected AWS credentials but got `{identity:?}")?;
433
434            let key = self.cache.key(bucket_name, credentials);
435            self.cache
436                .get_or_load(key, || async move {
437                    let creds = self.express_session_credentials(bucket_name, runtime_components, config_bag).await?;
438                    let mut data = Credentials::try_from(creds)?;
439                    data.get_property_mut_or_default::<Vec<AwsCredentialFeature>>()
440                        .push(AwsCredentialFeature::S3ExpressBucket);
441                    let expiry = data.expiry().unwrap();
442                    Ok((Identity::from(data), expiry))
443                })
444                .await
445        }
446
447        fn bucket_name<'a>(&'a self, config_bag: &'a ConfigBag) -> Result<&'a str, BoxError> {
448            let params = config_bag.load::<EndpointResolverParams>().expect("endpoint resolver params must be set");
449            let params = params
450                .get::<crate::config::endpoint::Params>()
451                .expect("`Params` should be wrapped in `EndpointResolverParams`");
452            params.bucket().ok_or("A bucket was not set in endpoint params".into())
453        }
454
455        async fn express_session_credentials<'a>(
456            &'a self,
457            bucket_name: &'a str,
458            runtime_components: &'a RuntimeComponents,
459            config_bag: &'a ConfigBag,
460        ) -> Result<SessionCredentials, BoxError> {
461            let mut config_builder = crate::config::Builder::from_config_bag(config_bag).behavior_version(self.behavior_version);
462
463            // inherits all runtime components from a current S3 operation but clears out
464            // out interceptors configured for that operation
465            let mut rc_builder = runtime_components.to_builder();
466            rc_builder.set_interceptors(std::iter::empty::<SharedInterceptor>());
467            config_builder.runtime_components = rc_builder;
468
469            let client = crate::Client::from_conf(config_builder.build());
470            let response = client.create_session().bucket(bucket_name).send().await?;
471
472            response.credentials.ok_or("no session credentials in response".into())
473        }
474    }
475
476    #[derive(Default)]
477    pub(crate) struct Builder {
478        behavior_version: Option<crate::config::BehaviorVersion>,
479        time_source: Option<SharedTimeSource>,
480        buffer_time: Option<Duration>,
481    }
482
483    impl Builder {
484        pub(crate) fn behavior_version(mut self, behavior_version: crate::config::BehaviorVersion) -> Self {
485            self.set_behavior_version(Some(behavior_version));
486            self
487        }
488        pub(crate) fn set_behavior_version(&mut self, behavior_version: Option<crate::config::BehaviorVersion>) -> &mut Self {
489            self.behavior_version = behavior_version;
490            self
491        }
492        pub(crate) fn time_source(mut self, time_source: impl TimeSource + 'static) -> Self {
493            self.set_time_source(time_source.into_shared());
494            self
495        }
496        pub(crate) fn set_time_source(&mut self, time_source: SharedTimeSource) -> &mut Self {
497            self.time_source = Some(time_source.into_shared());
498            self
499        }
500        #[allow(dead_code)]
501        pub(crate) fn buffer_time(mut self, buffer_time: Duration) -> Self {
502            self.set_buffer_time(Some(buffer_time));
503            self
504        }
505        #[allow(dead_code)]
506        pub(crate) fn set_buffer_time(&mut self, buffer_time: Option<Duration>) -> &mut Self {
507            self.buffer_time = buffer_time;
508            self
509        }
510        pub(crate) fn build(self) -> DefaultS3ExpressIdentityProvider {
511            DefaultS3ExpressIdentityProvider {
512                behavior_version: self.behavior_version.expect("required field `behavior_version` should be set"),
513                cache: S3ExpressIdentityCache::new(
514                    DEFAULT_MAX_CACHE_CAPACITY,
515                    self.time_source.unwrap_or_default(),
516                    self.buffer_time.unwrap_or(DEFAULT_BUFFER_TIME),
517                ),
518            }
519        }
520    }
521
522    impl ResolveIdentity for DefaultS3ExpressIdentityProvider {
523        fn resolve_identity<'a>(&'a self, runtime_components: &'a RuntimeComponents, config_bag: &'a ConfigBag) -> IdentityFuture<'a> {
524            IdentityFuture::new(async move { self.identity(runtime_components, config_bag).await })
525        }
526
527        fn cache_location(&self) -> IdentityCacheLocation {
528            IdentityCacheLocation::IdentityResolver
529        }
530    }
531
532    #[cfg(test)]
533    mod tests {
534        use super::*;
535        use aws_credential_types::credential_feature::AwsCredentialFeature;
536        use aws_credential_types::Credentials;
537
538        // Helper function to create test runtime components with SigV4 identity resolver
539        fn create_test_runtime_components(base_credentials: Credentials) -> aws_smithy_runtime_api::client::runtime_components::RuntimeComponents {
540            use aws_credential_types::provider::SharedCredentialsProvider;
541            use aws_smithy_runtime::client::http::test_util::infallible_client_fn;
542            use aws_smithy_runtime::client::orchestrator::endpoints::StaticUriEndpointResolver;
543            use aws_smithy_runtime::client::retries::strategy::NeverRetryStrategy;
544            use aws_smithy_runtime_api::client::auth::static_resolver::StaticAuthSchemeOptionResolver;
545            use aws_smithy_runtime_api::client::identity::SharedIdentityResolver;
546            use aws_smithy_runtime_api::client::runtime_components::RuntimeComponentsBuilder;
547            use aws_smithy_types::body::SdkBody;
548
549            let sigv4_resolver = SharedIdentityResolver::new(SharedCredentialsProvider::new(base_credentials));
550
551            // Create a simple auth scheme option resolver for testing
552            let auth_option_resolver = StaticAuthSchemeOptionResolver::new(vec![aws_runtime::auth::sigv4::SCHEME_ID]);
553
554            let http_client = infallible_client_fn(|_req| {
555                http::Response::builder()
556                    .status(200)
557                    .body(SdkBody::from(
558                        r#"<?xml version="1.0" encoding="UTF-8"?>
559                        <CreateSessionResult>
560                            <Credentials>
561                                <AccessKeyId>session_access_key</AccessKeyId>
562                                <SecretAccessKey>session_secret_key</SecretAccessKey>
563                                <SessionToken>session_token</SessionToken>
564                                <Expiration>2025-01-01T00:00:00Z</Expiration>
565                            </Credentials>
566                        </CreateSessionResult>"#,
567                    ))
568                    .unwrap()
569            });
570
571            RuntimeComponentsBuilder::for_tests()
572                .with_identity_resolver(aws_runtime::auth::sigv4::SCHEME_ID, sigv4_resolver)
573                .with_http_client(Some(http_client))
574                .with_time_source(Some(aws_smithy_async::time::SystemTimeSource::new()))
575                .with_retry_strategy(Some(NeverRetryStrategy::new()))
576                .with_auth_scheme_option_resolver(Some(auth_option_resolver))
577                .with_endpoint_resolver(Some(StaticUriEndpointResolver::http_localhost(8080)))
578                .build()
579                .unwrap()
580        }
581
582        // Helper function to create config bag with minimal S3 Express bucket parameters
583        fn create_test_config_bag(bucket_name: &str) -> aws_smithy_types::config_bag::ConfigBag {
584            use aws_smithy_runtime_api::client::endpoint::EndpointResolverParams;
585            use aws_smithy_runtime_api::client::stalled_stream_protection::StalledStreamProtectionConfig;
586            use aws_smithy_types::config_bag::{ConfigBag, Layer};
587
588            let mut config_bag = ConfigBag::base();
589            let mut layer = Layer::new("test");
590
591            let endpoint_params = EndpointResolverParams::new(crate::config::endpoint::Params::builder().bucket(bucket_name).build().unwrap());
592            layer.store_put(endpoint_params);
593
594            layer.store_put(StalledStreamProtectionConfig::disabled());
595
596            layer.store_put(crate::config::Region::new("us-west-2"));
597
598            config_bag.push_layer(layer);
599
600            config_bag
601        }
602
603        #[test]
604        fn test_session_credentials_conversion() {
605            let session_creds = SessionCredentials::builder()
606                .access_key_id("test_access_key")
607                .secret_access_key("test_secret_key")
608                .session_token("test_session_token")
609                .expiration(aws_smithy_types::DateTime::from_secs(1000))
610                .build()
611                .expect("valid session credentials");
612
613            let credentials = Credentials::try_from(session_creds).expect("conversion should succeed");
614
615            assert_eq!(credentials.access_key_id(), "test_access_key");
616            assert_eq!(credentials.secret_access_key(), "test_secret_key");
617            assert_eq!(credentials.session_token(), Some("test_session_token"));
618        }
619
620        #[tokio::test]
621        async fn test_identity_provider_embeds_s3express_feature() {
622            let bucket_name = "test-bucket--usw2-az1--x-s3";
623
624            // Use helper functions to set up test components
625            let base_credentials = Credentials::for_tests();
626            let runtime_components = create_test_runtime_components(base_credentials);
627            let config_bag = create_test_config_bag(bucket_name);
628
629            // Create the identity provider
630            let provider = DefaultS3ExpressIdentityProvider::builder()
631                .behavior_version(crate::config::BehaviorVersion::latest())
632                .time_source(aws_smithy_async::time::SystemTimeSource::new())
633                .build();
634
635            // Call identity() and verify the S3ExpressBucket feature is present
636            let identity = provider
637                .identity(&runtime_components, &config_bag)
638                .await
639                .expect("identity() should succeed");
640
641            let credentials = identity.data::<Credentials>().expect("Identity should contain Credentials");
642            let features = credentials
643                .get_property::<Vec<AwsCredentialFeature>>()
644                .expect("Credentials should have features");
645            assert!(
646                features.contains(&AwsCredentialFeature::S3ExpressBucket),
647                "S3ExpressBucket feature should be present in Credentials' property field"
648            );
649
650            let identity_layer = identity
651                .property::<aws_smithy_types::config_bag::FrozenLayer>()
652                .expect("Identity should have a property layer");
653            let identity_features: Vec<AwsCredentialFeature> = identity_layer.load::<AwsCredentialFeature>().cloned().collect();
654            assert!(
655                identity_features.contains(&AwsCredentialFeature::S3ExpressBucket),
656                "S3ExpressBucket feature should be present in Identity's property field"
657            );
658        }
659    }
660}
661
662/// Supporting code for S3 Express runtime plugin
663pub(crate) mod runtime_plugin {
664    use std::borrow::Cow;
665
666    use aws_runtime::auth::SigV4SessionTokenNameOverride;
667    use aws_sigv4::http_request::{SignatureLocation, SigningSettings};
668    use aws_smithy_runtime_api::{
669        box_error::BoxError,
670        client::{runtime_components::RuntimeComponentsBuilder, runtime_plugin::RuntimePlugin},
671    };
672    use aws_smithy_types::config_bag::{ConfigBag, FrozenLayer, Layer};
673    use aws_types::os_shim_internal::Env;
674
675    mod env {
676        pub(super) const S3_DISABLE_EXPRESS_SESSION_AUTH: &str = "AWS_S3_DISABLE_EXPRESS_SESSION_AUTH";
677    }
678
679    #[derive(Debug)]
680    pub(crate) struct S3ExpressRuntimePlugin {
681        config: FrozenLayer,
682        runtime_components_builder: RuntimeComponentsBuilder,
683    }
684
685    impl S3ExpressRuntimePlugin {
686        // `new` will be called as `additional_client_plugins` within `base_client_runtime_plugins`.
687        // This guarantees that `new` receives a fully constructed service config, with required
688        // runtime components registered with `RuntimeComponents`.
689        pub(crate) fn new(service_config: crate::config::Config) -> Self {
690            Self::new_with(service_config, Env::real())
691        }
692
693        fn new_with(service_config: crate::config::Config, env: Env) -> Self {
694            Self {
695                config: config(service_config.config.load::<crate::config::DisableS3ExpressSessionAuth>().cloned(), env),
696                runtime_components_builder: runtime_components_builder(service_config),
697            }
698        }
699    }
700
701    fn config(disable_s3_express_session_token: Option<crate::config::DisableS3ExpressSessionAuth>, env: Env) -> FrozenLayer {
702        let mut layer = Layer::new("S3ExpressRuntimePlugin");
703        if disable_s3_express_session_token.is_none() {
704            match env.get(env::S3_DISABLE_EXPRESS_SESSION_AUTH) {
705                Ok(value) if value.eq_ignore_ascii_case("true") || value.eq_ignore_ascii_case("false") => {
706                    let value = value.to_lowercase().parse::<bool>().expect("just checked to be a bool-valued string");
707                    layer.store_or_unset(Some(crate::config::DisableS3ExpressSessionAuth(value)));
708                }
709                Ok(value) => {
710                    tracing::warn!(
711                        "environment variable `{}` ignored since it only accepts either `true` or `false` (case-insensitive), but got `{}`.",
712                        env::S3_DISABLE_EXPRESS_SESSION_AUTH,
713                        value
714                    )
715                }
716                _ => {
717                    // TODO(aws-sdk-rust#1073): Transfer a value of
718                    //  `s3_disable_express_session_auth` from a profile file to `layer`
719                }
720            }
721        }
722
723        let session_token_name_override = SigV4SessionTokenNameOverride::new(|settings: &SigningSettings, cfg: &ConfigBag| {
724            // Not configured for S3 express, use the original session token name override
725            if !crate::s3_express::utils::for_s3_express(cfg) {
726                return Ok(settings.session_token_name_override);
727            }
728
729            let session_token_name_override = Some(match settings.signature_location {
730                SignatureLocation::Headers => "x-amz-s3session-token",
731                SignatureLocation::QueryParams => "X-Amz-S3session-Token",
732                _ => {
733                    return Err(BoxError::from(
734                        "`SignatureLocation` adds a new variant, which needs to be handled in a separate match arm",
735                    ))
736                }
737            });
738            Ok(session_token_name_override)
739        });
740        layer.store_or_unset(Some(session_token_name_override));
741
742        layer.freeze()
743    }
744
745    fn runtime_components_builder(service_config: crate::config::Config) -> RuntimeComponentsBuilder {
746        match (
747            service_config.runtime_components.identity_resolver(&super::auth::SCHEME_ID),
748            service_config.runtime_components.identity_resolver(&aws_runtime::auth::sigv4::SCHEME_ID),
749        ) {
750            (None, Some(_)) => RuntimeComponentsBuilder::new("S3ExpressRuntimePlugin").with_identity_resolver(
751                super::auth::SCHEME_ID,
752                super::identity_provider::DefaultS3ExpressIdentityProvider::builder()
753                    .time_source(
754                        service_config
755                            .runtime_components
756                            .time_source()
757                            .expect("should be set in `service_config`"),
758                    )
759                    .behavior_version(service_config.behavior_version.expect("should be set in `service_config`"))
760                    .build(),
761            ),
762            _ => RuntimeComponentsBuilder::new("S3ExpressRuntimePlugin"),
763        }
764    }
765
766    impl RuntimePlugin for S3ExpressRuntimePlugin {
767        fn config(&self) -> Option<FrozenLayer> {
768            Some(self.config.clone())
769        }
770
771        fn runtime_components(&self, _: &RuntimeComponentsBuilder) -> Cow<'_, RuntimeComponentsBuilder> {
772            Cow::Borrowed(&self.runtime_components_builder)
773        }
774    }
775
776    #[cfg(test)]
777    mod tests {
778        use super::*;
779        use aws_credential_types::Credentials;
780        use aws_smithy_runtime_api::client::identity::ResolveIdentity;
781
782        #[test]
783        fn disable_option_set_from_service_client_should_take_the_highest_precedence() {
784            // Disable option is set from service client.
785            let disable_s3_express_session_token = crate::config::DisableS3ExpressSessionAuth(true);
786
787            // An environment variable says the session auth is _not_ disabled,
788            // but it will be overruled by what is in `layer`.
789            let actual = config(
790                Some(disable_s3_express_session_token),
791                Env::from_slice(&[(super::env::S3_DISABLE_EXPRESS_SESSION_AUTH, "false")]),
792            );
793
794            // A config layer from this runtime plugin should not provide
795            // a new `DisableS3ExpressSessionAuth` if the disable option is set from service client.
796            assert!(actual.load::<crate::config::DisableS3ExpressSessionAuth>().is_none());
797        }
798
799        #[test]
800        fn disable_option_set_from_env_should_take_the_second_highest_precedence() {
801            // Disable option is set from environment variable.
802            let actual = config(None, Env::from_slice(&[(super::env::S3_DISABLE_EXPRESS_SESSION_AUTH, "true")]));
803
804            // The config layer should provide `DisableS3ExpressSessionAuth` from the environment variable.
805            assert!(actual.load::<crate::config::DisableS3ExpressSessionAuth>().unwrap().0);
806        }
807
808        #[should_panic]
809        #[test]
810        fn disable_option_set_from_profile_file_should_take_the_lowest_precedence() {
811            todo!("TODO(aws-sdk-rust#1073): Implement profile file test")
812        }
813
814        #[test]
815        fn disable_option_should_be_unspecified_if_unset() {
816            // Disable option is not set anywhere.
817            let actual = config(None, Env::from_slice(&[]));
818
819            // The config layer should not provide `DisableS3ExpressSessionAuth` when it's not configured.
820            assert!(actual.load::<crate::config::DisableS3ExpressSessionAuth>().is_none());
821        }
822
823        #[test]
824        fn s3_express_runtime_plugin_should_set_default_identity_resolver() {
825            // Config has SigV4 credentials provider, so S3 Express identity resolver should be set.
826            let config = crate::Config::builder()
827                .behavior_version_latest()
828                .time_source(aws_smithy_async::time::SystemTimeSource::new())
829                .credentials_provider(Credentials::for_tests())
830                .build();
831
832            let actual = runtime_components_builder(config);
833            // The runtime plugin should provide a default S3 Express identity resolver.
834            assert!(actual.identity_resolver(&crate::s3_express::auth::SCHEME_ID).is_some());
835        }
836
837        #[test]
838        fn s3_express_plugin_should_not_set_default_identity_resolver_without_sigv4_counterpart() {
839            // Config does not have SigV4 credentials provider.
840            let config = crate::Config::builder()
841                .behavior_version_latest()
842                .time_source(aws_smithy_async::time::SystemTimeSource::new())
843                .build();
844
845            let actual = runtime_components_builder(config);
846            // The runtime plugin should not provide S3 Express identity resolver without SigV4 credentials.
847            assert!(actual.identity_resolver(&crate::s3_express::auth::SCHEME_ID).is_none());
848        }
849
850        #[tokio::test]
851        async fn s3_express_plugin_should_not_set_default_identity_resolver_if_user_provided() {
852            // User provides a custom S3 Express credentials provider.
853            let expected_access_key_id = "expected acccess key ID";
854            let config = crate::Config::builder()
855                .behavior_version_latest()
856                .credentials_provider(Credentials::for_tests())
857                .express_credentials_provider(Credentials::new(
858                    expected_access_key_id,
859                    "secret",
860                    None,
861                    None,
862                    "test express credentials provider",
863                ))
864                .time_source(aws_smithy_async::time::SystemTimeSource::new())
865                .build();
866
867            // The runtime plugin should not override the user-provided identity resolver.
868            let runtime_components_builder = runtime_components_builder(config.clone());
869            assert!(runtime_components_builder
870                .identity_resolver(&crate::s3_express::auth::SCHEME_ID)
871                .is_none());
872
873            // The user-provided identity resolver should be used.
874            let express_identity_resolver = config.runtime_components.identity_resolver(&crate::s3_express::auth::SCHEME_ID).unwrap();
875            let creds = express_identity_resolver
876                .resolve_identity(&RuntimeComponentsBuilder::for_tests().build().unwrap(), &ConfigBag::base())
877                .await
878                .unwrap();
879
880            assert_eq!(expected_access_key_id, creds.data::<Credentials>().unwrap().access_key_id());
881        }
882    }
883}
884
885pub(crate) mod checksum {
886    use crate::http_request_checksum::DefaultRequestChecksumOverride;
887    use aws_smithy_checksums::ChecksumAlgorithm;
888    use aws_smithy_types::config_bag::ConfigBag;
889
890    pub(crate) fn provide_default_checksum_algorithm() -> crate::http_request_checksum::DefaultRequestChecksumOverride {
891        fn _provide_default_checksum_algorithm(original_checksum: Option<ChecksumAlgorithm>, cfg: &ConfigBag) -> Option<ChecksumAlgorithm> {
892            // S3 does not have the `ChecksumAlgorithm::Md5`, therefore customers cannot set it
893            // from outside.
894            if original_checksum != Some(ChecksumAlgorithm::Md5) {
895                return original_checksum;
896            }
897
898            if crate::s3_express::utils::for_s3_express(cfg) {
899                // S3 Express requires setting the default checksum algorithm to CRC-32
900                Some(ChecksumAlgorithm::Crc32)
901            } else {
902                original_checksum
903            }
904        }
905        DefaultRequestChecksumOverride::new(_provide_default_checksum_algorithm)
906    }
907}
908
909pub(crate) mod utils {
910    use aws_smithy_types::{config_bag::ConfigBag, Document};
911
912    pub(crate) fn for_s3_express(cfg: &ConfigBag) -> bool {
913        // logic borrowed from aws_smithy_runtime::client::orchestrator::auth::extract_endpoint_auth_scheme_config
914        let endpoint = cfg
915            .load::<crate::config::endpoint::Endpoint>()
916            .expect("endpoint added to config bag by endpoint orchestrator");
917
918        let auth_schemes = match endpoint.properties().get("authSchemes") {
919            Some(Document::Array(schemes)) => schemes,
920            _ => return false,
921        };
922        auth_schemes.iter().any(|doc| {
923            let config_scheme_id = doc.as_object().and_then(|object| object.get("name")).and_then(Document::as_string);
924            config_scheme_id == Some(crate::s3_express::auth::SCHEME_ID.as_str())
925        })
926    }
927}