datafusion_cli/
object_storage.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::fmt::{Debug, Display};
20use std::sync::Arc;
21
22use datafusion::common::config::{
23    ConfigEntry, ConfigExtension, ConfigField, ExtensionOptions, TableOptions, Visit,
24};
25use datafusion::common::{config_err, exec_datafusion_err, exec_err};
26use datafusion::error::{DataFusionError, Result};
27use datafusion::execution::context::SessionState;
28
29use async_trait::async_trait;
30use aws_config::BehaviorVersion;
31use aws_credential_types::provider::ProvideCredentials;
32use object_store::aws::{AmazonS3Builder, AwsCredential};
33use object_store::gcp::GoogleCloudStorageBuilder;
34use object_store::http::HttpBuilder;
35use object_store::{ClientOptions, CredentialProvider, ObjectStore};
36use url::Url;
37
38pub async fn get_s3_object_store_builder(
39    url: &Url,
40    aws_options: &AwsOptions,
41) -> Result<AmazonS3Builder> {
42    let AwsOptions {
43        access_key_id,
44        secret_access_key,
45        session_token,
46        region,
47        endpoint,
48        allow_http,
49    } = aws_options;
50
51    let bucket_name = get_bucket_name(url)?;
52    let mut builder = AmazonS3Builder::from_env().with_bucket_name(bucket_name);
53
54    if let (Some(access_key_id), Some(secret_access_key)) =
55        (access_key_id, secret_access_key)
56    {
57        builder = builder
58            .with_access_key_id(access_key_id)
59            .with_secret_access_key(secret_access_key);
60
61        if let Some(session_token) = session_token {
62            builder = builder.with_token(session_token);
63        }
64    } else {
65        let config = aws_config::defaults(BehaviorVersion::latest()).load().await;
66        if let Some(region) = config.region() {
67            builder = builder.with_region(region.to_string());
68        }
69
70        let credentials = config
71            .credentials_provider()
72            .ok_or_else(|| {
73                DataFusionError::ObjectStore(object_store::Error::Generic {
74                    store: "S3",
75                    source: "Failed to get S3 credentials from the environment".into(),
76                })
77            })?
78            .clone();
79
80        let credentials = Arc::new(S3CredentialProvider { credentials });
81        builder = builder.with_credentials(credentials);
82    }
83
84    if let Some(region) = region {
85        builder = builder.with_region(region);
86    }
87
88    if let Some(endpoint) = endpoint {
89        // Make a nicer error if the user hasn't allowed http and the endpoint
90        // is http as the default message is "URL scheme is not allowed"
91        if let Ok(endpoint_url) = Url::try_from(endpoint.as_str()) {
92            if !matches!(allow_http, Some(true)) && endpoint_url.scheme() == "http" {
93                return config_err!(
94                    "Invalid endpoint: {endpoint}. \
95                HTTP is not allowed for S3 endpoints. \
96                To allow HTTP, set 'aws.allow_http' to true"
97                );
98            }
99        }
100
101        builder = builder.with_endpoint(endpoint);
102    }
103
104    if let Some(allow_http) = allow_http {
105        builder = builder.with_allow_http(*allow_http);
106    }
107
108    Ok(builder)
109}
110
111#[derive(Debug)]
112struct S3CredentialProvider {
113    credentials: aws_credential_types::provider::SharedCredentialsProvider,
114}
115
116#[async_trait]
117impl CredentialProvider for S3CredentialProvider {
118    type Credential = AwsCredential;
119
120    async fn get_credential(&self) -> object_store::Result<Arc<Self::Credential>> {
121        let creds = self.credentials.provide_credentials().await.map_err(|e| {
122            object_store::Error::Generic {
123                store: "S3",
124                source: Box::new(e),
125            }
126        })?;
127        Ok(Arc::new(AwsCredential {
128            key_id: creds.access_key_id().to_string(),
129            secret_key: creds.secret_access_key().to_string(),
130            token: creds.session_token().map(ToString::to_string),
131        }))
132    }
133}
134
135pub fn get_oss_object_store_builder(
136    url: &Url,
137    aws_options: &AwsOptions,
138) -> Result<AmazonS3Builder> {
139    get_object_store_builder(url, aws_options, true)
140}
141
142pub fn get_cos_object_store_builder(
143    url: &Url,
144    aws_options: &AwsOptions,
145) -> Result<AmazonS3Builder> {
146    get_object_store_builder(url, aws_options, false)
147}
148
149fn get_object_store_builder(
150    url: &Url,
151    aws_options: &AwsOptions,
152    virtual_hosted_style_request: bool,
153) -> Result<AmazonS3Builder> {
154    let bucket_name = get_bucket_name(url)?;
155    let mut builder = AmazonS3Builder::from_env()
156        .with_virtual_hosted_style_request(virtual_hosted_style_request)
157        .with_bucket_name(bucket_name)
158        // oss/cos don't care about the "region" field
159        .with_region("do_not_care");
160
161    if let (Some(access_key_id), Some(secret_access_key)) =
162        (&aws_options.access_key_id, &aws_options.secret_access_key)
163    {
164        builder = builder
165            .with_access_key_id(access_key_id)
166            .with_secret_access_key(secret_access_key);
167    }
168
169    if let Some(endpoint) = &aws_options.endpoint {
170        builder = builder.with_endpoint(endpoint);
171    }
172
173    Ok(builder)
174}
175
176pub fn get_gcs_object_store_builder(
177    url: &Url,
178    gs_options: &GcpOptions,
179) -> Result<GoogleCloudStorageBuilder> {
180    let bucket_name = get_bucket_name(url)?;
181    let mut builder = GoogleCloudStorageBuilder::from_env().with_bucket_name(bucket_name);
182
183    if let Some(service_account_path) = &gs_options.service_account_path {
184        builder = builder.with_service_account_path(service_account_path);
185    }
186
187    if let Some(service_account_key) = &gs_options.service_account_key {
188        builder = builder.with_service_account_key(service_account_key);
189    }
190
191    if let Some(application_credentials_path) = &gs_options.application_credentials_path {
192        builder = builder.with_application_credentials(application_credentials_path);
193    }
194
195    Ok(builder)
196}
197
198fn get_bucket_name(url: &Url) -> Result<&str> {
199    url.host_str().ok_or_else(|| {
200        DataFusionError::Execution(format!(
201            "Not able to parse bucket name from url: {}",
202            url.as_str()
203        ))
204    })
205}
206
207/// This struct encapsulates AWS options one uses when setting up object storage.
208#[derive(Default, Debug, Clone)]
209pub struct AwsOptions {
210    /// Access Key ID
211    pub access_key_id: Option<String>,
212    /// Secret Access Key
213    pub secret_access_key: Option<String>,
214    /// Session token
215    pub session_token: Option<String>,
216    /// AWS Region
217    pub region: Option<String>,
218    /// OSS or COS Endpoint
219    pub endpoint: Option<String>,
220    /// Allow HTTP (otherwise will always use https)
221    pub allow_http: Option<bool>,
222}
223
224impl ExtensionOptions for AwsOptions {
225    fn as_any(&self) -> &dyn Any {
226        self
227    }
228
229    fn as_any_mut(&mut self) -> &mut dyn Any {
230        self
231    }
232
233    fn cloned(&self) -> Box<dyn ExtensionOptions> {
234        Box::new(self.clone())
235    }
236
237    fn set(&mut self, key: &str, value: &str) -> Result<()> {
238        let (_key, aws_key) = key.split_once('.').unwrap_or((key, ""));
239        let (key, rem) = aws_key.split_once('.').unwrap_or((aws_key, ""));
240        match key {
241            "access_key_id" => {
242                self.access_key_id.set(rem, value)?;
243            }
244            "secret_access_key" => {
245                self.secret_access_key.set(rem, value)?;
246            }
247            "session_token" => {
248                self.session_token.set(rem, value)?;
249            }
250            "region" => {
251                self.region.set(rem, value)?;
252            }
253            "oss" | "cos" | "endpoint" => {
254                self.endpoint.set(rem, value)?;
255            }
256            "allow_http" => {
257                self.allow_http.set(rem, value)?;
258            }
259            _ => {
260                return config_err!("Config value \"{}\" not found on AwsOptions", rem);
261            }
262        }
263        Ok(())
264    }
265
266    fn entries(&self) -> Vec<ConfigEntry> {
267        struct Visitor(Vec<ConfigEntry>);
268
269        impl Visit for Visitor {
270            fn some<V: Display>(
271                &mut self,
272                key: &str,
273                value: V,
274                description: &'static str,
275            ) {
276                self.0.push(ConfigEntry {
277                    key: key.to_string(),
278                    value: Some(value.to_string()),
279                    description,
280                })
281            }
282
283            fn none(&mut self, key: &str, description: &'static str) {
284                self.0.push(ConfigEntry {
285                    key: key.to_string(),
286                    value: None,
287                    description,
288                })
289            }
290        }
291
292        let mut v = Visitor(vec![]);
293        self.access_key_id.visit(&mut v, "access_key_id", "");
294        self.secret_access_key
295            .visit(&mut v, "secret_access_key", "");
296        self.session_token.visit(&mut v, "session_token", "");
297        self.region.visit(&mut v, "region", "");
298        self.endpoint.visit(&mut v, "endpoint", "");
299        self.allow_http.visit(&mut v, "allow_http", "");
300        v.0
301    }
302}
303
304impl ConfigExtension for AwsOptions {
305    const PREFIX: &'static str = "aws";
306}
307
308/// This struct encapsulates GCP options one uses when setting up object storage.
309#[derive(Debug, Clone, Default)]
310pub struct GcpOptions {
311    /// Service account path
312    pub service_account_path: Option<String>,
313    /// Service account key
314    pub service_account_key: Option<String>,
315    /// Application credentials path
316    pub application_credentials_path: Option<String>,
317}
318
319impl ExtensionOptions for GcpOptions {
320    fn as_any(&self) -> &dyn Any {
321        self
322    }
323
324    fn as_any_mut(&mut self) -> &mut dyn Any {
325        self
326    }
327
328    fn cloned(&self) -> Box<dyn ExtensionOptions> {
329        Box::new(self.clone())
330    }
331
332    fn set(&mut self, key: &str, value: &str) -> Result<()> {
333        let (_key, rem) = key.split_once('.').unwrap_or((key, ""));
334        match rem {
335            "service_account_path" => {
336                self.service_account_path.set(rem, value)?;
337            }
338            "service_account_key" => {
339                self.service_account_key.set(rem, value)?;
340            }
341            "application_credentials_path" => {
342                self.application_credentials_path.set(rem, value)?;
343            }
344            _ => {
345                return config_err!("Config value \"{}\" not found on GcpOptions", rem);
346            }
347        }
348        Ok(())
349    }
350
351    fn entries(&self) -> Vec<ConfigEntry> {
352        struct Visitor(Vec<ConfigEntry>);
353
354        impl Visit for Visitor {
355            fn some<V: Display>(
356                &mut self,
357                key: &str,
358                value: V,
359                description: &'static str,
360            ) {
361                self.0.push(ConfigEntry {
362                    key: key.to_string(),
363                    value: Some(value.to_string()),
364                    description,
365                })
366            }
367
368            fn none(&mut self, key: &str, description: &'static str) {
369                self.0.push(ConfigEntry {
370                    key: key.to_string(),
371                    value: None,
372                    description,
373                })
374            }
375        }
376
377        let mut v = Visitor(vec![]);
378        self.service_account_path
379            .visit(&mut v, "service_account_path", "");
380        self.service_account_key
381            .visit(&mut v, "service_account_key", "");
382        self.application_credentials_path.visit(
383            &mut v,
384            "application_credentials_path",
385            "",
386        );
387        v.0
388    }
389}
390
391impl ConfigExtension for GcpOptions {
392    const PREFIX: &'static str = "gcp";
393}
394
395pub(crate) async fn get_object_store(
396    state: &SessionState,
397    scheme: &str,
398    url: &Url,
399    table_options: &TableOptions,
400) -> Result<Arc<dyn ObjectStore>, DataFusionError> {
401    let store: Arc<dyn ObjectStore> = match scheme {
402        "s3" => {
403            let Some(options) = table_options.extensions.get::<AwsOptions>() else {
404                return exec_err!(
405                    "Given table options incompatible with the 's3' scheme"
406                );
407            };
408            let builder = get_s3_object_store_builder(url, options).await?;
409            Arc::new(builder.build()?)
410        }
411        "oss" => {
412            let Some(options) = table_options.extensions.get::<AwsOptions>() else {
413                return exec_err!(
414                    "Given table options incompatible with the 'oss' scheme"
415                );
416            };
417            let builder = get_oss_object_store_builder(url, options)?;
418            Arc::new(builder.build()?)
419        }
420        "cos" => {
421            let Some(options) = table_options.extensions.get::<AwsOptions>() else {
422                return exec_err!(
423                    "Given table options incompatible with the 'cos' scheme"
424                );
425            };
426            let builder = get_cos_object_store_builder(url, options)?;
427            Arc::new(builder.build()?)
428        }
429        "gs" | "gcs" => {
430            let Some(options) = table_options.extensions.get::<GcpOptions>() else {
431                return exec_err!(
432                    "Given table options incompatible with the 'gs'/'gcs' scheme"
433                );
434            };
435            let builder = get_gcs_object_store_builder(url, options)?;
436            Arc::new(builder.build()?)
437        }
438        "http" | "https" => Arc::new(
439            HttpBuilder::new()
440                .with_client_options(ClientOptions::new().with_allow_http(true))
441                .with_url(url.origin().ascii_serialization())
442                .build()?,
443        ),
444        _ => {
445            // For other types, try to get from `object_store_registry`:
446            state
447                .runtime_env()
448                .object_store_registry
449                .get_store(url)
450                .map_err(|_| {
451                    exec_datafusion_err!("Unsupported object store scheme: {}", scheme)
452                })?
453        }
454    };
455    Ok(store)
456}
457
458#[cfg(test)]
459mod tests {
460    use crate::cli_context::CliSessionContext;
461
462    use super::*;
463
464    use datafusion::common::plan_err;
465    use datafusion::{
466        datasource::listing::ListingTableUrl,
467        logical_expr::{DdlStatement, LogicalPlan},
468        prelude::SessionContext,
469    };
470
471    use object_store::{aws::AmazonS3ConfigKey, gcp::GoogleConfigKey};
472
473    #[tokio::test]
474    async fn s3_object_store_builder() -> Result<()> {
475        // "fake" is uppercase to ensure the values are not lowercased when parsed
476        let access_key_id = "FAKE_access_key_id";
477        let secret_access_key = "FAKE_secret_access_key";
478        let region = "fake_us-east-2";
479        let endpoint = "endpoint33";
480        let session_token = "FAKE_session_token";
481        let location = "s3://bucket/path/FAKE/file.parquet";
482
483        let table_url = ListingTableUrl::parse(location)?;
484        let scheme = table_url.scheme();
485        let sql = format!(
486            "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS\
487            ('aws.access_key_id' '{access_key_id}', \
488            'aws.secret_access_key' '{secret_access_key}', \
489            'aws.region' '{region}', \
490            'aws.session_token' {session_token}, \
491            'aws.endpoint' '{endpoint}'\
492            ) LOCATION '{location}'"
493        );
494
495        let ctx = SessionContext::new();
496        let mut plan = ctx.state().create_logical_plan(&sql).await?;
497
498        if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan {
499            ctx.register_table_options_extension_from_scheme(scheme);
500            let mut table_options = ctx.state().default_table_options();
501            table_options.alter_with_string_hash_map(&cmd.options)?;
502            let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
503            let builder =
504                get_s3_object_store_builder(table_url.as_ref(), aws_options).await?;
505            // get the actual configuration information, then assert_eq!
506            let config = [
507                (AmazonS3ConfigKey::AccessKeyId, access_key_id),
508                (AmazonS3ConfigKey::SecretAccessKey, secret_access_key),
509                (AmazonS3ConfigKey::Region, region),
510                (AmazonS3ConfigKey::Endpoint, endpoint),
511                (AmazonS3ConfigKey::Token, session_token),
512            ];
513            for (key, value) in config {
514                assert_eq!(value, builder.get_config_value(&key).unwrap());
515            }
516        } else {
517            return plan_err!("LogicalPlan is not a CreateExternalTable");
518        }
519
520        Ok(())
521    }
522
523    #[tokio::test]
524    async fn s3_object_store_builder_allow_http_error() -> Result<()> {
525        let access_key_id = "fake_access_key_id";
526        let secret_access_key = "fake_secret_access_key";
527        let endpoint = "http://endpoint33";
528        let location = "s3://bucket/path/file.parquet";
529
530        let table_url = ListingTableUrl::parse(location)?;
531        let scheme = table_url.scheme();
532        let sql = format!(
533            "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS\
534            ('aws.access_key_id' '{access_key_id}', \
535            'aws.secret_access_key' '{secret_access_key}', \
536            'aws.endpoint' '{endpoint}'\
537            ) LOCATION '{location}'"
538        );
539
540        let ctx = SessionContext::new();
541        let mut plan = ctx.state().create_logical_plan(&sql).await?;
542
543        if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan {
544            ctx.register_table_options_extension_from_scheme(scheme);
545            let mut table_options = ctx.state().default_table_options();
546            table_options.alter_with_string_hash_map(&cmd.options)?;
547            let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
548            let err = get_s3_object_store_builder(table_url.as_ref(), aws_options)
549                .await
550                .unwrap_err();
551
552            assert_eq!(err.to_string().lines().next().unwrap_or_default(), "Invalid or Unsupported Configuration: Invalid endpoint: http://endpoint33. HTTP is not allowed for S3 endpoints. To allow HTTP, set 'aws.allow_http' to true");
553        } else {
554            return plan_err!("LogicalPlan is not a CreateExternalTable");
555        }
556
557        // Now add `allow_http` to the options and check if it works
558        let sql = format!(
559            "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS\
560            ('aws.access_key_id' '{access_key_id}', \
561            'aws.secret_access_key' '{secret_access_key}', \
562            'aws.endpoint' '{endpoint}',\
563            'aws.allow_http' 'true'\
564            ) LOCATION '{location}'"
565        );
566
567        let mut plan = ctx.state().create_logical_plan(&sql).await?;
568
569        if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan {
570            ctx.register_table_options_extension_from_scheme(scheme);
571            let mut table_options = ctx.state().default_table_options();
572            table_options.alter_with_string_hash_map(&cmd.options)?;
573            let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
574            // ensure this isn't an error
575            get_s3_object_store_builder(table_url.as_ref(), aws_options).await?;
576        } else {
577            return plan_err!("LogicalPlan is not a CreateExternalTable");
578        }
579
580        Ok(())
581    }
582
583    #[tokio::test]
584    async fn oss_object_store_builder() -> Result<()> {
585        let access_key_id = "fake_access_key_id";
586        let secret_access_key = "fake_secret_access_key";
587        let endpoint = "fake_endpoint";
588        let location = "oss://bucket/path/file.parquet";
589
590        let table_url = ListingTableUrl::parse(location)?;
591        let scheme = table_url.scheme();
592        let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS('aws.access_key_id' '{access_key_id}', 'aws.secret_access_key' '{secret_access_key}', 'aws.oss.endpoint' '{endpoint}') LOCATION '{location}'");
593
594        let ctx = SessionContext::new();
595        let mut plan = ctx.state().create_logical_plan(&sql).await?;
596
597        if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan {
598            ctx.register_table_options_extension_from_scheme(scheme);
599            let mut table_options = ctx.state().default_table_options();
600            table_options.alter_with_string_hash_map(&cmd.options)?;
601            let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
602            let builder = get_oss_object_store_builder(table_url.as_ref(), aws_options)?;
603            // get the actual configuration information, then assert_eq!
604            let config = [
605                (AmazonS3ConfigKey::AccessKeyId, access_key_id),
606                (AmazonS3ConfigKey::SecretAccessKey, secret_access_key),
607                (AmazonS3ConfigKey::Endpoint, endpoint),
608            ];
609            for (key, value) in config {
610                assert_eq!(value, builder.get_config_value(&key).unwrap());
611            }
612        } else {
613            return plan_err!("LogicalPlan is not a CreateExternalTable");
614        }
615
616        Ok(())
617    }
618
619    #[tokio::test]
620    async fn gcs_object_store_builder() -> Result<()> {
621        let service_account_path = "fake_service_account_path";
622        let service_account_key =
623            "{\"private_key\": \"fake_private_key.pem\",\"client_email\":\"fake_client_email\"}";
624        let application_credentials_path = "fake_application_credentials_path";
625        let location = "gcs://bucket/path/file.parquet";
626
627        let table_url = ListingTableUrl::parse(location)?;
628        let scheme = table_url.scheme();
629        let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS('gcp.service_account_path' '{service_account_path}', 'gcp.service_account_key' '{service_account_key}', 'gcp.application_credentials_path' '{application_credentials_path}') LOCATION '{location}'");
630
631        let ctx = SessionContext::new();
632        let mut plan = ctx.state().create_logical_plan(&sql).await?;
633
634        if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan {
635            ctx.register_table_options_extension_from_scheme(scheme);
636            let mut table_options = ctx.state().default_table_options();
637            table_options.alter_with_string_hash_map(&cmd.options)?;
638            let gcp_options = table_options.extensions.get::<GcpOptions>().unwrap();
639            let builder = get_gcs_object_store_builder(table_url.as_ref(), gcp_options)?;
640            // get the actual configuration information, then assert_eq!
641            let config = [
642                (GoogleConfigKey::ServiceAccount, service_account_path),
643                (GoogleConfigKey::ServiceAccountKey, service_account_key),
644                (
645                    GoogleConfigKey::ApplicationCredentials,
646                    application_credentials_path,
647                ),
648            ];
649            for (key, value) in config {
650                assert_eq!(value, builder.get_config_value(&key).unwrap());
651            }
652        } else {
653            return plan_err!("LogicalPlan is not a CreateExternalTable");
654        }
655
656        Ok(())
657    }
658}