Skip to main content

deltalake_aws/
storage.rs

1//! AWS S3 storage backend.
2use std::collections::HashMap;
3use std::fmt::Debug;
4use std::ops::Range;
5use std::str::FromStr;
6use std::sync::Arc;
7use std::time::Duration;
8
9use aws_config::{Region, SdkConfig};
10use bytes::Bytes;
11use deltalake_core::logstore::object_store::aws::{AmazonS3Builder, AmazonS3ConfigKey};
12use deltalake_core::logstore::object_store::{
13    GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, ObjectStoreScheme,
14    PutMultipartOptions, PutOptions, PutPayload, PutResult, Result as ObjectStoreResult,
15};
16use deltalake_core::logstore::{
17    ObjectStoreFactory, ObjectStoreRef, StorageConfig, config::str_is_truthy,
18};
19use deltalake_core::{DeltaResult, DeltaTableError, ObjectStoreError, Path};
20use futures::Future;
21use futures::stream::BoxStream;
22use object_store::aws::AmazonS3;
23use object_store::client::SpawnedReqwestConnector;
24use tracing::log::*;
25use typed_builder::TypedBuilder;
26use url::Url;
27
28use crate::constants::{
29    self, DEFAULT_S3_GET_INTERNAL_SERVER_ERROR_RETRIES, DEFAULT_S3_POOL_IDLE_TIMEOUT_SECONDS,
30    DEFAULT_STS_POOL_IDLE_TIMEOUT_SECONDS,
31};
32use crate::credentials::AWSForObjectStore;
33use crate::errors::DynamoDbConfigError;
34
35const STORE_NAME: &str = "DeltaS3ObjectStore";
36
37#[derive(Clone, Default, Debug)]
38pub struct S3ObjectStoreFactory {}
39
40impl S3StorageOptionsConversion for S3ObjectStoreFactory {}
41
42impl ObjectStoreFactory for S3ObjectStoreFactory {
43    fn parse_url_opts(
44        &self,
45        url: &Url,
46        config: &StorageConfig,
47    ) -> DeltaResult<(ObjectStoreRef, Path)> {
48        let options = self.with_env_s3(&config.raw);
49
50        // All S3-likes should start their builder the same way
51        let mut builder = AmazonS3Builder::new()
52            .with_url(url.to_string())
53            .with_retry(config.retry.clone());
54
55        if let Some(runtime) = &config.runtime {
56            builder =
57                builder.with_http_connector(SpawnedReqwestConnector::new(runtime.get_handle()));
58        }
59
60        for (key, value) in options.iter() {
61            if let Ok(key) = AmazonS3ConfigKey::from_str(&key.to_ascii_lowercase()) {
62                builder = builder.with_config(key, value.clone());
63            }
64        }
65
66        let s3_options = S3StorageOptions::from_map(&options)?;
67        if let Some(ref sdk_config) = s3_options.sdk_config {
68            builder =
69                builder.with_credentials(Arc::new(AWSForObjectStore::new(sdk_config.clone())));
70        }
71
72        let (_, path) =
73            ObjectStoreScheme::parse(url).map_err(|e| DeltaTableError::GenericError {
74                source: Box::new(e),
75            })?;
76        let prefix = Path::parse(path)?;
77
78        let store = aws_storage_handler(builder.build()?, &s3_options)?;
79        debug!("Initialized the object store: {store:?}");
80
81        Ok((store, prefix))
82    }
83}
84
85fn aws_storage_handler(
86    store: AmazonS3,
87    s3_options: &S3StorageOptions,
88) -> DeltaResult<ObjectStoreRef> {
89    // Nearly all S3 Object stores support conditional put, so we change the default to always returning an S3 Object store
90    // unless explicitly passing a locking provider key or allow_unsafe_rename. Then we will pass it to the old S3StorageBackend.
91    if s3_options.locking_provider.as_deref() == Some("dynamodb") || s3_options.allow_unsafe_rename
92    {
93        let store = S3StorageBackend::try_new(
94            Arc::new(store),
95            Some("dynamodb") == s3_options.locking_provider.as_deref()
96                || s3_options.allow_unsafe_rename,
97        )?;
98        Ok(Arc::new(store))
99    } else {
100        Ok(Arc::new(store))
101    }
102}
103
104// Determine whether this crate is being configured for use with native AWS S3 or an S3-alike
105//
106// This function will return true in the default case since it's most likely that the absence of
107// options will mean default/S3 configuration
108fn is_aws(options: &HashMap<String, String>) -> bool {
109    // Checks storage option first then env var for existence of aws force credential load
110    // .from_s3_env never inserts these into the options because they are delta-rs specific
111    if str_option(options, constants::AWS_FORCE_CREDENTIAL_LOAD).is_some() {
112        return true;
113    }
114
115    // Checks storage option first then env var for existence of locking provider
116    // .from_s3_env never inserts these into the options because they are delta-rs specific
117    if str_option(options, constants::AWS_S3_LOCKING_PROVIDER).is_some() {
118        return true;
119    }
120
121    // Options at this stage should only contain 'aws_endpoint' in lowercase
122    // due to with_env_s3
123    !(options.contains_key("aws_endpoint") || options.contains_key(constants::AWS_ENDPOINT_URL))
124}
125
126/// Options used to configure the [S3StorageBackend].
127///
128/// Available options are described in [constants].
129#[derive(Clone, Debug, TypedBuilder)]
130#[builder(doc)]
131pub struct S3StorageOptions {
132    /// Whether to use virtual hosted-style requests
133    #[builder(default = false)]
134    pub virtual_hosted_style_request: bool,
135    /// Locking provider to use (e.g., "dynamodb")
136    #[builder(default, setter(strip_option, into))]
137    pub locking_provider: Option<String>,
138    /// Override endpoint for DynamoDB
139    #[builder(default, setter(strip_option, into))]
140    pub dynamodb_endpoint: Option<String>,
141    /// Override region for DynamoDB
142    #[builder(default, setter(strip_option, into))]
143    pub dynamodb_region: Option<String>,
144    /// Override access key ID for DynamoDB
145    #[builder(default, setter(strip_option, into))]
146    pub dynamodb_access_key_id: Option<String>,
147    /// Override secret access key for DynamoDB
148    #[builder(default, setter(strip_option, into))]
149    pub dynamodb_secret_access_key: Option<String>,
150    /// Override session token for DynamoDB
151    #[builder(default, setter(strip_option, into))]
152    pub dynamodb_session_token: Option<String>,
153    /// Idle timeout for S3 connection pool
154    #[builder(default = Duration::from_secs(DEFAULT_S3_POOL_IDLE_TIMEOUT_SECONDS))]
155    pub s3_pool_idle_timeout: Duration,
156    /// Idle timeout for STS connection pool
157    #[builder(default = Duration::from_secs(DEFAULT_STS_POOL_IDLE_TIMEOUT_SECONDS))]
158    pub sts_pool_idle_timeout: Duration,
159    /// Number of retries for S3 internal server errors
160    #[builder(default = DEFAULT_S3_GET_INTERNAL_SERVER_ERROR_RETRIES)]
161    pub s3_get_internal_server_error_retries: usize,
162    /// Allow unsafe rename operations
163    #[builder(default = false)]
164    pub allow_unsafe_rename: bool,
165    /// Extra storage options not handled by other fields
166    #[builder(default)]
167    pub extra_opts: HashMap<String, String>,
168    /// AWS SDK configuration
169    #[builder(default, setter(strip_option))]
170    pub sdk_config: Option<SdkConfig>,
171}
172
173impl Eq for S3StorageOptions {}
174impl PartialEq for S3StorageOptions {
175    fn eq(&self, other: &Self) -> bool {
176        self.virtual_hosted_style_request == other.virtual_hosted_style_request
177            && self.locking_provider == other.locking_provider
178            && self.dynamodb_endpoint == other.dynamodb_endpoint
179            && self.dynamodb_region == other.dynamodb_region
180            && self.dynamodb_access_key_id == other.dynamodb_access_key_id
181            && self.dynamodb_secret_access_key == other.dynamodb_secret_access_key
182            && self.dynamodb_session_token == other.dynamodb_session_token
183            && self.s3_pool_idle_timeout == other.s3_pool_idle_timeout
184            && self.sts_pool_idle_timeout == other.sts_pool_idle_timeout
185            && self.s3_get_internal_server_error_retries
186                == other.s3_get_internal_server_error_retries
187            && self.allow_unsafe_rename == other.allow_unsafe_rename
188            && self.extra_opts == other.extra_opts
189    }
190}
191
192impl S3StorageOptions {
193    /// Creates an instance of [`S3StorageOptions`] from the given HashMap.
194    pub fn from_map(options: &HashMap<String, String>) -> DeltaResult<S3StorageOptions> {
195        let extra_opts: HashMap<String, String> = options
196            .iter()
197            .filter(|(k, _)| !constants::S3_OPTS.contains(&k.as_str()))
198            .map(|(k, v)| (k.to_owned(), v.to_owned()))
199            .collect();
200        // Copy web identity values provided in options but not the environment into the environment
201        // to get picked up by the `from_k8s_env` call in `get_web_identity_provider`.
202        Self::ensure_env_var(options, constants::AWS_REGION);
203        Self::ensure_env_var(options, constants::AWS_PROFILE);
204        Self::ensure_env_var(options, constants::AWS_ACCESS_KEY_ID);
205        Self::ensure_env_var(options, constants::AWS_SECRET_ACCESS_KEY);
206        Self::ensure_env_var(options, constants::AWS_SESSION_TOKEN);
207        Self::ensure_env_var(options, constants::AWS_WEB_IDENTITY_TOKEN_FILE);
208        Self::ensure_env_var(options, constants::AWS_ROLE_ARN);
209        Self::ensure_env_var(options, constants::AWS_ROLE_SESSION_NAME);
210        let s3_pool_idle_timeout = Self::u64_or_default(
211            options,
212            constants::AWS_S3_POOL_IDLE_TIMEOUT_SECONDS,
213            DEFAULT_S3_POOL_IDLE_TIMEOUT_SECONDS,
214        );
215        let sts_pool_idle_timeout = Self::u64_or_default(
216            options,
217            constants::AWS_STS_POOL_IDLE_TIMEOUT_SECONDS,
218            DEFAULT_STS_POOL_IDLE_TIMEOUT_SECONDS,
219        );
220
221        let s3_get_internal_server_error_retries = Self::u64_or_default(
222            options,
223            constants::AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES,
224            DEFAULT_S3_GET_INTERNAL_SERVER_ERROR_RETRIES as u64,
225        ) as usize;
226
227        let virtual_hosted_style_request: bool =
228            str_option(options, constants::AWS_S3_ADDRESSING_STYLE)
229                .map(|addressing_style| addressing_style == "virtual")
230                .unwrap_or(false);
231
232        let allow_unsafe_rename = str_option(options, constants::AWS_S3_ALLOW_UNSAFE_RENAME)
233            .map(|val| str_is_truthy(&val))
234            .unwrap_or(false);
235
236        let sdk_config = match is_aws(options) {
237            false => None,
238            true => {
239                debug!("Detected AWS S3 Storage options, resolving AWS credentials");
240                Some(execute_sdk_future(
241                    crate::credentials::resolve_credentials(options),
242                )??)
243            }
244        };
245
246        Ok(Self {
247            virtual_hosted_style_request,
248            locking_provider: str_option(options, constants::AWS_S3_LOCKING_PROVIDER),
249            dynamodb_endpoint: str_option(options, constants::AWS_ENDPOINT_URL_DYNAMODB),
250            dynamodb_region: str_option(options, constants::AWS_REGION_DYNAMODB),
251            dynamodb_access_key_id: str_option(options, constants::AWS_ACCESS_KEY_ID_DYNAMODB),
252            dynamodb_secret_access_key: str_option(
253                options,
254                constants::AWS_SECRET_ACCESS_KEY_DYNAMODB,
255            ),
256            dynamodb_session_token: str_option(options, constants::AWS_SESSION_TOKEN_DYNAMODB),
257            s3_pool_idle_timeout: Duration::from_secs(s3_pool_idle_timeout),
258            sts_pool_idle_timeout: Duration::from_secs(sts_pool_idle_timeout),
259            s3_get_internal_server_error_retries,
260            allow_unsafe_rename,
261            extra_opts,
262            sdk_config,
263        })
264    }
265
266    /// Return the configured endpoint URL for S3 operations
267    pub fn endpoint_url(&self) -> Option<&str> {
268        self.sdk_config.as_ref().and_then(|v| v.endpoint_url())
269    }
270
271    /// Return the configured region used for S3 operations
272    pub fn region(&self) -> Option<&Region> {
273        self.sdk_config.as_ref().and_then(|v| v.region())
274    }
275
276    fn u64_or_default(map: &HashMap<String, String>, key: &str, default: u64) -> u64 {
277        str_option(map, key)
278            .and_then(|v| v.parse().ok())
279            .unwrap_or(default)
280    }
281
282    fn ensure_env_var(map: &HashMap<String, String>, key: &str) {
283        if let Some(val) = str_option(map, key) {
284            unsafe {
285                std::env::set_var(key, val);
286            }
287        }
288    }
289
290    pub fn try_default() -> DeltaResult<Self> {
291        Self::from_map(&HashMap::new())
292    }
293}
294
295fn execute_sdk_future<F, T>(future: F) -> DeltaResult<T>
296where
297    T: Send,
298    F: Future<Output = T> + Send,
299{
300    match tokio::runtime::Handle::try_current() {
301        Ok(handle) => match handle.runtime_flavor() {
302            tokio::runtime::RuntimeFlavor::MultiThread => {
303                Ok(tokio::task::block_in_place(move || handle.block_on(future)))
304            }
305            _ => {
306                let mut cfg: Option<T> = None;
307                std::thread::scope(|scope| {
308                    scope.spawn(|| {
309                        cfg = Some(handle.block_on(future));
310                    });
311                });
312                cfg.ok_or(DeltaTableError::ObjectStore {
313                    source: ObjectStoreError::Generic {
314                        store: STORE_NAME,
315                        source: Box::new(DynamoDbConfigError::InitializationError),
316                    },
317                })
318            }
319        },
320        Err(_) => {
321            let runtime = tokio::runtime::Builder::new_current_thread()
322                .enable_all()
323                .build()
324                .expect("a tokio runtime is required by the AWS sdk");
325            Ok(runtime.block_on(future))
326        }
327    }
328}
329
330/// An S3 implementation of the [ObjectStore] trait
331pub struct S3StorageBackend {
332    inner: ObjectStoreRef,
333    /// Whether allowed to performance rename_if_not_exist as rename
334    allow_unsafe_rename: bool,
335}
336
337impl std::fmt::Display for S3StorageBackend {
338    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
339        write!(
340            f,
341            "S3StorageBackend {{ allow_unsafe_rename: {}, inner: {} }}",
342            self.allow_unsafe_rename, self.inner
343        )
344    }
345}
346
347impl S3StorageBackend {
348    /// Creates a new S3StorageBackend.
349    ///
350    /// Options are described in [constants].
351    pub fn try_new(storage: ObjectStoreRef, allow_unsafe_rename: bool) -> ObjectStoreResult<Self> {
352        Ok(Self {
353            inner: storage,
354            allow_unsafe_rename,
355        })
356    }
357}
358
359impl Debug for S3StorageBackend {
360    fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
361        write!(
362            fmt,
363            "S3StorageBackend {{ allow_unsafe_rename: {}, inner: {:?} }}",
364            self.allow_unsafe_rename, self.inner
365        )
366    }
367}
368
369#[async_trait::async_trait]
370impl ObjectStore for S3StorageBackend {
371    async fn put(&self, location: &Path, bytes: PutPayload) -> ObjectStoreResult<PutResult> {
372        self.inner.put(location, bytes).await
373    }
374
375    async fn put_opts(
376        &self,
377        location: &Path,
378        bytes: PutPayload,
379        options: PutOptions,
380    ) -> ObjectStoreResult<PutResult> {
381        self.inner.put_opts(location, bytes, options).await
382    }
383
384    async fn put_multipart(&self, location: &Path) -> ObjectStoreResult<Box<dyn MultipartUpload>> {
385        self.inner.put_multipart(location).await
386    }
387
388    async fn put_multipart_opts(
389        &self,
390        location: &Path,
391        options: PutMultipartOptions,
392    ) -> ObjectStoreResult<Box<dyn MultipartUpload>> {
393        self.inner.put_multipart_opts(location, options).await
394    }
395
396    async fn get(&self, location: &Path) -> ObjectStoreResult<GetResult> {
397        self.inner.get(location).await
398    }
399
400    async fn get_opts(&self, location: &Path, options: GetOptions) -> ObjectStoreResult<GetResult> {
401        self.inner.get_opts(location, options).await
402    }
403
404    async fn get_range(&self, location: &Path, range: Range<u64>) -> ObjectStoreResult<Bytes> {
405        self.inner.get_range(location, range).await
406    }
407
408    async fn head(&self, location: &Path) -> ObjectStoreResult<ObjectMeta> {
409        self.inner.head(location).await
410    }
411
412    async fn delete(&self, location: &Path) -> ObjectStoreResult<()> {
413        self.inner.delete(location).await
414    }
415
416    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, ObjectStoreResult<ObjectMeta>> {
417        self.inner.list(prefix)
418    }
419
420    fn list_with_offset(
421        &self,
422        prefix: Option<&Path>,
423        offset: &Path,
424    ) -> BoxStream<'static, ObjectStoreResult<ObjectMeta>> {
425        self.inner.list_with_offset(prefix, offset)
426    }
427
428    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> ObjectStoreResult<ListResult> {
429        self.inner.list_with_delimiter(prefix).await
430    }
431
432    async fn copy(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
433        self.inner.copy(from, to).await
434    }
435
436    async fn copy_if_not_exists(&self, _from: &Path, _to: &Path) -> ObjectStoreResult<()> {
437        todo!()
438    }
439
440    async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
441        if self.allow_unsafe_rename {
442            self.inner.rename(from, to).await
443        } else {
444            Err(ObjectStoreError::Generic {
445                store: STORE_NAME,
446                source: Box::new(crate::errors::LockClientError::LockClientRequired),
447            })
448        }
449    }
450}
451
452/// Storage option keys to use when creating [`S3StorageOptions`].
453///
454/// The same key should be used whether passing a key in the hashmap or setting it as an environment variable.
455/// Provided keys may include configuration for the S3 backend and also the optional DynamoDb lock used for atomic rename.
456#[deprecated(
457    since = "0.20.0",
458    note = "s3_constants has moved up to deltalake_aws::constants::*"
459)]
460pub mod s3_constants {
461    pub use crate::constants::*;
462}
463
464pub(crate) fn str_option(map: &HashMap<String, String>, key: &str) -> Option<String> {
465    if let Some(s) = map.get(key) {
466        return Some(s.to_owned());
467    }
468
469    if let Some(s) = map.get(&key.to_ascii_lowercase()) {
470        return Some(s.to_owned());
471    }
472
473    std::env::var(key).ok()
474}
475
476pub(crate) trait S3StorageOptionsConversion {
477    fn with_env_s3(&self, options: &HashMap<String, String>) -> HashMap<String, String> {
478        let mut options: HashMap<String, String> = options
479            .clone()
480            .into_iter()
481            .map(|(k, v)| {
482                if let Ok(config_key) = AmazonS3ConfigKey::from_str(&k.to_ascii_lowercase()) {
483                    (config_key.as_ref().to_string(), v)
484                } else {
485                    (k, v)
486                }
487            })
488            .collect();
489
490        for (os_key, os_value) in std::env::vars_os() {
491            if let (Some(key), Some(value)) = (os_key.to_str(), os_value.to_str())
492                && let Ok(config_key) = AmazonS3ConfigKey::from_str(&key.to_ascii_lowercase())
493            {
494                options
495                    .entry(config_key.as_ref().to_string())
496                    .or_insert(value.to_string());
497            }
498        }
499
500        // All S3-like Object Stores use conditional put, object-store crate however still requires you to explicitly
501        // set this behaviour. We will however assume, when a locking provider/copy-if-not-exists keys are not provided
502        // that PutIfAbsent is supported.
503        // With conditional put in S3-like API we can use the deltalake default logstore which use PutIfAbsent
504        if !options.keys().any(|key| {
505            let key = key.to_ascii_lowercase();
506            [
507                AmazonS3ConfigKey::ConditionalPut.as_ref(),
508                "conditional_put",
509            ]
510            .contains(&key.as_str())
511        }) {
512            options.insert("conditional_put".into(), "etag".into());
513        }
514        options
515    }
516}
517
518#[cfg(test)]
519mod tests {
520    use super::*;
521
522    use crate::constants;
523    use serial_test::serial;
524
525    struct ScopedEnv {
526        vars: HashMap<std::ffi::OsString, std::ffi::OsString>,
527    }
528
529    impl ScopedEnv {
530        pub fn new() -> Self {
531            let vars = std::env::vars_os().collect();
532            Self { vars }
533        }
534
535        pub fn run<T>(mut f: impl FnMut() -> T) -> T {
536            let _env_scope = Self::new();
537            f()
538        }
539    }
540
541    impl Drop for ScopedEnv {
542        fn drop(&mut self) {
543            let to_remove: Vec<_> = std::env::vars_os()
544                .map(|kv| kv.0)
545                .filter(|k| !self.vars.contains_key(k))
546                .collect();
547            for k in to_remove {
548                unsafe {
549                    std::env::remove_var(k);
550                }
551            }
552            for (key, value) in self.vars.drain() {
553                unsafe {
554                    std::env::set_var(key, value);
555                }
556            }
557        }
558    }
559
560    fn clear_env_of_aws_keys() {
561        let keys_to_clear = std::env::vars().filter_map(|(k, _v)| {
562            if AmazonS3ConfigKey::from_str(&k.to_ascii_lowercase()).is_ok() {
563                Some(k)
564            } else {
565                None
566            }
567        });
568
569        for k in keys_to_clear {
570            unsafe {
571                std::env::remove_var(k);
572            }
573        }
574    }
575
576    #[test]
577    #[serial]
578    fn storage_options_default_test() {
579        ScopedEnv::run(|| {
580            clear_env_of_aws_keys();
581
582            unsafe {
583                std::env::set_var(constants::AWS_ENDPOINT_URL, "http://localhost");
584                std::env::set_var(constants::AWS_REGION, "us-west-1");
585                std::env::set_var(constants::AWS_PROFILE, "default");
586                std::env::set_var(constants::AWS_ACCESS_KEY_ID, "default_key_id");
587                std::env::set_var(constants::AWS_SECRET_ACCESS_KEY, "default_secret_key");
588                std::env::set_var(constants::AWS_S3_LOCKING_PROVIDER, "dynamodb");
589                std::env::set_var(
590                    constants::AWS_IAM_ROLE_ARN,
591                    "arn:aws:iam::123456789012:role/some_role",
592                );
593                std::env::set_var(constants::AWS_IAM_ROLE_SESSION_NAME, "session_name");
594                std::env::set_var(
595                    #[allow(deprecated)]
596                    constants::AWS_S3_ASSUME_ROLE_ARN,
597                    "arn:aws:iam::123456789012:role/some_role",
598                );
599                std::env::set_var(
600                    #[allow(deprecated)]
601                    constants::AWS_S3_ROLE_SESSION_NAME,
602                    "session_name",
603                );
604                std::env::set_var(constants::AWS_WEB_IDENTITY_TOKEN_FILE, "token_file");
605            }
606
607            let options = S3StorageOptions::try_default().unwrap();
608            assert_eq!(
609                S3StorageOptions::builder()
610                    .sdk_config(
611                        SdkConfig::builder()
612                            .endpoint_url("http://localhost".to_string())
613                            .region(Region::from_static("us-west-1"))
614                            .build()
615                    )
616                    .locking_provider("dynamodb")
617                    .build(),
618                options
619            );
620        });
621    }
622
623    #[test]
624    #[serial]
625    fn storage_options_with_only_region_and_credentials() {
626        ScopedEnv::run(|| {
627            clear_env_of_aws_keys();
628            unsafe {
629                std::env::remove_var(constants::AWS_ENDPOINT_URL);
630            }
631
632            let options = S3StorageOptions::from_map(&HashMap::from([
633                (constants::AWS_REGION.to_string(), "eu-west-1".to_string()),
634                (constants::AWS_ACCESS_KEY_ID.to_string(), "test".to_string()),
635                (
636                    constants::AWS_SECRET_ACCESS_KEY.to_string(),
637                    "test_secret".to_string(),
638                ),
639            ]))
640            .unwrap();
641
642            let mut expected = S3StorageOptions::try_default().unwrap();
643            expected.sdk_config = Some(
644                SdkConfig::builder()
645                    .region(Region::from_static("eu-west-1"))
646                    .build(),
647            );
648            assert_eq!(expected, options);
649        });
650    }
651
652    #[test]
653    #[serial]
654    fn storage_options_from_map_test() {
655        ScopedEnv::run(|| {
656            clear_env_of_aws_keys();
657            let options = S3StorageOptions::from_map(&HashMap::from([
658                (
659                    constants::AWS_ENDPOINT_URL.to_string(),
660                    "http://localhost:1234".to_string(),
661                ),
662                (constants::AWS_REGION.to_string(), "us-west-2".to_string()),
663                (constants::AWS_PROFILE.to_string(), "default".to_string()),
664                (
665                    constants::AWS_S3_ADDRESSING_STYLE.to_string(),
666                    "virtual".to_string(),
667                ),
668                (
669                    constants::AWS_S3_LOCKING_PROVIDER.to_string(),
670                    "another_locking_provider".to_string(),
671                ),
672                (
673                    constants::AWS_IAM_ROLE_ARN.to_string(),
674                    "arn:aws:iam::123456789012:role/another_role".to_string(),
675                ),
676                (
677                    constants::AWS_IAM_ROLE_SESSION_NAME.to_string(),
678                    "another_session_name".to_string(),
679                ),
680                (
681                    constants::AWS_WEB_IDENTITY_TOKEN_FILE.to_string(),
682                    "another_token_file".to_string(),
683                ),
684                (
685                    constants::AWS_S3_POOL_IDLE_TIMEOUT_SECONDS.to_string(),
686                    "1".to_string(),
687                ),
688                (
689                    constants::AWS_STS_POOL_IDLE_TIMEOUT_SECONDS.to_string(),
690                    "2".to_string(),
691                ),
692                (
693                    constants::AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES.to_string(),
694                    "3".to_string(),
695                ),
696                (
697                    constants::AWS_ACCESS_KEY_ID.to_string(),
698                    "test_id".to_string(),
699                ),
700                (
701                    constants::AWS_SECRET_ACCESS_KEY.to_string(),
702                    "test_secret".to_string(),
703                ),
704            ]))
705            .unwrap();
706
707            assert_eq!(
708                Some("another_locking_provider"),
709                options.locking_provider.as_deref()
710            );
711            assert_eq!(Duration::from_secs(1), options.s3_pool_idle_timeout);
712            assert_eq!(Duration::from_secs(2), options.sts_pool_idle_timeout);
713            assert_eq!(3, options.s3_get_internal_server_error_retries);
714            assert!(options.virtual_hosted_style_request);
715            assert!(!options.allow_unsafe_rename);
716            assert_eq!(
717                HashMap::from([(
718                    constants::AWS_S3_ADDRESSING_STYLE.to_string(),
719                    "virtual".to_string()
720                ),]),
721                options.extra_opts
722            );
723        });
724    }
725
726    #[test]
727    #[serial]
728    fn storage_options_from_map_with_dynamodb_endpoint_test() {
729        ScopedEnv::run(|| {
730            clear_env_of_aws_keys();
731            let options = S3StorageOptions::from_map(&HashMap::from([
732                (
733                    constants::AWS_ENDPOINT_URL.to_string(),
734                    "http://localhost:1234".to_string(),
735                ),
736                (
737                    constants::AWS_ENDPOINT_URL_DYNAMODB.to_string(),
738                    "http://localhost:2345".to_string(),
739                ),
740                (constants::AWS_REGION.to_string(), "us-west-2".to_string()),
741                (constants::AWS_PROFILE.to_string(), "default".to_string()),
742                (
743                    constants::AWS_S3_ADDRESSING_STYLE.to_string(),
744                    "virtual".to_string(),
745                ),
746                (
747                    constants::AWS_S3_LOCKING_PROVIDER.to_string(),
748                    "another_locking_provider".to_string(),
749                ),
750                (
751                    constants::AWS_IAM_ROLE_ARN.to_string(),
752                    "arn:aws:iam::123456789012:role/another_role".to_string(),
753                ),
754                (
755                    constants::AWS_IAM_ROLE_SESSION_NAME.to_string(),
756                    "another_session_name".to_string(),
757                ),
758                (
759                    constants::AWS_WEB_IDENTITY_TOKEN_FILE.to_string(),
760                    "another_token_file".to_string(),
761                ),
762                (
763                    constants::AWS_S3_POOL_IDLE_TIMEOUT_SECONDS.to_string(),
764                    "1".to_string(),
765                ),
766                (
767                    constants::AWS_STS_POOL_IDLE_TIMEOUT_SECONDS.to_string(),
768                    "2".to_string(),
769                ),
770                (
771                    constants::AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES.to_string(),
772                    "3".to_string(),
773                ),
774                (
775                    constants::AWS_ACCESS_KEY_ID.to_string(),
776                    "test_id".to_string(),
777                ),
778                (
779                    constants::AWS_SECRET_ACCESS_KEY.to_string(),
780                    "test_secret".to_string(),
781                ),
782            ]))
783            .unwrap();
784
785            assert_eq!(
786                Some("http://localhost:2345"),
787                options.dynamodb_endpoint.as_deref()
788            );
789        });
790    }
791
792    #[test]
793    #[serial]
794    fn storage_options_mixed_test() {
795        ScopedEnv::run(|| {
796            clear_env_of_aws_keys();
797            unsafe {
798                std::env::set_var(constants::AWS_ENDPOINT_URL, "http://localhost");
799                std::env::set_var(
800                    constants::AWS_ENDPOINT_URL_DYNAMODB,
801                    "http://localhost:dynamodb",
802                );
803                std::env::set_var(constants::AWS_REGION, "us-west-1");
804                std::env::set_var(constants::AWS_PROFILE, "default");
805                std::env::set_var(constants::AWS_ACCESS_KEY_ID, "wrong_key_id");
806                std::env::set_var(constants::AWS_SECRET_ACCESS_KEY, "wrong_secret_key");
807                std::env::set_var(constants::AWS_S3_LOCKING_PROVIDER, "dynamodb");
808                std::env::set_var(
809                    constants::AWS_IAM_ROLE_ARN,
810                    "arn:aws:iam::123456789012:role/some_role",
811                );
812                std::env::set_var(constants::AWS_IAM_ROLE_SESSION_NAME, "session_name");
813                std::env::set_var(constants::AWS_WEB_IDENTITY_TOKEN_FILE, "token_file");
814
815                std::env::set_var(constants::AWS_S3_POOL_IDLE_TIMEOUT_SECONDS, "1");
816                std::env::set_var(constants::AWS_STS_POOL_IDLE_TIMEOUT_SECONDS, "2");
817                std::env::set_var(constants::AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES, "3");
818            }
819            let options = S3StorageOptions::from_map(&HashMap::from([
820                (
821                    constants::AWS_ACCESS_KEY_ID.to_string(),
822                    "test_id_mixed".to_string(),
823                ),
824                (
825                    constants::AWS_SECRET_ACCESS_KEY.to_string(),
826                    "test_secret_mixed".to_string(),
827                ),
828                (constants::AWS_REGION.to_string(), "us-west-2".to_string()),
829                (
830                    "AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES".to_string(),
831                    "3".to_string(),
832                ),
833            ]))
834            .unwrap();
835
836            assert_eq!(
837                S3StorageOptions::builder()
838                    .sdk_config(
839                        SdkConfig::builder()
840                            .endpoint_url("http://localhost".to_string())
841                            .region(Region::from_static("us-west-2"))
842                            .build()
843                    )
844                    .locking_provider("dynamodb")
845                    .dynamodb_endpoint("http://localhost:dynamodb")
846                    .s3_pool_idle_timeout(Duration::from_secs(1))
847                    .sts_pool_idle_timeout(Duration::from_secs(2))
848                    .s3_get_internal_server_error_retries(3)
849                    .build(),
850                options
851            );
852        });
853    }
854
855    #[test]
856    #[serial]
857    fn storage_options_web_identity_test() {
858        ScopedEnv::run(|| {
859            clear_env_of_aws_keys();
860            let _options = S3StorageOptions::from_map(&HashMap::from([
861                (constants::AWS_REGION.to_string(), "eu-west-1".to_string()),
862                (
863                    constants::AWS_WEB_IDENTITY_TOKEN_FILE.to_string(),
864                    "web_identity_token_file".to_string(),
865                ),
866                (
867                    constants::AWS_ROLE_ARN.to_string(),
868                    "arn:aws:iam::123456789012:role/web_identity_role".to_string(),
869                ),
870                (
871                    constants::AWS_ROLE_SESSION_NAME.to_string(),
872                    "web_identity_session_name".to_string(),
873                ),
874            ]))
875            .unwrap();
876
877            assert_eq!("eu-west-1", std::env::var(constants::AWS_REGION).unwrap());
878
879            assert_eq!(
880                "web_identity_token_file",
881                std::env::var(constants::AWS_WEB_IDENTITY_TOKEN_FILE).unwrap()
882            );
883
884            assert_eq!(
885                "arn:aws:iam::123456789012:role/web_identity_role",
886                std::env::var(constants::AWS_ROLE_ARN).unwrap()
887            );
888
889            assert_eq!(
890                "web_identity_session_name",
891                std::env::var(constants::AWS_ROLE_SESSION_NAME).unwrap()
892            );
893        });
894    }
895
896    #[test]
897    #[serial]
898    fn when_merging_with_env_unsupplied_options_are_added() {
899        ScopedEnv::run(|| {
900            clear_env_of_aws_keys();
901            let raw_options = HashMap::new();
902            unsafe {
903                std::env::set_var(constants::AWS_ACCESS_KEY_ID, "env_key");
904                std::env::set_var(constants::AWS_ENDPOINT_URL, "env_key");
905                std::env::set_var(constants::AWS_SECRET_ACCESS_KEY, "env_key");
906                std::env::set_var(constants::AWS_REGION, "env_key");
907            }
908            let combined_options = S3ObjectStoreFactory {}.with_env_s3(&raw_options);
909
910            // Four and then the conditional_put built-in
911            assert_eq!(combined_options.len(), 5);
912
913            for (key, v) in combined_options {
914                if key != "conditional_put" {
915                    assert_eq!(v, "env_key");
916                }
917            }
918        });
919    }
920
921    #[tokio::test]
922    #[serial]
923    async fn when_merging_with_env_supplied_options_take_precedence() {
924        ScopedEnv::run(|| {
925            clear_env_of_aws_keys();
926            let raw_options = HashMap::from([
927                ("AWS_ACCESS_KEY_ID".to_string(), "options_key".to_string()),
928                ("AWS_ENDPOINT_URL".to_string(), "options_key".to_string()),
929                (
930                    "AWS_SECRET_ACCESS_KEY".to_string(),
931                    "options_key".to_string(),
932                ),
933                ("AWS_REGION".to_string(), "options_key".to_string()),
934            ]);
935            unsafe {
936                std::env::set_var("aws_access_key_id", "env_key");
937                std::env::set_var("aws_endpoint", "env_key");
938                std::env::set_var("aws_secret_access_key", "env_key");
939                std::env::set_var("aws_region", "env_key");
940            }
941
942            let combined_options = S3ObjectStoreFactory {}.with_env_s3(&raw_options);
943
944            for (key, v) in combined_options {
945                if key != "conditional_put" {
946                    assert_eq!(v, "options_key");
947                }
948            }
949        });
950    }
951
952    #[test]
953    #[serial]
954    fn test_is_aws() {
955        clear_env_of_aws_keys();
956        let options = HashMap::default();
957        assert!(is_aws(&options));
958
959        let minio: HashMap<String, String> = HashMap::from([(
960            constants::AWS_ENDPOINT_URL.to_string(),
961            "http://minio:8080".to_string(),
962        )]);
963        assert!(!is_aws(&minio));
964
965        let minio: HashMap<String, String> =
966            HashMap::from([("aws_endpoint".to_string(), "http://minio:8080".to_string())]);
967        assert!(!is_aws(&minio));
968
969        let localstack: HashMap<String, String> = HashMap::from([
970            (
971                constants::AWS_FORCE_CREDENTIAL_LOAD.to_string(),
972                "true".to_string(),
973            ),
974            ("aws_endpoint".to_string(), "http://minio:8080".to_string()),
975        ]);
976        assert!(is_aws(&localstack));
977    }
978}