1use async_trait::async_trait;
19use aws_config::BehaviorVersion;
20use aws_credential_types::provider::{
21 error::CredentialsError, ProvideCredentials, SharedCredentialsProvider,
22};
23use datafusion::{
24 common::{
25 config::ConfigEntry, config::ConfigExtension, config::ConfigField,
26 config::ExtensionOptions, config::TableOptions, config::Visit, config_err,
27 exec_datafusion_err, exec_err,
28 },
29 error::{DataFusionError, Result},
30 execution::context::SessionState,
31};
32use log::debug;
33use object_store::{
34 aws::{AmazonS3Builder, AmazonS3ConfigKey, AwsCredential},
35 gcp::GoogleCloudStorageBuilder,
36 http::HttpBuilder,
37 ClientOptions, CredentialProvider,
38 Error::Generic,
39 ObjectStore,
40};
41use std::{
42 any::Any,
43 error::Error,
44 fmt::{Debug, Display},
45 sync::Arc,
46};
47use url::Url;
48
49#[cfg(not(test))]
50use object_store::aws::resolve_bucket_region;
51
52#[cfg(test)]
54async fn resolve_bucket_region(
55 _bucket: &str,
56 _client_options: &ClientOptions,
57) -> object_store::Result<String> {
58 Ok("eu-central-1".to_string())
59}
60
61pub async fn get_s3_object_store_builder(
62 url: &Url,
63 aws_options: &AwsOptions,
64 resolve_region: bool,
65) -> Result<AmazonS3Builder> {
66 let AwsOptions {
67 access_key_id,
68 secret_access_key,
69 session_token,
70 region,
71 endpoint,
72 allow_http,
73 skip_signature,
74 } = aws_options;
75
76 let bucket_name = get_bucket_name(url)?;
77 let mut builder = AmazonS3Builder::from_env().with_bucket_name(bucket_name);
78
79 if let (Some(access_key_id), Some(secret_access_key)) =
80 (access_key_id, secret_access_key)
81 {
82 debug!("Using explicitly provided S3 access_key_id and secret_access_key");
83 builder = builder
84 .with_access_key_id(access_key_id)
85 .with_secret_access_key(secret_access_key);
86
87 if let Some(session_token) = session_token {
88 builder = builder.with_token(session_token);
89 }
90 } else {
91 debug!("Using AWS S3 SDK to determine credentials");
92 let CredentialsFromConfig {
93 region,
94 credentials,
95 } = CredentialsFromConfig::try_new().await?;
96 if let Some(region) = region {
97 builder = builder.with_region(region);
98 }
99 if let Some(credentials) = credentials {
100 let credentials = Arc::new(S3CredentialProvider { credentials });
101 builder = builder.with_credentials(credentials);
102 } else {
103 debug!("No credentials found, defaulting to skip signature ");
104 builder = builder.with_skip_signature(true);
105 }
106 }
107
108 if let Some(region) = region {
109 builder = builder.with_region(region);
110 }
111
112 if builder
114 .get_config_value(&AmazonS3ConfigKey::Region)
115 .is_none()
116 || resolve_region
117 {
118 let region = resolve_bucket_region(bucket_name, &ClientOptions::new()).await?;
119 builder = builder.with_region(region);
120 }
121
122 if let Some(endpoint) = endpoint {
123 if let Ok(endpoint_url) = Url::try_from(endpoint.as_str()) {
126 if !matches!(allow_http, Some(true)) && endpoint_url.scheme() == "http" {
127 return config_err!(
128 "Invalid endpoint: {endpoint}. \
129 HTTP is not allowed for S3 endpoints. \
130 To allow HTTP, set 'aws.allow_http' to true"
131 );
132 }
133 }
134
135 builder = builder.with_endpoint(endpoint);
136 }
137
138 if let Some(allow_http) = allow_http {
139 builder = builder.with_allow_http(*allow_http);
140 }
141
142 if let Some(skip_signature) = skip_signature {
143 builder = builder.with_skip_signature(*skip_signature);
144 }
145
146 Ok(builder)
147}
148
149struct CredentialsFromConfig {
151 region: Option<String>,
152 credentials: Option<SharedCredentialsProvider>,
153}
154
155impl CredentialsFromConfig {
156 pub async fn try_new() -> Result<Self> {
158 let config = aws_config::defaults(BehaviorVersion::latest()).load().await;
159 let region = config.region().map(|r| r.to_string());
160
161 let credentials = config
162 .credentials_provider()
163 .ok_or_else(|| {
164 DataFusionError::ObjectStore(Box::new(Generic {
165 store: "S3",
166 source: "Failed to get S3 credentials aws_config".into(),
167 }))
168 })?
169 .clone();
170
171 let credentials = match credentials.provide_credentials().await {
175 Ok(_) => Some(credentials),
176 Err(CredentialsError::CredentialsNotLoaded(_)) => {
177 debug!("Could not use AWS SDK to get credentials");
178 None
179 }
180 Err(e) => {
183 let source_message = if let Some(source) = e.source() {
185 format!(": {source}")
186 } else {
187 String::new()
188 };
189
190 let message = format!(
191 "Error getting credentials from provider: {e}{source_message}",
192 );
193
194 return Err(DataFusionError::ObjectStore(Box::new(Generic {
195 store: "S3",
196 source: message.into(),
197 })));
198 }
199 };
200 Ok(Self {
201 region,
202 credentials,
203 })
204 }
205}
206
207#[derive(Debug)]
208struct S3CredentialProvider {
209 credentials: aws_credential_types::provider::SharedCredentialsProvider,
210}
211
212#[async_trait]
213impl CredentialProvider for S3CredentialProvider {
214 type Credential = AwsCredential;
215
216 async fn get_credential(&self) -> object_store::Result<Arc<Self::Credential>> {
217 let creds =
218 self.credentials
219 .provide_credentials()
220 .await
221 .map_err(|e| Generic {
222 store: "S3",
223 source: Box::new(e),
224 })?;
225 Ok(Arc::new(AwsCredential {
226 key_id: creds.access_key_id().to_string(),
227 secret_key: creds.secret_access_key().to_string(),
228 token: creds.session_token().map(ToString::to_string),
229 }))
230 }
231}
232
233pub fn get_oss_object_store_builder(
234 url: &Url,
235 aws_options: &AwsOptions,
236) -> Result<AmazonS3Builder> {
237 get_object_store_builder(url, aws_options, true)
238}
239
240pub fn get_cos_object_store_builder(
241 url: &Url,
242 aws_options: &AwsOptions,
243) -> Result<AmazonS3Builder> {
244 get_object_store_builder(url, aws_options, false)
245}
246
247fn get_object_store_builder(
248 url: &Url,
249 aws_options: &AwsOptions,
250 virtual_hosted_style_request: bool,
251) -> Result<AmazonS3Builder> {
252 let bucket_name = get_bucket_name(url)?;
253 let mut builder = AmazonS3Builder::from_env()
254 .with_virtual_hosted_style_request(virtual_hosted_style_request)
255 .with_bucket_name(bucket_name)
256 .with_region("do_not_care");
258
259 if let (Some(access_key_id), Some(secret_access_key)) =
260 (&aws_options.access_key_id, &aws_options.secret_access_key)
261 {
262 builder = builder
263 .with_access_key_id(access_key_id)
264 .with_secret_access_key(secret_access_key);
265 }
266
267 if let Some(endpoint) = &aws_options.endpoint {
268 builder = builder.with_endpoint(endpoint);
269 }
270
271 Ok(builder)
272}
273
274pub fn get_gcs_object_store_builder(
275 url: &Url,
276 gs_options: &GcpOptions,
277) -> Result<GoogleCloudStorageBuilder> {
278 let bucket_name = get_bucket_name(url)?;
279 let mut builder = GoogleCloudStorageBuilder::from_env().with_bucket_name(bucket_name);
280
281 if let Some(service_account_path) = &gs_options.service_account_path {
282 builder = builder.with_service_account_path(service_account_path);
283 }
284
285 if let Some(service_account_key) = &gs_options.service_account_key {
286 builder = builder.with_service_account_key(service_account_key);
287 }
288
289 if let Some(application_credentials_path) = &gs_options.application_credentials_path {
290 builder = builder.with_application_credentials(application_credentials_path);
291 }
292
293 Ok(builder)
294}
295
296fn get_bucket_name(url: &Url) -> Result<&str> {
297 url.host_str().ok_or_else(|| {
298 DataFusionError::Execution(format!(
299 "Not able to parse bucket name from url: {}",
300 url.as_str()
301 ))
302 })
303}
304
305#[derive(Default, Debug, Clone)]
307pub struct AwsOptions {
308 pub access_key_id: Option<String>,
310 pub secret_access_key: Option<String>,
312 pub session_token: Option<String>,
314 pub region: Option<String>,
316 pub endpoint: Option<String>,
318 pub allow_http: Option<bool>,
320 pub skip_signature: Option<bool>,
325}
326
327impl ExtensionOptions for AwsOptions {
328 fn as_any(&self) -> &dyn Any {
329 self
330 }
331
332 fn as_any_mut(&mut self) -> &mut dyn Any {
333 self
334 }
335
336 fn cloned(&self) -> Box<dyn ExtensionOptions> {
337 Box::new(self.clone())
338 }
339
340 fn set(&mut self, key: &str, value: &str) -> Result<()> {
341 let (_key, aws_key) = key.split_once('.').unwrap_or((key, ""));
342 let (key, rem) = aws_key.split_once('.').unwrap_or((aws_key, ""));
343 match key {
344 "access_key_id" => {
345 self.access_key_id.set(rem, value)?;
346 }
347 "secret_access_key" => {
348 self.secret_access_key.set(rem, value)?;
349 }
350 "session_token" => {
351 self.session_token.set(rem, value)?;
352 }
353 "region" => {
354 self.region.set(rem, value)?;
355 }
356 "oss" | "cos" | "endpoint" => {
357 self.endpoint.set(rem, value)?;
358 }
359 "allow_http" => {
360 self.allow_http.set(rem, value)?;
361 }
362 "skip_signature" | "nosign" => {
363 self.skip_signature.set(rem, value)?;
364 }
365 _ => {
366 return config_err!("Config value \"{}\" not found on AwsOptions", rem);
367 }
368 }
369 Ok(())
370 }
371
372 fn entries(&self) -> Vec<ConfigEntry> {
373 struct Visitor(Vec<ConfigEntry>);
374
375 impl Visit for Visitor {
376 fn some<V: Display>(
377 &mut self,
378 key: &str,
379 value: V,
380 description: &'static str,
381 ) {
382 self.0.push(ConfigEntry {
383 key: key.to_string(),
384 value: Some(value.to_string()),
385 description,
386 })
387 }
388
389 fn none(&mut self, key: &str, description: &'static str) {
390 self.0.push(ConfigEntry {
391 key: key.to_string(),
392 value: None,
393 description,
394 })
395 }
396 }
397
398 let mut v = Visitor(vec![]);
399 self.access_key_id.visit(&mut v, "access_key_id", "");
400 self.secret_access_key
401 .visit(&mut v, "secret_access_key", "");
402 self.session_token.visit(&mut v, "session_token", "");
403 self.region.visit(&mut v, "region", "");
404 self.endpoint.visit(&mut v, "endpoint", "");
405 self.allow_http.visit(&mut v, "allow_http", "");
406 v.0
407 }
408}
409
410impl ConfigExtension for AwsOptions {
411 const PREFIX: &'static str = "aws";
412}
413
414#[derive(Debug, Clone, Default)]
416pub struct GcpOptions {
417 pub service_account_path: Option<String>,
419 pub service_account_key: Option<String>,
421 pub application_credentials_path: Option<String>,
423}
424
425impl ExtensionOptions for GcpOptions {
426 fn as_any(&self) -> &dyn Any {
427 self
428 }
429
430 fn as_any_mut(&mut self) -> &mut dyn Any {
431 self
432 }
433
434 fn cloned(&self) -> Box<dyn ExtensionOptions> {
435 Box::new(self.clone())
436 }
437
438 fn set(&mut self, key: &str, value: &str) -> Result<()> {
439 let (_key, rem) = key.split_once('.').unwrap_or((key, ""));
440 match rem {
441 "service_account_path" => {
442 self.service_account_path.set(rem, value)?;
443 }
444 "service_account_key" => {
445 self.service_account_key.set(rem, value)?;
446 }
447 "application_credentials_path" => {
448 self.application_credentials_path.set(rem, value)?;
449 }
450 _ => {
451 return config_err!("Config value \"{}\" not found on GcpOptions", rem);
452 }
453 }
454 Ok(())
455 }
456
457 fn entries(&self) -> Vec<ConfigEntry> {
458 struct Visitor(Vec<ConfigEntry>);
459
460 impl Visit for Visitor {
461 fn some<V: Display>(
462 &mut self,
463 key: &str,
464 value: V,
465 description: &'static str,
466 ) {
467 self.0.push(ConfigEntry {
468 key: key.to_string(),
469 value: Some(value.to_string()),
470 description,
471 })
472 }
473
474 fn none(&mut self, key: &str, description: &'static str) {
475 self.0.push(ConfigEntry {
476 key: key.to_string(),
477 value: None,
478 description,
479 })
480 }
481 }
482
483 let mut v = Visitor(vec![]);
484 self.service_account_path
485 .visit(&mut v, "service_account_path", "");
486 self.service_account_key
487 .visit(&mut v, "service_account_key", "");
488 self.application_credentials_path.visit(
489 &mut v,
490 "application_credentials_path",
491 "",
492 );
493 v.0
494 }
495}
496
497impl ConfigExtension for GcpOptions {
498 const PREFIX: &'static str = "gcp";
499}
500
501pub(crate) async fn get_object_store(
502 state: &SessionState,
503 scheme: &str,
504 url: &Url,
505 table_options: &TableOptions,
506 resolve_region: bool,
507) -> Result<Arc<dyn ObjectStore>, DataFusionError> {
508 let store: Arc<dyn ObjectStore> = match scheme {
509 "s3" => {
510 let Some(options) = table_options.extensions.get::<AwsOptions>() else {
511 return exec_err!(
512 "Given table options incompatible with the 's3' scheme"
513 );
514 };
515 let builder =
516 get_s3_object_store_builder(url, options, resolve_region).await?;
517 Arc::new(builder.build()?)
518 }
519 "oss" => {
520 let Some(options) = table_options.extensions.get::<AwsOptions>() else {
521 return exec_err!(
522 "Given table options incompatible with the 'oss' scheme"
523 );
524 };
525 let builder = get_oss_object_store_builder(url, options)?;
526 Arc::new(builder.build()?)
527 }
528 "cos" => {
529 let Some(options) = table_options.extensions.get::<AwsOptions>() else {
530 return exec_err!(
531 "Given table options incompatible with the 'cos' scheme"
532 );
533 };
534 let builder = get_cos_object_store_builder(url, options)?;
535 Arc::new(builder.build()?)
536 }
537 "gs" | "gcs" => {
538 let Some(options) = table_options.extensions.get::<GcpOptions>() else {
539 return exec_err!(
540 "Given table options incompatible with the 'gs'/'gcs' scheme"
541 );
542 };
543 let builder = get_gcs_object_store_builder(url, options)?;
544 Arc::new(builder.build()?)
545 }
546 "http" | "https" => Arc::new(
547 HttpBuilder::new()
548 .with_client_options(ClientOptions::new().with_allow_http(true))
549 .with_url(url.origin().ascii_serialization())
550 .build()?,
551 ),
552 _ => {
553 state
555 .runtime_env()
556 .object_store_registry
557 .get_store(url)
558 .map_err(|_| {
559 exec_datafusion_err!("Unsupported object store scheme: {}", scheme)
560 })?
561 }
562 };
563 Ok(store)
564}
565
566#[cfg(test)]
567mod tests {
568 use crate::cli_context::CliSessionContext;
569
570 use super::*;
571
572 use datafusion::{
573 datasource::listing::ListingTableUrl,
574 logical_expr::{DdlStatement, LogicalPlan},
575 prelude::SessionContext,
576 };
577
578 use object_store::{aws::AmazonS3ConfigKey, gcp::GoogleConfigKey};
579
580 #[tokio::test]
581 async fn s3_object_store_builder_default() -> Result<()> {
582 let location = "s3://bucket/path/FAKE/file.parquet";
583
584 let table_url = ListingTableUrl::parse(location)?;
586 let scheme = table_url.scheme();
587 let sql =
588 format!("CREATE EXTERNAL TABLE test STORED AS PARQUET LOCATION '{location}'");
589
590 let ctx = SessionContext::new();
591 ctx.register_table_options_extension_from_scheme(scheme);
592 let table_options = get_table_options(&ctx, &sql).await;
593 let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
594 let builder =
595 get_s3_object_store_builder(table_url.as_ref(), aws_options, false).await?;
596
597 let expected_access_key_id = std::env::var("AWS_ACCESS_KEY_ID").ok();
599 let expected_secret_access_key = std::env::var("AWS_SECRET_ACCESS_KEY").ok();
600 let expected_region = Some(
601 std::env::var("AWS_REGION").unwrap_or_else(|_| "eu-central-1".to_string()),
602 );
603 let expected_endpoint = std::env::var("AWS_ENDPOINT").ok();
604
605 assert_eq!(
607 builder.get_config_value(&AmazonS3ConfigKey::AccessKeyId),
608 expected_access_key_id
609 );
610 assert_eq!(
611 builder.get_config_value(&AmazonS3ConfigKey::SecretAccessKey),
612 expected_secret_access_key
613 );
614 let expected_skip_signature =
616 if expected_access_key_id.is_none() && expected_secret_access_key.is_none() {
617 Some(String::from("true"))
618 } else {
619 Some(String::from("false"))
620 };
621 assert_eq!(
622 builder.get_config_value(&AmazonS3ConfigKey::Region),
623 expected_region
624 );
625 assert_eq!(
626 builder.get_config_value(&AmazonS3ConfigKey::Endpoint),
627 expected_endpoint
628 );
629 assert_eq!(builder.get_config_value(&AmazonS3ConfigKey::Token), None);
630 assert_eq!(
631 builder.get_config_value(&AmazonS3ConfigKey::SkipSignature),
632 expected_skip_signature
633 );
634 Ok(())
635 }
636
637 #[tokio::test]
638 async fn s3_object_store_builder() -> Result<()> {
639 let access_key_id = "FAKE_access_key_id";
641 let secret_access_key = "FAKE_secret_access_key";
642 let region = "fake_us-east-2";
643 let endpoint = "endpoint33";
644 let session_token = "FAKE_session_token";
645 let location = "s3://bucket/path/FAKE/file.parquet";
646
647 let table_url = ListingTableUrl::parse(location)?;
648 let scheme = table_url.scheme();
649 let sql = format!(
650 "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS\
651 ('aws.access_key_id' '{access_key_id}', \
652 'aws.secret_access_key' '{secret_access_key}', \
653 'aws.region' '{region}', \
654 'aws.session_token' {session_token}, \
655 'aws.endpoint' '{endpoint}'\
656 ) LOCATION '{location}'"
657 );
658
659 let ctx = SessionContext::new();
660 ctx.register_table_options_extension_from_scheme(scheme);
661 let table_options = get_table_options(&ctx, &sql).await;
662 let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
663 let builder =
664 get_s3_object_store_builder(table_url.as_ref(), aws_options, false).await?;
665 let config = [
667 (AmazonS3ConfigKey::AccessKeyId, access_key_id),
668 (AmazonS3ConfigKey::SecretAccessKey, secret_access_key),
669 (AmazonS3ConfigKey::Region, region),
670 (AmazonS3ConfigKey::Endpoint, endpoint),
671 (AmazonS3ConfigKey::Token, session_token),
672 ];
673 for (key, value) in config {
674 assert_eq!(value, builder.get_config_value(&key).unwrap());
675 }
676 assert_eq!(
678 builder.get_config_value(&AmazonS3ConfigKey::SkipSignature),
679 Some("false".into())
680 );
681
682 Ok(())
683 }
684
685 #[tokio::test]
686 async fn s3_object_store_builder_allow_http_error() -> Result<()> {
687 let access_key_id = "fake_access_key_id";
688 let secret_access_key = "fake_secret_access_key";
689 let endpoint = "http://endpoint33";
690 let location = "s3://bucket/path/file.parquet";
691
692 let table_url = ListingTableUrl::parse(location)?;
693 let scheme = table_url.scheme();
694 let sql = format!(
695 "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS\
696 ('aws.access_key_id' '{access_key_id}', \
697 'aws.secret_access_key' '{secret_access_key}', \
698 'aws.endpoint' '{endpoint}'\
699 ) LOCATION '{location}'"
700 );
701
702 let ctx = SessionContext::new();
703 ctx.register_table_options_extension_from_scheme(scheme);
704
705 let table_options = get_table_options(&ctx, &sql).await;
706 let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
707 let err = get_s3_object_store_builder(table_url.as_ref(), aws_options, false)
708 .await
709 .unwrap_err();
710
711 assert_eq!(err.to_string().lines().next().unwrap_or_default(), "Invalid or Unsupported Configuration: Invalid endpoint: http://endpoint33. HTTP is not allowed for S3 endpoints. To allow HTTP, set 'aws.allow_http' to true");
712
713 let sql = format!(
715 "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS\
716 ('aws.access_key_id' '{access_key_id}', \
717 'aws.secret_access_key' '{secret_access_key}', \
718 'aws.endpoint' '{endpoint}',\
719 'aws.allow_http' 'true'\
720 ) LOCATION '{location}'"
721 );
722 let table_options = get_table_options(&ctx, &sql).await;
723
724 let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
725 get_s3_object_store_builder(table_url.as_ref(), aws_options, false).await?;
727
728 Ok(())
729 }
730
731 #[tokio::test]
732 async fn s3_object_store_builder_resolves_region_when_none_provided() -> Result<()> {
733 let expected_region = "eu-central-1";
734 let location = "s3://test-bucket/path/file.parquet";
735
736 let table_url = ListingTableUrl::parse(location)?;
737 let aws_options = AwsOptions {
738 region: None, ..Default::default()
740 };
741
742 let builder =
743 get_s3_object_store_builder(table_url.as_ref(), &aws_options, false).await?;
744
745 assert_eq!(
747 builder.get_config_value(&AmazonS3ConfigKey::Region),
748 Some(expected_region.to_string())
749 );
750
751 Ok(())
752 }
753
754 #[tokio::test]
755 async fn s3_object_store_builder_overrides_region_when_resolve_region_enabled(
756 ) -> Result<()> {
757 let original_region = "us-east-1";
758 let expected_region = "eu-central-1"; let location = "s3://test-bucket/path/file.parquet";
760
761 let table_url = ListingTableUrl::parse(location)?;
762 let aws_options = AwsOptions {
763 region: Some(original_region.to_string()), ..Default::default()
765 };
766
767 let builder =
768 get_s3_object_store_builder(table_url.as_ref(), &aws_options, true).await?;
769
770 assert_eq!(
772 builder.get_config_value(&AmazonS3ConfigKey::Region),
773 Some(expected_region.to_string())
774 );
775
776 Ok(())
777 }
778
779 #[tokio::test]
780 async fn oss_object_store_builder() -> Result<()> {
781 let access_key_id = "fake_access_key_id";
782 let secret_access_key = "fake_secret_access_key";
783 let endpoint = "fake_endpoint";
784 let location = "oss://bucket/path/file.parquet";
785
786 let table_url = ListingTableUrl::parse(location)?;
787 let scheme = table_url.scheme();
788 let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS('aws.access_key_id' '{access_key_id}', 'aws.secret_access_key' '{secret_access_key}', 'aws.oss.endpoint' '{endpoint}') LOCATION '{location}'");
789
790 let ctx = SessionContext::new();
791 ctx.register_table_options_extension_from_scheme(scheme);
792 let table_options = get_table_options(&ctx, &sql).await;
793
794 let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
795 let builder = get_oss_object_store_builder(table_url.as_ref(), aws_options)?;
796 let config = [
798 (AmazonS3ConfigKey::AccessKeyId, access_key_id),
799 (AmazonS3ConfigKey::SecretAccessKey, secret_access_key),
800 (AmazonS3ConfigKey::Endpoint, endpoint),
801 ];
802 for (key, value) in config {
803 assert_eq!(value, builder.get_config_value(&key).unwrap());
804 }
805
806 Ok(())
807 }
808
809 #[tokio::test]
810 async fn gcs_object_store_builder() -> Result<()> {
811 let service_account_path = "fake_service_account_path";
812 let service_account_key =
813 "{\"private_key\": \"fake_private_key.pem\",\"client_email\":\"fake_client_email\"}";
814 let application_credentials_path = "fake_application_credentials_path";
815 let location = "gcs://bucket/path/file.parquet";
816
817 let table_url = ListingTableUrl::parse(location)?;
818 let scheme = table_url.scheme();
819 let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS('gcp.service_account_path' '{service_account_path}', 'gcp.service_account_key' '{service_account_key}', 'gcp.application_credentials_path' '{application_credentials_path}') LOCATION '{location}'");
820
821 let ctx = SessionContext::new();
822 ctx.register_table_options_extension_from_scheme(scheme);
823 let table_options = get_table_options(&ctx, &sql).await;
824
825 let gcp_options = table_options.extensions.get::<GcpOptions>().unwrap();
826 let builder = get_gcs_object_store_builder(table_url.as_ref(), gcp_options)?;
827 let config = [
829 (GoogleConfigKey::ServiceAccount, service_account_path),
830 (GoogleConfigKey::ServiceAccountKey, service_account_key),
831 (
832 GoogleConfigKey::ApplicationCredentials,
833 application_credentials_path,
834 ),
835 ];
836 for (key, value) in config {
837 assert_eq!(value, builder.get_config_value(&key).unwrap());
838 }
839
840 Ok(())
841 }
842
843 async fn get_table_options(ctx: &SessionContext, sql: &str) -> TableOptions {
846 let mut plan = ctx.state().create_logical_plan(sql).await.unwrap();
847
848 let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan else {
849 panic!("plan is not a CreateExternalTable");
850 };
851
852 let mut table_options = ctx.state().default_table_options();
853 table_options
854 .alter_with_string_hash_map(&cmd.options)
855 .unwrap();
856 table_options
857 }
858}