datafusion_cli/
object_storage.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use async_trait::async_trait;
19use aws_config::BehaviorVersion;
20use aws_credential_types::provider::{
21    error::CredentialsError, ProvideCredentials, SharedCredentialsProvider,
22};
23use datafusion::{
24    common::{
25        config::ConfigEntry, config::ConfigExtension, config::ConfigField,
26        config::ExtensionOptions, config::TableOptions, config::Visit, config_err,
27        exec_datafusion_err, exec_err,
28    },
29    error::{DataFusionError, Result},
30    execution::context::SessionState,
31};
32use log::debug;
33use object_store::{
34    aws::{AmazonS3Builder, AmazonS3ConfigKey, AwsCredential},
35    gcp::GoogleCloudStorageBuilder,
36    http::HttpBuilder,
37    ClientOptions, CredentialProvider,
38    Error::Generic,
39    ObjectStore,
40};
41use std::{
42    any::Any,
43    error::Error,
44    fmt::{Debug, Display},
45    sync::Arc,
46};
47use url::Url;
48
49#[cfg(not(test))]
50use object_store::aws::resolve_bucket_region;
51
52// Provide a local mock when running tests so we don't make network calls
53#[cfg(test)]
54async fn resolve_bucket_region(
55    _bucket: &str,
56    _client_options: &ClientOptions,
57) -> object_store::Result<String> {
58    Ok("eu-central-1".to_string())
59}
60
61pub async fn get_s3_object_store_builder(
62    url: &Url,
63    aws_options: &AwsOptions,
64    resolve_region: bool,
65) -> Result<AmazonS3Builder> {
66    let AwsOptions {
67        access_key_id,
68        secret_access_key,
69        session_token,
70        region,
71        endpoint,
72        allow_http,
73        skip_signature,
74    } = aws_options;
75
76    let bucket_name = get_bucket_name(url)?;
77    let mut builder = AmazonS3Builder::from_env().with_bucket_name(bucket_name);
78
79    if let (Some(access_key_id), Some(secret_access_key)) =
80        (access_key_id, secret_access_key)
81    {
82        debug!("Using explicitly provided S3 access_key_id and secret_access_key");
83        builder = builder
84            .with_access_key_id(access_key_id)
85            .with_secret_access_key(secret_access_key);
86
87        if let Some(session_token) = session_token {
88            builder = builder.with_token(session_token);
89        }
90    } else {
91        debug!("Using AWS S3 SDK to determine credentials");
92        let CredentialsFromConfig {
93            region,
94            credentials,
95        } = CredentialsFromConfig::try_new().await?;
96        if let Some(region) = region {
97            builder = builder.with_region(region);
98        }
99        if let Some(credentials) = credentials {
100            let credentials = Arc::new(S3CredentialProvider { credentials });
101            builder = builder.with_credentials(credentials);
102        } else {
103            debug!("No credentials found, defaulting to skip signature ");
104            builder = builder.with_skip_signature(true);
105        }
106    }
107
108    if let Some(region) = region {
109        builder = builder.with_region(region);
110    }
111
112    // If the region is not set or auto_detect_region is true, resolve the region.
113    if builder
114        .get_config_value(&AmazonS3ConfigKey::Region)
115        .is_none()
116        || resolve_region
117    {
118        let region = resolve_bucket_region(bucket_name, &ClientOptions::new()).await?;
119        builder = builder.with_region(region);
120    }
121
122    if let Some(endpoint) = endpoint {
123        // Make a nicer error if the user hasn't allowed http and the endpoint
124        // is http as the default message is "URL scheme is not allowed"
125        if let Ok(endpoint_url) = Url::try_from(endpoint.as_str()) {
126            if !matches!(allow_http, Some(true)) && endpoint_url.scheme() == "http" {
127                return config_err!(
128                    "Invalid endpoint: {endpoint}. \
129                HTTP is not allowed for S3 endpoints. \
130                To allow HTTP, set 'aws.allow_http' to true"
131                );
132            }
133        }
134
135        builder = builder.with_endpoint(endpoint);
136    }
137
138    if let Some(allow_http) = allow_http {
139        builder = builder.with_allow_http(*allow_http);
140    }
141
142    if let Some(skip_signature) = skip_signature {
143        builder = builder.with_skip_signature(*skip_signature);
144    }
145
146    Ok(builder)
147}
148
149/// Credentials from the AWS SDK
150struct CredentialsFromConfig {
151    region: Option<String>,
152    credentials: Option<SharedCredentialsProvider>,
153}
154
155impl CredentialsFromConfig {
156    /// Attempt find AWS S3 credentials via the AWS SDK
157    pub async fn try_new() -> Result<Self> {
158        let config = aws_config::defaults(BehaviorVersion::latest()).load().await;
159        let region = config.region().map(|r| r.to_string());
160
161        let credentials = config
162            .credentials_provider()
163            .ok_or_else(|| {
164                DataFusionError::ObjectStore(Box::new(Generic {
165                    store: "S3",
166                    source: "Failed to get S3 credentials aws_config".into(),
167                }))
168            })?
169            .clone();
170
171        // The credential provider is lazy, so it does not fetch credentials
172        // until they are needed. To ensure that the credentials are valid,
173        // we can call `provide_credentials` here.
174        let credentials = match credentials.provide_credentials().await {
175            Ok(_) => Some(credentials),
176            Err(CredentialsError::CredentialsNotLoaded(_)) => {
177                debug!("Could not use AWS SDK to get credentials");
178                None
179            }
180            // other errors like `CredentialsError::InvalidConfiguration`
181            // should be returned to the user so they can be fixed
182            Err(e) => {
183                // Pass back underlying error to the user, including underlying source
184                let source_message = if let Some(source) = e.source() {
185                    format!(": {source}")
186                } else {
187                    String::new()
188                };
189
190                let message = format!(
191                    "Error getting credentials from provider: {e}{source_message}",
192                );
193
194                return Err(DataFusionError::ObjectStore(Box::new(Generic {
195                    store: "S3",
196                    source: message.into(),
197                })));
198            }
199        };
200        Ok(Self {
201            region,
202            credentials,
203        })
204    }
205}
206
207#[derive(Debug)]
208struct S3CredentialProvider {
209    credentials: aws_credential_types::provider::SharedCredentialsProvider,
210}
211
212#[async_trait]
213impl CredentialProvider for S3CredentialProvider {
214    type Credential = AwsCredential;
215
216    async fn get_credential(&self) -> object_store::Result<Arc<Self::Credential>> {
217        let creds =
218            self.credentials
219                .provide_credentials()
220                .await
221                .map_err(|e| Generic {
222                    store: "S3",
223                    source: Box::new(e),
224                })?;
225        Ok(Arc::new(AwsCredential {
226            key_id: creds.access_key_id().to_string(),
227            secret_key: creds.secret_access_key().to_string(),
228            token: creds.session_token().map(ToString::to_string),
229        }))
230    }
231}
232
233pub fn get_oss_object_store_builder(
234    url: &Url,
235    aws_options: &AwsOptions,
236) -> Result<AmazonS3Builder> {
237    get_object_store_builder(url, aws_options, true)
238}
239
240pub fn get_cos_object_store_builder(
241    url: &Url,
242    aws_options: &AwsOptions,
243) -> Result<AmazonS3Builder> {
244    get_object_store_builder(url, aws_options, false)
245}
246
247fn get_object_store_builder(
248    url: &Url,
249    aws_options: &AwsOptions,
250    virtual_hosted_style_request: bool,
251) -> Result<AmazonS3Builder> {
252    let bucket_name = get_bucket_name(url)?;
253    let mut builder = AmazonS3Builder::from_env()
254        .with_virtual_hosted_style_request(virtual_hosted_style_request)
255        .with_bucket_name(bucket_name)
256        // oss/cos don't care about the "region" field
257        .with_region("do_not_care");
258
259    if let (Some(access_key_id), Some(secret_access_key)) =
260        (&aws_options.access_key_id, &aws_options.secret_access_key)
261    {
262        builder = builder
263            .with_access_key_id(access_key_id)
264            .with_secret_access_key(secret_access_key);
265    }
266
267    if let Some(endpoint) = &aws_options.endpoint {
268        builder = builder.with_endpoint(endpoint);
269    }
270
271    Ok(builder)
272}
273
274pub fn get_gcs_object_store_builder(
275    url: &Url,
276    gs_options: &GcpOptions,
277) -> Result<GoogleCloudStorageBuilder> {
278    let bucket_name = get_bucket_name(url)?;
279    let mut builder = GoogleCloudStorageBuilder::from_env().with_bucket_name(bucket_name);
280
281    if let Some(service_account_path) = &gs_options.service_account_path {
282        builder = builder.with_service_account_path(service_account_path);
283    }
284
285    if let Some(service_account_key) = &gs_options.service_account_key {
286        builder = builder.with_service_account_key(service_account_key);
287    }
288
289    if let Some(application_credentials_path) = &gs_options.application_credentials_path {
290        builder = builder.with_application_credentials(application_credentials_path);
291    }
292
293    Ok(builder)
294}
295
296fn get_bucket_name(url: &Url) -> Result<&str> {
297    url.host_str().ok_or_else(|| {
298        DataFusionError::Execution(format!(
299            "Not able to parse bucket name from url: {}",
300            url.as_str()
301        ))
302    })
303}
304
305/// This struct encapsulates AWS options one uses when setting up object storage.
306#[derive(Default, Debug, Clone)]
307pub struct AwsOptions {
308    /// Access Key ID
309    pub access_key_id: Option<String>,
310    /// Secret Access Key
311    pub secret_access_key: Option<String>,
312    /// Session token
313    pub session_token: Option<String>,
314    /// AWS Region
315    pub region: Option<String>,
316    /// OSS or COS Endpoint
317    pub endpoint: Option<String>,
318    /// Allow HTTP (otherwise will always use https)
319    pub allow_http: Option<bool>,
320    /// Do not fetch credentials and do not sign requests
321    ///
322    /// This can be useful when interacting with public S3 buckets that deny
323    /// authorized requests
324    pub skip_signature: Option<bool>,
325}
326
327impl ExtensionOptions for AwsOptions {
328    fn as_any(&self) -> &dyn Any {
329        self
330    }
331
332    fn as_any_mut(&mut self) -> &mut dyn Any {
333        self
334    }
335
336    fn cloned(&self) -> Box<dyn ExtensionOptions> {
337        Box::new(self.clone())
338    }
339
340    fn set(&mut self, key: &str, value: &str) -> Result<()> {
341        let (_key, aws_key) = key.split_once('.').unwrap_or((key, ""));
342        let (key, rem) = aws_key.split_once('.').unwrap_or((aws_key, ""));
343        match key {
344            "access_key_id" => {
345                self.access_key_id.set(rem, value)?;
346            }
347            "secret_access_key" => {
348                self.secret_access_key.set(rem, value)?;
349            }
350            "session_token" => {
351                self.session_token.set(rem, value)?;
352            }
353            "region" => {
354                self.region.set(rem, value)?;
355            }
356            "oss" | "cos" | "endpoint" => {
357                self.endpoint.set(rem, value)?;
358            }
359            "allow_http" => {
360                self.allow_http.set(rem, value)?;
361            }
362            "skip_signature" | "nosign" => {
363                self.skip_signature.set(rem, value)?;
364            }
365            _ => {
366                return config_err!("Config value \"{}\" not found on AwsOptions", rem);
367            }
368        }
369        Ok(())
370    }
371
372    fn entries(&self) -> Vec<ConfigEntry> {
373        struct Visitor(Vec<ConfigEntry>);
374
375        impl Visit for Visitor {
376            fn some<V: Display>(
377                &mut self,
378                key: &str,
379                value: V,
380                description: &'static str,
381            ) {
382                self.0.push(ConfigEntry {
383                    key: key.to_string(),
384                    value: Some(value.to_string()),
385                    description,
386                })
387            }
388
389            fn none(&mut self, key: &str, description: &'static str) {
390                self.0.push(ConfigEntry {
391                    key: key.to_string(),
392                    value: None,
393                    description,
394                })
395            }
396        }
397
398        let mut v = Visitor(vec![]);
399        self.access_key_id.visit(&mut v, "access_key_id", "");
400        self.secret_access_key
401            .visit(&mut v, "secret_access_key", "");
402        self.session_token.visit(&mut v, "session_token", "");
403        self.region.visit(&mut v, "region", "");
404        self.endpoint.visit(&mut v, "endpoint", "");
405        self.allow_http.visit(&mut v, "allow_http", "");
406        v.0
407    }
408}
409
410impl ConfigExtension for AwsOptions {
411    const PREFIX: &'static str = "aws";
412}
413
414/// This struct encapsulates GCP options one uses when setting up object storage.
415#[derive(Debug, Clone, Default)]
416pub struct GcpOptions {
417    /// Service account path
418    pub service_account_path: Option<String>,
419    /// Service account key
420    pub service_account_key: Option<String>,
421    /// Application credentials path
422    pub application_credentials_path: Option<String>,
423}
424
425impl ExtensionOptions for GcpOptions {
426    fn as_any(&self) -> &dyn Any {
427        self
428    }
429
430    fn as_any_mut(&mut self) -> &mut dyn Any {
431        self
432    }
433
434    fn cloned(&self) -> Box<dyn ExtensionOptions> {
435        Box::new(self.clone())
436    }
437
438    fn set(&mut self, key: &str, value: &str) -> Result<()> {
439        let (_key, rem) = key.split_once('.').unwrap_or((key, ""));
440        match rem {
441            "service_account_path" => {
442                self.service_account_path.set(rem, value)?;
443            }
444            "service_account_key" => {
445                self.service_account_key.set(rem, value)?;
446            }
447            "application_credentials_path" => {
448                self.application_credentials_path.set(rem, value)?;
449            }
450            _ => {
451                return config_err!("Config value \"{}\" not found on GcpOptions", rem);
452            }
453        }
454        Ok(())
455    }
456
457    fn entries(&self) -> Vec<ConfigEntry> {
458        struct Visitor(Vec<ConfigEntry>);
459
460        impl Visit for Visitor {
461            fn some<V: Display>(
462                &mut self,
463                key: &str,
464                value: V,
465                description: &'static str,
466            ) {
467                self.0.push(ConfigEntry {
468                    key: key.to_string(),
469                    value: Some(value.to_string()),
470                    description,
471                })
472            }
473
474            fn none(&mut self, key: &str, description: &'static str) {
475                self.0.push(ConfigEntry {
476                    key: key.to_string(),
477                    value: None,
478                    description,
479                })
480            }
481        }
482
483        let mut v = Visitor(vec![]);
484        self.service_account_path
485            .visit(&mut v, "service_account_path", "");
486        self.service_account_key
487            .visit(&mut v, "service_account_key", "");
488        self.application_credentials_path.visit(
489            &mut v,
490            "application_credentials_path",
491            "",
492        );
493        v.0
494    }
495}
496
497impl ConfigExtension for GcpOptions {
498    const PREFIX: &'static str = "gcp";
499}
500
501pub(crate) async fn get_object_store(
502    state: &SessionState,
503    scheme: &str,
504    url: &Url,
505    table_options: &TableOptions,
506    resolve_region: bool,
507) -> Result<Arc<dyn ObjectStore>, DataFusionError> {
508    let store: Arc<dyn ObjectStore> = match scheme {
509        "s3" => {
510            let Some(options) = table_options.extensions.get::<AwsOptions>() else {
511                return exec_err!(
512                    "Given table options incompatible with the 's3' scheme"
513                );
514            };
515            let builder =
516                get_s3_object_store_builder(url, options, resolve_region).await?;
517            Arc::new(builder.build()?)
518        }
519        "oss" => {
520            let Some(options) = table_options.extensions.get::<AwsOptions>() else {
521                return exec_err!(
522                    "Given table options incompatible with the 'oss' scheme"
523                );
524            };
525            let builder = get_oss_object_store_builder(url, options)?;
526            Arc::new(builder.build()?)
527        }
528        "cos" => {
529            let Some(options) = table_options.extensions.get::<AwsOptions>() else {
530                return exec_err!(
531                    "Given table options incompatible with the 'cos' scheme"
532                );
533            };
534            let builder = get_cos_object_store_builder(url, options)?;
535            Arc::new(builder.build()?)
536        }
537        "gs" | "gcs" => {
538            let Some(options) = table_options.extensions.get::<GcpOptions>() else {
539                return exec_err!(
540                    "Given table options incompatible with the 'gs'/'gcs' scheme"
541                );
542            };
543            let builder = get_gcs_object_store_builder(url, options)?;
544            Arc::new(builder.build()?)
545        }
546        "http" | "https" => Arc::new(
547            HttpBuilder::new()
548                .with_client_options(ClientOptions::new().with_allow_http(true))
549                .with_url(url.origin().ascii_serialization())
550                .build()?,
551        ),
552        _ => {
553            // For other types, try to get from `object_store_registry`:
554            state
555                .runtime_env()
556                .object_store_registry
557                .get_store(url)
558                .map_err(|_| {
559                    exec_datafusion_err!("Unsupported object store scheme: {}", scheme)
560                })?
561        }
562    };
563    Ok(store)
564}
565
566#[cfg(test)]
567mod tests {
568    use crate::cli_context::CliSessionContext;
569
570    use super::*;
571
572    use datafusion::{
573        datasource::listing::ListingTableUrl,
574        logical_expr::{DdlStatement, LogicalPlan},
575        prelude::SessionContext,
576    };
577
578    use object_store::{aws::AmazonS3ConfigKey, gcp::GoogleConfigKey};
579
580    #[tokio::test]
581    async fn s3_object_store_builder_default() -> Result<()> {
582        let location = "s3://bucket/path/FAKE/file.parquet";
583
584        // No options
585        let table_url = ListingTableUrl::parse(location)?;
586        let scheme = table_url.scheme();
587        let sql =
588            format!("CREATE EXTERNAL TABLE test STORED AS PARQUET LOCATION '{location}'");
589
590        let ctx = SessionContext::new();
591        ctx.register_table_options_extension_from_scheme(scheme);
592        let table_options = get_table_options(&ctx, &sql).await;
593        let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
594        let builder =
595            get_s3_object_store_builder(table_url.as_ref(), aws_options, false).await?;
596
597        // If the environment variables are set (as they are in CI) use them
598        let expected_access_key_id = std::env::var("AWS_ACCESS_KEY_ID").ok();
599        let expected_secret_access_key = std::env::var("AWS_SECRET_ACCESS_KEY").ok();
600        let expected_region = Some(
601            std::env::var("AWS_REGION").unwrap_or_else(|_| "eu-central-1".to_string()),
602        );
603        let expected_endpoint = std::env::var("AWS_ENDPOINT").ok();
604
605        // get the actual configuration information, then assert_eq!
606        assert_eq!(
607            builder.get_config_value(&AmazonS3ConfigKey::AccessKeyId),
608            expected_access_key_id
609        );
610        assert_eq!(
611            builder.get_config_value(&AmazonS3ConfigKey::SecretAccessKey),
612            expected_secret_access_key
613        );
614        // Default is to skip signature when no credentials are provided
615        let expected_skip_signature =
616            if expected_access_key_id.is_none() && expected_secret_access_key.is_none() {
617                Some(String::from("true"))
618            } else {
619                Some(String::from("false"))
620            };
621        assert_eq!(
622            builder.get_config_value(&AmazonS3ConfigKey::Region),
623            expected_region
624        );
625        assert_eq!(
626            builder.get_config_value(&AmazonS3ConfigKey::Endpoint),
627            expected_endpoint
628        );
629        assert_eq!(builder.get_config_value(&AmazonS3ConfigKey::Token), None);
630        assert_eq!(
631            builder.get_config_value(&AmazonS3ConfigKey::SkipSignature),
632            expected_skip_signature
633        );
634        Ok(())
635    }
636
637    #[tokio::test]
638    async fn s3_object_store_builder() -> Result<()> {
639        // "fake" is uppercase to ensure the values are not lowercased when parsed
640        let access_key_id = "FAKE_access_key_id";
641        let secret_access_key = "FAKE_secret_access_key";
642        let region = "fake_us-east-2";
643        let endpoint = "endpoint33";
644        let session_token = "FAKE_session_token";
645        let location = "s3://bucket/path/FAKE/file.parquet";
646
647        let table_url = ListingTableUrl::parse(location)?;
648        let scheme = table_url.scheme();
649        let sql = format!(
650            "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS\
651            ('aws.access_key_id' '{access_key_id}', \
652            'aws.secret_access_key' '{secret_access_key}', \
653            'aws.region' '{region}', \
654            'aws.session_token' {session_token}, \
655            'aws.endpoint' '{endpoint}'\
656            ) LOCATION '{location}'"
657        );
658
659        let ctx = SessionContext::new();
660        ctx.register_table_options_extension_from_scheme(scheme);
661        let table_options = get_table_options(&ctx, &sql).await;
662        let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
663        let builder =
664            get_s3_object_store_builder(table_url.as_ref(), aws_options, false).await?;
665        // get the actual configuration information, then assert_eq!
666        let config = [
667            (AmazonS3ConfigKey::AccessKeyId, access_key_id),
668            (AmazonS3ConfigKey::SecretAccessKey, secret_access_key),
669            (AmazonS3ConfigKey::Region, region),
670            (AmazonS3ConfigKey::Endpoint, endpoint),
671            (AmazonS3ConfigKey::Token, session_token),
672        ];
673        for (key, value) in config {
674            assert_eq!(value, builder.get_config_value(&key).unwrap());
675        }
676        // Should not skip signature when credentials are provided
677        assert_eq!(
678            builder.get_config_value(&AmazonS3ConfigKey::SkipSignature),
679            Some("false".into())
680        );
681
682        Ok(())
683    }
684
685    #[tokio::test]
686    async fn s3_object_store_builder_allow_http_error() -> Result<()> {
687        let access_key_id = "fake_access_key_id";
688        let secret_access_key = "fake_secret_access_key";
689        let endpoint = "http://endpoint33";
690        let location = "s3://bucket/path/file.parquet";
691
692        let table_url = ListingTableUrl::parse(location)?;
693        let scheme = table_url.scheme();
694        let sql = format!(
695            "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS\
696            ('aws.access_key_id' '{access_key_id}', \
697            'aws.secret_access_key' '{secret_access_key}', \
698            'aws.endpoint' '{endpoint}'\
699            ) LOCATION '{location}'"
700        );
701
702        let ctx = SessionContext::new();
703        ctx.register_table_options_extension_from_scheme(scheme);
704
705        let table_options = get_table_options(&ctx, &sql).await;
706        let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
707        let err = get_s3_object_store_builder(table_url.as_ref(), aws_options, false)
708            .await
709            .unwrap_err();
710
711        assert_eq!(err.to_string().lines().next().unwrap_or_default(), "Invalid or Unsupported Configuration: Invalid endpoint: http://endpoint33. HTTP is not allowed for S3 endpoints. To allow HTTP, set 'aws.allow_http' to true");
712
713        // Now add `allow_http` to the options and check if it works
714        let sql = format!(
715            "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS\
716            ('aws.access_key_id' '{access_key_id}', \
717            'aws.secret_access_key' '{secret_access_key}', \
718            'aws.endpoint' '{endpoint}',\
719            'aws.allow_http' 'true'\
720            ) LOCATION '{location}'"
721        );
722        let table_options = get_table_options(&ctx, &sql).await;
723
724        let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
725        // ensure this isn't an error
726        get_s3_object_store_builder(table_url.as_ref(), aws_options, false).await?;
727
728        Ok(())
729    }
730
731    #[tokio::test]
732    async fn s3_object_store_builder_resolves_region_when_none_provided() -> Result<()> {
733        let expected_region = "eu-central-1";
734        let location = "s3://test-bucket/path/file.parquet";
735
736        let table_url = ListingTableUrl::parse(location)?;
737        let aws_options = AwsOptions {
738            region: None, // No region specified - should auto-detect
739            ..Default::default()
740        };
741
742        let builder =
743            get_s3_object_store_builder(table_url.as_ref(), &aws_options, false).await?;
744
745        // Verify that the region was auto-detected in test environment
746        assert_eq!(
747            builder.get_config_value(&AmazonS3ConfigKey::Region),
748            Some(expected_region.to_string())
749        );
750
751        Ok(())
752    }
753
754    #[tokio::test]
755    async fn s3_object_store_builder_overrides_region_when_resolve_region_enabled(
756    ) -> Result<()> {
757        let original_region = "us-east-1";
758        let expected_region = "eu-central-1"; // This should be the auto-detected region
759        let location = "s3://test-bucket/path/file.parquet";
760
761        let table_url = ListingTableUrl::parse(location)?;
762        let aws_options = AwsOptions {
763            region: Some(original_region.to_string()), // Explicit region provided
764            ..Default::default()
765        };
766
767        let builder =
768            get_s3_object_store_builder(table_url.as_ref(), &aws_options, true).await?;
769
770        // Verify that the region was overridden by auto-detection
771        assert_eq!(
772            builder.get_config_value(&AmazonS3ConfigKey::Region),
773            Some(expected_region.to_string())
774        );
775
776        Ok(())
777    }
778
779    #[tokio::test]
780    async fn oss_object_store_builder() -> Result<()> {
781        let access_key_id = "fake_access_key_id";
782        let secret_access_key = "fake_secret_access_key";
783        let endpoint = "fake_endpoint";
784        let location = "oss://bucket/path/file.parquet";
785
786        let table_url = ListingTableUrl::parse(location)?;
787        let scheme = table_url.scheme();
788        let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS('aws.access_key_id' '{access_key_id}', 'aws.secret_access_key' '{secret_access_key}', 'aws.oss.endpoint' '{endpoint}') LOCATION '{location}'");
789
790        let ctx = SessionContext::new();
791        ctx.register_table_options_extension_from_scheme(scheme);
792        let table_options = get_table_options(&ctx, &sql).await;
793
794        let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
795        let builder = get_oss_object_store_builder(table_url.as_ref(), aws_options)?;
796        // get the actual configuration information, then assert_eq!
797        let config = [
798            (AmazonS3ConfigKey::AccessKeyId, access_key_id),
799            (AmazonS3ConfigKey::SecretAccessKey, secret_access_key),
800            (AmazonS3ConfigKey::Endpoint, endpoint),
801        ];
802        for (key, value) in config {
803            assert_eq!(value, builder.get_config_value(&key).unwrap());
804        }
805
806        Ok(())
807    }
808
809    #[tokio::test]
810    async fn gcs_object_store_builder() -> Result<()> {
811        let service_account_path = "fake_service_account_path";
812        let service_account_key =
813            "{\"private_key\": \"fake_private_key.pem\",\"client_email\":\"fake_client_email\"}";
814        let application_credentials_path = "fake_application_credentials_path";
815        let location = "gcs://bucket/path/file.parquet";
816
817        let table_url = ListingTableUrl::parse(location)?;
818        let scheme = table_url.scheme();
819        let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS('gcp.service_account_path' '{service_account_path}', 'gcp.service_account_key' '{service_account_key}', 'gcp.application_credentials_path' '{application_credentials_path}') LOCATION '{location}'");
820
821        let ctx = SessionContext::new();
822        ctx.register_table_options_extension_from_scheme(scheme);
823        let table_options = get_table_options(&ctx, &sql).await;
824
825        let gcp_options = table_options.extensions.get::<GcpOptions>().unwrap();
826        let builder = get_gcs_object_store_builder(table_url.as_ref(), gcp_options)?;
827        // get the actual configuration information, then assert_eq!
828        let config = [
829            (GoogleConfigKey::ServiceAccount, service_account_path),
830            (GoogleConfigKey::ServiceAccountKey, service_account_key),
831            (
832                GoogleConfigKey::ApplicationCredentials,
833                application_credentials_path,
834            ),
835        ];
836        for (key, value) in config {
837            assert_eq!(value, builder.get_config_value(&key).unwrap());
838        }
839
840        Ok(())
841    }
842
843    /// Plans the `CREATE EXTERNAL TABLE` SQL statement and returns the
844    /// resulting resolved `CreateExternalTable` command.
845    async fn get_table_options(ctx: &SessionContext, sql: &str) -> TableOptions {
846        let mut plan = ctx.state().create_logical_plan(sql).await.unwrap();
847
848        let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan else {
849            panic!("plan is not a CreateExternalTable");
850        };
851
852        let mut table_options = ctx.state().default_table_options();
853        table_options
854            .alter_with_string_hash_map(&cmd.options)
855            .unwrap();
856        table_options
857    }
858}