datafusion_cli/
object_storage.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18pub mod instrumented;
19
20use async_trait::async_trait;
21use aws_config::BehaviorVersion;
22use aws_credential_types::provider::{
23    error::CredentialsError, ProvideCredentials, SharedCredentialsProvider,
24};
25use datafusion::{
26    common::{
27        config::ConfigEntry, config::ConfigExtension, config::ConfigField,
28        config::ExtensionOptions, config::TableOptions, config::Visit, config_err,
29        exec_datafusion_err, exec_err,
30    },
31    error::{DataFusionError, Result},
32    execution::context::SessionState,
33};
34use log::debug;
35use object_store::{
36    aws::{AmazonS3Builder, AmazonS3ConfigKey, AwsCredential},
37    gcp::GoogleCloudStorageBuilder,
38    http::HttpBuilder,
39    ClientOptions, CredentialProvider,
40    Error::Generic,
41    ObjectStore,
42};
43use std::{
44    any::Any,
45    error::Error,
46    fmt::{Debug, Display},
47    sync::Arc,
48};
49use url::Url;
50
51#[cfg(not(test))]
52use object_store::aws::resolve_bucket_region;
53
54// Provide a local mock when running tests so we don't make network calls
55#[cfg(test)]
56async fn resolve_bucket_region(
57    _bucket: &str,
58    _client_options: &ClientOptions,
59) -> object_store::Result<String> {
60    Ok("eu-central-1".to_string())
61}
62
63pub async fn get_s3_object_store_builder(
64    url: &Url,
65    aws_options: &AwsOptions,
66    resolve_region: bool,
67) -> Result<AmazonS3Builder> {
68    let AwsOptions {
69        access_key_id,
70        secret_access_key,
71        session_token,
72        region,
73        endpoint,
74        allow_http,
75        skip_signature,
76    } = aws_options;
77
78    let bucket_name = get_bucket_name(url)?;
79    let mut builder = AmazonS3Builder::from_env().with_bucket_name(bucket_name);
80
81    if let (Some(access_key_id), Some(secret_access_key)) =
82        (access_key_id, secret_access_key)
83    {
84        debug!("Using explicitly provided S3 access_key_id and secret_access_key");
85        builder = builder
86            .with_access_key_id(access_key_id)
87            .with_secret_access_key(secret_access_key);
88
89        if let Some(session_token) = session_token {
90            builder = builder.with_token(session_token);
91        }
92    } else {
93        debug!("Using AWS S3 SDK to determine credentials");
94        let CredentialsFromConfig {
95            region,
96            credentials,
97        } = CredentialsFromConfig::try_new().await?;
98        if let Some(region) = region {
99            builder = builder.with_region(region);
100        }
101        if let Some(credentials) = credentials {
102            let credentials = Arc::new(S3CredentialProvider { credentials });
103            builder = builder.with_credentials(credentials);
104        } else {
105            debug!("No credentials found, defaulting to skip signature ");
106            builder = builder.with_skip_signature(true);
107        }
108    }
109
110    if let Some(region) = region {
111        builder = builder.with_region(region);
112    }
113
114    // If the region is not set or auto_detect_region is true, resolve the region.
115    if builder
116        .get_config_value(&AmazonS3ConfigKey::Region)
117        .is_none()
118        || resolve_region
119    {
120        let region = resolve_bucket_region(bucket_name, &ClientOptions::new()).await?;
121        builder = builder.with_region(region);
122    }
123
124    if let Some(endpoint) = endpoint {
125        // Make a nicer error if the user hasn't allowed http and the endpoint
126        // is http as the default message is "URL scheme is not allowed"
127        if let Ok(endpoint_url) = Url::try_from(endpoint.as_str()) {
128            if !matches!(allow_http, Some(true)) && endpoint_url.scheme() == "http" {
129                return config_err!(
130                    "Invalid endpoint: {endpoint}. \
131                HTTP is not allowed for S3 endpoints. \
132                To allow HTTP, set 'aws.allow_http' to true"
133                );
134            }
135        }
136
137        builder = builder.with_endpoint(endpoint);
138    }
139
140    if let Some(allow_http) = allow_http {
141        builder = builder.with_allow_http(*allow_http);
142    }
143
144    if let Some(skip_signature) = skip_signature {
145        builder = builder.with_skip_signature(*skip_signature);
146    }
147
148    Ok(builder)
149}
150
151/// Credentials from the AWS SDK
152struct CredentialsFromConfig {
153    region: Option<String>,
154    credentials: Option<SharedCredentialsProvider>,
155}
156
157impl CredentialsFromConfig {
158    /// Attempt find AWS S3 credentials via the AWS SDK
159    pub async fn try_new() -> Result<Self> {
160        let config = aws_config::defaults(BehaviorVersion::latest()).load().await;
161        let region = config.region().map(|r| r.to_string());
162
163        let credentials = config
164            .credentials_provider()
165            .ok_or_else(|| {
166                DataFusionError::ObjectStore(Box::new(Generic {
167                    store: "S3",
168                    source: "Failed to get S3 credentials aws_config".into(),
169                }))
170            })?
171            .clone();
172
173        // The credential provider is lazy, so it does not fetch credentials
174        // until they are needed. To ensure that the credentials are valid,
175        // we can call `provide_credentials` here.
176        let credentials = match credentials.provide_credentials().await {
177            Ok(_) => Some(credentials),
178            Err(CredentialsError::CredentialsNotLoaded(_)) => {
179                debug!("Could not use AWS SDK to get credentials");
180                None
181            }
182            // other errors like `CredentialsError::InvalidConfiguration`
183            // should be returned to the user so they can be fixed
184            Err(e) => {
185                // Pass back underlying error to the user, including underlying source
186                let source_message = if let Some(source) = e.source() {
187                    format!(": {source}")
188                } else {
189                    String::new()
190                };
191
192                let message = format!(
193                    "Error getting credentials from provider: {e}{source_message}",
194                );
195
196                return Err(DataFusionError::ObjectStore(Box::new(Generic {
197                    store: "S3",
198                    source: message.into(),
199                })));
200            }
201        };
202        Ok(Self {
203            region,
204            credentials,
205        })
206    }
207}
208
209#[derive(Debug)]
210struct S3CredentialProvider {
211    credentials: aws_credential_types::provider::SharedCredentialsProvider,
212}
213
214#[async_trait]
215impl CredentialProvider for S3CredentialProvider {
216    type Credential = AwsCredential;
217
218    async fn get_credential(&self) -> object_store::Result<Arc<Self::Credential>> {
219        let creds =
220            self.credentials
221                .provide_credentials()
222                .await
223                .map_err(|e| Generic {
224                    store: "S3",
225                    source: Box::new(e),
226                })?;
227        Ok(Arc::new(AwsCredential {
228            key_id: creds.access_key_id().to_string(),
229            secret_key: creds.secret_access_key().to_string(),
230            token: creds.session_token().map(ToString::to_string),
231        }))
232    }
233}
234
235pub fn get_oss_object_store_builder(
236    url: &Url,
237    aws_options: &AwsOptions,
238) -> Result<AmazonS3Builder> {
239    get_object_store_builder(url, aws_options, true)
240}
241
242pub fn get_cos_object_store_builder(
243    url: &Url,
244    aws_options: &AwsOptions,
245) -> Result<AmazonS3Builder> {
246    get_object_store_builder(url, aws_options, false)
247}
248
249fn get_object_store_builder(
250    url: &Url,
251    aws_options: &AwsOptions,
252    virtual_hosted_style_request: bool,
253) -> Result<AmazonS3Builder> {
254    let bucket_name = get_bucket_name(url)?;
255    let mut builder = AmazonS3Builder::from_env()
256        .with_virtual_hosted_style_request(virtual_hosted_style_request)
257        .with_bucket_name(bucket_name)
258        // oss/cos don't care about the "region" field
259        .with_region("do_not_care");
260
261    if let (Some(access_key_id), Some(secret_access_key)) =
262        (&aws_options.access_key_id, &aws_options.secret_access_key)
263    {
264        builder = builder
265            .with_access_key_id(access_key_id)
266            .with_secret_access_key(secret_access_key);
267    }
268
269    if let Some(endpoint) = &aws_options.endpoint {
270        builder = builder.with_endpoint(endpoint);
271    }
272
273    Ok(builder)
274}
275
276pub fn get_gcs_object_store_builder(
277    url: &Url,
278    gs_options: &GcpOptions,
279) -> Result<GoogleCloudStorageBuilder> {
280    let bucket_name = get_bucket_name(url)?;
281    let mut builder = GoogleCloudStorageBuilder::from_env().with_bucket_name(bucket_name);
282
283    if let Some(service_account_path) = &gs_options.service_account_path {
284        builder = builder.with_service_account_path(service_account_path);
285    }
286
287    if let Some(service_account_key) = &gs_options.service_account_key {
288        builder = builder.with_service_account_key(service_account_key);
289    }
290
291    if let Some(application_credentials_path) = &gs_options.application_credentials_path {
292        builder = builder.with_application_credentials(application_credentials_path);
293    }
294
295    Ok(builder)
296}
297
298fn get_bucket_name(url: &Url) -> Result<&str> {
299    url.host_str().ok_or_else(|| {
300        exec_datafusion_err!("Not able to parse bucket name from url: {}", url.as_str())
301    })
302}
303
304/// This struct encapsulates AWS options one uses when setting up object storage.
305#[derive(Default, Debug, Clone)]
306pub struct AwsOptions {
307    /// Access Key ID
308    pub access_key_id: Option<String>,
309    /// Secret Access Key
310    pub secret_access_key: Option<String>,
311    /// Session token
312    pub session_token: Option<String>,
313    /// AWS Region
314    pub region: Option<String>,
315    /// OSS or COS Endpoint
316    pub endpoint: Option<String>,
317    /// Allow HTTP (otherwise will always use https)
318    pub allow_http: Option<bool>,
319    /// Do not fetch credentials and do not sign requests
320    ///
321    /// This can be useful when interacting with public S3 buckets that deny
322    /// authorized requests
323    pub skip_signature: Option<bool>,
324}
325
326impl ExtensionOptions for AwsOptions {
327    fn as_any(&self) -> &dyn Any {
328        self
329    }
330
331    fn as_any_mut(&mut self) -> &mut dyn Any {
332        self
333    }
334
335    fn cloned(&self) -> Box<dyn ExtensionOptions> {
336        Box::new(self.clone())
337    }
338
339    fn set(&mut self, key: &str, value: &str) -> Result<()> {
340        let (_key, aws_key) = key.split_once('.').unwrap_or((key, ""));
341        let (key, rem) = aws_key.split_once('.').unwrap_or((aws_key, ""));
342        match key {
343            "access_key_id" => {
344                self.access_key_id.set(rem, value)?;
345            }
346            "secret_access_key" => {
347                self.secret_access_key.set(rem, value)?;
348            }
349            "session_token" => {
350                self.session_token.set(rem, value)?;
351            }
352            "region" => {
353                self.region.set(rem, value)?;
354            }
355            "oss" | "cos" | "endpoint" => {
356                self.endpoint.set(rem, value)?;
357            }
358            "allow_http" => {
359                self.allow_http.set(rem, value)?;
360            }
361            "skip_signature" | "nosign" => {
362                self.skip_signature.set(rem, value)?;
363            }
364            _ => {
365                return config_err!("Config value \"{}\" not found on AwsOptions", rem);
366            }
367        }
368        Ok(())
369    }
370
371    fn entries(&self) -> Vec<ConfigEntry> {
372        struct Visitor(Vec<ConfigEntry>);
373
374        impl Visit for Visitor {
375            fn some<V: Display>(
376                &mut self,
377                key: &str,
378                value: V,
379                description: &'static str,
380            ) {
381                self.0.push(ConfigEntry {
382                    key: key.to_string(),
383                    value: Some(value.to_string()),
384                    description,
385                })
386            }
387
388            fn none(&mut self, key: &str, description: &'static str) {
389                self.0.push(ConfigEntry {
390                    key: key.to_string(),
391                    value: None,
392                    description,
393                })
394            }
395        }
396
397        let mut v = Visitor(vec![]);
398        self.access_key_id.visit(&mut v, "access_key_id", "");
399        self.secret_access_key
400            .visit(&mut v, "secret_access_key", "");
401        self.session_token.visit(&mut v, "session_token", "");
402        self.region.visit(&mut v, "region", "");
403        self.endpoint.visit(&mut v, "endpoint", "");
404        self.allow_http.visit(&mut v, "allow_http", "");
405        v.0
406    }
407}
408
409impl ConfigExtension for AwsOptions {
410    const PREFIX: &'static str = "aws";
411}
412
413/// This struct encapsulates GCP options one uses when setting up object storage.
414#[derive(Debug, Clone, Default)]
415pub struct GcpOptions {
416    /// Service account path
417    pub service_account_path: Option<String>,
418    /// Service account key
419    pub service_account_key: Option<String>,
420    /// Application credentials path
421    pub application_credentials_path: Option<String>,
422}
423
424impl ExtensionOptions for GcpOptions {
425    fn as_any(&self) -> &dyn Any {
426        self
427    }
428
429    fn as_any_mut(&mut self) -> &mut dyn Any {
430        self
431    }
432
433    fn cloned(&self) -> Box<dyn ExtensionOptions> {
434        Box::new(self.clone())
435    }
436
437    fn set(&mut self, key: &str, value: &str) -> Result<()> {
438        let (_key, rem) = key.split_once('.').unwrap_or((key, ""));
439        match rem {
440            "service_account_path" => {
441                self.service_account_path.set(rem, value)?;
442            }
443            "service_account_key" => {
444                self.service_account_key.set(rem, value)?;
445            }
446            "application_credentials_path" => {
447                self.application_credentials_path.set(rem, value)?;
448            }
449            _ => {
450                return config_err!("Config value \"{}\" not found on GcpOptions", rem);
451            }
452        }
453        Ok(())
454    }
455
456    fn entries(&self) -> Vec<ConfigEntry> {
457        struct Visitor(Vec<ConfigEntry>);
458
459        impl Visit for Visitor {
460            fn some<V: Display>(
461                &mut self,
462                key: &str,
463                value: V,
464                description: &'static str,
465            ) {
466                self.0.push(ConfigEntry {
467                    key: key.to_string(),
468                    value: Some(value.to_string()),
469                    description,
470                })
471            }
472
473            fn none(&mut self, key: &str, description: &'static str) {
474                self.0.push(ConfigEntry {
475                    key: key.to_string(),
476                    value: None,
477                    description,
478                })
479            }
480        }
481
482        let mut v = Visitor(vec![]);
483        self.service_account_path
484            .visit(&mut v, "service_account_path", "");
485        self.service_account_key
486            .visit(&mut v, "service_account_key", "");
487        self.application_credentials_path.visit(
488            &mut v,
489            "application_credentials_path",
490            "",
491        );
492        v.0
493    }
494}
495
496impl ConfigExtension for GcpOptions {
497    const PREFIX: &'static str = "gcp";
498}
499
500pub(crate) async fn get_object_store(
501    state: &SessionState,
502    scheme: &str,
503    url: &Url,
504    table_options: &TableOptions,
505    resolve_region: bool,
506) -> Result<Arc<dyn ObjectStore>, DataFusionError> {
507    let store: Arc<dyn ObjectStore> = match scheme {
508        "s3" => {
509            let Some(options) = table_options.extensions.get::<AwsOptions>() else {
510                return exec_err!(
511                    "Given table options incompatible with the 's3' scheme"
512                );
513            };
514            let builder =
515                get_s3_object_store_builder(url, options, resolve_region).await?;
516            Arc::new(builder.build()?)
517        }
518        "oss" => {
519            let Some(options) = table_options.extensions.get::<AwsOptions>() else {
520                return exec_err!(
521                    "Given table options incompatible with the 'oss' scheme"
522                );
523            };
524            let builder = get_oss_object_store_builder(url, options)?;
525            Arc::new(builder.build()?)
526        }
527        "cos" => {
528            let Some(options) = table_options.extensions.get::<AwsOptions>() else {
529                return exec_err!(
530                    "Given table options incompatible with the 'cos' scheme"
531                );
532            };
533            let builder = get_cos_object_store_builder(url, options)?;
534            Arc::new(builder.build()?)
535        }
536        "gs" | "gcs" => {
537            let Some(options) = table_options.extensions.get::<GcpOptions>() else {
538                return exec_err!(
539                    "Given table options incompatible with the 'gs'/'gcs' scheme"
540                );
541            };
542            let builder = get_gcs_object_store_builder(url, options)?;
543            Arc::new(builder.build()?)
544        }
545        "http" | "https" => Arc::new(
546            HttpBuilder::new()
547                .with_client_options(ClientOptions::new().with_allow_http(true))
548                .with_url(url.origin().ascii_serialization())
549                .build()?,
550        ),
551        _ => {
552            // For other types, try to get from `object_store_registry`:
553            state
554                .runtime_env()
555                .object_store_registry
556                .get_store(url)
557                .map_err(|_| {
558                    exec_datafusion_err!("Unsupported object store scheme: {}", scheme)
559                })?
560        }
561    };
562    Ok(store)
563}
564
565#[cfg(test)]
566mod tests {
567    use crate::cli_context::CliSessionContext;
568
569    use super::*;
570
571    use datafusion::{
572        datasource::listing::ListingTableUrl,
573        logical_expr::{DdlStatement, LogicalPlan},
574        prelude::SessionContext,
575    };
576
577    use object_store::{aws::AmazonS3ConfigKey, gcp::GoogleConfigKey};
578
579    #[tokio::test]
580    async fn s3_object_store_builder_default() -> Result<()> {
581        if let Err(DataFusionError::Execution(e)) = check_aws_envs().await {
582            // Skip test if AWS envs are not set
583            eprintln!("{e}");
584            return Ok(());
585        }
586
587        let location = "s3://bucket/path/FAKE/file.parquet";
588        // Set it to a non-existent file to avoid reading the default configuration file
589        std::env::set_var("AWS_CONFIG_FILE", "data/aws.config");
590        std::env::set_var("AWS_SHARED_CREDENTIALS_FILE", "data/aws.credentials");
591
592        // No options
593        let table_url = ListingTableUrl::parse(location)?;
594        let scheme = table_url.scheme();
595        let sql =
596            format!("CREATE EXTERNAL TABLE test STORED AS PARQUET LOCATION '{location}'");
597
598        let ctx = SessionContext::new();
599        ctx.register_table_options_extension_from_scheme(scheme);
600        let table_options = get_table_options(&ctx, &sql).await;
601        let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
602        let builder =
603            get_s3_object_store_builder(table_url.as_ref(), aws_options, false).await?;
604
605        // If the environment variables are set (as they are in CI) use them
606        let expected_access_key_id = std::env::var("AWS_ACCESS_KEY_ID").ok();
607        let expected_secret_access_key = std::env::var("AWS_SECRET_ACCESS_KEY").ok();
608        let expected_region = Some(
609            std::env::var("AWS_REGION").unwrap_or_else(|_| "eu-central-1".to_string()),
610        );
611        let expected_endpoint = std::env::var("AWS_ENDPOINT").ok();
612
613        // get the actual configuration information, then assert_eq!
614        assert_eq!(
615            builder.get_config_value(&AmazonS3ConfigKey::AccessKeyId),
616            expected_access_key_id
617        );
618        assert_eq!(
619            builder.get_config_value(&AmazonS3ConfigKey::SecretAccessKey),
620            expected_secret_access_key
621        );
622        // Default is to skip signature when no credentials are provided
623        let expected_skip_signature =
624            if expected_access_key_id.is_none() && expected_secret_access_key.is_none() {
625                Some(String::from("true"))
626            } else {
627                Some(String::from("false"))
628            };
629        assert_eq!(
630            builder.get_config_value(&AmazonS3ConfigKey::Region),
631            expected_region
632        );
633        assert_eq!(
634            builder.get_config_value(&AmazonS3ConfigKey::Endpoint),
635            expected_endpoint
636        );
637        assert_eq!(builder.get_config_value(&AmazonS3ConfigKey::Token), None);
638        assert_eq!(
639            builder.get_config_value(&AmazonS3ConfigKey::SkipSignature),
640            expected_skip_signature
641        );
642        Ok(())
643    }
644
645    #[tokio::test]
646    async fn s3_object_store_builder() -> Result<()> {
647        // "fake" is uppercase to ensure the values are not lowercased when parsed
648        let access_key_id = "FAKE_access_key_id";
649        let secret_access_key = "FAKE_secret_access_key";
650        let region = "fake_us-east-2";
651        let endpoint = "endpoint33";
652        let session_token = "FAKE_session_token";
653        let location = "s3://bucket/path/FAKE/file.parquet";
654
655        let table_url = ListingTableUrl::parse(location)?;
656        let scheme = table_url.scheme();
657        let sql = format!(
658            "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS\
659            ('aws.access_key_id' '{access_key_id}', \
660            'aws.secret_access_key' '{secret_access_key}', \
661            'aws.region' '{region}', \
662            'aws.session_token' {session_token}, \
663            'aws.endpoint' '{endpoint}'\
664            ) LOCATION '{location}'"
665        );
666
667        let ctx = SessionContext::new();
668        ctx.register_table_options_extension_from_scheme(scheme);
669        let table_options = get_table_options(&ctx, &sql).await;
670        let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
671        let builder =
672            get_s3_object_store_builder(table_url.as_ref(), aws_options, false).await?;
673        // get the actual configuration information, then assert_eq!
674        let config = [
675            (AmazonS3ConfigKey::AccessKeyId, access_key_id),
676            (AmazonS3ConfigKey::SecretAccessKey, secret_access_key),
677            (AmazonS3ConfigKey::Region, region),
678            (AmazonS3ConfigKey::Endpoint, endpoint),
679            (AmazonS3ConfigKey::Token, session_token),
680        ];
681        for (key, value) in config {
682            assert_eq!(value, builder.get_config_value(&key).unwrap());
683        }
684        // Should not skip signature when credentials are provided
685        assert_eq!(
686            builder.get_config_value(&AmazonS3ConfigKey::SkipSignature),
687            Some("false".into())
688        );
689
690        Ok(())
691    }
692
693    #[tokio::test]
694    async fn s3_object_store_builder_allow_http_error() -> Result<()> {
695        let access_key_id = "fake_access_key_id";
696        let secret_access_key = "fake_secret_access_key";
697        let endpoint = "http://endpoint33";
698        let location = "s3://bucket/path/file.parquet";
699
700        let table_url = ListingTableUrl::parse(location)?;
701        let scheme = table_url.scheme();
702        let sql = format!(
703            "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS\
704            ('aws.access_key_id' '{access_key_id}', \
705            'aws.secret_access_key' '{secret_access_key}', \
706            'aws.endpoint' '{endpoint}'\
707            ) LOCATION '{location}'"
708        );
709
710        let ctx = SessionContext::new();
711        ctx.register_table_options_extension_from_scheme(scheme);
712
713        let table_options = get_table_options(&ctx, &sql).await;
714        let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
715        let err = get_s3_object_store_builder(table_url.as_ref(), aws_options, false)
716            .await
717            .unwrap_err();
718
719        assert_eq!(err.to_string().lines().next().unwrap_or_default(), "Invalid or Unsupported Configuration: Invalid endpoint: http://endpoint33. HTTP is not allowed for S3 endpoints. To allow HTTP, set 'aws.allow_http' to true");
720
721        // Now add `allow_http` to the options and check if it works
722        let sql = format!(
723            "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS\
724            ('aws.access_key_id' '{access_key_id}', \
725            'aws.secret_access_key' '{secret_access_key}', \
726            'aws.endpoint' '{endpoint}',\
727            'aws.allow_http' 'true'\
728            ) LOCATION '{location}'"
729        );
730        let table_options = get_table_options(&ctx, &sql).await;
731
732        let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
733        // ensure this isn't an error
734        get_s3_object_store_builder(table_url.as_ref(), aws_options, false).await?;
735
736        Ok(())
737    }
738
739    #[tokio::test]
740    async fn s3_object_store_builder_resolves_region_when_none_provided() -> Result<()> {
741        if let Err(DataFusionError::Execution(e)) = check_aws_envs().await {
742            // Skip test if AWS envs are not set
743            eprintln!("{e}");
744            return Ok(());
745        }
746        let expected_region = "eu-central-1";
747        let location = "s3://test-bucket/path/file.parquet";
748        // Set it to a non-existent file to avoid reading the default configuration file
749        std::env::set_var("AWS_CONFIG_FILE", "data/aws.config");
750
751        let table_url = ListingTableUrl::parse(location)?;
752        let aws_options = AwsOptions {
753            region: None, // No region specified - should auto-detect
754            ..Default::default()
755        };
756
757        let builder =
758            get_s3_object_store_builder(table_url.as_ref(), &aws_options, false).await?;
759
760        // Verify that the region was auto-detected in test environment
761        assert_eq!(
762            builder.get_config_value(&AmazonS3ConfigKey::Region),
763            Some(expected_region.to_string())
764        );
765
766        Ok(())
767    }
768
769    #[tokio::test]
770    async fn s3_object_store_builder_overrides_region_when_resolve_region_enabled(
771    ) -> Result<()> {
772        if let Err(DataFusionError::Execution(e)) = check_aws_envs().await {
773            // Skip test if AWS envs are not set
774            eprintln!("{e}");
775            return Ok(());
776        }
777
778        let original_region = "us-east-1";
779        let expected_region = "eu-central-1"; // This should be the auto-detected region
780        let location = "s3://test-bucket/path/file.parquet";
781
782        let table_url = ListingTableUrl::parse(location)?;
783        let aws_options = AwsOptions {
784            region: Some(original_region.to_string()), // Explicit region provided
785            ..Default::default()
786        };
787
788        let builder =
789            get_s3_object_store_builder(table_url.as_ref(), &aws_options, true).await?;
790
791        // Verify that the region was overridden by auto-detection
792        assert_eq!(
793            builder.get_config_value(&AmazonS3ConfigKey::Region),
794            Some(expected_region.to_string())
795        );
796
797        Ok(())
798    }
799
800    #[tokio::test]
801    async fn oss_object_store_builder() -> Result<()> {
802        let access_key_id = "fake_access_key_id";
803        let secret_access_key = "fake_secret_access_key";
804        let endpoint = "fake_endpoint";
805        let location = "oss://bucket/path/file.parquet";
806
807        let table_url = ListingTableUrl::parse(location)?;
808        let scheme = table_url.scheme();
809        let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS('aws.access_key_id' '{access_key_id}', 'aws.secret_access_key' '{secret_access_key}', 'aws.oss.endpoint' '{endpoint}') LOCATION '{location}'");
810
811        let ctx = SessionContext::new();
812        ctx.register_table_options_extension_from_scheme(scheme);
813        let table_options = get_table_options(&ctx, &sql).await;
814
815        let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
816        let builder = get_oss_object_store_builder(table_url.as_ref(), aws_options)?;
817        // get the actual configuration information, then assert_eq!
818        let config = [
819            (AmazonS3ConfigKey::AccessKeyId, access_key_id),
820            (AmazonS3ConfigKey::SecretAccessKey, secret_access_key),
821            (AmazonS3ConfigKey::Endpoint, endpoint),
822        ];
823        for (key, value) in config {
824            assert_eq!(value, builder.get_config_value(&key).unwrap());
825        }
826
827        Ok(())
828    }
829
830    #[tokio::test]
831    async fn gcs_object_store_builder() -> Result<()> {
832        let service_account_path = "fake_service_account_path";
833        let service_account_key =
834            "{\"private_key\": \"fake_private_key.pem\",\"client_email\":\"fake_client_email\"}";
835        let application_credentials_path = "fake_application_credentials_path";
836        let location = "gcs://bucket/path/file.parquet";
837
838        let table_url = ListingTableUrl::parse(location)?;
839        let scheme = table_url.scheme();
840        let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS('gcp.service_account_path' '{service_account_path}', 'gcp.service_account_key' '{service_account_key}', 'gcp.application_credentials_path' '{application_credentials_path}') LOCATION '{location}'");
841
842        let ctx = SessionContext::new();
843        ctx.register_table_options_extension_from_scheme(scheme);
844        let table_options = get_table_options(&ctx, &sql).await;
845
846        let gcp_options = table_options.extensions.get::<GcpOptions>().unwrap();
847        let builder = get_gcs_object_store_builder(table_url.as_ref(), gcp_options)?;
848        // get the actual configuration information, then assert_eq!
849        let config = [
850            (GoogleConfigKey::ServiceAccount, service_account_path),
851            (GoogleConfigKey::ServiceAccountKey, service_account_key),
852            (
853                GoogleConfigKey::ApplicationCredentials,
854                application_credentials_path,
855            ),
856        ];
857        for (key, value) in config {
858            assert_eq!(value, builder.get_config_value(&key).unwrap());
859        }
860
861        Ok(())
862    }
863
864    /// Plans the `CREATE EXTERNAL TABLE` SQL statement and returns the
865    /// resulting resolved `CreateExternalTable` command.
866    async fn get_table_options(ctx: &SessionContext, sql: &str) -> TableOptions {
867        let mut plan = ctx.state().create_logical_plan(sql).await.unwrap();
868
869        let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan else {
870            panic!("plan is not a CreateExternalTable");
871        };
872
873        let mut table_options = ctx.state().default_table_options();
874        table_options
875            .alter_with_string_hash_map(&cmd.options)
876            .unwrap();
877        table_options
878    }
879
880    async fn check_aws_envs() -> Result<()> {
881        let aws_envs = [
882            "AWS_ACCESS_KEY_ID",
883            "AWS_SECRET_ACCESS_KEY",
884            "AWS_REGION",
885            "AWS_ALLOW_HTTP",
886        ];
887        for aws_env in aws_envs {
888            std::env::var(aws_env).map_err(|_| {
889                exec_datafusion_err!("aws envs not set, skipping s3 tests")
890            })?;
891        }
892        Ok(())
893    }
894}