datafusion_cli/
object_storage.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18pub mod instrumented;
19
20use async_trait::async_trait;
21use aws_config::BehaviorVersion;
22use aws_credential_types::provider::{
23    ProvideCredentials, SharedCredentialsProvider, error::CredentialsError,
24};
25use datafusion::{
26    common::{
27        config::ConfigEntry, config::ConfigExtension, config::ConfigField,
28        config::ExtensionOptions, config::TableOptions, config::Visit, config_err,
29        exec_datafusion_err, exec_err,
30    },
31    error::{DataFusionError, Result},
32    execution::context::SessionState,
33};
34use log::debug;
35use object_store::{
36    ClientOptions, CredentialProvider,
37    Error::Generic,
38    ObjectStore,
39    aws::{AmazonS3Builder, AmazonS3ConfigKey, AwsCredential},
40    gcp::GoogleCloudStorageBuilder,
41    http::HttpBuilder,
42};
43use std::{
44    any::Any,
45    error::Error,
46    fmt::{Debug, Display},
47    sync::Arc,
48};
49use url::Url;
50
51#[cfg(not(test))]
52use object_store::aws::resolve_bucket_region;
53
54// Provide a local mock when running tests so we don't make network calls
55#[cfg(test)]
56async fn resolve_bucket_region(
57    _bucket: &str,
58    _client_options: &ClientOptions,
59) -> object_store::Result<String> {
60    Ok("eu-central-1".to_string())
61}
62
63pub async fn get_s3_object_store_builder(
64    url: &Url,
65    aws_options: &AwsOptions,
66    resolve_region: bool,
67) -> Result<AmazonS3Builder> {
68    let AwsOptions {
69        access_key_id,
70        secret_access_key,
71        session_token,
72        region,
73        endpoint,
74        allow_http,
75        skip_signature,
76    } = aws_options;
77
78    let bucket_name = get_bucket_name(url)?;
79    let mut builder = AmazonS3Builder::from_env().with_bucket_name(bucket_name);
80
81    if let (Some(access_key_id), Some(secret_access_key)) =
82        (access_key_id, secret_access_key)
83    {
84        debug!("Using explicitly provided S3 access_key_id and secret_access_key");
85        builder = builder
86            .with_access_key_id(access_key_id)
87            .with_secret_access_key(secret_access_key);
88
89        if let Some(session_token) = session_token {
90            builder = builder.with_token(session_token);
91        }
92    } else {
93        debug!("Using AWS S3 SDK to determine credentials");
94        let CredentialsFromConfig {
95            region,
96            credentials,
97        } = CredentialsFromConfig::try_new().await?;
98        if let Some(region) = region {
99            builder = builder.with_region(region);
100        }
101        if let Some(credentials) = credentials {
102            let credentials = Arc::new(S3CredentialProvider { credentials });
103            builder = builder.with_credentials(credentials);
104        } else {
105            debug!("No credentials found, defaulting to skip signature ");
106            builder = builder.with_skip_signature(true);
107        }
108    }
109
110    if let Some(region) = region {
111        builder = builder.with_region(region);
112    }
113
114    // If the region is not set or auto_detect_region is true, resolve the region.
115    if builder
116        .get_config_value(&AmazonS3ConfigKey::Region)
117        .is_none()
118        || resolve_region
119    {
120        let region = resolve_bucket_region(bucket_name, &ClientOptions::new()).await?;
121        builder = builder.with_region(region);
122    }
123
124    if let Some(endpoint) = endpoint {
125        // Make a nicer error if the user hasn't allowed http and the endpoint
126        // is http as the default message is "URL scheme is not allowed"
127        if let Ok(endpoint_url) = Url::try_from(endpoint.as_str())
128            && !matches!(allow_http, Some(true))
129            && endpoint_url.scheme() == "http"
130        {
131            return config_err!(
132                "Invalid endpoint: {endpoint}. \
133                HTTP is not allowed for S3 endpoints. \
134                To allow HTTP, set 'aws.allow_http' to true"
135            );
136        }
137
138        builder = builder.with_endpoint(endpoint);
139    }
140
141    if let Some(allow_http) = allow_http {
142        builder = builder.with_allow_http(*allow_http);
143    }
144
145    if let Some(skip_signature) = skip_signature {
146        builder = builder.with_skip_signature(*skip_signature);
147    }
148
149    Ok(builder)
150}
151
152/// Credentials from the AWS SDK
153struct CredentialsFromConfig {
154    region: Option<String>,
155    credentials: Option<SharedCredentialsProvider>,
156}
157
158impl CredentialsFromConfig {
159    /// Attempt find AWS S3 credentials via the AWS SDK
160    pub async fn try_new() -> Result<Self> {
161        let config = aws_config::defaults(BehaviorVersion::latest()).load().await;
162        let region = config.region().map(|r| r.to_string());
163
164        let credentials = config
165            .credentials_provider()
166            .ok_or_else(|| {
167                DataFusionError::ObjectStore(Box::new(Generic {
168                    store: "S3",
169                    source: "Failed to get S3 credentials aws_config".into(),
170                }))
171            })?
172            .clone();
173
174        // The credential provider is lazy, so it does not fetch credentials
175        // until they are needed. To ensure that the credentials are valid,
176        // we can call `provide_credentials` here.
177        let credentials = match credentials.provide_credentials().await {
178            Ok(_) => Some(credentials),
179            Err(CredentialsError::CredentialsNotLoaded(_)) => {
180                debug!("Could not use AWS SDK to get credentials");
181                None
182            }
183            // other errors like `CredentialsError::InvalidConfiguration`
184            // should be returned to the user so they can be fixed
185            Err(e) => {
186                // Pass back underlying error to the user, including underlying source
187                let source_message = if let Some(source) = e.source() {
188                    format!(": {source}")
189                } else {
190                    String::new()
191                };
192
193                let message = format!(
194                    "Error getting credentials from provider: {e}{source_message}",
195                );
196
197                return Err(DataFusionError::ObjectStore(Box::new(Generic {
198                    store: "S3",
199                    source: message.into(),
200                })));
201            }
202        };
203        Ok(Self {
204            region,
205            credentials,
206        })
207    }
208}
209
210#[derive(Debug)]
211struct S3CredentialProvider {
212    credentials: aws_credential_types::provider::SharedCredentialsProvider,
213}
214
215#[async_trait]
216impl CredentialProvider for S3CredentialProvider {
217    type Credential = AwsCredential;
218
219    async fn get_credential(&self) -> object_store::Result<Arc<Self::Credential>> {
220        let creds =
221            self.credentials
222                .provide_credentials()
223                .await
224                .map_err(|e| Generic {
225                    store: "S3",
226                    source: Box::new(e),
227                })?;
228        Ok(Arc::new(AwsCredential {
229            key_id: creds.access_key_id().to_string(),
230            secret_key: creds.secret_access_key().to_string(),
231            token: creds.session_token().map(ToString::to_string),
232        }))
233    }
234}
235
236pub fn get_oss_object_store_builder(
237    url: &Url,
238    aws_options: &AwsOptions,
239) -> Result<AmazonS3Builder> {
240    get_object_store_builder(url, aws_options, true)
241}
242
243pub fn get_cos_object_store_builder(
244    url: &Url,
245    aws_options: &AwsOptions,
246) -> Result<AmazonS3Builder> {
247    get_object_store_builder(url, aws_options, false)
248}
249
250fn get_object_store_builder(
251    url: &Url,
252    aws_options: &AwsOptions,
253    virtual_hosted_style_request: bool,
254) -> Result<AmazonS3Builder> {
255    let bucket_name = get_bucket_name(url)?;
256    let mut builder = AmazonS3Builder::from_env()
257        .with_virtual_hosted_style_request(virtual_hosted_style_request)
258        .with_bucket_name(bucket_name)
259        // oss/cos don't care about the "region" field
260        .with_region("do_not_care");
261
262    if let (Some(access_key_id), Some(secret_access_key)) =
263        (&aws_options.access_key_id, &aws_options.secret_access_key)
264    {
265        builder = builder
266            .with_access_key_id(access_key_id)
267            .with_secret_access_key(secret_access_key);
268    }
269
270    if let Some(endpoint) = &aws_options.endpoint {
271        builder = builder.with_endpoint(endpoint);
272    }
273
274    Ok(builder)
275}
276
277pub fn get_gcs_object_store_builder(
278    url: &Url,
279    gs_options: &GcpOptions,
280) -> Result<GoogleCloudStorageBuilder> {
281    let bucket_name = get_bucket_name(url)?;
282    let mut builder = GoogleCloudStorageBuilder::from_env().with_bucket_name(bucket_name);
283
284    if let Some(service_account_path) = &gs_options.service_account_path {
285        builder = builder.with_service_account_path(service_account_path);
286    }
287
288    if let Some(service_account_key) = &gs_options.service_account_key {
289        builder = builder.with_service_account_key(service_account_key);
290    }
291
292    if let Some(application_credentials_path) = &gs_options.application_credentials_path {
293        builder = builder.with_application_credentials(application_credentials_path);
294    }
295
296    Ok(builder)
297}
298
299fn get_bucket_name(url: &Url) -> Result<&str> {
300    url.host_str().ok_or_else(|| {
301        exec_datafusion_err!("Not able to parse bucket name from url: {}", url.as_str())
302    })
303}
304
305/// This struct encapsulates AWS options one uses when setting up object storage.
306#[derive(Default, Debug, Clone)]
307pub struct AwsOptions {
308    /// Access Key ID
309    pub access_key_id: Option<String>,
310    /// Secret Access Key
311    pub secret_access_key: Option<String>,
312    /// Session token
313    pub session_token: Option<String>,
314    /// AWS Region
315    pub region: Option<String>,
316    /// OSS or COS Endpoint
317    pub endpoint: Option<String>,
318    /// Allow HTTP (otherwise will always use https)
319    pub allow_http: Option<bool>,
320    /// Do not fetch credentials and do not sign requests
321    ///
322    /// This can be useful when interacting with public S3 buckets that deny
323    /// authorized requests
324    pub skip_signature: Option<bool>,
325}
326
327impl ExtensionOptions for AwsOptions {
328    fn as_any(&self) -> &dyn Any {
329        self
330    }
331
332    fn as_any_mut(&mut self) -> &mut dyn Any {
333        self
334    }
335
336    fn cloned(&self) -> Box<dyn ExtensionOptions> {
337        Box::new(self.clone())
338    }
339
340    fn set(&mut self, key: &str, value: &str) -> Result<()> {
341        let (_key, aws_key) = key.split_once('.').unwrap_or((key, ""));
342        let (key, rem) = aws_key.split_once('.').unwrap_or((aws_key, ""));
343        match key {
344            "access_key_id" => {
345                self.access_key_id.set(rem, value)?;
346            }
347            "secret_access_key" => {
348                self.secret_access_key.set(rem, value)?;
349            }
350            "session_token" => {
351                self.session_token.set(rem, value)?;
352            }
353            "region" => {
354                self.region.set(rem, value)?;
355            }
356            "oss" | "cos" | "endpoint" => {
357                self.endpoint.set(rem, value)?;
358            }
359            "allow_http" => {
360                self.allow_http.set(rem, value)?;
361            }
362            "skip_signature" | "nosign" => {
363                self.skip_signature.set(rem, value)?;
364            }
365            _ => {
366                return config_err!("Config value \"{}\" not found on AwsOptions", rem);
367            }
368        }
369        Ok(())
370    }
371
372    fn entries(&self) -> Vec<ConfigEntry> {
373        struct Visitor(Vec<ConfigEntry>);
374
375        impl Visit for Visitor {
376            fn some<V: Display>(
377                &mut self,
378                key: &str,
379                value: V,
380                description: &'static str,
381            ) {
382                self.0.push(ConfigEntry {
383                    key: key.to_string(),
384                    value: Some(value.to_string()),
385                    description,
386                })
387            }
388
389            fn none(&mut self, key: &str, description: &'static str) {
390                self.0.push(ConfigEntry {
391                    key: key.to_string(),
392                    value: None,
393                    description,
394                })
395            }
396        }
397
398        let mut v = Visitor(vec![]);
399        self.access_key_id.visit(&mut v, "access_key_id", "");
400        self.secret_access_key
401            .visit(&mut v, "secret_access_key", "");
402        self.session_token.visit(&mut v, "session_token", "");
403        self.region.visit(&mut v, "region", "");
404        self.endpoint.visit(&mut v, "endpoint", "");
405        self.allow_http.visit(&mut v, "allow_http", "");
406        v.0
407    }
408}
409
410impl ConfigExtension for AwsOptions {
411    const PREFIX: &'static str = "aws";
412}
413
414/// This struct encapsulates GCP options one uses when setting up object storage.
415#[derive(Debug, Clone, Default)]
416pub struct GcpOptions {
417    /// Service account path
418    pub service_account_path: Option<String>,
419    /// Service account key
420    pub service_account_key: Option<String>,
421    /// Application credentials path
422    pub application_credentials_path: Option<String>,
423}
424
425impl ExtensionOptions for GcpOptions {
426    fn as_any(&self) -> &dyn Any {
427        self
428    }
429
430    fn as_any_mut(&mut self) -> &mut dyn Any {
431        self
432    }
433
434    fn cloned(&self) -> Box<dyn ExtensionOptions> {
435        Box::new(self.clone())
436    }
437
438    fn set(&mut self, key: &str, value: &str) -> Result<()> {
439        let (_key, rem) = key.split_once('.').unwrap_or((key, ""));
440        match rem {
441            "service_account_path" => {
442                self.service_account_path.set(rem, value)?;
443            }
444            "service_account_key" => {
445                self.service_account_key.set(rem, value)?;
446            }
447            "application_credentials_path" => {
448                self.application_credentials_path.set(rem, value)?;
449            }
450            _ => {
451                return config_err!("Config value \"{}\" not found on GcpOptions", rem);
452            }
453        }
454        Ok(())
455    }
456
457    fn entries(&self) -> Vec<ConfigEntry> {
458        struct Visitor(Vec<ConfigEntry>);
459
460        impl Visit for Visitor {
461            fn some<V: Display>(
462                &mut self,
463                key: &str,
464                value: V,
465                description: &'static str,
466            ) {
467                self.0.push(ConfigEntry {
468                    key: key.to_string(),
469                    value: Some(value.to_string()),
470                    description,
471                })
472            }
473
474            fn none(&mut self, key: &str, description: &'static str) {
475                self.0.push(ConfigEntry {
476                    key: key.to_string(),
477                    value: None,
478                    description,
479                })
480            }
481        }
482
483        let mut v = Visitor(vec![]);
484        self.service_account_path
485            .visit(&mut v, "service_account_path", "");
486        self.service_account_key
487            .visit(&mut v, "service_account_key", "");
488        self.application_credentials_path.visit(
489            &mut v,
490            "application_credentials_path",
491            "",
492        );
493        v.0
494    }
495}
496
497impl ConfigExtension for GcpOptions {
498    const PREFIX: &'static str = "gcp";
499}
500
501pub(crate) async fn get_object_store(
502    state: &SessionState,
503    scheme: &str,
504    url: &Url,
505    table_options: &TableOptions,
506    resolve_region: bool,
507) -> Result<Arc<dyn ObjectStore>, DataFusionError> {
508    let store: Arc<dyn ObjectStore> = match scheme {
509        "s3" => {
510            let Some(options) = table_options.extensions.get::<AwsOptions>() else {
511                return exec_err!(
512                    "Given table options incompatible with the 's3' scheme"
513                );
514            };
515            let builder =
516                get_s3_object_store_builder(url, options, resolve_region).await?;
517            Arc::new(builder.build()?)
518        }
519        "oss" => {
520            let Some(options) = table_options.extensions.get::<AwsOptions>() else {
521                return exec_err!(
522                    "Given table options incompatible with the 'oss' scheme"
523                );
524            };
525            let builder = get_oss_object_store_builder(url, options)?;
526            Arc::new(builder.build()?)
527        }
528        "cos" => {
529            let Some(options) = table_options.extensions.get::<AwsOptions>() else {
530                return exec_err!(
531                    "Given table options incompatible with the 'cos' scheme"
532                );
533            };
534            let builder = get_cos_object_store_builder(url, options)?;
535            Arc::new(builder.build()?)
536        }
537        "gs" | "gcs" => {
538            let Some(options) = table_options.extensions.get::<GcpOptions>() else {
539                return exec_err!(
540                    "Given table options incompatible with the 'gs'/'gcs' scheme"
541                );
542            };
543            let builder = get_gcs_object_store_builder(url, options)?;
544            Arc::new(builder.build()?)
545        }
546        "http" | "https" => Arc::new(
547            HttpBuilder::new()
548                .with_client_options(ClientOptions::new().with_allow_http(true))
549                .with_url(url.origin().ascii_serialization())
550                .build()?,
551        ),
552        _ => {
553            // For other types, try to get from `object_store_registry`:
554            state
555                .runtime_env()
556                .object_store_registry
557                .get_store(url)
558                .map_err(|_| {
559                    exec_datafusion_err!("Unsupported object store scheme: {}", scheme)
560                })?
561        }
562    };
563    Ok(store)
564}
565
566#[cfg(test)]
567mod tests {
568    use crate::cli_context::CliSessionContext;
569
570    use super::*;
571
572    use datafusion::{
573        datasource::listing::ListingTableUrl,
574        logical_expr::{DdlStatement, LogicalPlan},
575        prelude::SessionContext,
576    };
577
578    use object_store::{aws::AmazonS3ConfigKey, gcp::GoogleConfigKey};
579
580    #[tokio::test]
581    async fn s3_object_store_builder_default() -> Result<()> {
582        if let Err(DataFusionError::Execution(e)) = check_aws_envs().await {
583            // Skip test if AWS envs are not set
584            eprintln!("{e}");
585            return Ok(());
586        }
587
588        let location = "s3://bucket/path/FAKE/file.parquet";
589        // Set it to a non-existent file to avoid reading the default configuration file
590        unsafe {
591            std::env::set_var("AWS_CONFIG_FILE", "data/aws.config");
592            std::env::set_var("AWS_SHARED_CREDENTIALS_FILE", "data/aws.credentials");
593        }
594
595        // No options
596        let table_url = ListingTableUrl::parse(location)?;
597        let scheme = table_url.scheme();
598        let sql =
599            format!("CREATE EXTERNAL TABLE test STORED AS PARQUET LOCATION '{location}'");
600
601        let ctx = SessionContext::new();
602        ctx.register_table_options_extension_from_scheme(scheme);
603        let table_options = get_table_options(&ctx, &sql).await;
604        let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
605        let builder =
606            get_s3_object_store_builder(table_url.as_ref(), aws_options, false).await?;
607
608        // If the environment variables are set (as they are in CI) use them
609        let expected_access_key_id = std::env::var("AWS_ACCESS_KEY_ID").ok();
610        let expected_secret_access_key = std::env::var("AWS_SECRET_ACCESS_KEY").ok();
611        let expected_region = Some(
612            std::env::var("AWS_REGION").unwrap_or_else(|_| "eu-central-1".to_string()),
613        );
614        let expected_endpoint = std::env::var("AWS_ENDPOINT").ok();
615
616        // get the actual configuration information, then assert_eq!
617        assert_eq!(
618            builder.get_config_value(&AmazonS3ConfigKey::AccessKeyId),
619            expected_access_key_id
620        );
621        assert_eq!(
622            builder.get_config_value(&AmazonS3ConfigKey::SecretAccessKey),
623            expected_secret_access_key
624        );
625        // Default is to skip signature when no credentials are provided
626        let expected_skip_signature =
627            if expected_access_key_id.is_none() && expected_secret_access_key.is_none() {
628                Some(String::from("true"))
629            } else {
630                Some(String::from("false"))
631            };
632        assert_eq!(
633            builder.get_config_value(&AmazonS3ConfigKey::Region),
634            expected_region
635        );
636        assert_eq!(
637            builder.get_config_value(&AmazonS3ConfigKey::Endpoint),
638            expected_endpoint
639        );
640        assert_eq!(builder.get_config_value(&AmazonS3ConfigKey::Token), None);
641        assert_eq!(
642            builder.get_config_value(&AmazonS3ConfigKey::SkipSignature),
643            expected_skip_signature
644        );
645        Ok(())
646    }
647
648    #[tokio::test]
649    async fn s3_object_store_builder() -> Result<()> {
650        // "fake" is uppercase to ensure the values are not lowercased when parsed
651        let access_key_id = "FAKE_access_key_id";
652        let secret_access_key = "FAKE_secret_access_key";
653        let region = "fake_us-east-2";
654        let endpoint = "endpoint33";
655        let session_token = "FAKE_session_token";
656        let location = "s3://bucket/path/FAKE/file.parquet";
657
658        let table_url = ListingTableUrl::parse(location)?;
659        let scheme = table_url.scheme();
660        let sql = format!(
661            "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS\
662            ('aws.access_key_id' '{access_key_id}', \
663            'aws.secret_access_key' '{secret_access_key}', \
664            'aws.region' '{region}', \
665            'aws.session_token' {session_token}, \
666            'aws.endpoint' '{endpoint}'\
667            ) LOCATION '{location}'"
668        );
669
670        let ctx = SessionContext::new();
671        ctx.register_table_options_extension_from_scheme(scheme);
672        let table_options = get_table_options(&ctx, &sql).await;
673        let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
674        let builder =
675            get_s3_object_store_builder(table_url.as_ref(), aws_options, false).await?;
676        // get the actual configuration information, then assert_eq!
677        let config = [
678            (AmazonS3ConfigKey::AccessKeyId, access_key_id),
679            (AmazonS3ConfigKey::SecretAccessKey, secret_access_key),
680            (AmazonS3ConfigKey::Region, region),
681            (AmazonS3ConfigKey::Endpoint, endpoint),
682            (AmazonS3ConfigKey::Token, session_token),
683        ];
684        for (key, value) in config {
685            assert_eq!(value, builder.get_config_value(&key).unwrap());
686        }
687        // Should not skip signature when credentials are provided
688        assert_eq!(
689            builder.get_config_value(&AmazonS3ConfigKey::SkipSignature),
690            Some("false".into())
691        );
692
693        Ok(())
694    }
695
696    #[tokio::test]
697    async fn s3_object_store_builder_allow_http_error() -> Result<()> {
698        let access_key_id = "fake_access_key_id";
699        let secret_access_key = "fake_secret_access_key";
700        let endpoint = "http://endpoint33";
701        let location = "s3://bucket/path/file.parquet";
702
703        let table_url = ListingTableUrl::parse(location)?;
704        let scheme = table_url.scheme();
705        let sql = format!(
706            "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS\
707            ('aws.access_key_id' '{access_key_id}', \
708            'aws.secret_access_key' '{secret_access_key}', \
709            'aws.endpoint' '{endpoint}'\
710            ) LOCATION '{location}'"
711        );
712
713        let ctx = SessionContext::new();
714        ctx.register_table_options_extension_from_scheme(scheme);
715
716        let table_options = get_table_options(&ctx, &sql).await;
717        let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
718        let err = get_s3_object_store_builder(table_url.as_ref(), aws_options, false)
719            .await
720            .unwrap_err();
721
722        assert_eq!(
723            err.to_string().lines().next().unwrap_or_default(),
724            "Invalid or Unsupported Configuration: Invalid endpoint: http://endpoint33. HTTP is not allowed for S3 endpoints. To allow HTTP, set 'aws.allow_http' to true"
725        );
726
727        // Now add `allow_http` to the options and check if it works
728        let sql = format!(
729            "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS\
730            ('aws.access_key_id' '{access_key_id}', \
731            'aws.secret_access_key' '{secret_access_key}', \
732            'aws.endpoint' '{endpoint}',\
733            'aws.allow_http' 'true'\
734            ) LOCATION '{location}'"
735        );
736        let table_options = get_table_options(&ctx, &sql).await;
737
738        let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
739        // ensure this isn't an error
740        get_s3_object_store_builder(table_url.as_ref(), aws_options, false).await?;
741
742        Ok(())
743    }
744
745    #[tokio::test]
746    async fn s3_object_store_builder_resolves_region_when_none_provided() -> Result<()> {
747        if let Err(DataFusionError::Execution(e)) = check_aws_envs().await {
748            // Skip test if AWS envs are not set
749            eprintln!("{e}");
750            return Ok(());
751        }
752        let expected_region = "eu-central-1";
753        let location = "s3://test-bucket/path/file.parquet";
754        // Set it to a non-existent file to avoid reading the default configuration file
755        unsafe {
756            std::env::set_var("AWS_CONFIG_FILE", "data/aws.config");
757        }
758
759        let table_url = ListingTableUrl::parse(location)?;
760        let aws_options = AwsOptions {
761            region: None, // No region specified - should auto-detect
762            ..Default::default()
763        };
764
765        let builder =
766            get_s3_object_store_builder(table_url.as_ref(), &aws_options, false).await?;
767
768        // Verify that the region was auto-detected in test environment
769        assert_eq!(
770            builder.get_config_value(&AmazonS3ConfigKey::Region),
771            Some(expected_region.to_string())
772        );
773
774        Ok(())
775    }
776
777    #[tokio::test]
778    async fn s3_object_store_builder_overrides_region_when_resolve_region_enabled()
779    -> Result<()> {
780        if let Err(DataFusionError::Execution(e)) = check_aws_envs().await {
781            // Skip test if AWS envs are not set
782            eprintln!("{e}");
783            return Ok(());
784        }
785
786        let original_region = "us-east-1";
787        let expected_region = "eu-central-1"; // This should be the auto-detected region
788        let location = "s3://test-bucket/path/file.parquet";
789
790        let table_url = ListingTableUrl::parse(location)?;
791        let aws_options = AwsOptions {
792            region: Some(original_region.to_string()), // Explicit region provided
793            ..Default::default()
794        };
795
796        let builder =
797            get_s3_object_store_builder(table_url.as_ref(), &aws_options, true).await?;
798
799        // Verify that the region was overridden by auto-detection
800        assert_eq!(
801            builder.get_config_value(&AmazonS3ConfigKey::Region),
802            Some(expected_region.to_string())
803        );
804
805        Ok(())
806    }
807
808    #[tokio::test]
809    async fn oss_object_store_builder() -> Result<()> {
810        let access_key_id = "fake_access_key_id";
811        let secret_access_key = "fake_secret_access_key";
812        let endpoint = "fake_endpoint";
813        let location = "oss://bucket/path/file.parquet";
814
815        let table_url = ListingTableUrl::parse(location)?;
816        let scheme = table_url.scheme();
817        let sql = format!(
818            "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS('aws.access_key_id' '{access_key_id}', 'aws.secret_access_key' '{secret_access_key}', 'aws.oss.endpoint' '{endpoint}') LOCATION '{location}'"
819        );
820
821        let ctx = SessionContext::new();
822        ctx.register_table_options_extension_from_scheme(scheme);
823        let table_options = get_table_options(&ctx, &sql).await;
824
825        let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
826        let builder = get_oss_object_store_builder(table_url.as_ref(), aws_options)?;
827        // get the actual configuration information, then assert_eq!
828        let config = [
829            (AmazonS3ConfigKey::AccessKeyId, access_key_id),
830            (AmazonS3ConfigKey::SecretAccessKey, secret_access_key),
831            (AmazonS3ConfigKey::Endpoint, endpoint),
832        ];
833        for (key, value) in config {
834            assert_eq!(value, builder.get_config_value(&key).unwrap());
835        }
836
837        Ok(())
838    }
839
840    #[tokio::test]
841    async fn gcs_object_store_builder() -> Result<()> {
842        let service_account_path = "fake_service_account_path";
843        let service_account_key = "{\"private_key\": \"fake_private_key.pem\",\"client_email\":\"fake_client_email\"}";
844        let application_credentials_path = "fake_application_credentials_path";
845        let location = "gcs://bucket/path/file.parquet";
846
847        let table_url = ListingTableUrl::parse(location)?;
848        let scheme = table_url.scheme();
849        let sql = format!(
850            "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS('gcp.service_account_path' '{service_account_path}', 'gcp.service_account_key' '{service_account_key}', 'gcp.application_credentials_path' '{application_credentials_path}') LOCATION '{location}'"
851        );
852
853        let ctx = SessionContext::new();
854        ctx.register_table_options_extension_from_scheme(scheme);
855        let table_options = get_table_options(&ctx, &sql).await;
856
857        let gcp_options = table_options.extensions.get::<GcpOptions>().unwrap();
858        let builder = get_gcs_object_store_builder(table_url.as_ref(), gcp_options)?;
859        // get the actual configuration information, then assert_eq!
860        let config = [
861            (GoogleConfigKey::ServiceAccount, service_account_path),
862            (GoogleConfigKey::ServiceAccountKey, service_account_key),
863            (
864                GoogleConfigKey::ApplicationCredentials,
865                application_credentials_path,
866            ),
867        ];
868        for (key, value) in config {
869            assert_eq!(value, builder.get_config_value(&key).unwrap());
870        }
871
872        Ok(())
873    }
874
875    /// Plans the `CREATE EXTERNAL TABLE` SQL statement and returns the
876    /// resulting resolved `CreateExternalTable` command.
877    async fn get_table_options(ctx: &SessionContext, sql: &str) -> TableOptions {
878        let mut plan = ctx.state().create_logical_plan(sql).await.unwrap();
879
880        let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan else {
881            panic!("plan is not a CreateExternalTable");
882        };
883
884        let mut table_options = ctx.state().default_table_options();
885        table_options
886            .alter_with_string_hash_map(&cmd.options)
887            .unwrap();
888        table_options
889    }
890
891    async fn check_aws_envs() -> Result<()> {
892        let aws_envs = [
893            "AWS_ACCESS_KEY_ID",
894            "AWS_SECRET_ACCESS_KEY",
895            "AWS_REGION",
896            "AWS_ALLOW_HTTP",
897        ];
898        for aws_env in aws_envs {
899            std::env::var(aws_env).map_err(|_| {
900                exec_datafusion_err!("aws envs not set, skipping s3 tests")
901            })?;
902        }
903        Ok(())
904    }
905}