1pub mod instrumented;
19
20use async_trait::async_trait;
21use aws_config::BehaviorVersion;
22use aws_credential_types::provider::{
23 error::CredentialsError, ProvideCredentials, SharedCredentialsProvider,
24};
25use datafusion::{
26 common::{
27 config::ConfigEntry, config::ConfigExtension, config::ConfigField,
28 config::ExtensionOptions, config::TableOptions, config::Visit, config_err,
29 exec_datafusion_err, exec_err,
30 },
31 error::{DataFusionError, Result},
32 execution::context::SessionState,
33};
34use log::debug;
35use object_store::{
36 aws::{AmazonS3Builder, AmazonS3ConfigKey, AwsCredential},
37 gcp::GoogleCloudStorageBuilder,
38 http::HttpBuilder,
39 ClientOptions, CredentialProvider,
40 Error::Generic,
41 ObjectStore,
42};
43use std::{
44 any::Any,
45 error::Error,
46 fmt::{Debug, Display},
47 sync::Arc,
48};
49use url::Url;
50
51#[cfg(not(test))]
52use object_store::aws::resolve_bucket_region;
53
54#[cfg(test)]
56async fn resolve_bucket_region(
57 _bucket: &str,
58 _client_options: &ClientOptions,
59) -> object_store::Result<String> {
60 Ok("eu-central-1".to_string())
61}
62
63pub async fn get_s3_object_store_builder(
64 url: &Url,
65 aws_options: &AwsOptions,
66 resolve_region: bool,
67) -> Result<AmazonS3Builder> {
68 let AwsOptions {
69 access_key_id,
70 secret_access_key,
71 session_token,
72 region,
73 endpoint,
74 allow_http,
75 skip_signature,
76 } = aws_options;
77
78 let bucket_name = get_bucket_name(url)?;
79 let mut builder = AmazonS3Builder::from_env().with_bucket_name(bucket_name);
80
81 if let (Some(access_key_id), Some(secret_access_key)) =
82 (access_key_id, secret_access_key)
83 {
84 debug!("Using explicitly provided S3 access_key_id and secret_access_key");
85 builder = builder
86 .with_access_key_id(access_key_id)
87 .with_secret_access_key(secret_access_key);
88
89 if let Some(session_token) = session_token {
90 builder = builder.with_token(session_token);
91 }
92 } else {
93 debug!("Using AWS S3 SDK to determine credentials");
94 let CredentialsFromConfig {
95 region,
96 credentials,
97 } = CredentialsFromConfig::try_new().await?;
98 if let Some(region) = region {
99 builder = builder.with_region(region);
100 }
101 if let Some(credentials) = credentials {
102 let credentials = Arc::new(S3CredentialProvider { credentials });
103 builder = builder.with_credentials(credentials);
104 } else {
105 debug!("No credentials found, defaulting to skip signature ");
106 builder = builder.with_skip_signature(true);
107 }
108 }
109
110 if let Some(region) = region {
111 builder = builder.with_region(region);
112 }
113
114 if builder
116 .get_config_value(&AmazonS3ConfigKey::Region)
117 .is_none()
118 || resolve_region
119 {
120 let region = resolve_bucket_region(bucket_name, &ClientOptions::new()).await?;
121 builder = builder.with_region(region);
122 }
123
124 if let Some(endpoint) = endpoint {
125 if let Ok(endpoint_url) = Url::try_from(endpoint.as_str()) {
128 if !matches!(allow_http, Some(true)) && endpoint_url.scheme() == "http" {
129 return config_err!(
130 "Invalid endpoint: {endpoint}. \
131 HTTP is not allowed for S3 endpoints. \
132 To allow HTTP, set 'aws.allow_http' to true"
133 );
134 }
135 }
136
137 builder = builder.with_endpoint(endpoint);
138 }
139
140 if let Some(allow_http) = allow_http {
141 builder = builder.with_allow_http(*allow_http);
142 }
143
144 if let Some(skip_signature) = skip_signature {
145 builder = builder.with_skip_signature(*skip_signature);
146 }
147
148 Ok(builder)
149}
150
151struct CredentialsFromConfig {
153 region: Option<String>,
154 credentials: Option<SharedCredentialsProvider>,
155}
156
157impl CredentialsFromConfig {
158 pub async fn try_new() -> Result<Self> {
160 let config = aws_config::defaults(BehaviorVersion::latest()).load().await;
161 let region = config.region().map(|r| r.to_string());
162
163 let credentials = config
164 .credentials_provider()
165 .ok_or_else(|| {
166 DataFusionError::ObjectStore(Box::new(Generic {
167 store: "S3",
168 source: "Failed to get S3 credentials aws_config".into(),
169 }))
170 })?
171 .clone();
172
173 let credentials = match credentials.provide_credentials().await {
177 Ok(_) => Some(credentials),
178 Err(CredentialsError::CredentialsNotLoaded(_)) => {
179 debug!("Could not use AWS SDK to get credentials");
180 None
181 }
182 Err(e) => {
185 let source_message = if let Some(source) = e.source() {
187 format!(": {source}")
188 } else {
189 String::new()
190 };
191
192 let message = format!(
193 "Error getting credentials from provider: {e}{source_message}",
194 );
195
196 return Err(DataFusionError::ObjectStore(Box::new(Generic {
197 store: "S3",
198 source: message.into(),
199 })));
200 }
201 };
202 Ok(Self {
203 region,
204 credentials,
205 })
206 }
207}
208
209#[derive(Debug)]
210struct S3CredentialProvider {
211 credentials: aws_credential_types::provider::SharedCredentialsProvider,
212}
213
214#[async_trait]
215impl CredentialProvider for S3CredentialProvider {
216 type Credential = AwsCredential;
217
218 async fn get_credential(&self) -> object_store::Result<Arc<Self::Credential>> {
219 let creds =
220 self.credentials
221 .provide_credentials()
222 .await
223 .map_err(|e| Generic {
224 store: "S3",
225 source: Box::new(e),
226 })?;
227 Ok(Arc::new(AwsCredential {
228 key_id: creds.access_key_id().to_string(),
229 secret_key: creds.secret_access_key().to_string(),
230 token: creds.session_token().map(ToString::to_string),
231 }))
232 }
233}
234
235pub fn get_oss_object_store_builder(
236 url: &Url,
237 aws_options: &AwsOptions,
238) -> Result<AmazonS3Builder> {
239 get_object_store_builder(url, aws_options, true)
240}
241
242pub fn get_cos_object_store_builder(
243 url: &Url,
244 aws_options: &AwsOptions,
245) -> Result<AmazonS3Builder> {
246 get_object_store_builder(url, aws_options, false)
247}
248
249fn get_object_store_builder(
250 url: &Url,
251 aws_options: &AwsOptions,
252 virtual_hosted_style_request: bool,
253) -> Result<AmazonS3Builder> {
254 let bucket_name = get_bucket_name(url)?;
255 let mut builder = AmazonS3Builder::from_env()
256 .with_virtual_hosted_style_request(virtual_hosted_style_request)
257 .with_bucket_name(bucket_name)
258 .with_region("do_not_care");
260
261 if let (Some(access_key_id), Some(secret_access_key)) =
262 (&aws_options.access_key_id, &aws_options.secret_access_key)
263 {
264 builder = builder
265 .with_access_key_id(access_key_id)
266 .with_secret_access_key(secret_access_key);
267 }
268
269 if let Some(endpoint) = &aws_options.endpoint {
270 builder = builder.with_endpoint(endpoint);
271 }
272
273 Ok(builder)
274}
275
276pub fn get_gcs_object_store_builder(
277 url: &Url,
278 gs_options: &GcpOptions,
279) -> Result<GoogleCloudStorageBuilder> {
280 let bucket_name = get_bucket_name(url)?;
281 let mut builder = GoogleCloudStorageBuilder::from_env().with_bucket_name(bucket_name);
282
283 if let Some(service_account_path) = &gs_options.service_account_path {
284 builder = builder.with_service_account_path(service_account_path);
285 }
286
287 if let Some(service_account_key) = &gs_options.service_account_key {
288 builder = builder.with_service_account_key(service_account_key);
289 }
290
291 if let Some(application_credentials_path) = &gs_options.application_credentials_path {
292 builder = builder.with_application_credentials(application_credentials_path);
293 }
294
295 Ok(builder)
296}
297
298fn get_bucket_name(url: &Url) -> Result<&str> {
299 url.host_str().ok_or_else(|| {
300 exec_datafusion_err!("Not able to parse bucket name from url: {}", url.as_str())
301 })
302}
303
304#[derive(Default, Debug, Clone)]
306pub struct AwsOptions {
307 pub access_key_id: Option<String>,
309 pub secret_access_key: Option<String>,
311 pub session_token: Option<String>,
313 pub region: Option<String>,
315 pub endpoint: Option<String>,
317 pub allow_http: Option<bool>,
319 pub skip_signature: Option<bool>,
324}
325
326impl ExtensionOptions for AwsOptions {
327 fn as_any(&self) -> &dyn Any {
328 self
329 }
330
331 fn as_any_mut(&mut self) -> &mut dyn Any {
332 self
333 }
334
335 fn cloned(&self) -> Box<dyn ExtensionOptions> {
336 Box::new(self.clone())
337 }
338
339 fn set(&mut self, key: &str, value: &str) -> Result<()> {
340 let (_key, aws_key) = key.split_once('.').unwrap_or((key, ""));
341 let (key, rem) = aws_key.split_once('.').unwrap_or((aws_key, ""));
342 match key {
343 "access_key_id" => {
344 self.access_key_id.set(rem, value)?;
345 }
346 "secret_access_key" => {
347 self.secret_access_key.set(rem, value)?;
348 }
349 "session_token" => {
350 self.session_token.set(rem, value)?;
351 }
352 "region" => {
353 self.region.set(rem, value)?;
354 }
355 "oss" | "cos" | "endpoint" => {
356 self.endpoint.set(rem, value)?;
357 }
358 "allow_http" => {
359 self.allow_http.set(rem, value)?;
360 }
361 "skip_signature" | "nosign" => {
362 self.skip_signature.set(rem, value)?;
363 }
364 _ => {
365 return config_err!("Config value \"{}\" not found on AwsOptions", rem);
366 }
367 }
368 Ok(())
369 }
370
371 fn entries(&self) -> Vec<ConfigEntry> {
372 struct Visitor(Vec<ConfigEntry>);
373
374 impl Visit for Visitor {
375 fn some<V: Display>(
376 &mut self,
377 key: &str,
378 value: V,
379 description: &'static str,
380 ) {
381 self.0.push(ConfigEntry {
382 key: key.to_string(),
383 value: Some(value.to_string()),
384 description,
385 })
386 }
387
388 fn none(&mut self, key: &str, description: &'static str) {
389 self.0.push(ConfigEntry {
390 key: key.to_string(),
391 value: None,
392 description,
393 })
394 }
395 }
396
397 let mut v = Visitor(vec![]);
398 self.access_key_id.visit(&mut v, "access_key_id", "");
399 self.secret_access_key
400 .visit(&mut v, "secret_access_key", "");
401 self.session_token.visit(&mut v, "session_token", "");
402 self.region.visit(&mut v, "region", "");
403 self.endpoint.visit(&mut v, "endpoint", "");
404 self.allow_http.visit(&mut v, "allow_http", "");
405 v.0
406 }
407}
408
409impl ConfigExtension for AwsOptions {
410 const PREFIX: &'static str = "aws";
411}
412
413#[derive(Debug, Clone, Default)]
415pub struct GcpOptions {
416 pub service_account_path: Option<String>,
418 pub service_account_key: Option<String>,
420 pub application_credentials_path: Option<String>,
422}
423
424impl ExtensionOptions for GcpOptions {
425 fn as_any(&self) -> &dyn Any {
426 self
427 }
428
429 fn as_any_mut(&mut self) -> &mut dyn Any {
430 self
431 }
432
433 fn cloned(&self) -> Box<dyn ExtensionOptions> {
434 Box::new(self.clone())
435 }
436
437 fn set(&mut self, key: &str, value: &str) -> Result<()> {
438 let (_key, rem) = key.split_once('.').unwrap_or((key, ""));
439 match rem {
440 "service_account_path" => {
441 self.service_account_path.set(rem, value)?;
442 }
443 "service_account_key" => {
444 self.service_account_key.set(rem, value)?;
445 }
446 "application_credentials_path" => {
447 self.application_credentials_path.set(rem, value)?;
448 }
449 _ => {
450 return config_err!("Config value \"{}\" not found on GcpOptions", rem);
451 }
452 }
453 Ok(())
454 }
455
456 fn entries(&self) -> Vec<ConfigEntry> {
457 struct Visitor(Vec<ConfigEntry>);
458
459 impl Visit for Visitor {
460 fn some<V: Display>(
461 &mut self,
462 key: &str,
463 value: V,
464 description: &'static str,
465 ) {
466 self.0.push(ConfigEntry {
467 key: key.to_string(),
468 value: Some(value.to_string()),
469 description,
470 })
471 }
472
473 fn none(&mut self, key: &str, description: &'static str) {
474 self.0.push(ConfigEntry {
475 key: key.to_string(),
476 value: None,
477 description,
478 })
479 }
480 }
481
482 let mut v = Visitor(vec![]);
483 self.service_account_path
484 .visit(&mut v, "service_account_path", "");
485 self.service_account_key
486 .visit(&mut v, "service_account_key", "");
487 self.application_credentials_path.visit(
488 &mut v,
489 "application_credentials_path",
490 "",
491 );
492 v.0
493 }
494}
495
496impl ConfigExtension for GcpOptions {
497 const PREFIX: &'static str = "gcp";
498}
499
500pub(crate) async fn get_object_store(
501 state: &SessionState,
502 scheme: &str,
503 url: &Url,
504 table_options: &TableOptions,
505 resolve_region: bool,
506) -> Result<Arc<dyn ObjectStore>, DataFusionError> {
507 let store: Arc<dyn ObjectStore> = match scheme {
508 "s3" => {
509 let Some(options) = table_options.extensions.get::<AwsOptions>() else {
510 return exec_err!(
511 "Given table options incompatible with the 's3' scheme"
512 );
513 };
514 let builder =
515 get_s3_object_store_builder(url, options, resolve_region).await?;
516 Arc::new(builder.build()?)
517 }
518 "oss" => {
519 let Some(options) = table_options.extensions.get::<AwsOptions>() else {
520 return exec_err!(
521 "Given table options incompatible with the 'oss' scheme"
522 );
523 };
524 let builder = get_oss_object_store_builder(url, options)?;
525 Arc::new(builder.build()?)
526 }
527 "cos" => {
528 let Some(options) = table_options.extensions.get::<AwsOptions>() else {
529 return exec_err!(
530 "Given table options incompatible with the 'cos' scheme"
531 );
532 };
533 let builder = get_cos_object_store_builder(url, options)?;
534 Arc::new(builder.build()?)
535 }
536 "gs" | "gcs" => {
537 let Some(options) = table_options.extensions.get::<GcpOptions>() else {
538 return exec_err!(
539 "Given table options incompatible with the 'gs'/'gcs' scheme"
540 );
541 };
542 let builder = get_gcs_object_store_builder(url, options)?;
543 Arc::new(builder.build()?)
544 }
545 "http" | "https" => Arc::new(
546 HttpBuilder::new()
547 .with_client_options(ClientOptions::new().with_allow_http(true))
548 .with_url(url.origin().ascii_serialization())
549 .build()?,
550 ),
551 _ => {
552 state
554 .runtime_env()
555 .object_store_registry
556 .get_store(url)
557 .map_err(|_| {
558 exec_datafusion_err!("Unsupported object store scheme: {}", scheme)
559 })?
560 }
561 };
562 Ok(store)
563}
564
565#[cfg(test)]
566mod tests {
567 use crate::cli_context::CliSessionContext;
568
569 use super::*;
570
571 use datafusion::{
572 datasource::listing::ListingTableUrl,
573 logical_expr::{DdlStatement, LogicalPlan},
574 prelude::SessionContext,
575 };
576
577 use object_store::{aws::AmazonS3ConfigKey, gcp::GoogleConfigKey};
578
579 #[tokio::test]
580 async fn s3_object_store_builder_default() -> Result<()> {
581 if let Err(DataFusionError::Execution(e)) = check_aws_envs().await {
582 eprintln!("{e}");
584 return Ok(());
585 }
586
587 let location = "s3://bucket/path/FAKE/file.parquet";
588 std::env::set_var("AWS_CONFIG_FILE", "data/aws.config");
590 std::env::set_var("AWS_SHARED_CREDENTIALS_FILE", "data/aws.credentials");
591
592 let table_url = ListingTableUrl::parse(location)?;
594 let scheme = table_url.scheme();
595 let sql =
596 format!("CREATE EXTERNAL TABLE test STORED AS PARQUET LOCATION '{location}'");
597
598 let ctx = SessionContext::new();
599 ctx.register_table_options_extension_from_scheme(scheme);
600 let table_options = get_table_options(&ctx, &sql).await;
601 let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
602 let builder =
603 get_s3_object_store_builder(table_url.as_ref(), aws_options, false).await?;
604
605 let expected_access_key_id = std::env::var("AWS_ACCESS_KEY_ID").ok();
607 let expected_secret_access_key = std::env::var("AWS_SECRET_ACCESS_KEY").ok();
608 let expected_region = Some(
609 std::env::var("AWS_REGION").unwrap_or_else(|_| "eu-central-1".to_string()),
610 );
611 let expected_endpoint = std::env::var("AWS_ENDPOINT").ok();
612
613 assert_eq!(
615 builder.get_config_value(&AmazonS3ConfigKey::AccessKeyId),
616 expected_access_key_id
617 );
618 assert_eq!(
619 builder.get_config_value(&AmazonS3ConfigKey::SecretAccessKey),
620 expected_secret_access_key
621 );
622 let expected_skip_signature =
624 if expected_access_key_id.is_none() && expected_secret_access_key.is_none() {
625 Some(String::from("true"))
626 } else {
627 Some(String::from("false"))
628 };
629 assert_eq!(
630 builder.get_config_value(&AmazonS3ConfigKey::Region),
631 expected_region
632 );
633 assert_eq!(
634 builder.get_config_value(&AmazonS3ConfigKey::Endpoint),
635 expected_endpoint
636 );
637 assert_eq!(builder.get_config_value(&AmazonS3ConfigKey::Token), None);
638 assert_eq!(
639 builder.get_config_value(&AmazonS3ConfigKey::SkipSignature),
640 expected_skip_signature
641 );
642 Ok(())
643 }
644
645 #[tokio::test]
646 async fn s3_object_store_builder() -> Result<()> {
647 let access_key_id = "FAKE_access_key_id";
649 let secret_access_key = "FAKE_secret_access_key";
650 let region = "fake_us-east-2";
651 let endpoint = "endpoint33";
652 let session_token = "FAKE_session_token";
653 let location = "s3://bucket/path/FAKE/file.parquet";
654
655 let table_url = ListingTableUrl::parse(location)?;
656 let scheme = table_url.scheme();
657 let sql = format!(
658 "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS\
659 ('aws.access_key_id' '{access_key_id}', \
660 'aws.secret_access_key' '{secret_access_key}', \
661 'aws.region' '{region}', \
662 'aws.session_token' {session_token}, \
663 'aws.endpoint' '{endpoint}'\
664 ) LOCATION '{location}'"
665 );
666
667 let ctx = SessionContext::new();
668 ctx.register_table_options_extension_from_scheme(scheme);
669 let table_options = get_table_options(&ctx, &sql).await;
670 let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
671 let builder =
672 get_s3_object_store_builder(table_url.as_ref(), aws_options, false).await?;
673 let config = [
675 (AmazonS3ConfigKey::AccessKeyId, access_key_id),
676 (AmazonS3ConfigKey::SecretAccessKey, secret_access_key),
677 (AmazonS3ConfigKey::Region, region),
678 (AmazonS3ConfigKey::Endpoint, endpoint),
679 (AmazonS3ConfigKey::Token, session_token),
680 ];
681 for (key, value) in config {
682 assert_eq!(value, builder.get_config_value(&key).unwrap());
683 }
684 assert_eq!(
686 builder.get_config_value(&AmazonS3ConfigKey::SkipSignature),
687 Some("false".into())
688 );
689
690 Ok(())
691 }
692
693 #[tokio::test]
694 async fn s3_object_store_builder_allow_http_error() -> Result<()> {
695 let access_key_id = "fake_access_key_id";
696 let secret_access_key = "fake_secret_access_key";
697 let endpoint = "http://endpoint33";
698 let location = "s3://bucket/path/file.parquet";
699
700 let table_url = ListingTableUrl::parse(location)?;
701 let scheme = table_url.scheme();
702 let sql = format!(
703 "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS\
704 ('aws.access_key_id' '{access_key_id}', \
705 'aws.secret_access_key' '{secret_access_key}', \
706 'aws.endpoint' '{endpoint}'\
707 ) LOCATION '{location}'"
708 );
709
710 let ctx = SessionContext::new();
711 ctx.register_table_options_extension_from_scheme(scheme);
712
713 let table_options = get_table_options(&ctx, &sql).await;
714 let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
715 let err = get_s3_object_store_builder(table_url.as_ref(), aws_options, false)
716 .await
717 .unwrap_err();
718
719 assert_eq!(err.to_string().lines().next().unwrap_or_default(), "Invalid or Unsupported Configuration: Invalid endpoint: http://endpoint33. HTTP is not allowed for S3 endpoints. To allow HTTP, set 'aws.allow_http' to true");
720
721 let sql = format!(
723 "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS\
724 ('aws.access_key_id' '{access_key_id}', \
725 'aws.secret_access_key' '{secret_access_key}', \
726 'aws.endpoint' '{endpoint}',\
727 'aws.allow_http' 'true'\
728 ) LOCATION '{location}'"
729 );
730 let table_options = get_table_options(&ctx, &sql).await;
731
732 let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
733 get_s3_object_store_builder(table_url.as_ref(), aws_options, false).await?;
735
736 Ok(())
737 }
738
739 #[tokio::test]
740 async fn s3_object_store_builder_resolves_region_when_none_provided() -> Result<()> {
741 if let Err(DataFusionError::Execution(e)) = check_aws_envs().await {
742 eprintln!("{e}");
744 return Ok(());
745 }
746 let expected_region = "eu-central-1";
747 let location = "s3://test-bucket/path/file.parquet";
748 std::env::set_var("AWS_CONFIG_FILE", "data/aws.config");
750
751 let table_url = ListingTableUrl::parse(location)?;
752 let aws_options = AwsOptions {
753 region: None, ..Default::default()
755 };
756
757 let builder =
758 get_s3_object_store_builder(table_url.as_ref(), &aws_options, false).await?;
759
760 assert_eq!(
762 builder.get_config_value(&AmazonS3ConfigKey::Region),
763 Some(expected_region.to_string())
764 );
765
766 Ok(())
767 }
768
769 #[tokio::test]
770 async fn s3_object_store_builder_overrides_region_when_resolve_region_enabled(
771 ) -> Result<()> {
772 if let Err(DataFusionError::Execution(e)) = check_aws_envs().await {
773 eprintln!("{e}");
775 return Ok(());
776 }
777
778 let original_region = "us-east-1";
779 let expected_region = "eu-central-1"; let location = "s3://test-bucket/path/file.parquet";
781
782 let table_url = ListingTableUrl::parse(location)?;
783 let aws_options = AwsOptions {
784 region: Some(original_region.to_string()), ..Default::default()
786 };
787
788 let builder =
789 get_s3_object_store_builder(table_url.as_ref(), &aws_options, true).await?;
790
791 assert_eq!(
793 builder.get_config_value(&AmazonS3ConfigKey::Region),
794 Some(expected_region.to_string())
795 );
796
797 Ok(())
798 }
799
800 #[tokio::test]
801 async fn oss_object_store_builder() -> Result<()> {
802 let access_key_id = "fake_access_key_id";
803 let secret_access_key = "fake_secret_access_key";
804 let endpoint = "fake_endpoint";
805 let location = "oss://bucket/path/file.parquet";
806
807 let table_url = ListingTableUrl::parse(location)?;
808 let scheme = table_url.scheme();
809 let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS('aws.access_key_id' '{access_key_id}', 'aws.secret_access_key' '{secret_access_key}', 'aws.oss.endpoint' '{endpoint}') LOCATION '{location}'");
810
811 let ctx = SessionContext::new();
812 ctx.register_table_options_extension_from_scheme(scheme);
813 let table_options = get_table_options(&ctx, &sql).await;
814
815 let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
816 let builder = get_oss_object_store_builder(table_url.as_ref(), aws_options)?;
817 let config = [
819 (AmazonS3ConfigKey::AccessKeyId, access_key_id),
820 (AmazonS3ConfigKey::SecretAccessKey, secret_access_key),
821 (AmazonS3ConfigKey::Endpoint, endpoint),
822 ];
823 for (key, value) in config {
824 assert_eq!(value, builder.get_config_value(&key).unwrap());
825 }
826
827 Ok(())
828 }
829
830 #[tokio::test]
831 async fn gcs_object_store_builder() -> Result<()> {
832 let service_account_path = "fake_service_account_path";
833 let service_account_key =
834 "{\"private_key\": \"fake_private_key.pem\",\"client_email\":\"fake_client_email\"}";
835 let application_credentials_path = "fake_application_credentials_path";
836 let location = "gcs://bucket/path/file.parquet";
837
838 let table_url = ListingTableUrl::parse(location)?;
839 let scheme = table_url.scheme();
840 let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS('gcp.service_account_path' '{service_account_path}', 'gcp.service_account_key' '{service_account_key}', 'gcp.application_credentials_path' '{application_credentials_path}') LOCATION '{location}'");
841
842 let ctx = SessionContext::new();
843 ctx.register_table_options_extension_from_scheme(scheme);
844 let table_options = get_table_options(&ctx, &sql).await;
845
846 let gcp_options = table_options.extensions.get::<GcpOptions>().unwrap();
847 let builder = get_gcs_object_store_builder(table_url.as_ref(), gcp_options)?;
848 let config = [
850 (GoogleConfigKey::ServiceAccount, service_account_path),
851 (GoogleConfigKey::ServiceAccountKey, service_account_key),
852 (
853 GoogleConfigKey::ApplicationCredentials,
854 application_credentials_path,
855 ),
856 ];
857 for (key, value) in config {
858 assert_eq!(value, builder.get_config_value(&key).unwrap());
859 }
860
861 Ok(())
862 }
863
864 async fn get_table_options(ctx: &SessionContext, sql: &str) -> TableOptions {
867 let mut plan = ctx.state().create_logical_plan(sql).await.unwrap();
868
869 let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan else {
870 panic!("plan is not a CreateExternalTable");
871 };
872
873 let mut table_options = ctx.state().default_table_options();
874 table_options
875 .alter_with_string_hash_map(&cmd.options)
876 .unwrap();
877 table_options
878 }
879
880 async fn check_aws_envs() -> Result<()> {
881 let aws_envs = [
882 "AWS_ACCESS_KEY_ID",
883 "AWS_SECRET_ACCESS_KEY",
884 "AWS_REGION",
885 "AWS_ALLOW_HTTP",
886 ];
887 for aws_env in aws_envs {
888 std::env::var(aws_env).map_err(|_| {
889 exec_datafusion_err!("aws envs not set, skipping s3 tests")
890 })?;
891 }
892 Ok(())
893 }
894}