Skip to main content

datafusion_cli/
object_storage.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18pub mod instrumented;
19
20use async_trait::async_trait;
21use aws_config::BehaviorVersion;
22use aws_credential_types::provider::{
23    ProvideCredentials, SharedCredentialsProvider, error::CredentialsError,
24};
25use datafusion::{
26    common::{
27        config::ConfigEntry, config::ConfigExtension, config::ConfigField,
28        config::ExtensionOptions, config::TableOptions, config::Visit, config_err,
29        exec_datafusion_err, exec_err,
30    },
31    error::{DataFusionError, Result},
32    execution::context::SessionState,
33};
34use log::debug;
35use object_store::{
36    ClientOptions, CredentialProvider,
37    Error::Generic,
38    ObjectStore,
39    aws::{AmazonS3Builder, AmazonS3ConfigKey, AwsCredential},
40    gcp::GoogleCloudStorageBuilder,
41    http::HttpBuilder,
42};
43use std::{
44    any::Any,
45    error::Error,
46    fmt::{Debug, Display},
47    sync::Arc,
48};
49use url::Url;
50
51#[cfg(not(test))]
52use object_store::aws::resolve_bucket_region;
53
54// Provide a local mock when running tests so we don't make network calls
55#[cfg(test)]
56async fn resolve_bucket_region(
57    _bucket: &str,
58    _client_options: &ClientOptions,
59) -> object_store::Result<String> {
60    Ok("eu-central-1".to_string())
61}
62
63pub async fn get_s3_object_store_builder(
64    url: &Url,
65    aws_options: &AwsOptions,
66    resolve_region: bool,
67) -> Result<AmazonS3Builder> {
68    // Box the inner future to reduce the future size of this async function,
69    // which is deeply nested in the CLI's async call chain.
70    Box::pin(get_s3_object_store_builder_inner(
71        url,
72        aws_options,
73        resolve_region,
74    ))
75    .await
76}
77
78async fn get_s3_object_store_builder_inner(
79    url: &Url,
80    aws_options: &AwsOptions,
81    resolve_region: bool,
82) -> Result<AmazonS3Builder> {
83    let AwsOptions {
84        access_key_id,
85        secret_access_key,
86        session_token,
87        region,
88        endpoint,
89        allow_http,
90        skip_signature,
91    } = aws_options;
92
93    let bucket_name = get_bucket_name(url)?;
94    let mut builder = AmazonS3Builder::from_env().with_bucket_name(bucket_name);
95
96    if let (Some(access_key_id), Some(secret_access_key)) =
97        (access_key_id, secret_access_key)
98    {
99        debug!("Using explicitly provided S3 access_key_id and secret_access_key");
100        builder = builder
101            .with_access_key_id(access_key_id)
102            .with_secret_access_key(secret_access_key);
103
104        if let Some(session_token) = session_token {
105            builder = builder.with_token(session_token);
106        }
107    } else {
108        debug!("Using AWS S3 SDK to determine credentials");
109        let CredentialsFromConfig {
110            region,
111            credentials,
112        } = CredentialsFromConfig::try_new().await?;
113        if let Some(region) = region {
114            builder = builder.with_region(region);
115        }
116        if let Some(credentials) = credentials {
117            let credentials = Arc::new(S3CredentialProvider { credentials });
118            builder = builder.with_credentials(credentials);
119        } else {
120            debug!("No credentials found, defaulting to skip signature ");
121            builder = builder.with_skip_signature(true);
122        }
123    }
124
125    if let Some(region) = region {
126        builder = builder.with_region(region);
127    }
128
129    // If the region is not set or auto_detect_region is true, resolve the region.
130    if builder
131        .get_config_value(&AmazonS3ConfigKey::Region)
132        .is_none()
133        || resolve_region
134    {
135        let region = resolve_bucket_region(bucket_name, &ClientOptions::new()).await?;
136        builder = builder.with_region(region);
137    }
138
139    if let Some(endpoint) = endpoint {
140        // Make a nicer error if the user hasn't allowed http and the endpoint
141        // is http as the default message is "URL scheme is not allowed"
142        if let Ok(endpoint_url) = Url::try_from(endpoint.as_str())
143            && !matches!(allow_http, Some(true))
144            && endpoint_url.scheme() == "http"
145        {
146            return config_err!(
147                "Invalid endpoint: {endpoint}. \
148                HTTP is not allowed for S3 endpoints. \
149                To allow HTTP, set 'aws.allow_http' to true"
150            );
151        }
152
153        builder = builder.with_endpoint(endpoint);
154    }
155
156    if let Some(allow_http) = allow_http {
157        builder = builder.with_allow_http(*allow_http);
158    }
159
160    if let Some(skip_signature) = skip_signature {
161        builder = builder.with_skip_signature(*skip_signature);
162    }
163
164    Ok(builder)
165}
166
167/// Credentials from the AWS SDK
168struct CredentialsFromConfig {
169    region: Option<String>,
170    credentials: Option<SharedCredentialsProvider>,
171}
172
173impl CredentialsFromConfig {
174    /// Attempt find AWS S3 credentials via the AWS SDK
175    pub async fn try_new() -> Result<Self> {
176        let config = aws_config::defaults(BehaviorVersion::latest()).load().await;
177        let region = config.region().map(|r| r.to_string());
178
179        let credentials = config
180            .credentials_provider()
181            .ok_or_else(|| {
182                DataFusionError::ObjectStore(Box::new(Generic {
183                    store: "S3",
184                    source: "Failed to get S3 credentials aws_config".into(),
185                }))
186            })?
187            .clone();
188
189        // The credential provider is lazy, so it does not fetch credentials
190        // until they are needed. To ensure that the credentials are valid,
191        // we can call `provide_credentials` here.
192        let credentials = match credentials.provide_credentials().await {
193            Ok(_) => Some(credentials),
194            Err(CredentialsError::CredentialsNotLoaded(_)) => {
195                debug!("Could not use AWS SDK to get credentials");
196                None
197            }
198            // other errors like `CredentialsError::InvalidConfiguration`
199            // should be returned to the user so they can be fixed
200            Err(e) => {
201                // Pass back underlying error to the user, including underlying source
202                let source_message = if let Some(source) = e.source() {
203                    format!(": {source}")
204                } else {
205                    String::new()
206                };
207
208                let message = format!(
209                    "Error getting credentials from provider: {e}{source_message}",
210                );
211
212                return Err(DataFusionError::ObjectStore(Box::new(Generic {
213                    store: "S3",
214                    source: message.into(),
215                })));
216            }
217        };
218        Ok(Self {
219            region,
220            credentials,
221        })
222    }
223}
224
225#[derive(Debug)]
226struct S3CredentialProvider {
227    credentials: SharedCredentialsProvider,
228}
229
230#[async_trait]
231impl CredentialProvider for S3CredentialProvider {
232    type Credential = AwsCredential;
233
234    async fn get_credential(&self) -> object_store::Result<Arc<Self::Credential>> {
235        let creds =
236            self.credentials
237                .provide_credentials()
238                .await
239                .map_err(|e| Generic {
240                    store: "S3",
241                    source: Box::new(e),
242                })?;
243        Ok(Arc::new(AwsCredential {
244            key_id: creds.access_key_id().to_string(),
245            secret_key: creds.secret_access_key().to_string(),
246            token: creds.session_token().map(ToString::to_string),
247        }))
248    }
249}
250
251pub fn get_oss_object_store_builder(
252    url: &Url,
253    aws_options: &AwsOptions,
254) -> Result<AmazonS3Builder> {
255    get_object_store_builder(url, aws_options, true)
256}
257
258pub fn get_cos_object_store_builder(
259    url: &Url,
260    aws_options: &AwsOptions,
261) -> Result<AmazonS3Builder> {
262    get_object_store_builder(url, aws_options, false)
263}
264
265fn get_object_store_builder(
266    url: &Url,
267    aws_options: &AwsOptions,
268    virtual_hosted_style_request: bool,
269) -> Result<AmazonS3Builder> {
270    let bucket_name = get_bucket_name(url)?;
271    let mut builder = AmazonS3Builder::from_env()
272        .with_virtual_hosted_style_request(virtual_hosted_style_request)
273        .with_bucket_name(bucket_name)
274        // oss/cos don't care about the "region" field
275        .with_region("do_not_care");
276
277    if let (Some(access_key_id), Some(secret_access_key)) =
278        (&aws_options.access_key_id, &aws_options.secret_access_key)
279    {
280        builder = builder
281            .with_access_key_id(access_key_id)
282            .with_secret_access_key(secret_access_key);
283    }
284
285    if let Some(endpoint) = &aws_options.endpoint {
286        builder = builder.with_endpoint(endpoint);
287    }
288
289    Ok(builder)
290}
291
292pub fn get_gcs_object_store_builder(
293    url: &Url,
294    gs_options: &GcpOptions,
295) -> Result<GoogleCloudStorageBuilder> {
296    let bucket_name = get_bucket_name(url)?;
297    let mut builder = GoogleCloudStorageBuilder::from_env().with_bucket_name(bucket_name);
298
299    if let Some(service_account_path) = &gs_options.service_account_path {
300        builder = builder.with_service_account_path(service_account_path);
301    }
302
303    if let Some(service_account_key) = &gs_options.service_account_key {
304        builder = builder.with_service_account_key(service_account_key);
305    }
306
307    if let Some(application_credentials_path) = &gs_options.application_credentials_path {
308        builder = builder.with_application_credentials(application_credentials_path);
309    }
310
311    Ok(builder)
312}
313
314fn get_bucket_name(url: &Url) -> Result<&str> {
315    url.host_str().ok_or_else(|| {
316        exec_datafusion_err!("Not able to parse bucket name from url: {}", url.as_str())
317    })
318}
319
320/// This struct encapsulates AWS options one uses when setting up object storage.
321#[derive(Default, Debug, Clone)]
322pub struct AwsOptions {
323    /// Access Key ID
324    pub access_key_id: Option<String>,
325    /// Secret Access Key
326    pub secret_access_key: Option<String>,
327    /// Session token
328    pub session_token: Option<String>,
329    /// AWS Region
330    pub region: Option<String>,
331    /// OSS or COS Endpoint
332    pub endpoint: Option<String>,
333    /// Allow HTTP (otherwise will always use https)
334    pub allow_http: Option<bool>,
335    /// Do not fetch credentials and do not sign requests
336    ///
337    /// This can be useful when interacting with public S3 buckets that deny
338    /// authorized requests
339    pub skip_signature: Option<bool>,
340}
341
342impl ExtensionOptions for AwsOptions {
343    fn as_any(&self) -> &dyn Any {
344        self
345    }
346
347    fn as_any_mut(&mut self) -> &mut dyn Any {
348        self
349    }
350
351    fn cloned(&self) -> Box<dyn ExtensionOptions> {
352        Box::new(self.clone())
353    }
354
355    fn set(&mut self, key: &str, value: &str) -> Result<()> {
356        let (_key, aws_key) = key.split_once('.').unwrap_or((key, ""));
357        let (key, rem) = aws_key.split_once('.').unwrap_or((aws_key, ""));
358        match key {
359            "access_key_id" => {
360                self.access_key_id.set(rem, value)?;
361            }
362            "secret_access_key" => {
363                self.secret_access_key.set(rem, value)?;
364            }
365            "session_token" => {
366                self.session_token.set(rem, value)?;
367            }
368            "region" => {
369                self.region.set(rem, value)?;
370            }
371            "oss" | "cos" | "endpoint" => {
372                self.endpoint.set(rem, value)?;
373            }
374            "allow_http" => {
375                self.allow_http.set(rem, value)?;
376            }
377            "skip_signature" | "nosign" => {
378                self.skip_signature.set(rem, value)?;
379            }
380            _ => {
381                return config_err!("Config value \"{}\" not found on AwsOptions", rem);
382            }
383        }
384        Ok(())
385    }
386
387    fn entries(&self) -> Vec<ConfigEntry> {
388        struct Visitor(Vec<ConfigEntry>);
389
390        impl Visit for Visitor {
391            fn some<V: Display>(
392                &mut self,
393                key: &str,
394                value: V,
395                description: &'static str,
396            ) {
397                self.0.push(ConfigEntry {
398                    key: key.to_string(),
399                    value: Some(value.to_string()),
400                    description,
401                })
402            }
403
404            fn none(&mut self, key: &str, description: &'static str) {
405                self.0.push(ConfigEntry {
406                    key: key.to_string(),
407                    value: None,
408                    description,
409                })
410            }
411        }
412
413        let mut v = Visitor(vec![]);
414        self.access_key_id.visit(&mut v, "access_key_id", "");
415        self.secret_access_key
416            .visit(&mut v, "secret_access_key", "");
417        self.session_token.visit(&mut v, "session_token", "");
418        self.region.visit(&mut v, "region", "");
419        self.endpoint.visit(&mut v, "endpoint", "");
420        self.allow_http.visit(&mut v, "allow_http", "");
421        v.0
422    }
423}
424
425impl ConfigExtension for AwsOptions {
426    const PREFIX: &'static str = "aws";
427}
428
429/// This struct encapsulates GCP options one uses when setting up object storage.
430#[derive(Debug, Clone, Default)]
431pub struct GcpOptions {
432    /// Service account path
433    pub service_account_path: Option<String>,
434    /// Service account key
435    pub service_account_key: Option<String>,
436    /// Application credentials path
437    pub application_credentials_path: Option<String>,
438}
439
440impl ExtensionOptions for GcpOptions {
441    fn as_any(&self) -> &dyn Any {
442        self
443    }
444
445    fn as_any_mut(&mut self) -> &mut dyn Any {
446        self
447    }
448
449    fn cloned(&self) -> Box<dyn ExtensionOptions> {
450        Box::new(self.clone())
451    }
452
453    fn set(&mut self, key: &str, value: &str) -> Result<()> {
454        let (_key, rem) = key.split_once('.').unwrap_or((key, ""));
455        match rem {
456            "service_account_path" => {
457                self.service_account_path.set(rem, value)?;
458            }
459            "service_account_key" => {
460                self.service_account_key.set(rem, value)?;
461            }
462            "application_credentials_path" => {
463                self.application_credentials_path.set(rem, value)?;
464            }
465            _ => {
466                return config_err!("Config value \"{}\" not found on GcpOptions", rem);
467            }
468        }
469        Ok(())
470    }
471
472    fn entries(&self) -> Vec<ConfigEntry> {
473        struct Visitor(Vec<ConfigEntry>);
474
475        impl Visit for Visitor {
476            fn some<V: Display>(
477                &mut self,
478                key: &str,
479                value: V,
480                description: &'static str,
481            ) {
482                self.0.push(ConfigEntry {
483                    key: key.to_string(),
484                    value: Some(value.to_string()),
485                    description,
486                })
487            }
488
489            fn none(&mut self, key: &str, description: &'static str) {
490                self.0.push(ConfigEntry {
491                    key: key.to_string(),
492                    value: None,
493                    description,
494                })
495            }
496        }
497
498        let mut v = Visitor(vec![]);
499        self.service_account_path
500            .visit(&mut v, "service_account_path", "");
501        self.service_account_key
502            .visit(&mut v, "service_account_key", "");
503        self.application_credentials_path.visit(
504            &mut v,
505            "application_credentials_path",
506            "",
507        );
508        v.0
509    }
510}
511
512impl ConfigExtension for GcpOptions {
513    const PREFIX: &'static str = "gcp";
514}
515
516pub(crate) async fn get_object_store(
517    state: &SessionState,
518    scheme: &str,
519    url: &Url,
520    table_options: &TableOptions,
521    resolve_region: bool,
522) -> Result<Arc<dyn ObjectStore>, DataFusionError> {
523    let store: Arc<dyn ObjectStore> = match scheme {
524        "s3" => {
525            let Some(options) = table_options.extensions.get::<AwsOptions>() else {
526                return exec_err!(
527                    "Given table options incompatible with the 's3' scheme"
528                );
529            };
530            let builder =
531                get_s3_object_store_builder(url, options, resolve_region).await?;
532            Arc::new(builder.build()?)
533        }
534        "oss" => {
535            let Some(options) = table_options.extensions.get::<AwsOptions>() else {
536                return exec_err!(
537                    "Given table options incompatible with the 'oss' scheme"
538                );
539            };
540            let builder = get_oss_object_store_builder(url, options)?;
541            Arc::new(builder.build()?)
542        }
543        "cos" => {
544            let Some(options) = table_options.extensions.get::<AwsOptions>() else {
545                return exec_err!(
546                    "Given table options incompatible with the 'cos' scheme"
547                );
548            };
549            let builder = get_cos_object_store_builder(url, options)?;
550            Arc::new(builder.build()?)
551        }
552        "gs" | "gcs" => {
553            let Some(options) = table_options.extensions.get::<GcpOptions>() else {
554                return exec_err!(
555                    "Given table options incompatible with the 'gs'/'gcs' scheme"
556                );
557            };
558            let builder = get_gcs_object_store_builder(url, options)?;
559            Arc::new(builder.build()?)
560        }
561        "http" | "https" => Arc::new(
562            HttpBuilder::new()
563                .with_client_options(ClientOptions::new().with_allow_http(true))
564                .with_url(url.origin().ascii_serialization())
565                .build()?,
566        ),
567        _ => {
568            // For other types, try to get from `object_store_registry`:
569            state
570                .runtime_env()
571                .object_store_registry
572                .get_store(url)
573                .map_err(|_| {
574                    exec_datafusion_err!("Unsupported object store scheme: {}", scheme)
575                })?
576        }
577    };
578    Ok(store)
579}
580
581#[cfg(test)]
582mod tests {
583    use crate::cli_context::CliSessionContext;
584
585    use super::*;
586
587    use datafusion::{
588        datasource::listing::ListingTableUrl,
589        logical_expr::{DdlStatement, LogicalPlan},
590        prelude::SessionContext,
591    };
592
593    use object_store::{aws::AmazonS3ConfigKey, gcp::GoogleConfigKey};
594
595    #[tokio::test]
596    async fn s3_object_store_builder_default() -> Result<()> {
597        if let Err(DataFusionError::Execution(e)) = check_aws_envs().await {
598            // Skip test if AWS envs are not set
599            eprintln!("{e}");
600            return Ok(());
601        }
602
603        let location = "s3://bucket/path/FAKE/file.parquet";
604        // Set it to a non-existent file to avoid reading the default configuration file
605        unsafe {
606            std::env::set_var("AWS_CONFIG_FILE", "data/aws.config");
607            std::env::set_var("AWS_SHARED_CREDENTIALS_FILE", "data/aws.credentials");
608        }
609
610        // No options
611        let table_url = ListingTableUrl::parse(location)?;
612        let scheme = table_url.scheme();
613        let sql =
614            format!("CREATE EXTERNAL TABLE test STORED AS PARQUET LOCATION '{location}'");
615
616        let ctx = SessionContext::new();
617        ctx.register_table_options_extension_from_scheme(scheme);
618        let table_options = get_table_options(&ctx, &sql).await;
619        let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
620        let builder =
621            get_s3_object_store_builder(table_url.as_ref(), aws_options, false).await?;
622
623        // If the environment variables are set (as they are in CI) use them
624        let expected_access_key_id = std::env::var("AWS_ACCESS_KEY_ID").ok();
625        let expected_secret_access_key = std::env::var("AWS_SECRET_ACCESS_KEY").ok();
626        let expected_region = Some(
627            std::env::var("AWS_REGION").unwrap_or_else(|_| "eu-central-1".to_string()),
628        );
629        let expected_endpoint = std::env::var("AWS_ENDPOINT").ok();
630
631        // get the actual configuration information, then assert_eq!
632        assert_eq!(
633            builder.get_config_value(&AmazonS3ConfigKey::AccessKeyId),
634            expected_access_key_id
635        );
636        assert_eq!(
637            builder.get_config_value(&AmazonS3ConfigKey::SecretAccessKey),
638            expected_secret_access_key
639        );
640        // Default is to skip signature when no credentials are provided
641        let expected_skip_signature =
642            if expected_access_key_id.is_none() && expected_secret_access_key.is_none() {
643                Some(String::from("true"))
644            } else {
645                Some(String::from("false"))
646            };
647        assert_eq!(
648            builder.get_config_value(&AmazonS3ConfigKey::Region),
649            expected_region
650        );
651        assert_eq!(
652            builder.get_config_value(&AmazonS3ConfigKey::Endpoint),
653            expected_endpoint
654        );
655        assert_eq!(builder.get_config_value(&AmazonS3ConfigKey::Token), None);
656        assert_eq!(
657            builder.get_config_value(&AmazonS3ConfigKey::SkipSignature),
658            expected_skip_signature
659        );
660        Ok(())
661    }
662
663    #[tokio::test]
664    async fn s3_object_store_builder() -> Result<()> {
665        // "fake" is uppercase to ensure the values are not lowercased when parsed
666        let access_key_id = "FAKE_access_key_id";
667        let secret_access_key = "FAKE_secret_access_key";
668        let region = "fake_us-east-2";
669        let endpoint = "endpoint33";
670        let session_token = "FAKE_session_token";
671        let location = "s3://bucket/path/FAKE/file.parquet";
672
673        let table_url = ListingTableUrl::parse(location)?;
674        let scheme = table_url.scheme();
675        let sql = format!(
676            "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS\
677            ('aws.access_key_id' '{access_key_id}', \
678            'aws.secret_access_key' '{secret_access_key}', \
679            'aws.region' '{region}', \
680            'aws.session_token' {session_token}, \
681            'aws.endpoint' '{endpoint}'\
682            ) LOCATION '{location}'"
683        );
684
685        let ctx = SessionContext::new();
686        ctx.register_table_options_extension_from_scheme(scheme);
687        let table_options = get_table_options(&ctx, &sql).await;
688        let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
689        let builder =
690            get_s3_object_store_builder(table_url.as_ref(), aws_options, false).await?;
691        // get the actual configuration information, then assert_eq!
692        let config = [
693            (AmazonS3ConfigKey::AccessKeyId, access_key_id),
694            (AmazonS3ConfigKey::SecretAccessKey, secret_access_key),
695            (AmazonS3ConfigKey::Region, region),
696            (AmazonS3ConfigKey::Endpoint, endpoint),
697            (AmazonS3ConfigKey::Token, session_token),
698        ];
699        for (key, value) in config {
700            assert_eq!(value, builder.get_config_value(&key).unwrap());
701        }
702        // Should not skip signature when credentials are provided
703        assert_eq!(
704            builder.get_config_value(&AmazonS3ConfigKey::SkipSignature),
705            Some("false".into())
706        );
707
708        Ok(())
709    }
710
711    #[tokio::test]
712    async fn s3_object_store_builder_allow_http_error() -> Result<()> {
713        let access_key_id = "fake_access_key_id";
714        let secret_access_key = "fake_secret_access_key";
715        let endpoint = "http://endpoint33";
716        let location = "s3://bucket/path/file.parquet";
717
718        let table_url = ListingTableUrl::parse(location)?;
719        let scheme = table_url.scheme();
720        let sql = format!(
721            "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS\
722            ('aws.access_key_id' '{access_key_id}', \
723            'aws.secret_access_key' '{secret_access_key}', \
724            'aws.endpoint' '{endpoint}'\
725            ) LOCATION '{location}'"
726        );
727
728        let ctx = SessionContext::new();
729        ctx.register_table_options_extension_from_scheme(scheme);
730
731        let table_options = get_table_options(&ctx, &sql).await;
732        let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
733        let err = get_s3_object_store_builder(table_url.as_ref(), aws_options, false)
734            .await
735            .unwrap_err();
736
737        assert_eq!(
738            err.to_string().lines().next().unwrap_or_default(),
739            "Invalid or Unsupported Configuration: Invalid endpoint: http://endpoint33. HTTP is not allowed for S3 endpoints. To allow HTTP, set 'aws.allow_http' to true"
740        );
741
742        // Now add `allow_http` to the options and check if it works
743        let sql = format!(
744            "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS\
745            ('aws.access_key_id' '{access_key_id}', \
746            'aws.secret_access_key' '{secret_access_key}', \
747            'aws.endpoint' '{endpoint}',\
748            'aws.allow_http' 'true'\
749            ) LOCATION '{location}'"
750        );
751        let table_options = get_table_options(&ctx, &sql).await;
752
753        let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
754        // ensure this isn't an error
755        get_s3_object_store_builder(table_url.as_ref(), aws_options, false).await?;
756
757        Ok(())
758    }
759
760    #[tokio::test]
761    async fn s3_object_store_builder_resolves_region_when_none_provided() -> Result<()> {
762        if let Err(DataFusionError::Execution(e)) = check_aws_envs().await {
763            // Skip test if AWS envs are not set
764            eprintln!("{e}");
765            return Ok(());
766        }
767        let location = "s3://test-bucket/path/file.parquet";
768        // Set it to a non-existent file to avoid reading the default configuration file
769        unsafe {
770            std::env::set_var("AWS_CONFIG_FILE", "data/aws.config");
771        }
772
773        let table_url = ListingTableUrl::parse(location)?;
774        let aws_options = AwsOptions {
775            region: None, // No region specified - should auto-detect
776            ..Default::default()
777        };
778
779        let builder =
780            get_s3_object_store_builder(table_url.as_ref(), &aws_options, false).await?;
781
782        // Verify that the region was auto-detected in test environment
783        assert!(
784            builder
785                .get_config_value(&AmazonS3ConfigKey::Region)
786                .is_some()
787        );
788
789        Ok(())
790    }
791
792    #[tokio::test]
793    async fn s3_object_store_builder_overrides_region_when_resolve_region_enabled()
794    -> Result<()> {
795        if let Err(DataFusionError::Execution(e)) = check_aws_envs().await {
796            // Skip test if AWS envs are not set
797            eprintln!("{e}");
798            return Ok(());
799        }
800
801        let original_region = "us-east-1";
802        let expected_region = "eu-central-1"; // This should be the auto-detected region
803        let location = "s3://test-bucket/path/file.parquet";
804
805        let table_url = ListingTableUrl::parse(location)?;
806        let aws_options = AwsOptions {
807            region: Some(original_region.to_string()), // Explicit region provided
808            ..Default::default()
809        };
810
811        let builder =
812            get_s3_object_store_builder(table_url.as_ref(), &aws_options, true).await?;
813
814        // Verify that the region was overridden by auto-detection
815        assert_eq!(
816            builder.get_config_value(&AmazonS3ConfigKey::Region),
817            Some(expected_region.to_string())
818        );
819
820        Ok(())
821    }
822
823    #[tokio::test]
824    async fn oss_object_store_builder() -> Result<()> {
825        let access_key_id = "fake_access_key_id";
826        let secret_access_key = "fake_secret_access_key";
827        let endpoint = "fake_endpoint";
828        let location = "oss://bucket/path/file.parquet";
829
830        let table_url = ListingTableUrl::parse(location)?;
831        let scheme = table_url.scheme();
832        let sql = format!(
833            "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS('aws.access_key_id' '{access_key_id}', 'aws.secret_access_key' '{secret_access_key}', 'aws.oss.endpoint' '{endpoint}') LOCATION '{location}'"
834        );
835
836        let ctx = SessionContext::new();
837        ctx.register_table_options_extension_from_scheme(scheme);
838        let table_options = get_table_options(&ctx, &sql).await;
839
840        let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
841        let builder = get_oss_object_store_builder(table_url.as_ref(), aws_options)?;
842        // get the actual configuration information, then assert_eq!
843        let config = [
844            (AmazonS3ConfigKey::AccessKeyId, access_key_id),
845            (AmazonS3ConfigKey::SecretAccessKey, secret_access_key),
846            (AmazonS3ConfigKey::Endpoint, endpoint),
847        ];
848        for (key, value) in config {
849            assert_eq!(value, builder.get_config_value(&key).unwrap());
850        }
851
852        Ok(())
853    }
854
855    #[tokio::test]
856    async fn gcs_object_store_builder() -> Result<()> {
857        let service_account_path = "fake_service_account_path";
858        let service_account_key = "{\"private_key\": \"fake_private_key.pem\",\"client_email\":\"fake_client_email\"}";
859        let application_credentials_path = "fake_application_credentials_path";
860        let location = "gcs://bucket/path/file.parquet";
861
862        let table_url = ListingTableUrl::parse(location)?;
863        let scheme = table_url.scheme();
864        let sql = format!(
865            "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS('gcp.service_account_path' '{service_account_path}', 'gcp.service_account_key' '{service_account_key}', 'gcp.application_credentials_path' '{application_credentials_path}') LOCATION '{location}'"
866        );
867
868        let ctx = SessionContext::new();
869        ctx.register_table_options_extension_from_scheme(scheme);
870        let table_options = get_table_options(&ctx, &sql).await;
871
872        let gcp_options = table_options.extensions.get::<GcpOptions>().unwrap();
873        let builder = get_gcs_object_store_builder(table_url.as_ref(), gcp_options)?;
874        // get the actual configuration information, then assert_eq!
875        let config = [
876            (GoogleConfigKey::ServiceAccount, service_account_path),
877            (GoogleConfigKey::ServiceAccountKey, service_account_key),
878            (
879                GoogleConfigKey::ApplicationCredentials,
880                application_credentials_path,
881            ),
882        ];
883        for (key, value) in config {
884            assert_eq!(value, builder.get_config_value(&key).unwrap());
885        }
886
887        Ok(())
888    }
889
890    /// Plans the `CREATE EXTERNAL TABLE` SQL statement and returns the
891    /// resulting resolved `CreateExternalTable` command.
892    async fn get_table_options(ctx: &SessionContext, sql: &str) -> TableOptions {
893        let mut plan = ctx.state().create_logical_plan(sql).await.unwrap();
894
895        let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan else {
896            panic!("plan is not a CreateExternalTable");
897        };
898
899        let mut table_options = ctx.state().default_table_options();
900        table_options
901            .alter_with_string_hash_map(&cmd.options)
902            .unwrap();
903        table_options
904    }
905
906    async fn check_aws_envs() -> Result<()> {
907        let aws_envs = [
908            "AWS_ACCESS_KEY_ID",
909            "AWS_SECRET_ACCESS_KEY",
910            "AWS_REGION",
911            "AWS_ALLOW_HTTP",
912        ];
913        for aws_env in aws_envs {
914            std::env::var(aws_env).map_err(|_| {
915                exec_datafusion_err!("aws envs not set, skipping s3 tests")
916            })?;
917        }
918        Ok(())
919    }
920}