1pub mod instrumented;
19
20use async_trait::async_trait;
21use aws_config::BehaviorVersion;
22use aws_credential_types::provider::{
23 ProvideCredentials, SharedCredentialsProvider, error::CredentialsError,
24};
25use datafusion::{
26 common::{
27 config::ConfigEntry, config::ConfigExtension, config::ConfigField,
28 config::ExtensionOptions, config::TableOptions, config::Visit, config_err,
29 exec_datafusion_err, exec_err,
30 },
31 error::{DataFusionError, Result},
32 execution::context::SessionState,
33};
34use log::debug;
35use object_store::{
36 ClientOptions, CredentialProvider,
37 Error::Generic,
38 ObjectStore,
39 aws::{AmazonS3Builder, AmazonS3ConfigKey, AwsCredential},
40 gcp::GoogleCloudStorageBuilder,
41 http::HttpBuilder,
42};
43use std::{
44 any::Any,
45 error::Error,
46 fmt::{Debug, Display},
47 sync::Arc,
48};
49use url::Url;
50
51#[cfg(not(test))]
52use object_store::aws::resolve_bucket_region;
53
54#[cfg(test)]
56async fn resolve_bucket_region(
57 _bucket: &str,
58 _client_options: &ClientOptions,
59) -> object_store::Result<String> {
60 Ok("eu-central-1".to_string())
61}
62
63pub async fn get_s3_object_store_builder(
64 url: &Url,
65 aws_options: &AwsOptions,
66 resolve_region: bool,
67) -> Result<AmazonS3Builder> {
68 let AwsOptions {
69 access_key_id,
70 secret_access_key,
71 session_token,
72 region,
73 endpoint,
74 allow_http,
75 skip_signature,
76 } = aws_options;
77
78 let bucket_name = get_bucket_name(url)?;
79 let mut builder = AmazonS3Builder::from_env().with_bucket_name(bucket_name);
80
81 if let (Some(access_key_id), Some(secret_access_key)) =
82 (access_key_id, secret_access_key)
83 {
84 debug!("Using explicitly provided S3 access_key_id and secret_access_key");
85 builder = builder
86 .with_access_key_id(access_key_id)
87 .with_secret_access_key(secret_access_key);
88
89 if let Some(session_token) = session_token {
90 builder = builder.with_token(session_token);
91 }
92 } else {
93 debug!("Using AWS S3 SDK to determine credentials");
94 let CredentialsFromConfig {
95 region,
96 credentials,
97 } = CredentialsFromConfig::try_new().await?;
98 if let Some(region) = region {
99 builder = builder.with_region(region);
100 }
101 if let Some(credentials) = credentials {
102 let credentials = Arc::new(S3CredentialProvider { credentials });
103 builder = builder.with_credentials(credentials);
104 } else {
105 debug!("No credentials found, defaulting to skip signature ");
106 builder = builder.with_skip_signature(true);
107 }
108 }
109
110 if let Some(region) = region {
111 builder = builder.with_region(region);
112 }
113
114 if builder
116 .get_config_value(&AmazonS3ConfigKey::Region)
117 .is_none()
118 || resolve_region
119 {
120 let region = resolve_bucket_region(bucket_name, &ClientOptions::new()).await?;
121 builder = builder.with_region(region);
122 }
123
124 if let Some(endpoint) = endpoint {
125 if let Ok(endpoint_url) = Url::try_from(endpoint.as_str())
128 && !matches!(allow_http, Some(true))
129 && endpoint_url.scheme() == "http"
130 {
131 return config_err!(
132 "Invalid endpoint: {endpoint}. \
133 HTTP is not allowed for S3 endpoints. \
134 To allow HTTP, set 'aws.allow_http' to true"
135 );
136 }
137
138 builder = builder.with_endpoint(endpoint);
139 }
140
141 if let Some(allow_http) = allow_http {
142 builder = builder.with_allow_http(*allow_http);
143 }
144
145 if let Some(skip_signature) = skip_signature {
146 builder = builder.with_skip_signature(*skip_signature);
147 }
148
149 Ok(builder)
150}
151
152struct CredentialsFromConfig {
154 region: Option<String>,
155 credentials: Option<SharedCredentialsProvider>,
156}
157
158impl CredentialsFromConfig {
159 pub async fn try_new() -> Result<Self> {
161 let config = aws_config::defaults(BehaviorVersion::latest()).load().await;
162 let region = config.region().map(|r| r.to_string());
163
164 let credentials = config
165 .credentials_provider()
166 .ok_or_else(|| {
167 DataFusionError::ObjectStore(Box::new(Generic {
168 store: "S3",
169 source: "Failed to get S3 credentials aws_config".into(),
170 }))
171 })?
172 .clone();
173
174 let credentials = match credentials.provide_credentials().await {
178 Ok(_) => Some(credentials),
179 Err(CredentialsError::CredentialsNotLoaded(_)) => {
180 debug!("Could not use AWS SDK to get credentials");
181 None
182 }
183 Err(e) => {
186 let source_message = if let Some(source) = e.source() {
188 format!(": {source}")
189 } else {
190 String::new()
191 };
192
193 let message = format!(
194 "Error getting credentials from provider: {e}{source_message}",
195 );
196
197 return Err(DataFusionError::ObjectStore(Box::new(Generic {
198 store: "S3",
199 source: message.into(),
200 })));
201 }
202 };
203 Ok(Self {
204 region,
205 credentials,
206 })
207 }
208}
209
210#[derive(Debug)]
211struct S3CredentialProvider {
212 credentials: aws_credential_types::provider::SharedCredentialsProvider,
213}
214
215#[async_trait]
216impl CredentialProvider for S3CredentialProvider {
217 type Credential = AwsCredential;
218
219 async fn get_credential(&self) -> object_store::Result<Arc<Self::Credential>> {
220 let creds =
221 self.credentials
222 .provide_credentials()
223 .await
224 .map_err(|e| Generic {
225 store: "S3",
226 source: Box::new(e),
227 })?;
228 Ok(Arc::new(AwsCredential {
229 key_id: creds.access_key_id().to_string(),
230 secret_key: creds.secret_access_key().to_string(),
231 token: creds.session_token().map(ToString::to_string),
232 }))
233 }
234}
235
236pub fn get_oss_object_store_builder(
237 url: &Url,
238 aws_options: &AwsOptions,
239) -> Result<AmazonS3Builder> {
240 get_object_store_builder(url, aws_options, true)
241}
242
243pub fn get_cos_object_store_builder(
244 url: &Url,
245 aws_options: &AwsOptions,
246) -> Result<AmazonS3Builder> {
247 get_object_store_builder(url, aws_options, false)
248}
249
250fn get_object_store_builder(
251 url: &Url,
252 aws_options: &AwsOptions,
253 virtual_hosted_style_request: bool,
254) -> Result<AmazonS3Builder> {
255 let bucket_name = get_bucket_name(url)?;
256 let mut builder = AmazonS3Builder::from_env()
257 .with_virtual_hosted_style_request(virtual_hosted_style_request)
258 .with_bucket_name(bucket_name)
259 .with_region("do_not_care");
261
262 if let (Some(access_key_id), Some(secret_access_key)) =
263 (&aws_options.access_key_id, &aws_options.secret_access_key)
264 {
265 builder = builder
266 .with_access_key_id(access_key_id)
267 .with_secret_access_key(secret_access_key);
268 }
269
270 if let Some(endpoint) = &aws_options.endpoint {
271 builder = builder.with_endpoint(endpoint);
272 }
273
274 Ok(builder)
275}
276
277pub fn get_gcs_object_store_builder(
278 url: &Url,
279 gs_options: &GcpOptions,
280) -> Result<GoogleCloudStorageBuilder> {
281 let bucket_name = get_bucket_name(url)?;
282 let mut builder = GoogleCloudStorageBuilder::from_env().with_bucket_name(bucket_name);
283
284 if let Some(service_account_path) = &gs_options.service_account_path {
285 builder = builder.with_service_account_path(service_account_path);
286 }
287
288 if let Some(service_account_key) = &gs_options.service_account_key {
289 builder = builder.with_service_account_key(service_account_key);
290 }
291
292 if let Some(application_credentials_path) = &gs_options.application_credentials_path {
293 builder = builder.with_application_credentials(application_credentials_path);
294 }
295
296 Ok(builder)
297}
298
299fn get_bucket_name(url: &Url) -> Result<&str> {
300 url.host_str().ok_or_else(|| {
301 exec_datafusion_err!("Not able to parse bucket name from url: {}", url.as_str())
302 })
303}
304
305#[derive(Default, Debug, Clone)]
307pub struct AwsOptions {
308 pub access_key_id: Option<String>,
310 pub secret_access_key: Option<String>,
312 pub session_token: Option<String>,
314 pub region: Option<String>,
316 pub endpoint: Option<String>,
318 pub allow_http: Option<bool>,
320 pub skip_signature: Option<bool>,
325}
326
327impl ExtensionOptions for AwsOptions {
328 fn as_any(&self) -> &dyn Any {
329 self
330 }
331
332 fn as_any_mut(&mut self) -> &mut dyn Any {
333 self
334 }
335
336 fn cloned(&self) -> Box<dyn ExtensionOptions> {
337 Box::new(self.clone())
338 }
339
340 fn set(&mut self, key: &str, value: &str) -> Result<()> {
341 let (_key, aws_key) = key.split_once('.').unwrap_or((key, ""));
342 let (key, rem) = aws_key.split_once('.').unwrap_or((aws_key, ""));
343 match key {
344 "access_key_id" => {
345 self.access_key_id.set(rem, value)?;
346 }
347 "secret_access_key" => {
348 self.secret_access_key.set(rem, value)?;
349 }
350 "session_token" => {
351 self.session_token.set(rem, value)?;
352 }
353 "region" => {
354 self.region.set(rem, value)?;
355 }
356 "oss" | "cos" | "endpoint" => {
357 self.endpoint.set(rem, value)?;
358 }
359 "allow_http" => {
360 self.allow_http.set(rem, value)?;
361 }
362 "skip_signature" | "nosign" => {
363 self.skip_signature.set(rem, value)?;
364 }
365 _ => {
366 return config_err!("Config value \"{}\" not found on AwsOptions", rem);
367 }
368 }
369 Ok(())
370 }
371
372 fn entries(&self) -> Vec<ConfigEntry> {
373 struct Visitor(Vec<ConfigEntry>);
374
375 impl Visit for Visitor {
376 fn some<V: Display>(
377 &mut self,
378 key: &str,
379 value: V,
380 description: &'static str,
381 ) {
382 self.0.push(ConfigEntry {
383 key: key.to_string(),
384 value: Some(value.to_string()),
385 description,
386 })
387 }
388
389 fn none(&mut self, key: &str, description: &'static str) {
390 self.0.push(ConfigEntry {
391 key: key.to_string(),
392 value: None,
393 description,
394 })
395 }
396 }
397
398 let mut v = Visitor(vec![]);
399 self.access_key_id.visit(&mut v, "access_key_id", "");
400 self.secret_access_key
401 .visit(&mut v, "secret_access_key", "");
402 self.session_token.visit(&mut v, "session_token", "");
403 self.region.visit(&mut v, "region", "");
404 self.endpoint.visit(&mut v, "endpoint", "");
405 self.allow_http.visit(&mut v, "allow_http", "");
406 v.0
407 }
408}
409
410impl ConfigExtension for AwsOptions {
411 const PREFIX: &'static str = "aws";
412}
413
414#[derive(Debug, Clone, Default)]
416pub struct GcpOptions {
417 pub service_account_path: Option<String>,
419 pub service_account_key: Option<String>,
421 pub application_credentials_path: Option<String>,
423}
424
425impl ExtensionOptions for GcpOptions {
426 fn as_any(&self) -> &dyn Any {
427 self
428 }
429
430 fn as_any_mut(&mut self) -> &mut dyn Any {
431 self
432 }
433
434 fn cloned(&self) -> Box<dyn ExtensionOptions> {
435 Box::new(self.clone())
436 }
437
438 fn set(&mut self, key: &str, value: &str) -> Result<()> {
439 let (_key, rem) = key.split_once('.').unwrap_or((key, ""));
440 match rem {
441 "service_account_path" => {
442 self.service_account_path.set(rem, value)?;
443 }
444 "service_account_key" => {
445 self.service_account_key.set(rem, value)?;
446 }
447 "application_credentials_path" => {
448 self.application_credentials_path.set(rem, value)?;
449 }
450 _ => {
451 return config_err!("Config value \"{}\" not found on GcpOptions", rem);
452 }
453 }
454 Ok(())
455 }
456
457 fn entries(&self) -> Vec<ConfigEntry> {
458 struct Visitor(Vec<ConfigEntry>);
459
460 impl Visit for Visitor {
461 fn some<V: Display>(
462 &mut self,
463 key: &str,
464 value: V,
465 description: &'static str,
466 ) {
467 self.0.push(ConfigEntry {
468 key: key.to_string(),
469 value: Some(value.to_string()),
470 description,
471 })
472 }
473
474 fn none(&mut self, key: &str, description: &'static str) {
475 self.0.push(ConfigEntry {
476 key: key.to_string(),
477 value: None,
478 description,
479 })
480 }
481 }
482
483 let mut v = Visitor(vec![]);
484 self.service_account_path
485 .visit(&mut v, "service_account_path", "");
486 self.service_account_key
487 .visit(&mut v, "service_account_key", "");
488 self.application_credentials_path.visit(
489 &mut v,
490 "application_credentials_path",
491 "",
492 );
493 v.0
494 }
495}
496
497impl ConfigExtension for GcpOptions {
498 const PREFIX: &'static str = "gcp";
499}
500
501pub(crate) async fn get_object_store(
502 state: &SessionState,
503 scheme: &str,
504 url: &Url,
505 table_options: &TableOptions,
506 resolve_region: bool,
507) -> Result<Arc<dyn ObjectStore>, DataFusionError> {
508 let store: Arc<dyn ObjectStore> = match scheme {
509 "s3" => {
510 let Some(options) = table_options.extensions.get::<AwsOptions>() else {
511 return exec_err!(
512 "Given table options incompatible with the 's3' scheme"
513 );
514 };
515 let builder =
516 get_s3_object_store_builder(url, options, resolve_region).await?;
517 Arc::new(builder.build()?)
518 }
519 "oss" => {
520 let Some(options) = table_options.extensions.get::<AwsOptions>() else {
521 return exec_err!(
522 "Given table options incompatible with the 'oss' scheme"
523 );
524 };
525 let builder = get_oss_object_store_builder(url, options)?;
526 Arc::new(builder.build()?)
527 }
528 "cos" => {
529 let Some(options) = table_options.extensions.get::<AwsOptions>() else {
530 return exec_err!(
531 "Given table options incompatible with the 'cos' scheme"
532 );
533 };
534 let builder = get_cos_object_store_builder(url, options)?;
535 Arc::new(builder.build()?)
536 }
537 "gs" | "gcs" => {
538 let Some(options) = table_options.extensions.get::<GcpOptions>() else {
539 return exec_err!(
540 "Given table options incompatible with the 'gs'/'gcs' scheme"
541 );
542 };
543 let builder = get_gcs_object_store_builder(url, options)?;
544 Arc::new(builder.build()?)
545 }
546 "http" | "https" => Arc::new(
547 HttpBuilder::new()
548 .with_client_options(ClientOptions::new().with_allow_http(true))
549 .with_url(url.origin().ascii_serialization())
550 .build()?,
551 ),
552 _ => {
553 state
555 .runtime_env()
556 .object_store_registry
557 .get_store(url)
558 .map_err(|_| {
559 exec_datafusion_err!("Unsupported object store scheme: {}", scheme)
560 })?
561 }
562 };
563 Ok(store)
564}
565
566#[cfg(test)]
567mod tests {
568 use crate::cli_context::CliSessionContext;
569
570 use super::*;
571
572 use datafusion::{
573 datasource::listing::ListingTableUrl,
574 logical_expr::{DdlStatement, LogicalPlan},
575 prelude::SessionContext,
576 };
577
578 use object_store::{aws::AmazonS3ConfigKey, gcp::GoogleConfigKey};
579
580 #[tokio::test]
581 async fn s3_object_store_builder_default() -> Result<()> {
582 if let Err(DataFusionError::Execution(e)) = check_aws_envs().await {
583 eprintln!("{e}");
585 return Ok(());
586 }
587
588 let location = "s3://bucket/path/FAKE/file.parquet";
589 unsafe {
591 std::env::set_var("AWS_CONFIG_FILE", "data/aws.config");
592 std::env::set_var("AWS_SHARED_CREDENTIALS_FILE", "data/aws.credentials");
593 }
594
595 let table_url = ListingTableUrl::parse(location)?;
597 let scheme = table_url.scheme();
598 let sql =
599 format!("CREATE EXTERNAL TABLE test STORED AS PARQUET LOCATION '{location}'");
600
601 let ctx = SessionContext::new();
602 ctx.register_table_options_extension_from_scheme(scheme);
603 let table_options = get_table_options(&ctx, &sql).await;
604 let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
605 let builder =
606 get_s3_object_store_builder(table_url.as_ref(), aws_options, false).await?;
607
608 let expected_access_key_id = std::env::var("AWS_ACCESS_KEY_ID").ok();
610 let expected_secret_access_key = std::env::var("AWS_SECRET_ACCESS_KEY").ok();
611 let expected_region = Some(
612 std::env::var("AWS_REGION").unwrap_or_else(|_| "eu-central-1".to_string()),
613 );
614 let expected_endpoint = std::env::var("AWS_ENDPOINT").ok();
615
616 assert_eq!(
618 builder.get_config_value(&AmazonS3ConfigKey::AccessKeyId),
619 expected_access_key_id
620 );
621 assert_eq!(
622 builder.get_config_value(&AmazonS3ConfigKey::SecretAccessKey),
623 expected_secret_access_key
624 );
625 let expected_skip_signature =
627 if expected_access_key_id.is_none() && expected_secret_access_key.is_none() {
628 Some(String::from("true"))
629 } else {
630 Some(String::from("false"))
631 };
632 assert_eq!(
633 builder.get_config_value(&AmazonS3ConfigKey::Region),
634 expected_region
635 );
636 assert_eq!(
637 builder.get_config_value(&AmazonS3ConfigKey::Endpoint),
638 expected_endpoint
639 );
640 assert_eq!(builder.get_config_value(&AmazonS3ConfigKey::Token), None);
641 assert_eq!(
642 builder.get_config_value(&AmazonS3ConfigKey::SkipSignature),
643 expected_skip_signature
644 );
645 Ok(())
646 }
647
648 #[tokio::test]
649 async fn s3_object_store_builder() -> Result<()> {
650 let access_key_id = "FAKE_access_key_id";
652 let secret_access_key = "FAKE_secret_access_key";
653 let region = "fake_us-east-2";
654 let endpoint = "endpoint33";
655 let session_token = "FAKE_session_token";
656 let location = "s3://bucket/path/FAKE/file.parquet";
657
658 let table_url = ListingTableUrl::parse(location)?;
659 let scheme = table_url.scheme();
660 let sql = format!(
661 "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS\
662 ('aws.access_key_id' '{access_key_id}', \
663 'aws.secret_access_key' '{secret_access_key}', \
664 'aws.region' '{region}', \
665 'aws.session_token' {session_token}, \
666 'aws.endpoint' '{endpoint}'\
667 ) LOCATION '{location}'"
668 );
669
670 let ctx = SessionContext::new();
671 ctx.register_table_options_extension_from_scheme(scheme);
672 let table_options = get_table_options(&ctx, &sql).await;
673 let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
674 let builder =
675 get_s3_object_store_builder(table_url.as_ref(), aws_options, false).await?;
676 let config = [
678 (AmazonS3ConfigKey::AccessKeyId, access_key_id),
679 (AmazonS3ConfigKey::SecretAccessKey, secret_access_key),
680 (AmazonS3ConfigKey::Region, region),
681 (AmazonS3ConfigKey::Endpoint, endpoint),
682 (AmazonS3ConfigKey::Token, session_token),
683 ];
684 for (key, value) in config {
685 assert_eq!(value, builder.get_config_value(&key).unwrap());
686 }
687 assert_eq!(
689 builder.get_config_value(&AmazonS3ConfigKey::SkipSignature),
690 Some("false".into())
691 );
692
693 Ok(())
694 }
695
696 #[tokio::test]
697 async fn s3_object_store_builder_allow_http_error() -> Result<()> {
698 let access_key_id = "fake_access_key_id";
699 let secret_access_key = "fake_secret_access_key";
700 let endpoint = "http://endpoint33";
701 let location = "s3://bucket/path/file.parquet";
702
703 let table_url = ListingTableUrl::parse(location)?;
704 let scheme = table_url.scheme();
705 let sql = format!(
706 "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS\
707 ('aws.access_key_id' '{access_key_id}', \
708 'aws.secret_access_key' '{secret_access_key}', \
709 'aws.endpoint' '{endpoint}'\
710 ) LOCATION '{location}'"
711 );
712
713 let ctx = SessionContext::new();
714 ctx.register_table_options_extension_from_scheme(scheme);
715
716 let table_options = get_table_options(&ctx, &sql).await;
717 let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
718 let err = get_s3_object_store_builder(table_url.as_ref(), aws_options, false)
719 .await
720 .unwrap_err();
721
722 assert_eq!(
723 err.to_string().lines().next().unwrap_or_default(),
724 "Invalid or Unsupported Configuration: Invalid endpoint: http://endpoint33. HTTP is not allowed for S3 endpoints. To allow HTTP, set 'aws.allow_http' to true"
725 );
726
727 let sql = format!(
729 "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS\
730 ('aws.access_key_id' '{access_key_id}', \
731 'aws.secret_access_key' '{secret_access_key}', \
732 'aws.endpoint' '{endpoint}',\
733 'aws.allow_http' 'true'\
734 ) LOCATION '{location}'"
735 );
736 let table_options = get_table_options(&ctx, &sql).await;
737
738 let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
739 get_s3_object_store_builder(table_url.as_ref(), aws_options, false).await?;
741
742 Ok(())
743 }
744
745 #[tokio::test]
746 async fn s3_object_store_builder_resolves_region_when_none_provided() -> Result<()> {
747 if let Err(DataFusionError::Execution(e)) = check_aws_envs().await {
748 eprintln!("{e}");
750 return Ok(());
751 }
752 let expected_region = "eu-central-1";
753 let location = "s3://test-bucket/path/file.parquet";
754 unsafe {
756 std::env::set_var("AWS_CONFIG_FILE", "data/aws.config");
757 }
758
759 let table_url = ListingTableUrl::parse(location)?;
760 let aws_options = AwsOptions {
761 region: None, ..Default::default()
763 };
764
765 let builder =
766 get_s3_object_store_builder(table_url.as_ref(), &aws_options, false).await?;
767
768 assert_eq!(
770 builder.get_config_value(&AmazonS3ConfigKey::Region),
771 Some(expected_region.to_string())
772 );
773
774 Ok(())
775 }
776
777 #[tokio::test]
778 async fn s3_object_store_builder_overrides_region_when_resolve_region_enabled()
779 -> Result<()> {
780 if let Err(DataFusionError::Execution(e)) = check_aws_envs().await {
781 eprintln!("{e}");
783 return Ok(());
784 }
785
786 let original_region = "us-east-1";
787 let expected_region = "eu-central-1"; let location = "s3://test-bucket/path/file.parquet";
789
790 let table_url = ListingTableUrl::parse(location)?;
791 let aws_options = AwsOptions {
792 region: Some(original_region.to_string()), ..Default::default()
794 };
795
796 let builder =
797 get_s3_object_store_builder(table_url.as_ref(), &aws_options, true).await?;
798
799 assert_eq!(
801 builder.get_config_value(&AmazonS3ConfigKey::Region),
802 Some(expected_region.to_string())
803 );
804
805 Ok(())
806 }
807
808 #[tokio::test]
809 async fn oss_object_store_builder() -> Result<()> {
810 let access_key_id = "fake_access_key_id";
811 let secret_access_key = "fake_secret_access_key";
812 let endpoint = "fake_endpoint";
813 let location = "oss://bucket/path/file.parquet";
814
815 let table_url = ListingTableUrl::parse(location)?;
816 let scheme = table_url.scheme();
817 let sql = format!(
818 "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS('aws.access_key_id' '{access_key_id}', 'aws.secret_access_key' '{secret_access_key}', 'aws.oss.endpoint' '{endpoint}') LOCATION '{location}'"
819 );
820
821 let ctx = SessionContext::new();
822 ctx.register_table_options_extension_from_scheme(scheme);
823 let table_options = get_table_options(&ctx, &sql).await;
824
825 let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
826 let builder = get_oss_object_store_builder(table_url.as_ref(), aws_options)?;
827 let config = [
829 (AmazonS3ConfigKey::AccessKeyId, access_key_id),
830 (AmazonS3ConfigKey::SecretAccessKey, secret_access_key),
831 (AmazonS3ConfigKey::Endpoint, endpoint),
832 ];
833 for (key, value) in config {
834 assert_eq!(value, builder.get_config_value(&key).unwrap());
835 }
836
837 Ok(())
838 }
839
840 #[tokio::test]
841 async fn gcs_object_store_builder() -> Result<()> {
842 let service_account_path = "fake_service_account_path";
843 let service_account_key = "{\"private_key\": \"fake_private_key.pem\",\"client_email\":\"fake_client_email\"}";
844 let application_credentials_path = "fake_application_credentials_path";
845 let location = "gcs://bucket/path/file.parquet";
846
847 let table_url = ListingTableUrl::parse(location)?;
848 let scheme = table_url.scheme();
849 let sql = format!(
850 "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS('gcp.service_account_path' '{service_account_path}', 'gcp.service_account_key' '{service_account_key}', 'gcp.application_credentials_path' '{application_credentials_path}') LOCATION '{location}'"
851 );
852
853 let ctx = SessionContext::new();
854 ctx.register_table_options_extension_from_scheme(scheme);
855 let table_options = get_table_options(&ctx, &sql).await;
856
857 let gcp_options = table_options.extensions.get::<GcpOptions>().unwrap();
858 let builder = get_gcs_object_store_builder(table_url.as_ref(), gcp_options)?;
859 let config = [
861 (GoogleConfigKey::ServiceAccount, service_account_path),
862 (GoogleConfigKey::ServiceAccountKey, service_account_key),
863 (
864 GoogleConfigKey::ApplicationCredentials,
865 application_credentials_path,
866 ),
867 ];
868 for (key, value) in config {
869 assert_eq!(value, builder.get_config_value(&key).unwrap());
870 }
871
872 Ok(())
873 }
874
875 async fn get_table_options(ctx: &SessionContext, sql: &str) -> TableOptions {
878 let mut plan = ctx.state().create_logical_plan(sql).await.unwrap();
879
880 let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan else {
881 panic!("plan is not a CreateExternalTable");
882 };
883
884 let mut table_options = ctx.state().default_table_options();
885 table_options
886 .alter_with_string_hash_map(&cmd.options)
887 .unwrap();
888 table_options
889 }
890
891 async fn check_aws_envs() -> Result<()> {
892 let aws_envs = [
893 "AWS_ACCESS_KEY_ID",
894 "AWS_SECRET_ACCESS_KEY",
895 "AWS_REGION",
896 "AWS_ALLOW_HTTP",
897 ];
898 for aws_env in aws_envs {
899 std::env::var(aws_env).map_err(|_| {
900 exec_datafusion_err!("aws envs not set, skipping s3 tests")
901 })?;
902 }
903 Ok(())
904 }
905}