1use std::collections::HashMap;
3use std::fmt::Debug;
4use std::ops::Range;
5use std::str::FromStr;
6use std::sync::Arc;
7use std::time::Duration;
8
9use aws_config::{Region, SdkConfig};
10use bytes::Bytes;
11use deltalake_core::logstore::object_store::aws::{AmazonS3Builder, AmazonS3ConfigKey};
12use deltalake_core::logstore::object_store::{
13 GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, ObjectStoreScheme,
14 PutMultipartOptions, PutOptions, PutPayload, PutResult, Result as ObjectStoreResult,
15};
16use deltalake_core::logstore::{
17 ObjectStoreFactory, ObjectStoreRef, StorageConfig, config::str_is_truthy,
18};
19use deltalake_core::{DeltaResult, DeltaTableError, ObjectStoreError, Path};
20use futures::Future;
21use futures::stream::BoxStream;
22use object_store::aws::AmazonS3;
23use object_store::client::SpawnedReqwestConnector;
24use tracing::log::*;
25use typed_builder::TypedBuilder;
26use url::Url;
27
28use crate::constants::{
29 self, DEFAULT_S3_GET_INTERNAL_SERVER_ERROR_RETRIES, DEFAULT_S3_POOL_IDLE_TIMEOUT_SECONDS,
30 DEFAULT_STS_POOL_IDLE_TIMEOUT_SECONDS,
31};
32use crate::credentials::AWSForObjectStore;
33use crate::errors::DynamoDbConfigError;
34
35const STORE_NAME: &str = "DeltaS3ObjectStore";
36
37#[derive(Clone, Default, Debug)]
38pub struct S3ObjectStoreFactory {}
39
40impl S3StorageOptionsConversion for S3ObjectStoreFactory {}
41
42impl ObjectStoreFactory for S3ObjectStoreFactory {
43 fn parse_url_opts(
44 &self,
45 url: &Url,
46 config: &StorageConfig,
47 ) -> DeltaResult<(ObjectStoreRef, Path)> {
48 let options = self.with_env_s3(&config.raw);
49
50 let mut builder = AmazonS3Builder::new()
52 .with_url(url.to_string())
53 .with_retry(config.retry.clone());
54
55 if let Some(runtime) = &config.runtime {
56 builder =
57 builder.with_http_connector(SpawnedReqwestConnector::new(runtime.get_handle()));
58 }
59
60 for (key, value) in options.iter() {
61 if let Ok(key) = AmazonS3ConfigKey::from_str(&key.to_ascii_lowercase()) {
62 builder = builder.with_config(key, value.clone());
63 }
64 }
65
66 let s3_options = S3StorageOptions::from_map(&options)?;
67 if let Some(ref sdk_config) = s3_options.sdk_config {
68 builder =
69 builder.with_credentials(Arc::new(AWSForObjectStore::new(sdk_config.clone())));
70 }
71
72 let (_, path) =
73 ObjectStoreScheme::parse(url).map_err(|e| DeltaTableError::GenericError {
74 source: Box::new(e),
75 })?;
76 let prefix = Path::parse(path)?;
77
78 let store = aws_storage_handler(builder.build()?, &s3_options)?;
79 debug!("Initialized the object store: {store:?}");
80
81 Ok((store, prefix))
82 }
83}
84
85fn aws_storage_handler(
86 store: AmazonS3,
87 s3_options: &S3StorageOptions,
88) -> DeltaResult<ObjectStoreRef> {
89 if s3_options.locking_provider.as_deref() == Some("dynamodb") || s3_options.allow_unsafe_rename
92 {
93 let store = S3StorageBackend::try_new(
94 Arc::new(store),
95 Some("dynamodb") == s3_options.locking_provider.as_deref()
96 || s3_options.allow_unsafe_rename,
97 )?;
98 Ok(Arc::new(store))
99 } else {
100 Ok(Arc::new(store))
101 }
102}
103
104fn is_aws(options: &HashMap<String, String>) -> bool {
109 if str_option(options, constants::AWS_FORCE_CREDENTIAL_LOAD).is_some() {
112 return true;
113 }
114
115 if str_option(options, constants::AWS_S3_LOCKING_PROVIDER).is_some() {
118 return true;
119 }
120
121 !(options.contains_key("aws_endpoint") || options.contains_key(constants::AWS_ENDPOINT_URL))
124}
125
126#[derive(Clone, Debug, TypedBuilder)]
130#[builder(doc)]
131pub struct S3StorageOptions {
132 #[builder(default = false)]
134 pub virtual_hosted_style_request: bool,
135 #[builder(default, setter(strip_option, into))]
137 pub locking_provider: Option<String>,
138 #[builder(default, setter(strip_option, into))]
140 pub dynamodb_endpoint: Option<String>,
141 #[builder(default, setter(strip_option, into))]
143 pub dynamodb_region: Option<String>,
144 #[builder(default, setter(strip_option, into))]
146 pub dynamodb_access_key_id: Option<String>,
147 #[builder(default, setter(strip_option, into))]
149 pub dynamodb_secret_access_key: Option<String>,
150 #[builder(default, setter(strip_option, into))]
152 pub dynamodb_session_token: Option<String>,
153 #[builder(default = Duration::from_secs(DEFAULT_S3_POOL_IDLE_TIMEOUT_SECONDS))]
155 pub s3_pool_idle_timeout: Duration,
156 #[builder(default = Duration::from_secs(DEFAULT_STS_POOL_IDLE_TIMEOUT_SECONDS))]
158 pub sts_pool_idle_timeout: Duration,
159 #[builder(default = DEFAULT_S3_GET_INTERNAL_SERVER_ERROR_RETRIES)]
161 pub s3_get_internal_server_error_retries: usize,
162 #[builder(default = false)]
164 pub allow_unsafe_rename: bool,
165 #[builder(default)]
167 pub extra_opts: HashMap<String, String>,
168 #[builder(default, setter(strip_option))]
170 pub sdk_config: Option<SdkConfig>,
171}
172
173impl Eq for S3StorageOptions {}
174impl PartialEq for S3StorageOptions {
175 fn eq(&self, other: &Self) -> bool {
176 self.virtual_hosted_style_request == other.virtual_hosted_style_request
177 && self.locking_provider == other.locking_provider
178 && self.dynamodb_endpoint == other.dynamodb_endpoint
179 && self.dynamodb_region == other.dynamodb_region
180 && self.dynamodb_access_key_id == other.dynamodb_access_key_id
181 && self.dynamodb_secret_access_key == other.dynamodb_secret_access_key
182 && self.dynamodb_session_token == other.dynamodb_session_token
183 && self.s3_pool_idle_timeout == other.s3_pool_idle_timeout
184 && self.sts_pool_idle_timeout == other.sts_pool_idle_timeout
185 && self.s3_get_internal_server_error_retries
186 == other.s3_get_internal_server_error_retries
187 && self.allow_unsafe_rename == other.allow_unsafe_rename
188 && self.extra_opts == other.extra_opts
189 }
190}
191
192impl S3StorageOptions {
193 pub fn from_map(options: &HashMap<String, String>) -> DeltaResult<S3StorageOptions> {
195 let extra_opts: HashMap<String, String> = options
196 .iter()
197 .filter(|(k, _)| !constants::S3_OPTS.contains(&k.as_str()))
198 .map(|(k, v)| (k.to_owned(), v.to_owned()))
199 .collect();
200 Self::ensure_env_var(options, constants::AWS_REGION);
203 Self::ensure_env_var(options, constants::AWS_PROFILE);
204 Self::ensure_env_var(options, constants::AWS_ACCESS_KEY_ID);
205 Self::ensure_env_var(options, constants::AWS_SECRET_ACCESS_KEY);
206 Self::ensure_env_var(options, constants::AWS_SESSION_TOKEN);
207 Self::ensure_env_var(options, constants::AWS_WEB_IDENTITY_TOKEN_FILE);
208 Self::ensure_env_var(options, constants::AWS_ROLE_ARN);
209 Self::ensure_env_var(options, constants::AWS_ROLE_SESSION_NAME);
210 let s3_pool_idle_timeout = Self::u64_or_default(
211 options,
212 constants::AWS_S3_POOL_IDLE_TIMEOUT_SECONDS,
213 DEFAULT_S3_POOL_IDLE_TIMEOUT_SECONDS,
214 );
215 let sts_pool_idle_timeout = Self::u64_or_default(
216 options,
217 constants::AWS_STS_POOL_IDLE_TIMEOUT_SECONDS,
218 DEFAULT_STS_POOL_IDLE_TIMEOUT_SECONDS,
219 );
220
221 let s3_get_internal_server_error_retries = Self::u64_or_default(
222 options,
223 constants::AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES,
224 DEFAULT_S3_GET_INTERNAL_SERVER_ERROR_RETRIES as u64,
225 ) as usize;
226
227 let virtual_hosted_style_request: bool =
228 str_option(options, constants::AWS_S3_ADDRESSING_STYLE)
229 .map(|addressing_style| addressing_style == "virtual")
230 .unwrap_or(false);
231
232 let allow_unsafe_rename = str_option(options, constants::AWS_S3_ALLOW_UNSAFE_RENAME)
233 .map(|val| str_is_truthy(&val))
234 .unwrap_or(false);
235
236 let sdk_config = match is_aws(options) {
237 false => None,
238 true => {
239 debug!("Detected AWS S3 Storage options, resolving AWS credentials");
240 Some(execute_sdk_future(
241 crate::credentials::resolve_credentials(options),
242 )??)
243 }
244 };
245
246 Ok(Self {
247 virtual_hosted_style_request,
248 locking_provider: str_option(options, constants::AWS_S3_LOCKING_PROVIDER),
249 dynamodb_endpoint: str_option(options, constants::AWS_ENDPOINT_URL_DYNAMODB),
250 dynamodb_region: str_option(options, constants::AWS_REGION_DYNAMODB),
251 dynamodb_access_key_id: str_option(options, constants::AWS_ACCESS_KEY_ID_DYNAMODB),
252 dynamodb_secret_access_key: str_option(
253 options,
254 constants::AWS_SECRET_ACCESS_KEY_DYNAMODB,
255 ),
256 dynamodb_session_token: str_option(options, constants::AWS_SESSION_TOKEN_DYNAMODB),
257 s3_pool_idle_timeout: Duration::from_secs(s3_pool_idle_timeout),
258 sts_pool_idle_timeout: Duration::from_secs(sts_pool_idle_timeout),
259 s3_get_internal_server_error_retries,
260 allow_unsafe_rename,
261 extra_opts,
262 sdk_config,
263 })
264 }
265
266 pub fn endpoint_url(&self) -> Option<&str> {
268 self.sdk_config.as_ref().and_then(|v| v.endpoint_url())
269 }
270
271 pub fn region(&self) -> Option<&Region> {
273 self.sdk_config.as_ref().and_then(|v| v.region())
274 }
275
276 fn u64_or_default(map: &HashMap<String, String>, key: &str, default: u64) -> u64 {
277 str_option(map, key)
278 .and_then(|v| v.parse().ok())
279 .unwrap_or(default)
280 }
281
282 fn ensure_env_var(map: &HashMap<String, String>, key: &str) {
283 if let Some(val) = str_option(map, key) {
284 unsafe {
285 std::env::set_var(key, val);
286 }
287 }
288 }
289
290 pub fn try_default() -> DeltaResult<Self> {
291 Self::from_map(&HashMap::new())
292 }
293}
294
295fn execute_sdk_future<F, T>(future: F) -> DeltaResult<T>
296where
297 T: Send,
298 F: Future<Output = T> + Send,
299{
300 match tokio::runtime::Handle::try_current() {
301 Ok(handle) => match handle.runtime_flavor() {
302 tokio::runtime::RuntimeFlavor::MultiThread => {
303 Ok(tokio::task::block_in_place(move || handle.block_on(future)))
304 }
305 _ => {
306 let mut cfg: Option<T> = None;
307 std::thread::scope(|scope| {
308 scope.spawn(|| {
309 cfg = Some(handle.block_on(future));
310 });
311 });
312 cfg.ok_or(DeltaTableError::ObjectStore {
313 source: ObjectStoreError::Generic {
314 store: STORE_NAME,
315 source: Box::new(DynamoDbConfigError::InitializationError),
316 },
317 })
318 }
319 },
320 Err(_) => {
321 let runtime = tokio::runtime::Builder::new_current_thread()
322 .enable_all()
323 .build()
324 .expect("a tokio runtime is required by the AWS sdk");
325 Ok(runtime.block_on(future))
326 }
327 }
328}
329
330pub struct S3StorageBackend {
332 inner: ObjectStoreRef,
333 allow_unsafe_rename: bool,
335}
336
337impl std::fmt::Display for S3StorageBackend {
338 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
339 write!(
340 f,
341 "S3StorageBackend {{ allow_unsafe_rename: {}, inner: {} }}",
342 self.allow_unsafe_rename, self.inner
343 )
344 }
345}
346
347impl S3StorageBackend {
348 pub fn try_new(storage: ObjectStoreRef, allow_unsafe_rename: bool) -> ObjectStoreResult<Self> {
352 Ok(Self {
353 inner: storage,
354 allow_unsafe_rename,
355 })
356 }
357}
358
359impl Debug for S3StorageBackend {
360 fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
361 write!(
362 fmt,
363 "S3StorageBackend {{ allow_unsafe_rename: {}, inner: {:?} }}",
364 self.allow_unsafe_rename, self.inner
365 )
366 }
367}
368
369#[async_trait::async_trait]
370impl ObjectStore for S3StorageBackend {
371 async fn put(&self, location: &Path, bytes: PutPayload) -> ObjectStoreResult<PutResult> {
372 self.inner.put(location, bytes).await
373 }
374
375 async fn put_opts(
376 &self,
377 location: &Path,
378 bytes: PutPayload,
379 options: PutOptions,
380 ) -> ObjectStoreResult<PutResult> {
381 self.inner.put_opts(location, bytes, options).await
382 }
383
384 async fn put_multipart(&self, location: &Path) -> ObjectStoreResult<Box<dyn MultipartUpload>> {
385 self.inner.put_multipart(location).await
386 }
387
388 async fn put_multipart_opts(
389 &self,
390 location: &Path,
391 options: PutMultipartOptions,
392 ) -> ObjectStoreResult<Box<dyn MultipartUpload>> {
393 self.inner.put_multipart_opts(location, options).await
394 }
395
396 async fn get(&self, location: &Path) -> ObjectStoreResult<GetResult> {
397 self.inner.get(location).await
398 }
399
400 async fn get_opts(&self, location: &Path, options: GetOptions) -> ObjectStoreResult<GetResult> {
401 self.inner.get_opts(location, options).await
402 }
403
404 async fn get_range(&self, location: &Path, range: Range<u64>) -> ObjectStoreResult<Bytes> {
405 self.inner.get_range(location, range).await
406 }
407
408 async fn head(&self, location: &Path) -> ObjectStoreResult<ObjectMeta> {
409 self.inner.head(location).await
410 }
411
412 async fn delete(&self, location: &Path) -> ObjectStoreResult<()> {
413 self.inner.delete(location).await
414 }
415
416 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, ObjectStoreResult<ObjectMeta>> {
417 self.inner.list(prefix)
418 }
419
420 fn list_with_offset(
421 &self,
422 prefix: Option<&Path>,
423 offset: &Path,
424 ) -> BoxStream<'static, ObjectStoreResult<ObjectMeta>> {
425 self.inner.list_with_offset(prefix, offset)
426 }
427
428 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> ObjectStoreResult<ListResult> {
429 self.inner.list_with_delimiter(prefix).await
430 }
431
432 async fn copy(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
433 self.inner.copy(from, to).await
434 }
435
436 async fn copy_if_not_exists(&self, _from: &Path, _to: &Path) -> ObjectStoreResult<()> {
437 todo!()
438 }
439
440 async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
441 if self.allow_unsafe_rename {
442 self.inner.rename(from, to).await
443 } else {
444 Err(ObjectStoreError::Generic {
445 store: STORE_NAME,
446 source: Box::new(crate::errors::LockClientError::LockClientRequired),
447 })
448 }
449 }
450}
451
452#[deprecated(
457 since = "0.20.0",
458 note = "s3_constants has moved up to deltalake_aws::constants::*"
459)]
460pub mod s3_constants {
461 pub use crate::constants::*;
462}
463
464pub(crate) fn str_option(map: &HashMap<String, String>, key: &str) -> Option<String> {
465 if let Some(s) = map.get(key) {
466 return Some(s.to_owned());
467 }
468
469 if let Some(s) = map.get(&key.to_ascii_lowercase()) {
470 return Some(s.to_owned());
471 }
472
473 std::env::var(key).ok()
474}
475
476pub(crate) trait S3StorageOptionsConversion {
477 fn with_env_s3(&self, options: &HashMap<String, String>) -> HashMap<String, String> {
478 let mut options: HashMap<String, String> = options
479 .clone()
480 .into_iter()
481 .map(|(k, v)| {
482 if let Ok(config_key) = AmazonS3ConfigKey::from_str(&k.to_ascii_lowercase()) {
483 (config_key.as_ref().to_string(), v)
484 } else {
485 (k, v)
486 }
487 })
488 .collect();
489
490 for (os_key, os_value) in std::env::vars_os() {
491 if let (Some(key), Some(value)) = (os_key.to_str(), os_value.to_str())
492 && let Ok(config_key) = AmazonS3ConfigKey::from_str(&key.to_ascii_lowercase())
493 {
494 options
495 .entry(config_key.as_ref().to_string())
496 .or_insert(value.to_string());
497 }
498 }
499
500 if !options.keys().any(|key| {
505 let key = key.to_ascii_lowercase();
506 [
507 AmazonS3ConfigKey::ConditionalPut.as_ref(),
508 "conditional_put",
509 ]
510 .contains(&key.as_str())
511 }) {
512 options.insert("conditional_put".into(), "etag".into());
513 }
514 options
515 }
516}
517
518#[cfg(test)]
519mod tests {
520 use super::*;
521
522 use crate::constants;
523 use serial_test::serial;
524
525 struct ScopedEnv {
526 vars: HashMap<std::ffi::OsString, std::ffi::OsString>,
527 }
528
529 impl ScopedEnv {
530 pub fn new() -> Self {
531 let vars = std::env::vars_os().collect();
532 Self { vars }
533 }
534
535 pub fn run<T>(mut f: impl FnMut() -> T) -> T {
536 let _env_scope = Self::new();
537 f()
538 }
539 }
540
541 impl Drop for ScopedEnv {
542 fn drop(&mut self) {
543 let to_remove: Vec<_> = std::env::vars_os()
544 .map(|kv| kv.0)
545 .filter(|k| !self.vars.contains_key(k))
546 .collect();
547 for k in to_remove {
548 unsafe {
549 std::env::remove_var(k);
550 }
551 }
552 for (key, value) in self.vars.drain() {
553 unsafe {
554 std::env::set_var(key, value);
555 }
556 }
557 }
558 }
559
560 fn clear_env_of_aws_keys() {
561 let keys_to_clear = std::env::vars().filter_map(|(k, _v)| {
562 if AmazonS3ConfigKey::from_str(&k.to_ascii_lowercase()).is_ok() {
563 Some(k)
564 } else {
565 None
566 }
567 });
568
569 for k in keys_to_clear {
570 unsafe {
571 std::env::remove_var(k);
572 }
573 }
574 }
575
576 #[test]
577 #[serial]
578 fn storage_options_default_test() {
579 ScopedEnv::run(|| {
580 clear_env_of_aws_keys();
581
582 unsafe {
583 std::env::set_var(constants::AWS_ENDPOINT_URL, "http://localhost");
584 std::env::set_var(constants::AWS_REGION, "us-west-1");
585 std::env::set_var(constants::AWS_PROFILE, "default");
586 std::env::set_var(constants::AWS_ACCESS_KEY_ID, "default_key_id");
587 std::env::set_var(constants::AWS_SECRET_ACCESS_KEY, "default_secret_key");
588 std::env::set_var(constants::AWS_S3_LOCKING_PROVIDER, "dynamodb");
589 std::env::set_var(
590 constants::AWS_IAM_ROLE_ARN,
591 "arn:aws:iam::123456789012:role/some_role",
592 );
593 std::env::set_var(constants::AWS_IAM_ROLE_SESSION_NAME, "session_name");
594 std::env::set_var(
595 #[allow(deprecated)]
596 constants::AWS_S3_ASSUME_ROLE_ARN,
597 "arn:aws:iam::123456789012:role/some_role",
598 );
599 std::env::set_var(
600 #[allow(deprecated)]
601 constants::AWS_S3_ROLE_SESSION_NAME,
602 "session_name",
603 );
604 std::env::set_var(constants::AWS_WEB_IDENTITY_TOKEN_FILE, "token_file");
605 }
606
607 let options = S3StorageOptions::try_default().unwrap();
608 assert_eq!(
609 S3StorageOptions::builder()
610 .sdk_config(
611 SdkConfig::builder()
612 .endpoint_url("http://localhost".to_string())
613 .region(Region::from_static("us-west-1"))
614 .build()
615 )
616 .locking_provider("dynamodb")
617 .build(),
618 options
619 );
620 });
621 }
622
623 #[test]
624 #[serial]
625 fn storage_options_with_only_region_and_credentials() {
626 ScopedEnv::run(|| {
627 clear_env_of_aws_keys();
628 unsafe {
629 std::env::remove_var(constants::AWS_ENDPOINT_URL);
630 }
631
632 let options = S3StorageOptions::from_map(&HashMap::from([
633 (constants::AWS_REGION.to_string(), "eu-west-1".to_string()),
634 (constants::AWS_ACCESS_KEY_ID.to_string(), "test".to_string()),
635 (
636 constants::AWS_SECRET_ACCESS_KEY.to_string(),
637 "test_secret".to_string(),
638 ),
639 ]))
640 .unwrap();
641
642 let mut expected = S3StorageOptions::try_default().unwrap();
643 expected.sdk_config = Some(
644 SdkConfig::builder()
645 .region(Region::from_static("eu-west-1"))
646 .build(),
647 );
648 assert_eq!(expected, options);
649 });
650 }
651
652 #[test]
653 #[serial]
654 fn storage_options_from_map_test() {
655 ScopedEnv::run(|| {
656 clear_env_of_aws_keys();
657 let options = S3StorageOptions::from_map(&HashMap::from([
658 (
659 constants::AWS_ENDPOINT_URL.to_string(),
660 "http://localhost:1234".to_string(),
661 ),
662 (constants::AWS_REGION.to_string(), "us-west-2".to_string()),
663 (constants::AWS_PROFILE.to_string(), "default".to_string()),
664 (
665 constants::AWS_S3_ADDRESSING_STYLE.to_string(),
666 "virtual".to_string(),
667 ),
668 (
669 constants::AWS_S3_LOCKING_PROVIDER.to_string(),
670 "another_locking_provider".to_string(),
671 ),
672 (
673 constants::AWS_IAM_ROLE_ARN.to_string(),
674 "arn:aws:iam::123456789012:role/another_role".to_string(),
675 ),
676 (
677 constants::AWS_IAM_ROLE_SESSION_NAME.to_string(),
678 "another_session_name".to_string(),
679 ),
680 (
681 constants::AWS_WEB_IDENTITY_TOKEN_FILE.to_string(),
682 "another_token_file".to_string(),
683 ),
684 (
685 constants::AWS_S3_POOL_IDLE_TIMEOUT_SECONDS.to_string(),
686 "1".to_string(),
687 ),
688 (
689 constants::AWS_STS_POOL_IDLE_TIMEOUT_SECONDS.to_string(),
690 "2".to_string(),
691 ),
692 (
693 constants::AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES.to_string(),
694 "3".to_string(),
695 ),
696 (
697 constants::AWS_ACCESS_KEY_ID.to_string(),
698 "test_id".to_string(),
699 ),
700 (
701 constants::AWS_SECRET_ACCESS_KEY.to_string(),
702 "test_secret".to_string(),
703 ),
704 ]))
705 .unwrap();
706
707 assert_eq!(
708 Some("another_locking_provider"),
709 options.locking_provider.as_deref()
710 );
711 assert_eq!(Duration::from_secs(1), options.s3_pool_idle_timeout);
712 assert_eq!(Duration::from_secs(2), options.sts_pool_idle_timeout);
713 assert_eq!(3, options.s3_get_internal_server_error_retries);
714 assert!(options.virtual_hosted_style_request);
715 assert!(!options.allow_unsafe_rename);
716 assert_eq!(
717 HashMap::from([(
718 constants::AWS_S3_ADDRESSING_STYLE.to_string(),
719 "virtual".to_string()
720 ),]),
721 options.extra_opts
722 );
723 });
724 }
725
726 #[test]
727 #[serial]
728 fn storage_options_from_map_with_dynamodb_endpoint_test() {
729 ScopedEnv::run(|| {
730 clear_env_of_aws_keys();
731 let options = S3StorageOptions::from_map(&HashMap::from([
732 (
733 constants::AWS_ENDPOINT_URL.to_string(),
734 "http://localhost:1234".to_string(),
735 ),
736 (
737 constants::AWS_ENDPOINT_URL_DYNAMODB.to_string(),
738 "http://localhost:2345".to_string(),
739 ),
740 (constants::AWS_REGION.to_string(), "us-west-2".to_string()),
741 (constants::AWS_PROFILE.to_string(), "default".to_string()),
742 (
743 constants::AWS_S3_ADDRESSING_STYLE.to_string(),
744 "virtual".to_string(),
745 ),
746 (
747 constants::AWS_S3_LOCKING_PROVIDER.to_string(),
748 "another_locking_provider".to_string(),
749 ),
750 (
751 constants::AWS_IAM_ROLE_ARN.to_string(),
752 "arn:aws:iam::123456789012:role/another_role".to_string(),
753 ),
754 (
755 constants::AWS_IAM_ROLE_SESSION_NAME.to_string(),
756 "another_session_name".to_string(),
757 ),
758 (
759 constants::AWS_WEB_IDENTITY_TOKEN_FILE.to_string(),
760 "another_token_file".to_string(),
761 ),
762 (
763 constants::AWS_S3_POOL_IDLE_TIMEOUT_SECONDS.to_string(),
764 "1".to_string(),
765 ),
766 (
767 constants::AWS_STS_POOL_IDLE_TIMEOUT_SECONDS.to_string(),
768 "2".to_string(),
769 ),
770 (
771 constants::AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES.to_string(),
772 "3".to_string(),
773 ),
774 (
775 constants::AWS_ACCESS_KEY_ID.to_string(),
776 "test_id".to_string(),
777 ),
778 (
779 constants::AWS_SECRET_ACCESS_KEY.to_string(),
780 "test_secret".to_string(),
781 ),
782 ]))
783 .unwrap();
784
785 assert_eq!(
786 Some("http://localhost:2345"),
787 options.dynamodb_endpoint.as_deref()
788 );
789 });
790 }
791
792 #[test]
793 #[serial]
794 fn storage_options_mixed_test() {
795 ScopedEnv::run(|| {
796 clear_env_of_aws_keys();
797 unsafe {
798 std::env::set_var(constants::AWS_ENDPOINT_URL, "http://localhost");
799 std::env::set_var(
800 constants::AWS_ENDPOINT_URL_DYNAMODB,
801 "http://localhost:dynamodb",
802 );
803 std::env::set_var(constants::AWS_REGION, "us-west-1");
804 std::env::set_var(constants::AWS_PROFILE, "default");
805 std::env::set_var(constants::AWS_ACCESS_KEY_ID, "wrong_key_id");
806 std::env::set_var(constants::AWS_SECRET_ACCESS_KEY, "wrong_secret_key");
807 std::env::set_var(constants::AWS_S3_LOCKING_PROVIDER, "dynamodb");
808 std::env::set_var(
809 constants::AWS_IAM_ROLE_ARN,
810 "arn:aws:iam::123456789012:role/some_role",
811 );
812 std::env::set_var(constants::AWS_IAM_ROLE_SESSION_NAME, "session_name");
813 std::env::set_var(constants::AWS_WEB_IDENTITY_TOKEN_FILE, "token_file");
814
815 std::env::set_var(constants::AWS_S3_POOL_IDLE_TIMEOUT_SECONDS, "1");
816 std::env::set_var(constants::AWS_STS_POOL_IDLE_TIMEOUT_SECONDS, "2");
817 std::env::set_var(constants::AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES, "3");
818 }
819 let options = S3StorageOptions::from_map(&HashMap::from([
820 (
821 constants::AWS_ACCESS_KEY_ID.to_string(),
822 "test_id_mixed".to_string(),
823 ),
824 (
825 constants::AWS_SECRET_ACCESS_KEY.to_string(),
826 "test_secret_mixed".to_string(),
827 ),
828 (constants::AWS_REGION.to_string(), "us-west-2".to_string()),
829 (
830 "AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES".to_string(),
831 "3".to_string(),
832 ),
833 ]))
834 .unwrap();
835
836 assert_eq!(
837 S3StorageOptions::builder()
838 .sdk_config(
839 SdkConfig::builder()
840 .endpoint_url("http://localhost".to_string())
841 .region(Region::from_static("us-west-2"))
842 .build()
843 )
844 .locking_provider("dynamodb")
845 .dynamodb_endpoint("http://localhost:dynamodb")
846 .s3_pool_idle_timeout(Duration::from_secs(1))
847 .sts_pool_idle_timeout(Duration::from_secs(2))
848 .s3_get_internal_server_error_retries(3)
849 .build(),
850 options
851 );
852 });
853 }
854
855 #[test]
856 #[serial]
857 fn storage_options_web_identity_test() {
858 ScopedEnv::run(|| {
859 clear_env_of_aws_keys();
860 let _options = S3StorageOptions::from_map(&HashMap::from([
861 (constants::AWS_REGION.to_string(), "eu-west-1".to_string()),
862 (
863 constants::AWS_WEB_IDENTITY_TOKEN_FILE.to_string(),
864 "web_identity_token_file".to_string(),
865 ),
866 (
867 constants::AWS_ROLE_ARN.to_string(),
868 "arn:aws:iam::123456789012:role/web_identity_role".to_string(),
869 ),
870 (
871 constants::AWS_ROLE_SESSION_NAME.to_string(),
872 "web_identity_session_name".to_string(),
873 ),
874 ]))
875 .unwrap();
876
877 assert_eq!("eu-west-1", std::env::var(constants::AWS_REGION).unwrap());
878
879 assert_eq!(
880 "web_identity_token_file",
881 std::env::var(constants::AWS_WEB_IDENTITY_TOKEN_FILE).unwrap()
882 );
883
884 assert_eq!(
885 "arn:aws:iam::123456789012:role/web_identity_role",
886 std::env::var(constants::AWS_ROLE_ARN).unwrap()
887 );
888
889 assert_eq!(
890 "web_identity_session_name",
891 std::env::var(constants::AWS_ROLE_SESSION_NAME).unwrap()
892 );
893 });
894 }
895
896 #[test]
897 #[serial]
898 fn when_merging_with_env_unsupplied_options_are_added() {
899 ScopedEnv::run(|| {
900 clear_env_of_aws_keys();
901 let raw_options = HashMap::new();
902 unsafe {
903 std::env::set_var(constants::AWS_ACCESS_KEY_ID, "env_key");
904 std::env::set_var(constants::AWS_ENDPOINT_URL, "env_key");
905 std::env::set_var(constants::AWS_SECRET_ACCESS_KEY, "env_key");
906 std::env::set_var(constants::AWS_REGION, "env_key");
907 }
908 let combined_options = S3ObjectStoreFactory {}.with_env_s3(&raw_options);
909
910 assert_eq!(combined_options.len(), 5);
912
913 for (key, v) in combined_options {
914 if key != "conditional_put" {
915 assert_eq!(v, "env_key");
916 }
917 }
918 });
919 }
920
921 #[tokio::test]
922 #[serial]
923 async fn when_merging_with_env_supplied_options_take_precedence() {
924 ScopedEnv::run(|| {
925 clear_env_of_aws_keys();
926 let raw_options = HashMap::from([
927 ("AWS_ACCESS_KEY_ID".to_string(), "options_key".to_string()),
928 ("AWS_ENDPOINT_URL".to_string(), "options_key".to_string()),
929 (
930 "AWS_SECRET_ACCESS_KEY".to_string(),
931 "options_key".to_string(),
932 ),
933 ("AWS_REGION".to_string(), "options_key".to_string()),
934 ]);
935 unsafe {
936 std::env::set_var("aws_access_key_id", "env_key");
937 std::env::set_var("aws_endpoint", "env_key");
938 std::env::set_var("aws_secret_access_key", "env_key");
939 std::env::set_var("aws_region", "env_key");
940 }
941
942 let combined_options = S3ObjectStoreFactory {}.with_env_s3(&raw_options);
943
944 for (key, v) in combined_options {
945 if key != "conditional_put" {
946 assert_eq!(v, "options_key");
947 }
948 }
949 });
950 }
951
952 #[test]
953 #[serial]
954 fn test_is_aws() {
955 clear_env_of_aws_keys();
956 let options = HashMap::default();
957 assert!(is_aws(&options));
958
959 let minio: HashMap<String, String> = HashMap::from([(
960 constants::AWS_ENDPOINT_URL.to_string(),
961 "http://minio:8080".to_string(),
962 )]);
963 assert!(!is_aws(&minio));
964
965 let minio: HashMap<String, String> =
966 HashMap::from([("aws_endpoint".to_string(), "http://minio:8080".to_string())]);
967 assert!(!is_aws(&minio));
968
969 let localstack: HashMap<String, String> = HashMap::from([
970 (
971 constants::AWS_FORCE_CREDENTIAL_LOAD.to_string(),
972 "true".to_string(),
973 ),
974 ("aws_endpoint".to_string(), "http://minio:8080".to_string()),
975 ]);
976 assert!(is_aws(&localstack));
977 }
978}