1use async_trait::async_trait;
19use aws_config::BehaviorVersion;
20use aws_credential_types::provider::{
21 error::CredentialsError, ProvideCredentials, SharedCredentialsProvider,
22};
23use datafusion::{
24 common::{
25 config::ConfigEntry, config::ConfigExtension, config::ConfigField,
26 config::ExtensionOptions, config::TableOptions, config::Visit, config_err,
27 exec_datafusion_err, exec_err,
28 },
29 error::{DataFusionError, Result},
30 execution::context::SessionState,
31};
32use log::debug;
33use object_store::{
34 aws::{AmazonS3Builder, AmazonS3ConfigKey, AwsCredential},
35 gcp::GoogleCloudStorageBuilder,
36 http::HttpBuilder,
37 ClientOptions, CredentialProvider,
38 Error::Generic,
39 ObjectStore,
40};
41use std::{
42 any::Any,
43 error::Error,
44 fmt::{Debug, Display},
45 sync::Arc,
46};
47use url::Url;
48
49#[cfg(not(test))]
50use object_store::aws::resolve_bucket_region;
51
52#[cfg(test)]
54async fn resolve_bucket_region(
55 _bucket: &str,
56 _client_options: &ClientOptions,
57) -> object_store::Result<String> {
58 Ok("eu-central-1".to_string())
59}
60
61pub async fn get_s3_object_store_builder(
62 url: &Url,
63 aws_options: &AwsOptions,
64 resolve_region: bool,
65) -> Result<AmazonS3Builder> {
66 let AwsOptions {
67 access_key_id,
68 secret_access_key,
69 session_token,
70 region,
71 endpoint,
72 allow_http,
73 skip_signature,
74 } = aws_options;
75
76 let bucket_name = get_bucket_name(url)?;
77 let mut builder = AmazonS3Builder::from_env().with_bucket_name(bucket_name);
78
79 if let (Some(access_key_id), Some(secret_access_key)) =
80 (access_key_id, secret_access_key)
81 {
82 debug!("Using explicitly provided S3 access_key_id and secret_access_key");
83 builder = builder
84 .with_access_key_id(access_key_id)
85 .with_secret_access_key(secret_access_key);
86
87 if let Some(session_token) = session_token {
88 builder = builder.with_token(session_token);
89 }
90 } else {
91 debug!("Using AWS S3 SDK to determine credentials");
92 let CredentialsFromConfig {
93 region,
94 credentials,
95 } = CredentialsFromConfig::try_new().await?;
96 if let Some(region) = region {
97 builder = builder.with_region(region);
98 }
99 if let Some(credentials) = credentials {
100 let credentials = Arc::new(S3CredentialProvider { credentials });
101 builder = builder.with_credentials(credentials);
102 } else {
103 debug!("No credentials found, defaulting to skip signature ");
104 builder = builder.with_skip_signature(true);
105 }
106 }
107
108 if let Some(region) = region {
109 builder = builder.with_region(region);
110 }
111
112 if builder
114 .get_config_value(&AmazonS3ConfigKey::Region)
115 .is_none()
116 || resolve_region
117 {
118 let region = resolve_bucket_region(bucket_name, &ClientOptions::new()).await?;
119 builder = builder.with_region(region);
120 }
121
122 if let Some(endpoint) = endpoint {
123 if let Ok(endpoint_url) = Url::try_from(endpoint.as_str()) {
126 if !matches!(allow_http, Some(true)) && endpoint_url.scheme() == "http" {
127 return config_err!(
128 "Invalid endpoint: {endpoint}. \
129 HTTP is not allowed for S3 endpoints. \
130 To allow HTTP, set 'aws.allow_http' to true"
131 );
132 }
133 }
134
135 builder = builder.with_endpoint(endpoint);
136 }
137
138 if let Some(allow_http) = allow_http {
139 builder = builder.with_allow_http(*allow_http);
140 }
141
142 if let Some(skip_signature) = skip_signature {
143 builder = builder.with_skip_signature(*skip_signature);
144 }
145
146 Ok(builder)
147}
148
149struct CredentialsFromConfig {
151 region: Option<String>,
152 credentials: Option<SharedCredentialsProvider>,
153}
154
155impl CredentialsFromConfig {
156 pub async fn try_new() -> Result<Self> {
158 let config = aws_config::defaults(BehaviorVersion::latest()).load().await;
159 let region = config.region().map(|r| r.to_string());
160
161 let credentials = config
162 .credentials_provider()
163 .ok_or_else(|| {
164 DataFusionError::ObjectStore(Box::new(Generic {
165 store: "S3",
166 source: "Failed to get S3 credentials aws_config".into(),
167 }))
168 })?
169 .clone();
170
171 let credentials = match credentials.provide_credentials().await {
175 Ok(_) => Some(credentials),
176 Err(CredentialsError::CredentialsNotLoaded(_)) => {
177 debug!("Could not use AWS SDK to get credentials");
178 None
179 }
180 Err(e) => {
183 let source_message = if let Some(source) = e.source() {
185 format!(": {source}")
186 } else {
187 String::new()
188 };
189
190 let message = format!(
191 "Error getting credentials from provider: {e}{source_message}",
192 );
193
194 return Err(DataFusionError::ObjectStore(Box::new(Generic {
195 store: "S3",
196 source: message.into(),
197 })));
198 }
199 };
200 Ok(Self {
201 region,
202 credentials,
203 })
204 }
205}
206
207#[derive(Debug)]
208struct S3CredentialProvider {
209 credentials: aws_credential_types::provider::SharedCredentialsProvider,
210}
211
212#[async_trait]
213impl CredentialProvider for S3CredentialProvider {
214 type Credential = AwsCredential;
215
216 async fn get_credential(&self) -> object_store::Result<Arc<Self::Credential>> {
217 let creds =
218 self.credentials
219 .provide_credentials()
220 .await
221 .map_err(|e| Generic {
222 store: "S3",
223 source: Box::new(e),
224 })?;
225 Ok(Arc::new(AwsCredential {
226 key_id: creds.access_key_id().to_string(),
227 secret_key: creds.secret_access_key().to_string(),
228 token: creds.session_token().map(ToString::to_string),
229 }))
230 }
231}
232
233pub fn get_oss_object_store_builder(
234 url: &Url,
235 aws_options: &AwsOptions,
236) -> Result<AmazonS3Builder> {
237 get_object_store_builder(url, aws_options, true)
238}
239
240pub fn get_cos_object_store_builder(
241 url: &Url,
242 aws_options: &AwsOptions,
243) -> Result<AmazonS3Builder> {
244 get_object_store_builder(url, aws_options, false)
245}
246
247fn get_object_store_builder(
248 url: &Url,
249 aws_options: &AwsOptions,
250 virtual_hosted_style_request: bool,
251) -> Result<AmazonS3Builder> {
252 let bucket_name = get_bucket_name(url)?;
253 let mut builder = AmazonS3Builder::from_env()
254 .with_virtual_hosted_style_request(virtual_hosted_style_request)
255 .with_bucket_name(bucket_name)
256 .with_region("do_not_care");
258
259 if let (Some(access_key_id), Some(secret_access_key)) =
260 (&aws_options.access_key_id, &aws_options.secret_access_key)
261 {
262 builder = builder
263 .with_access_key_id(access_key_id)
264 .with_secret_access_key(secret_access_key);
265 }
266
267 if let Some(endpoint) = &aws_options.endpoint {
268 builder = builder.with_endpoint(endpoint);
269 }
270
271 Ok(builder)
272}
273
274pub fn get_gcs_object_store_builder(
275 url: &Url,
276 gs_options: &GcpOptions,
277) -> Result<GoogleCloudStorageBuilder> {
278 let bucket_name = get_bucket_name(url)?;
279 let mut builder = GoogleCloudStorageBuilder::from_env().with_bucket_name(bucket_name);
280
281 if let Some(service_account_path) = &gs_options.service_account_path {
282 builder = builder.with_service_account_path(service_account_path);
283 }
284
285 if let Some(service_account_key) = &gs_options.service_account_key {
286 builder = builder.with_service_account_key(service_account_key);
287 }
288
289 if let Some(application_credentials_path) = &gs_options.application_credentials_path {
290 builder = builder.with_application_credentials(application_credentials_path);
291 }
292
293 Ok(builder)
294}
295
296fn get_bucket_name(url: &Url) -> Result<&str> {
297 url.host_str().ok_or_else(|| {
298 DataFusionError::Execution(format!(
299 "Not able to parse bucket name from url: {}",
300 url.as_str()
301 ))
302 })
303}
304
305#[derive(Default, Debug, Clone)]
307pub struct AwsOptions {
308 pub access_key_id: Option<String>,
310 pub secret_access_key: Option<String>,
312 pub session_token: Option<String>,
314 pub region: Option<String>,
316 pub endpoint: Option<String>,
318 pub allow_http: Option<bool>,
320 pub skip_signature: Option<bool>,
325}
326
327impl ExtensionOptions for AwsOptions {
328 fn as_any(&self) -> &dyn Any {
329 self
330 }
331
332 fn as_any_mut(&mut self) -> &mut dyn Any {
333 self
334 }
335
336 fn cloned(&self) -> Box<dyn ExtensionOptions> {
337 Box::new(self.clone())
338 }
339
340 fn set(&mut self, key: &str, value: &str) -> Result<()> {
341 let (_key, aws_key) = key.split_once('.').unwrap_or((key, ""));
342 let (key, rem) = aws_key.split_once('.').unwrap_or((aws_key, ""));
343 match key {
344 "access_key_id" => {
345 self.access_key_id.set(rem, value)?;
346 }
347 "secret_access_key" => {
348 self.secret_access_key.set(rem, value)?;
349 }
350 "session_token" => {
351 self.session_token.set(rem, value)?;
352 }
353 "region" => {
354 self.region.set(rem, value)?;
355 }
356 "oss" | "cos" | "endpoint" => {
357 self.endpoint.set(rem, value)?;
358 }
359 "allow_http" => {
360 self.allow_http.set(rem, value)?;
361 }
362 "skip_signature" | "nosign" => {
363 self.skip_signature.set(rem, value)?;
364 }
365 _ => {
366 return config_err!("Config value \"{}\" not found on AwsOptions", rem);
367 }
368 }
369 Ok(())
370 }
371
372 fn entries(&self) -> Vec<ConfigEntry> {
373 struct Visitor(Vec<ConfigEntry>);
374
375 impl Visit for Visitor {
376 fn some<V: Display>(
377 &mut self,
378 key: &str,
379 value: V,
380 description: &'static str,
381 ) {
382 self.0.push(ConfigEntry {
383 key: key.to_string(),
384 value: Some(value.to_string()),
385 description,
386 })
387 }
388
389 fn none(&mut self, key: &str, description: &'static str) {
390 self.0.push(ConfigEntry {
391 key: key.to_string(),
392 value: None,
393 description,
394 })
395 }
396 }
397
398 let mut v = Visitor(vec![]);
399 self.access_key_id.visit(&mut v, "access_key_id", "");
400 self.secret_access_key
401 .visit(&mut v, "secret_access_key", "");
402 self.session_token.visit(&mut v, "session_token", "");
403 self.region.visit(&mut v, "region", "");
404 self.endpoint.visit(&mut v, "endpoint", "");
405 self.allow_http.visit(&mut v, "allow_http", "");
406 v.0
407 }
408}
409
410impl ConfigExtension for AwsOptions {
411 const PREFIX: &'static str = "aws";
412}
413
414#[derive(Debug, Clone, Default)]
416pub struct GcpOptions {
417 pub service_account_path: Option<String>,
419 pub service_account_key: Option<String>,
421 pub application_credentials_path: Option<String>,
423}
424
425impl ExtensionOptions for GcpOptions {
426 fn as_any(&self) -> &dyn Any {
427 self
428 }
429
430 fn as_any_mut(&mut self) -> &mut dyn Any {
431 self
432 }
433
434 fn cloned(&self) -> Box<dyn ExtensionOptions> {
435 Box::new(self.clone())
436 }
437
438 fn set(&mut self, key: &str, value: &str) -> Result<()> {
439 let (_key, rem) = key.split_once('.').unwrap_or((key, ""));
440 match rem {
441 "service_account_path" => {
442 self.service_account_path.set(rem, value)?;
443 }
444 "service_account_key" => {
445 self.service_account_key.set(rem, value)?;
446 }
447 "application_credentials_path" => {
448 self.application_credentials_path.set(rem, value)?;
449 }
450 _ => {
451 return config_err!("Config value \"{}\" not found on GcpOptions", rem);
452 }
453 }
454 Ok(())
455 }
456
457 fn entries(&self) -> Vec<ConfigEntry> {
458 struct Visitor(Vec<ConfigEntry>);
459
460 impl Visit for Visitor {
461 fn some<V: Display>(
462 &mut self,
463 key: &str,
464 value: V,
465 description: &'static str,
466 ) {
467 self.0.push(ConfigEntry {
468 key: key.to_string(),
469 value: Some(value.to_string()),
470 description,
471 })
472 }
473
474 fn none(&mut self, key: &str, description: &'static str) {
475 self.0.push(ConfigEntry {
476 key: key.to_string(),
477 value: None,
478 description,
479 })
480 }
481 }
482
483 let mut v = Visitor(vec![]);
484 self.service_account_path
485 .visit(&mut v, "service_account_path", "");
486 self.service_account_key
487 .visit(&mut v, "service_account_key", "");
488 self.application_credentials_path.visit(
489 &mut v,
490 "application_credentials_path",
491 "",
492 );
493 v.0
494 }
495}
496
497impl ConfigExtension for GcpOptions {
498 const PREFIX: &'static str = "gcp";
499}
500
501pub(crate) async fn get_object_store(
502 state: &SessionState,
503 scheme: &str,
504 url: &Url,
505 table_options: &TableOptions,
506 resolve_region: bool,
507) -> Result<Arc<dyn ObjectStore>, DataFusionError> {
508 let store: Arc<dyn ObjectStore> = match scheme {
509 "s3" => {
510 let Some(options) = table_options.extensions.get::<AwsOptions>() else {
511 return exec_err!(
512 "Given table options incompatible with the 's3' scheme"
513 );
514 };
515 let builder =
516 get_s3_object_store_builder(url, options, resolve_region).await?;
517 Arc::new(builder.build()?)
518 }
519 "oss" => {
520 let Some(options) = table_options.extensions.get::<AwsOptions>() else {
521 return exec_err!(
522 "Given table options incompatible with the 'oss' scheme"
523 );
524 };
525 let builder = get_oss_object_store_builder(url, options)?;
526 Arc::new(builder.build()?)
527 }
528 "cos" => {
529 let Some(options) = table_options.extensions.get::<AwsOptions>() else {
530 return exec_err!(
531 "Given table options incompatible with the 'cos' scheme"
532 );
533 };
534 let builder = get_cos_object_store_builder(url, options)?;
535 Arc::new(builder.build()?)
536 }
537 "gs" | "gcs" => {
538 let Some(options) = table_options.extensions.get::<GcpOptions>() else {
539 return exec_err!(
540 "Given table options incompatible with the 'gs'/'gcs' scheme"
541 );
542 };
543 let builder = get_gcs_object_store_builder(url, options)?;
544 Arc::new(builder.build()?)
545 }
546 "http" | "https" => Arc::new(
547 HttpBuilder::new()
548 .with_client_options(ClientOptions::new().with_allow_http(true))
549 .with_url(url.origin().ascii_serialization())
550 .build()?,
551 ),
552 _ => {
553 state
555 .runtime_env()
556 .object_store_registry
557 .get_store(url)
558 .map_err(|_| {
559 exec_datafusion_err!("Unsupported object store scheme: {}", scheme)
560 })?
561 }
562 };
563 Ok(store)
564}
565
566#[cfg(test)]
567mod tests {
568 use crate::cli_context::CliSessionContext;
569
570 use super::*;
571
572 use datafusion::{
573 datasource::listing::ListingTableUrl,
574 logical_expr::{DdlStatement, LogicalPlan},
575 prelude::SessionContext,
576 };
577
578 use object_store::{aws::AmazonS3ConfigKey, gcp::GoogleConfigKey};
579
580 #[tokio::test]
581 async fn s3_object_store_builder_default() -> Result<()> {
582 let location = "s3://bucket/path/FAKE/file.parquet";
583 std::env::set_var("AWS_CONFIG_FILE", "data/aws.config");
585 std::env::set_var("AWS_SHARED_CREDENTIALS_FILE", "data/aws.credentials");
586
587 let table_url = ListingTableUrl::parse(location)?;
589 let scheme = table_url.scheme();
590 let sql =
591 format!("CREATE EXTERNAL TABLE test STORED AS PARQUET LOCATION '{location}'");
592
593 let ctx = SessionContext::new();
594 ctx.register_table_options_extension_from_scheme(scheme);
595 let table_options = get_table_options(&ctx, &sql).await;
596 let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
597 let builder =
598 get_s3_object_store_builder(table_url.as_ref(), aws_options, false).await?;
599
600 let expected_access_key_id = std::env::var("AWS_ACCESS_KEY_ID").ok();
602 let expected_secret_access_key = std::env::var("AWS_SECRET_ACCESS_KEY").ok();
603 let expected_region = Some(
604 std::env::var("AWS_REGION").unwrap_or_else(|_| "eu-central-1".to_string()),
605 );
606 let expected_endpoint = std::env::var("AWS_ENDPOINT").ok();
607
608 assert_eq!(
610 builder.get_config_value(&AmazonS3ConfigKey::AccessKeyId),
611 expected_access_key_id
612 );
613 assert_eq!(
614 builder.get_config_value(&AmazonS3ConfigKey::SecretAccessKey),
615 expected_secret_access_key
616 );
617 let expected_skip_signature =
619 if expected_access_key_id.is_none() && expected_secret_access_key.is_none() {
620 Some(String::from("true"))
621 } else {
622 Some(String::from("false"))
623 };
624 assert_eq!(
625 builder.get_config_value(&AmazonS3ConfigKey::Region),
626 expected_region
627 );
628 assert_eq!(
629 builder.get_config_value(&AmazonS3ConfigKey::Endpoint),
630 expected_endpoint
631 );
632 assert_eq!(builder.get_config_value(&AmazonS3ConfigKey::Token), None);
633 assert_eq!(
634 builder.get_config_value(&AmazonS3ConfigKey::SkipSignature),
635 expected_skip_signature
636 );
637 Ok(())
638 }
639
640 #[tokio::test]
641 async fn s3_object_store_builder() -> Result<()> {
642 let access_key_id = "FAKE_access_key_id";
644 let secret_access_key = "FAKE_secret_access_key";
645 let region = "fake_us-east-2";
646 let endpoint = "endpoint33";
647 let session_token = "FAKE_session_token";
648 let location = "s3://bucket/path/FAKE/file.parquet";
649
650 let table_url = ListingTableUrl::parse(location)?;
651 let scheme = table_url.scheme();
652 let sql = format!(
653 "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS\
654 ('aws.access_key_id' '{access_key_id}', \
655 'aws.secret_access_key' '{secret_access_key}', \
656 'aws.region' '{region}', \
657 'aws.session_token' {session_token}, \
658 'aws.endpoint' '{endpoint}'\
659 ) LOCATION '{location}'"
660 );
661
662 let ctx = SessionContext::new();
663 ctx.register_table_options_extension_from_scheme(scheme);
664 let table_options = get_table_options(&ctx, &sql).await;
665 let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
666 let builder =
667 get_s3_object_store_builder(table_url.as_ref(), aws_options, false).await?;
668 let config = [
670 (AmazonS3ConfigKey::AccessKeyId, access_key_id),
671 (AmazonS3ConfigKey::SecretAccessKey, secret_access_key),
672 (AmazonS3ConfigKey::Region, region),
673 (AmazonS3ConfigKey::Endpoint, endpoint),
674 (AmazonS3ConfigKey::Token, session_token),
675 ];
676 for (key, value) in config {
677 assert_eq!(value, builder.get_config_value(&key).unwrap());
678 }
679 assert_eq!(
681 builder.get_config_value(&AmazonS3ConfigKey::SkipSignature),
682 Some("false".into())
683 );
684
685 Ok(())
686 }
687
688 #[tokio::test]
689 async fn s3_object_store_builder_allow_http_error() -> Result<()> {
690 let access_key_id = "fake_access_key_id";
691 let secret_access_key = "fake_secret_access_key";
692 let endpoint = "http://endpoint33";
693 let location = "s3://bucket/path/file.parquet";
694
695 let table_url = ListingTableUrl::parse(location)?;
696 let scheme = table_url.scheme();
697 let sql = format!(
698 "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS\
699 ('aws.access_key_id' '{access_key_id}', \
700 'aws.secret_access_key' '{secret_access_key}', \
701 'aws.endpoint' '{endpoint}'\
702 ) LOCATION '{location}'"
703 );
704
705 let ctx = SessionContext::new();
706 ctx.register_table_options_extension_from_scheme(scheme);
707
708 let table_options = get_table_options(&ctx, &sql).await;
709 let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
710 let err = get_s3_object_store_builder(table_url.as_ref(), aws_options, false)
711 .await
712 .unwrap_err();
713
714 assert_eq!(err.to_string().lines().next().unwrap_or_default(), "Invalid or Unsupported Configuration: Invalid endpoint: http://endpoint33. HTTP is not allowed for S3 endpoints. To allow HTTP, set 'aws.allow_http' to true");
715
716 let sql = format!(
718 "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS\
719 ('aws.access_key_id' '{access_key_id}', \
720 'aws.secret_access_key' '{secret_access_key}', \
721 'aws.endpoint' '{endpoint}',\
722 'aws.allow_http' 'true'\
723 ) LOCATION '{location}'"
724 );
725 let table_options = get_table_options(&ctx, &sql).await;
726
727 let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
728 get_s3_object_store_builder(table_url.as_ref(), aws_options, false).await?;
730
731 Ok(())
732 }
733
734 #[tokio::test]
735 async fn s3_object_store_builder_resolves_region_when_none_provided() -> Result<()> {
736 let expected_region = "eu-central-1";
737 let location = "s3://test-bucket/path/file.parquet";
738 std::env::set_var("AWS_CONFIG_FILE", "data/aws.config");
740
741 let table_url = ListingTableUrl::parse(location)?;
742 let aws_options = AwsOptions {
743 region: None, ..Default::default()
745 };
746
747 let builder =
748 get_s3_object_store_builder(table_url.as_ref(), &aws_options, false).await?;
749
750 assert_eq!(
752 builder.get_config_value(&AmazonS3ConfigKey::Region),
753 Some(expected_region.to_string())
754 );
755
756 Ok(())
757 }
758
759 #[tokio::test]
760 async fn s3_object_store_builder_overrides_region_when_resolve_region_enabled(
761 ) -> Result<()> {
762 let original_region = "us-east-1";
763 let expected_region = "eu-central-1"; let location = "s3://test-bucket/path/file.parquet";
765
766 let table_url = ListingTableUrl::parse(location)?;
767 let aws_options = AwsOptions {
768 region: Some(original_region.to_string()), ..Default::default()
770 };
771
772 let builder =
773 get_s3_object_store_builder(table_url.as_ref(), &aws_options, true).await?;
774
775 assert_eq!(
777 builder.get_config_value(&AmazonS3ConfigKey::Region),
778 Some(expected_region.to_string())
779 );
780
781 Ok(())
782 }
783
784 #[tokio::test]
785 async fn oss_object_store_builder() -> Result<()> {
786 let access_key_id = "fake_access_key_id";
787 let secret_access_key = "fake_secret_access_key";
788 let endpoint = "fake_endpoint";
789 let location = "oss://bucket/path/file.parquet";
790
791 let table_url = ListingTableUrl::parse(location)?;
792 let scheme = table_url.scheme();
793 let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS('aws.access_key_id' '{access_key_id}', 'aws.secret_access_key' '{secret_access_key}', 'aws.oss.endpoint' '{endpoint}') LOCATION '{location}'");
794
795 let ctx = SessionContext::new();
796 ctx.register_table_options_extension_from_scheme(scheme);
797 let table_options = get_table_options(&ctx, &sql).await;
798
799 let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
800 let builder = get_oss_object_store_builder(table_url.as_ref(), aws_options)?;
801 let config = [
803 (AmazonS3ConfigKey::AccessKeyId, access_key_id),
804 (AmazonS3ConfigKey::SecretAccessKey, secret_access_key),
805 (AmazonS3ConfigKey::Endpoint, endpoint),
806 ];
807 for (key, value) in config {
808 assert_eq!(value, builder.get_config_value(&key).unwrap());
809 }
810
811 Ok(())
812 }
813
814 #[tokio::test]
815 async fn gcs_object_store_builder() -> Result<()> {
816 let service_account_path = "fake_service_account_path";
817 let service_account_key =
818 "{\"private_key\": \"fake_private_key.pem\",\"client_email\":\"fake_client_email\"}";
819 let application_credentials_path = "fake_application_credentials_path";
820 let location = "gcs://bucket/path/file.parquet";
821
822 let table_url = ListingTableUrl::parse(location)?;
823 let scheme = table_url.scheme();
824 let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS('gcp.service_account_path' '{service_account_path}', 'gcp.service_account_key' '{service_account_key}', 'gcp.application_credentials_path' '{application_credentials_path}') LOCATION '{location}'");
825
826 let ctx = SessionContext::new();
827 ctx.register_table_options_extension_from_scheme(scheme);
828 let table_options = get_table_options(&ctx, &sql).await;
829
830 let gcp_options = table_options.extensions.get::<GcpOptions>().unwrap();
831 let builder = get_gcs_object_store_builder(table_url.as_ref(), gcp_options)?;
832 let config = [
834 (GoogleConfigKey::ServiceAccount, service_account_path),
835 (GoogleConfigKey::ServiceAccountKey, service_account_key),
836 (
837 GoogleConfigKey::ApplicationCredentials,
838 application_credentials_path,
839 ),
840 ];
841 for (key, value) in config {
842 assert_eq!(value, builder.get_config_value(&key).unwrap());
843 }
844
845 Ok(())
846 }
847
848 async fn get_table_options(ctx: &SessionContext, sql: &str) -> TableOptions {
851 let mut plan = ctx.state().create_logical_plan(sql).await.unwrap();
852
853 let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan else {
854 panic!("plan is not a CreateExternalTable");
855 };
856
857 let mut table_options = ctx.state().default_table_options();
858 table_options
859 .alter_with_string_hash_map(&cmd.options)
860 .unwrap();
861 table_options
862 }
863}