datafusion_cli/
object_storage.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use async_trait::async_trait;
19use aws_config::BehaviorVersion;
20use aws_credential_types::provider::{
21    error::CredentialsError, ProvideCredentials, SharedCredentialsProvider,
22};
23use datafusion::{
24    common::{
25        config::ConfigEntry, config::ConfigExtension, config::ConfigField,
26        config::ExtensionOptions, config::TableOptions, config::Visit, config_err,
27        exec_datafusion_err, exec_err,
28    },
29    error::{DataFusionError, Result},
30    execution::context::SessionState,
31};
32use log::debug;
33use object_store::{
34    aws::{AmazonS3Builder, AmazonS3ConfigKey, AwsCredential},
35    gcp::GoogleCloudStorageBuilder,
36    http::HttpBuilder,
37    ClientOptions, CredentialProvider,
38    Error::Generic,
39    ObjectStore,
40};
41use std::{
42    any::Any,
43    error::Error,
44    fmt::{Debug, Display},
45    sync::Arc,
46};
47use url::Url;
48
49#[cfg(not(test))]
50use object_store::aws::resolve_bucket_region;
51
52// Provide a local mock when running tests so we don't make network calls
53#[cfg(test)]
54async fn resolve_bucket_region(
55    _bucket: &str,
56    _client_options: &ClientOptions,
57) -> object_store::Result<String> {
58    Ok("eu-central-1".to_string())
59}
60
61pub async fn get_s3_object_store_builder(
62    url: &Url,
63    aws_options: &AwsOptions,
64    resolve_region: bool,
65) -> Result<AmazonS3Builder> {
66    let AwsOptions {
67        access_key_id,
68        secret_access_key,
69        session_token,
70        region,
71        endpoint,
72        allow_http,
73        skip_signature,
74    } = aws_options;
75
76    let bucket_name = get_bucket_name(url)?;
77    let mut builder = AmazonS3Builder::from_env().with_bucket_name(bucket_name);
78
79    if let (Some(access_key_id), Some(secret_access_key)) =
80        (access_key_id, secret_access_key)
81    {
82        debug!("Using explicitly provided S3 access_key_id and secret_access_key");
83        builder = builder
84            .with_access_key_id(access_key_id)
85            .with_secret_access_key(secret_access_key);
86
87        if let Some(session_token) = session_token {
88            builder = builder.with_token(session_token);
89        }
90    } else {
91        debug!("Using AWS S3 SDK to determine credentials");
92        let CredentialsFromConfig {
93            region,
94            credentials,
95        } = CredentialsFromConfig::try_new().await?;
96        if let Some(region) = region {
97            builder = builder.with_region(region);
98        }
99        if let Some(credentials) = credentials {
100            let credentials = Arc::new(S3CredentialProvider { credentials });
101            builder = builder.with_credentials(credentials);
102        } else {
103            debug!("No credentials found, defaulting to skip signature ");
104            builder = builder.with_skip_signature(true);
105        }
106    }
107
108    if let Some(region) = region {
109        builder = builder.with_region(region);
110    }
111
112    // If the region is not set or auto_detect_region is true, resolve the region.
113    if builder
114        .get_config_value(&AmazonS3ConfigKey::Region)
115        .is_none()
116        || resolve_region
117    {
118        let region = resolve_bucket_region(bucket_name, &ClientOptions::new()).await?;
119        builder = builder.with_region(region);
120    }
121
122    if let Some(endpoint) = endpoint {
123        // Make a nicer error if the user hasn't allowed http and the endpoint
124        // is http as the default message is "URL scheme is not allowed"
125        if let Ok(endpoint_url) = Url::try_from(endpoint.as_str()) {
126            if !matches!(allow_http, Some(true)) && endpoint_url.scheme() == "http" {
127                return config_err!(
128                    "Invalid endpoint: {endpoint}. \
129                HTTP is not allowed for S3 endpoints. \
130                To allow HTTP, set 'aws.allow_http' to true"
131                );
132            }
133        }
134
135        builder = builder.with_endpoint(endpoint);
136    }
137
138    if let Some(allow_http) = allow_http {
139        builder = builder.with_allow_http(*allow_http);
140    }
141
142    if let Some(skip_signature) = skip_signature {
143        builder = builder.with_skip_signature(*skip_signature);
144    }
145
146    Ok(builder)
147}
148
149/// Credentials from the AWS SDK
150struct CredentialsFromConfig {
151    region: Option<String>,
152    credentials: Option<SharedCredentialsProvider>,
153}
154
155impl CredentialsFromConfig {
156    /// Attempt find AWS S3 credentials via the AWS SDK
157    pub async fn try_new() -> Result<Self> {
158        let config = aws_config::defaults(BehaviorVersion::latest()).load().await;
159        let region = config.region().map(|r| r.to_string());
160
161        let credentials = config
162            .credentials_provider()
163            .ok_or_else(|| {
164                DataFusionError::ObjectStore(Box::new(Generic {
165                    store: "S3",
166                    source: "Failed to get S3 credentials aws_config".into(),
167                }))
168            })?
169            .clone();
170
171        // The credential provider is lazy, so it does not fetch credentials
172        // until they are needed. To ensure that the credentials are valid,
173        // we can call `provide_credentials` here.
174        let credentials = match credentials.provide_credentials().await {
175            Ok(_) => Some(credentials),
176            Err(CredentialsError::CredentialsNotLoaded(_)) => {
177                debug!("Could not use AWS SDK to get credentials");
178                None
179            }
180            // other errors like `CredentialsError::InvalidConfiguration`
181            // should be returned to the user so they can be fixed
182            Err(e) => {
183                // Pass back underlying error to the user, including underlying source
184                let source_message = if let Some(source) = e.source() {
185                    format!(": {source}")
186                } else {
187                    String::new()
188                };
189
190                let message = format!(
191                    "Error getting credentials from provider: {e}{source_message}",
192                );
193
194                return Err(DataFusionError::ObjectStore(Box::new(Generic {
195                    store: "S3",
196                    source: message.into(),
197                })));
198            }
199        };
200        Ok(Self {
201            region,
202            credentials,
203        })
204    }
205}
206
207#[derive(Debug)]
208struct S3CredentialProvider {
209    credentials: aws_credential_types::provider::SharedCredentialsProvider,
210}
211
212#[async_trait]
213impl CredentialProvider for S3CredentialProvider {
214    type Credential = AwsCredential;
215
216    async fn get_credential(&self) -> object_store::Result<Arc<Self::Credential>> {
217        let creds =
218            self.credentials
219                .provide_credentials()
220                .await
221                .map_err(|e| Generic {
222                    store: "S3",
223                    source: Box::new(e),
224                })?;
225        Ok(Arc::new(AwsCredential {
226            key_id: creds.access_key_id().to_string(),
227            secret_key: creds.secret_access_key().to_string(),
228            token: creds.session_token().map(ToString::to_string),
229        }))
230    }
231}
232
233pub fn get_oss_object_store_builder(
234    url: &Url,
235    aws_options: &AwsOptions,
236) -> Result<AmazonS3Builder> {
237    get_object_store_builder(url, aws_options, true)
238}
239
240pub fn get_cos_object_store_builder(
241    url: &Url,
242    aws_options: &AwsOptions,
243) -> Result<AmazonS3Builder> {
244    get_object_store_builder(url, aws_options, false)
245}
246
247fn get_object_store_builder(
248    url: &Url,
249    aws_options: &AwsOptions,
250    virtual_hosted_style_request: bool,
251) -> Result<AmazonS3Builder> {
252    let bucket_name = get_bucket_name(url)?;
253    let mut builder = AmazonS3Builder::from_env()
254        .with_virtual_hosted_style_request(virtual_hosted_style_request)
255        .with_bucket_name(bucket_name)
256        // oss/cos don't care about the "region" field
257        .with_region("do_not_care");
258
259    if let (Some(access_key_id), Some(secret_access_key)) =
260        (&aws_options.access_key_id, &aws_options.secret_access_key)
261    {
262        builder = builder
263            .with_access_key_id(access_key_id)
264            .with_secret_access_key(secret_access_key);
265    }
266
267    if let Some(endpoint) = &aws_options.endpoint {
268        builder = builder.with_endpoint(endpoint);
269    }
270
271    Ok(builder)
272}
273
274pub fn get_gcs_object_store_builder(
275    url: &Url,
276    gs_options: &GcpOptions,
277) -> Result<GoogleCloudStorageBuilder> {
278    let bucket_name = get_bucket_name(url)?;
279    let mut builder = GoogleCloudStorageBuilder::from_env().with_bucket_name(bucket_name);
280
281    if let Some(service_account_path) = &gs_options.service_account_path {
282        builder = builder.with_service_account_path(service_account_path);
283    }
284
285    if let Some(service_account_key) = &gs_options.service_account_key {
286        builder = builder.with_service_account_key(service_account_key);
287    }
288
289    if let Some(application_credentials_path) = &gs_options.application_credentials_path {
290        builder = builder.with_application_credentials(application_credentials_path);
291    }
292
293    Ok(builder)
294}
295
296fn get_bucket_name(url: &Url) -> Result<&str> {
297    url.host_str().ok_or_else(|| {
298        DataFusionError::Execution(format!(
299            "Not able to parse bucket name from url: {}",
300            url.as_str()
301        ))
302    })
303}
304
305/// This struct encapsulates AWS options one uses when setting up object storage.
306#[derive(Default, Debug, Clone)]
307pub struct AwsOptions {
308    /// Access Key ID
309    pub access_key_id: Option<String>,
310    /// Secret Access Key
311    pub secret_access_key: Option<String>,
312    /// Session token
313    pub session_token: Option<String>,
314    /// AWS Region
315    pub region: Option<String>,
316    /// OSS or COS Endpoint
317    pub endpoint: Option<String>,
318    /// Allow HTTP (otherwise will always use https)
319    pub allow_http: Option<bool>,
320    /// Do not fetch credentials and do not sign requests
321    ///
322    /// This can be useful when interacting with public S3 buckets that deny
323    /// authorized requests
324    pub skip_signature: Option<bool>,
325}
326
327impl ExtensionOptions for AwsOptions {
328    fn as_any(&self) -> &dyn Any {
329        self
330    }
331
332    fn as_any_mut(&mut self) -> &mut dyn Any {
333        self
334    }
335
336    fn cloned(&self) -> Box<dyn ExtensionOptions> {
337        Box::new(self.clone())
338    }
339
340    fn set(&mut self, key: &str, value: &str) -> Result<()> {
341        let (_key, aws_key) = key.split_once('.').unwrap_or((key, ""));
342        let (key, rem) = aws_key.split_once('.').unwrap_or((aws_key, ""));
343        match key {
344            "access_key_id" => {
345                self.access_key_id.set(rem, value)?;
346            }
347            "secret_access_key" => {
348                self.secret_access_key.set(rem, value)?;
349            }
350            "session_token" => {
351                self.session_token.set(rem, value)?;
352            }
353            "region" => {
354                self.region.set(rem, value)?;
355            }
356            "oss" | "cos" | "endpoint" => {
357                self.endpoint.set(rem, value)?;
358            }
359            "allow_http" => {
360                self.allow_http.set(rem, value)?;
361            }
362            "skip_signature" | "nosign" => {
363                self.skip_signature.set(rem, value)?;
364            }
365            _ => {
366                return config_err!("Config value \"{}\" not found on AwsOptions", rem);
367            }
368        }
369        Ok(())
370    }
371
372    fn entries(&self) -> Vec<ConfigEntry> {
373        struct Visitor(Vec<ConfigEntry>);
374
375        impl Visit for Visitor {
376            fn some<V: Display>(
377                &mut self,
378                key: &str,
379                value: V,
380                description: &'static str,
381            ) {
382                self.0.push(ConfigEntry {
383                    key: key.to_string(),
384                    value: Some(value.to_string()),
385                    description,
386                })
387            }
388
389            fn none(&mut self, key: &str, description: &'static str) {
390                self.0.push(ConfigEntry {
391                    key: key.to_string(),
392                    value: None,
393                    description,
394                })
395            }
396        }
397
398        let mut v = Visitor(vec![]);
399        self.access_key_id.visit(&mut v, "access_key_id", "");
400        self.secret_access_key
401            .visit(&mut v, "secret_access_key", "");
402        self.session_token.visit(&mut v, "session_token", "");
403        self.region.visit(&mut v, "region", "");
404        self.endpoint.visit(&mut v, "endpoint", "");
405        self.allow_http.visit(&mut v, "allow_http", "");
406        v.0
407    }
408}
409
410impl ConfigExtension for AwsOptions {
411    const PREFIX: &'static str = "aws";
412}
413
414/// This struct encapsulates GCP options one uses when setting up object storage.
415#[derive(Debug, Clone, Default)]
416pub struct GcpOptions {
417    /// Service account path
418    pub service_account_path: Option<String>,
419    /// Service account key
420    pub service_account_key: Option<String>,
421    /// Application credentials path
422    pub application_credentials_path: Option<String>,
423}
424
425impl ExtensionOptions for GcpOptions {
426    fn as_any(&self) -> &dyn Any {
427        self
428    }
429
430    fn as_any_mut(&mut self) -> &mut dyn Any {
431        self
432    }
433
434    fn cloned(&self) -> Box<dyn ExtensionOptions> {
435        Box::new(self.clone())
436    }
437
438    fn set(&mut self, key: &str, value: &str) -> Result<()> {
439        let (_key, rem) = key.split_once('.').unwrap_or((key, ""));
440        match rem {
441            "service_account_path" => {
442                self.service_account_path.set(rem, value)?;
443            }
444            "service_account_key" => {
445                self.service_account_key.set(rem, value)?;
446            }
447            "application_credentials_path" => {
448                self.application_credentials_path.set(rem, value)?;
449            }
450            _ => {
451                return config_err!("Config value \"{}\" not found on GcpOptions", rem);
452            }
453        }
454        Ok(())
455    }
456
457    fn entries(&self) -> Vec<ConfigEntry> {
458        struct Visitor(Vec<ConfigEntry>);
459
460        impl Visit for Visitor {
461            fn some<V: Display>(
462                &mut self,
463                key: &str,
464                value: V,
465                description: &'static str,
466            ) {
467                self.0.push(ConfigEntry {
468                    key: key.to_string(),
469                    value: Some(value.to_string()),
470                    description,
471                })
472            }
473
474            fn none(&mut self, key: &str, description: &'static str) {
475                self.0.push(ConfigEntry {
476                    key: key.to_string(),
477                    value: None,
478                    description,
479                })
480            }
481        }
482
483        let mut v = Visitor(vec![]);
484        self.service_account_path
485            .visit(&mut v, "service_account_path", "");
486        self.service_account_key
487            .visit(&mut v, "service_account_key", "");
488        self.application_credentials_path.visit(
489            &mut v,
490            "application_credentials_path",
491            "",
492        );
493        v.0
494    }
495}
496
497impl ConfigExtension for GcpOptions {
498    const PREFIX: &'static str = "gcp";
499}
500
501pub(crate) async fn get_object_store(
502    state: &SessionState,
503    scheme: &str,
504    url: &Url,
505    table_options: &TableOptions,
506    resolve_region: bool,
507) -> Result<Arc<dyn ObjectStore>, DataFusionError> {
508    let store: Arc<dyn ObjectStore> = match scheme {
509        "s3" => {
510            let Some(options) = table_options.extensions.get::<AwsOptions>() else {
511                return exec_err!(
512                    "Given table options incompatible with the 's3' scheme"
513                );
514            };
515            let builder =
516                get_s3_object_store_builder(url, options, resolve_region).await?;
517            Arc::new(builder.build()?)
518        }
519        "oss" => {
520            let Some(options) = table_options.extensions.get::<AwsOptions>() else {
521                return exec_err!(
522                    "Given table options incompatible with the 'oss' scheme"
523                );
524            };
525            let builder = get_oss_object_store_builder(url, options)?;
526            Arc::new(builder.build()?)
527        }
528        "cos" => {
529            let Some(options) = table_options.extensions.get::<AwsOptions>() else {
530                return exec_err!(
531                    "Given table options incompatible with the 'cos' scheme"
532                );
533            };
534            let builder = get_cos_object_store_builder(url, options)?;
535            Arc::new(builder.build()?)
536        }
537        "gs" | "gcs" => {
538            let Some(options) = table_options.extensions.get::<GcpOptions>() else {
539                return exec_err!(
540                    "Given table options incompatible with the 'gs'/'gcs' scheme"
541                );
542            };
543            let builder = get_gcs_object_store_builder(url, options)?;
544            Arc::new(builder.build()?)
545        }
546        "http" | "https" => Arc::new(
547            HttpBuilder::new()
548                .with_client_options(ClientOptions::new().with_allow_http(true))
549                .with_url(url.origin().ascii_serialization())
550                .build()?,
551        ),
552        _ => {
553            // For other types, try to get from `object_store_registry`:
554            state
555                .runtime_env()
556                .object_store_registry
557                .get_store(url)
558                .map_err(|_| {
559                    exec_datafusion_err!("Unsupported object store scheme: {}", scheme)
560                })?
561        }
562    };
563    Ok(store)
564}
565
566#[cfg(test)]
567mod tests {
568    use crate::cli_context::CliSessionContext;
569
570    use super::*;
571
572    use datafusion::{
573        datasource::listing::ListingTableUrl,
574        logical_expr::{DdlStatement, LogicalPlan},
575        prelude::SessionContext,
576    };
577
578    use object_store::{aws::AmazonS3ConfigKey, gcp::GoogleConfigKey};
579
580    #[tokio::test]
581    async fn s3_object_store_builder_default() -> Result<()> {
582        let location = "s3://bucket/path/FAKE/file.parquet";
583        // Set it to a non-existent file to avoid reading the default configuration file
584        std::env::set_var("AWS_CONFIG_FILE", "data/aws.config");
585        std::env::set_var("AWS_SHARED_CREDENTIALS_FILE", "data/aws.credentials");
586
587        // No options
588        let table_url = ListingTableUrl::parse(location)?;
589        let scheme = table_url.scheme();
590        let sql =
591            format!("CREATE EXTERNAL TABLE test STORED AS PARQUET LOCATION '{location}'");
592
593        let ctx = SessionContext::new();
594        ctx.register_table_options_extension_from_scheme(scheme);
595        let table_options = get_table_options(&ctx, &sql).await;
596        let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
597        let builder =
598            get_s3_object_store_builder(table_url.as_ref(), aws_options, false).await?;
599
600        // If the environment variables are set (as they are in CI) use them
601        let expected_access_key_id = std::env::var("AWS_ACCESS_KEY_ID").ok();
602        let expected_secret_access_key = std::env::var("AWS_SECRET_ACCESS_KEY").ok();
603        let expected_region = Some(
604            std::env::var("AWS_REGION").unwrap_or_else(|_| "eu-central-1".to_string()),
605        );
606        let expected_endpoint = std::env::var("AWS_ENDPOINT").ok();
607
608        // get the actual configuration information, then assert_eq!
609        assert_eq!(
610            builder.get_config_value(&AmazonS3ConfigKey::AccessKeyId),
611            expected_access_key_id
612        );
613        assert_eq!(
614            builder.get_config_value(&AmazonS3ConfigKey::SecretAccessKey),
615            expected_secret_access_key
616        );
617        // Default is to skip signature when no credentials are provided
618        let expected_skip_signature =
619            if expected_access_key_id.is_none() && expected_secret_access_key.is_none() {
620                Some(String::from("true"))
621            } else {
622                Some(String::from("false"))
623            };
624        assert_eq!(
625            builder.get_config_value(&AmazonS3ConfigKey::Region),
626            expected_region
627        );
628        assert_eq!(
629            builder.get_config_value(&AmazonS3ConfigKey::Endpoint),
630            expected_endpoint
631        );
632        assert_eq!(builder.get_config_value(&AmazonS3ConfigKey::Token), None);
633        assert_eq!(
634            builder.get_config_value(&AmazonS3ConfigKey::SkipSignature),
635            expected_skip_signature
636        );
637        Ok(())
638    }
639
640    #[tokio::test]
641    async fn s3_object_store_builder() -> Result<()> {
642        // "fake" is uppercase to ensure the values are not lowercased when parsed
643        let access_key_id = "FAKE_access_key_id";
644        let secret_access_key = "FAKE_secret_access_key";
645        let region = "fake_us-east-2";
646        let endpoint = "endpoint33";
647        let session_token = "FAKE_session_token";
648        let location = "s3://bucket/path/FAKE/file.parquet";
649
650        let table_url = ListingTableUrl::parse(location)?;
651        let scheme = table_url.scheme();
652        let sql = format!(
653            "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS\
654            ('aws.access_key_id' '{access_key_id}', \
655            'aws.secret_access_key' '{secret_access_key}', \
656            'aws.region' '{region}', \
657            'aws.session_token' {session_token}, \
658            'aws.endpoint' '{endpoint}'\
659            ) LOCATION '{location}'"
660        );
661
662        let ctx = SessionContext::new();
663        ctx.register_table_options_extension_from_scheme(scheme);
664        let table_options = get_table_options(&ctx, &sql).await;
665        let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
666        let builder =
667            get_s3_object_store_builder(table_url.as_ref(), aws_options, false).await?;
668        // get the actual configuration information, then assert_eq!
669        let config = [
670            (AmazonS3ConfigKey::AccessKeyId, access_key_id),
671            (AmazonS3ConfigKey::SecretAccessKey, secret_access_key),
672            (AmazonS3ConfigKey::Region, region),
673            (AmazonS3ConfigKey::Endpoint, endpoint),
674            (AmazonS3ConfigKey::Token, session_token),
675        ];
676        for (key, value) in config {
677            assert_eq!(value, builder.get_config_value(&key).unwrap());
678        }
679        // Should not skip signature when credentials are provided
680        assert_eq!(
681            builder.get_config_value(&AmazonS3ConfigKey::SkipSignature),
682            Some("false".into())
683        );
684
685        Ok(())
686    }
687
688    #[tokio::test]
689    async fn s3_object_store_builder_allow_http_error() -> Result<()> {
690        let access_key_id = "fake_access_key_id";
691        let secret_access_key = "fake_secret_access_key";
692        let endpoint = "http://endpoint33";
693        let location = "s3://bucket/path/file.parquet";
694
695        let table_url = ListingTableUrl::parse(location)?;
696        let scheme = table_url.scheme();
697        let sql = format!(
698            "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS\
699            ('aws.access_key_id' '{access_key_id}', \
700            'aws.secret_access_key' '{secret_access_key}', \
701            'aws.endpoint' '{endpoint}'\
702            ) LOCATION '{location}'"
703        );
704
705        let ctx = SessionContext::new();
706        ctx.register_table_options_extension_from_scheme(scheme);
707
708        let table_options = get_table_options(&ctx, &sql).await;
709        let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
710        let err = get_s3_object_store_builder(table_url.as_ref(), aws_options, false)
711            .await
712            .unwrap_err();
713
714        assert_eq!(err.to_string().lines().next().unwrap_or_default(), "Invalid or Unsupported Configuration: Invalid endpoint: http://endpoint33. HTTP is not allowed for S3 endpoints. To allow HTTP, set 'aws.allow_http' to true");
715
716        // Now add `allow_http` to the options and check if it works
717        let sql = format!(
718            "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS\
719            ('aws.access_key_id' '{access_key_id}', \
720            'aws.secret_access_key' '{secret_access_key}', \
721            'aws.endpoint' '{endpoint}',\
722            'aws.allow_http' 'true'\
723            ) LOCATION '{location}'"
724        );
725        let table_options = get_table_options(&ctx, &sql).await;
726
727        let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
728        // ensure this isn't an error
729        get_s3_object_store_builder(table_url.as_ref(), aws_options, false).await?;
730
731        Ok(())
732    }
733
734    #[tokio::test]
735    async fn s3_object_store_builder_resolves_region_when_none_provided() -> Result<()> {
736        let expected_region = "eu-central-1";
737        let location = "s3://test-bucket/path/file.parquet";
738        // Set it to a non-existent file to avoid reading the default configuration file
739        std::env::set_var("AWS_CONFIG_FILE", "data/aws.config");
740
741        let table_url = ListingTableUrl::parse(location)?;
742        let aws_options = AwsOptions {
743            region: None, // No region specified - should auto-detect
744            ..Default::default()
745        };
746
747        let builder =
748            get_s3_object_store_builder(table_url.as_ref(), &aws_options, false).await?;
749
750        // Verify that the region was auto-detected in test environment
751        assert_eq!(
752            builder.get_config_value(&AmazonS3ConfigKey::Region),
753            Some(expected_region.to_string())
754        );
755
756        Ok(())
757    }
758
759    #[tokio::test]
760    async fn s3_object_store_builder_overrides_region_when_resolve_region_enabled(
761    ) -> Result<()> {
762        let original_region = "us-east-1";
763        let expected_region = "eu-central-1"; // This should be the auto-detected region
764        let location = "s3://test-bucket/path/file.parquet";
765
766        let table_url = ListingTableUrl::parse(location)?;
767        let aws_options = AwsOptions {
768            region: Some(original_region.to_string()), // Explicit region provided
769            ..Default::default()
770        };
771
772        let builder =
773            get_s3_object_store_builder(table_url.as_ref(), &aws_options, true).await?;
774
775        // Verify that the region was overridden by auto-detection
776        assert_eq!(
777            builder.get_config_value(&AmazonS3ConfigKey::Region),
778            Some(expected_region.to_string())
779        );
780
781        Ok(())
782    }
783
784    #[tokio::test]
785    async fn oss_object_store_builder() -> Result<()> {
786        let access_key_id = "fake_access_key_id";
787        let secret_access_key = "fake_secret_access_key";
788        let endpoint = "fake_endpoint";
789        let location = "oss://bucket/path/file.parquet";
790
791        let table_url = ListingTableUrl::parse(location)?;
792        let scheme = table_url.scheme();
793        let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS('aws.access_key_id' '{access_key_id}', 'aws.secret_access_key' '{secret_access_key}', 'aws.oss.endpoint' '{endpoint}') LOCATION '{location}'");
794
795        let ctx = SessionContext::new();
796        ctx.register_table_options_extension_from_scheme(scheme);
797        let table_options = get_table_options(&ctx, &sql).await;
798
799        let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
800        let builder = get_oss_object_store_builder(table_url.as_ref(), aws_options)?;
801        // get the actual configuration information, then assert_eq!
802        let config = [
803            (AmazonS3ConfigKey::AccessKeyId, access_key_id),
804            (AmazonS3ConfigKey::SecretAccessKey, secret_access_key),
805            (AmazonS3ConfigKey::Endpoint, endpoint),
806        ];
807        for (key, value) in config {
808            assert_eq!(value, builder.get_config_value(&key).unwrap());
809        }
810
811        Ok(())
812    }
813
814    #[tokio::test]
815    async fn gcs_object_store_builder() -> Result<()> {
816        let service_account_path = "fake_service_account_path";
817        let service_account_key =
818            "{\"private_key\": \"fake_private_key.pem\",\"client_email\":\"fake_client_email\"}";
819        let application_credentials_path = "fake_application_credentials_path";
820        let location = "gcs://bucket/path/file.parquet";
821
822        let table_url = ListingTableUrl::parse(location)?;
823        let scheme = table_url.scheme();
824        let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS('gcp.service_account_path' '{service_account_path}', 'gcp.service_account_key' '{service_account_key}', 'gcp.application_credentials_path' '{application_credentials_path}') LOCATION '{location}'");
825
826        let ctx = SessionContext::new();
827        ctx.register_table_options_extension_from_scheme(scheme);
828        let table_options = get_table_options(&ctx, &sql).await;
829
830        let gcp_options = table_options.extensions.get::<GcpOptions>().unwrap();
831        let builder = get_gcs_object_store_builder(table_url.as_ref(), gcp_options)?;
832        // get the actual configuration information, then assert_eq!
833        let config = [
834            (GoogleConfigKey::ServiceAccount, service_account_path),
835            (GoogleConfigKey::ServiceAccountKey, service_account_key),
836            (
837                GoogleConfigKey::ApplicationCredentials,
838                application_credentials_path,
839            ),
840        ];
841        for (key, value) in config {
842            assert_eq!(value, builder.get_config_value(&key).unwrap());
843        }
844
845        Ok(())
846    }
847
848    /// Plans the `CREATE EXTERNAL TABLE` SQL statement and returns the
849    /// resulting resolved `CreateExternalTable` command.
850    async fn get_table_options(ctx: &SessionContext, sql: &str) -> TableOptions {
851        let mut plan = ctx.state().create_logical_plan(sql).await.unwrap();
852
853        let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan else {
854            panic!("plan is not a CreateExternalTable");
855        };
856
857        let mut table_options = ctx.state().default_table_options();
858        table_options
859            .alter_with_string_hash_map(&cmd.options)
860            .unwrap();
861        table_options
862    }
863}