1pub mod instrumented;
19
20use async_trait::async_trait;
21use aws_config::BehaviorVersion;
22use aws_credential_types::provider::{
23 ProvideCredentials, SharedCredentialsProvider, error::CredentialsError,
24};
25use datafusion::{
26 common::{
27 config::ConfigEntry, config::ConfigExtension, config::ConfigField,
28 config::ExtensionOptions, config::TableOptions, config::Visit, config_err,
29 exec_datafusion_err, exec_err,
30 },
31 error::{DataFusionError, Result},
32 execution::context::SessionState,
33};
34use log::debug;
35use object_store::{
36 ClientOptions, CredentialProvider,
37 Error::Generic,
38 ObjectStore,
39 aws::{AmazonS3Builder, AmazonS3ConfigKey, AwsCredential},
40 gcp::GoogleCloudStorageBuilder,
41 http::HttpBuilder,
42};
43use std::{
44 any::Any,
45 error::Error,
46 fmt::{Debug, Display},
47 sync::Arc,
48};
49use url::Url;
50
51#[cfg(not(test))]
52use object_store::aws::resolve_bucket_region;
53
54#[cfg(test)]
56async fn resolve_bucket_region(
57 _bucket: &str,
58 _client_options: &ClientOptions,
59) -> object_store::Result<String> {
60 Ok("eu-central-1".to_string())
61}
62
63pub async fn get_s3_object_store_builder(
64 url: &Url,
65 aws_options: &AwsOptions,
66 resolve_region: bool,
67) -> Result<AmazonS3Builder> {
68 Box::pin(get_s3_object_store_builder_inner(
71 url,
72 aws_options,
73 resolve_region,
74 ))
75 .await
76}
77
78async fn get_s3_object_store_builder_inner(
79 url: &Url,
80 aws_options: &AwsOptions,
81 resolve_region: bool,
82) -> Result<AmazonS3Builder> {
83 let AwsOptions {
84 access_key_id,
85 secret_access_key,
86 session_token,
87 region,
88 endpoint,
89 allow_http,
90 skip_signature,
91 } = aws_options;
92
93 let bucket_name = get_bucket_name(url)?;
94 let mut builder = AmazonS3Builder::from_env().with_bucket_name(bucket_name);
95
96 if let (Some(access_key_id), Some(secret_access_key)) =
97 (access_key_id, secret_access_key)
98 {
99 debug!("Using explicitly provided S3 access_key_id and secret_access_key");
100 builder = builder
101 .with_access_key_id(access_key_id)
102 .with_secret_access_key(secret_access_key);
103
104 if let Some(session_token) = session_token {
105 builder = builder.with_token(session_token);
106 }
107 } else {
108 debug!("Using AWS S3 SDK to determine credentials");
109 let CredentialsFromConfig {
110 region,
111 credentials,
112 } = CredentialsFromConfig::try_new().await?;
113 if let Some(region) = region {
114 builder = builder.with_region(region);
115 }
116 if let Some(credentials) = credentials {
117 let credentials = Arc::new(S3CredentialProvider { credentials });
118 builder = builder.with_credentials(credentials);
119 } else {
120 debug!("No credentials found, defaulting to skip signature ");
121 builder = builder.with_skip_signature(true);
122 }
123 }
124
125 if let Some(region) = region {
126 builder = builder.with_region(region);
127 }
128
129 if builder
131 .get_config_value(&AmazonS3ConfigKey::Region)
132 .is_none()
133 || resolve_region
134 {
135 let region = resolve_bucket_region(bucket_name, &ClientOptions::new()).await?;
136 builder = builder.with_region(region);
137 }
138
139 if let Some(endpoint) = endpoint {
140 if let Ok(endpoint_url) = Url::try_from(endpoint.as_str())
143 && !matches!(allow_http, Some(true))
144 && endpoint_url.scheme() == "http"
145 {
146 return config_err!(
147 "Invalid endpoint: {endpoint}. \
148 HTTP is not allowed for S3 endpoints. \
149 To allow HTTP, set 'aws.allow_http' to true"
150 );
151 }
152
153 builder = builder.with_endpoint(endpoint);
154 }
155
156 if let Some(allow_http) = allow_http {
157 builder = builder.with_allow_http(*allow_http);
158 }
159
160 if let Some(skip_signature) = skip_signature {
161 builder = builder.with_skip_signature(*skip_signature);
162 }
163
164 Ok(builder)
165}
166
167struct CredentialsFromConfig {
169 region: Option<String>,
170 credentials: Option<SharedCredentialsProvider>,
171}
172
173impl CredentialsFromConfig {
174 pub async fn try_new() -> Result<Self> {
176 let config = aws_config::defaults(BehaviorVersion::latest()).load().await;
177 let region = config.region().map(|r| r.to_string());
178
179 let credentials = config
180 .credentials_provider()
181 .ok_or_else(|| {
182 DataFusionError::ObjectStore(Box::new(Generic {
183 store: "S3",
184 source: "Failed to get S3 credentials aws_config".into(),
185 }))
186 })?
187 .clone();
188
189 let credentials = match credentials.provide_credentials().await {
193 Ok(_) => Some(credentials),
194 Err(CredentialsError::CredentialsNotLoaded(_)) => {
195 debug!("Could not use AWS SDK to get credentials");
196 None
197 }
198 Err(e) => {
201 let source_message = if let Some(source) = e.source() {
203 format!(": {source}")
204 } else {
205 String::new()
206 };
207
208 let message = format!(
209 "Error getting credentials from provider: {e}{source_message}",
210 );
211
212 return Err(DataFusionError::ObjectStore(Box::new(Generic {
213 store: "S3",
214 source: message.into(),
215 })));
216 }
217 };
218 Ok(Self {
219 region,
220 credentials,
221 })
222 }
223}
224
225#[derive(Debug)]
226struct S3CredentialProvider {
227 credentials: SharedCredentialsProvider,
228}
229
230#[async_trait]
231impl CredentialProvider for S3CredentialProvider {
232 type Credential = AwsCredential;
233
234 async fn get_credential(&self) -> object_store::Result<Arc<Self::Credential>> {
235 let creds =
236 self.credentials
237 .provide_credentials()
238 .await
239 .map_err(|e| Generic {
240 store: "S3",
241 source: Box::new(e),
242 })?;
243 Ok(Arc::new(AwsCredential {
244 key_id: creds.access_key_id().to_string(),
245 secret_key: creds.secret_access_key().to_string(),
246 token: creds.session_token().map(ToString::to_string),
247 }))
248 }
249}
250
251pub fn get_oss_object_store_builder(
252 url: &Url,
253 aws_options: &AwsOptions,
254) -> Result<AmazonS3Builder> {
255 get_object_store_builder(url, aws_options, true)
256}
257
258pub fn get_cos_object_store_builder(
259 url: &Url,
260 aws_options: &AwsOptions,
261) -> Result<AmazonS3Builder> {
262 get_object_store_builder(url, aws_options, false)
263}
264
265fn get_object_store_builder(
266 url: &Url,
267 aws_options: &AwsOptions,
268 virtual_hosted_style_request: bool,
269) -> Result<AmazonS3Builder> {
270 let bucket_name = get_bucket_name(url)?;
271 let mut builder = AmazonS3Builder::from_env()
272 .with_virtual_hosted_style_request(virtual_hosted_style_request)
273 .with_bucket_name(bucket_name)
274 .with_region("do_not_care");
276
277 if let (Some(access_key_id), Some(secret_access_key)) =
278 (&aws_options.access_key_id, &aws_options.secret_access_key)
279 {
280 builder = builder
281 .with_access_key_id(access_key_id)
282 .with_secret_access_key(secret_access_key);
283 }
284
285 if let Some(endpoint) = &aws_options.endpoint {
286 builder = builder.with_endpoint(endpoint);
287 }
288
289 Ok(builder)
290}
291
292pub fn get_gcs_object_store_builder(
293 url: &Url,
294 gs_options: &GcpOptions,
295) -> Result<GoogleCloudStorageBuilder> {
296 let bucket_name = get_bucket_name(url)?;
297 let mut builder = GoogleCloudStorageBuilder::from_env().with_bucket_name(bucket_name);
298
299 if let Some(service_account_path) = &gs_options.service_account_path {
300 builder = builder.with_service_account_path(service_account_path);
301 }
302
303 if let Some(service_account_key) = &gs_options.service_account_key {
304 builder = builder.with_service_account_key(service_account_key);
305 }
306
307 if let Some(application_credentials_path) = &gs_options.application_credentials_path {
308 builder = builder.with_application_credentials(application_credentials_path);
309 }
310
311 Ok(builder)
312}
313
314fn get_bucket_name(url: &Url) -> Result<&str> {
315 url.host_str().ok_or_else(|| {
316 exec_datafusion_err!("Not able to parse bucket name from url: {}", url.as_str())
317 })
318}
319
320#[derive(Default, Debug, Clone)]
322pub struct AwsOptions {
323 pub access_key_id: Option<String>,
325 pub secret_access_key: Option<String>,
327 pub session_token: Option<String>,
329 pub region: Option<String>,
331 pub endpoint: Option<String>,
333 pub allow_http: Option<bool>,
335 pub skip_signature: Option<bool>,
340}
341
342impl ExtensionOptions for AwsOptions {
343 fn as_any(&self) -> &dyn Any {
344 self
345 }
346
347 fn as_any_mut(&mut self) -> &mut dyn Any {
348 self
349 }
350
351 fn cloned(&self) -> Box<dyn ExtensionOptions> {
352 Box::new(self.clone())
353 }
354
355 fn set(&mut self, key: &str, value: &str) -> Result<()> {
356 let (_key, aws_key) = key.split_once('.').unwrap_or((key, ""));
357 let (key, rem) = aws_key.split_once('.').unwrap_or((aws_key, ""));
358 match key {
359 "access_key_id" => {
360 self.access_key_id.set(rem, value)?;
361 }
362 "secret_access_key" => {
363 self.secret_access_key.set(rem, value)?;
364 }
365 "session_token" => {
366 self.session_token.set(rem, value)?;
367 }
368 "region" => {
369 self.region.set(rem, value)?;
370 }
371 "oss" | "cos" | "endpoint" => {
372 self.endpoint.set(rem, value)?;
373 }
374 "allow_http" => {
375 self.allow_http.set(rem, value)?;
376 }
377 "skip_signature" | "nosign" => {
378 self.skip_signature.set(rem, value)?;
379 }
380 _ => {
381 return config_err!("Config value \"{}\" not found on AwsOptions", rem);
382 }
383 }
384 Ok(())
385 }
386
387 fn entries(&self) -> Vec<ConfigEntry> {
388 struct Visitor(Vec<ConfigEntry>);
389
390 impl Visit for Visitor {
391 fn some<V: Display>(
392 &mut self,
393 key: &str,
394 value: V,
395 description: &'static str,
396 ) {
397 self.0.push(ConfigEntry {
398 key: key.to_string(),
399 value: Some(value.to_string()),
400 description,
401 })
402 }
403
404 fn none(&mut self, key: &str, description: &'static str) {
405 self.0.push(ConfigEntry {
406 key: key.to_string(),
407 value: None,
408 description,
409 })
410 }
411 }
412
413 let mut v = Visitor(vec![]);
414 self.access_key_id.visit(&mut v, "access_key_id", "");
415 self.secret_access_key
416 .visit(&mut v, "secret_access_key", "");
417 self.session_token.visit(&mut v, "session_token", "");
418 self.region.visit(&mut v, "region", "");
419 self.endpoint.visit(&mut v, "endpoint", "");
420 self.allow_http.visit(&mut v, "allow_http", "");
421 v.0
422 }
423}
424
425impl ConfigExtension for AwsOptions {
426 const PREFIX: &'static str = "aws";
427}
428
429#[derive(Debug, Clone, Default)]
431pub struct GcpOptions {
432 pub service_account_path: Option<String>,
434 pub service_account_key: Option<String>,
436 pub application_credentials_path: Option<String>,
438}
439
440impl ExtensionOptions for GcpOptions {
441 fn as_any(&self) -> &dyn Any {
442 self
443 }
444
445 fn as_any_mut(&mut self) -> &mut dyn Any {
446 self
447 }
448
449 fn cloned(&self) -> Box<dyn ExtensionOptions> {
450 Box::new(self.clone())
451 }
452
453 fn set(&mut self, key: &str, value: &str) -> Result<()> {
454 let (_key, rem) = key.split_once('.').unwrap_or((key, ""));
455 match rem {
456 "service_account_path" => {
457 self.service_account_path.set(rem, value)?;
458 }
459 "service_account_key" => {
460 self.service_account_key.set(rem, value)?;
461 }
462 "application_credentials_path" => {
463 self.application_credentials_path.set(rem, value)?;
464 }
465 _ => {
466 return config_err!("Config value \"{}\" not found on GcpOptions", rem);
467 }
468 }
469 Ok(())
470 }
471
472 fn entries(&self) -> Vec<ConfigEntry> {
473 struct Visitor(Vec<ConfigEntry>);
474
475 impl Visit for Visitor {
476 fn some<V: Display>(
477 &mut self,
478 key: &str,
479 value: V,
480 description: &'static str,
481 ) {
482 self.0.push(ConfigEntry {
483 key: key.to_string(),
484 value: Some(value.to_string()),
485 description,
486 })
487 }
488
489 fn none(&mut self, key: &str, description: &'static str) {
490 self.0.push(ConfigEntry {
491 key: key.to_string(),
492 value: None,
493 description,
494 })
495 }
496 }
497
498 let mut v = Visitor(vec![]);
499 self.service_account_path
500 .visit(&mut v, "service_account_path", "");
501 self.service_account_key
502 .visit(&mut v, "service_account_key", "");
503 self.application_credentials_path.visit(
504 &mut v,
505 "application_credentials_path",
506 "",
507 );
508 v.0
509 }
510}
511
512impl ConfigExtension for GcpOptions {
513 const PREFIX: &'static str = "gcp";
514}
515
516pub(crate) async fn get_object_store(
517 state: &SessionState,
518 scheme: &str,
519 url: &Url,
520 table_options: &TableOptions,
521 resolve_region: bool,
522) -> Result<Arc<dyn ObjectStore>, DataFusionError> {
523 let store: Arc<dyn ObjectStore> = match scheme {
524 "s3" => {
525 let Some(options) = table_options.extensions.get::<AwsOptions>() else {
526 return exec_err!(
527 "Given table options incompatible with the 's3' scheme"
528 );
529 };
530 let builder =
531 get_s3_object_store_builder(url, options, resolve_region).await?;
532 Arc::new(builder.build()?)
533 }
534 "oss" => {
535 let Some(options) = table_options.extensions.get::<AwsOptions>() else {
536 return exec_err!(
537 "Given table options incompatible with the 'oss' scheme"
538 );
539 };
540 let builder = get_oss_object_store_builder(url, options)?;
541 Arc::new(builder.build()?)
542 }
543 "cos" => {
544 let Some(options) = table_options.extensions.get::<AwsOptions>() else {
545 return exec_err!(
546 "Given table options incompatible with the 'cos' scheme"
547 );
548 };
549 let builder = get_cos_object_store_builder(url, options)?;
550 Arc::new(builder.build()?)
551 }
552 "gs" | "gcs" => {
553 let Some(options) = table_options.extensions.get::<GcpOptions>() else {
554 return exec_err!(
555 "Given table options incompatible with the 'gs'/'gcs' scheme"
556 );
557 };
558 let builder = get_gcs_object_store_builder(url, options)?;
559 Arc::new(builder.build()?)
560 }
561 "http" | "https" => Arc::new(
562 HttpBuilder::new()
563 .with_client_options(ClientOptions::new().with_allow_http(true))
564 .with_url(url.origin().ascii_serialization())
565 .build()?,
566 ),
567 _ => {
568 state
570 .runtime_env()
571 .object_store_registry
572 .get_store(url)
573 .map_err(|_| {
574 exec_datafusion_err!("Unsupported object store scheme: {}", scheme)
575 })?
576 }
577 };
578 Ok(store)
579}
580
581#[cfg(test)]
582mod tests {
583 use crate::cli_context::CliSessionContext;
584
585 use super::*;
586
587 use datafusion::{
588 datasource::listing::ListingTableUrl,
589 logical_expr::{DdlStatement, LogicalPlan},
590 prelude::SessionContext,
591 };
592
593 use object_store::{aws::AmazonS3ConfigKey, gcp::GoogleConfigKey};
594
595 #[tokio::test]
596 async fn s3_object_store_builder_default() -> Result<()> {
597 if let Err(DataFusionError::Execution(e)) = check_aws_envs().await {
598 eprintln!("{e}");
600 return Ok(());
601 }
602
603 let location = "s3://bucket/path/FAKE/file.parquet";
604 unsafe {
606 std::env::set_var("AWS_CONFIG_FILE", "data/aws.config");
607 std::env::set_var("AWS_SHARED_CREDENTIALS_FILE", "data/aws.credentials");
608 }
609
610 let table_url = ListingTableUrl::parse(location)?;
612 let scheme = table_url.scheme();
613 let sql =
614 format!("CREATE EXTERNAL TABLE test STORED AS PARQUET LOCATION '{location}'");
615
616 let ctx = SessionContext::new();
617 ctx.register_table_options_extension_from_scheme(scheme);
618 let table_options = get_table_options(&ctx, &sql).await;
619 let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
620 let builder =
621 get_s3_object_store_builder(table_url.as_ref(), aws_options, false).await?;
622
623 let expected_access_key_id = std::env::var("AWS_ACCESS_KEY_ID").ok();
625 let expected_secret_access_key = std::env::var("AWS_SECRET_ACCESS_KEY").ok();
626 let expected_region = Some(
627 std::env::var("AWS_REGION").unwrap_or_else(|_| "eu-central-1".to_string()),
628 );
629 let expected_endpoint = std::env::var("AWS_ENDPOINT").ok();
630
631 assert_eq!(
633 builder.get_config_value(&AmazonS3ConfigKey::AccessKeyId),
634 expected_access_key_id
635 );
636 assert_eq!(
637 builder.get_config_value(&AmazonS3ConfigKey::SecretAccessKey),
638 expected_secret_access_key
639 );
640 let expected_skip_signature =
642 if expected_access_key_id.is_none() && expected_secret_access_key.is_none() {
643 Some(String::from("true"))
644 } else {
645 Some(String::from("false"))
646 };
647 assert_eq!(
648 builder.get_config_value(&AmazonS3ConfigKey::Region),
649 expected_region
650 );
651 assert_eq!(
652 builder.get_config_value(&AmazonS3ConfigKey::Endpoint),
653 expected_endpoint
654 );
655 assert_eq!(builder.get_config_value(&AmazonS3ConfigKey::Token), None);
656 assert_eq!(
657 builder.get_config_value(&AmazonS3ConfigKey::SkipSignature),
658 expected_skip_signature
659 );
660 Ok(())
661 }
662
663 #[tokio::test]
664 async fn s3_object_store_builder() -> Result<()> {
665 let access_key_id = "FAKE_access_key_id";
667 let secret_access_key = "FAKE_secret_access_key";
668 let region = "fake_us-east-2";
669 let endpoint = "endpoint33";
670 let session_token = "FAKE_session_token";
671 let location = "s3://bucket/path/FAKE/file.parquet";
672
673 let table_url = ListingTableUrl::parse(location)?;
674 let scheme = table_url.scheme();
675 let sql = format!(
676 "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS\
677 ('aws.access_key_id' '{access_key_id}', \
678 'aws.secret_access_key' '{secret_access_key}', \
679 'aws.region' '{region}', \
680 'aws.session_token' {session_token}, \
681 'aws.endpoint' '{endpoint}'\
682 ) LOCATION '{location}'"
683 );
684
685 let ctx = SessionContext::new();
686 ctx.register_table_options_extension_from_scheme(scheme);
687 let table_options = get_table_options(&ctx, &sql).await;
688 let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
689 let builder =
690 get_s3_object_store_builder(table_url.as_ref(), aws_options, false).await?;
691 let config = [
693 (AmazonS3ConfigKey::AccessKeyId, access_key_id),
694 (AmazonS3ConfigKey::SecretAccessKey, secret_access_key),
695 (AmazonS3ConfigKey::Region, region),
696 (AmazonS3ConfigKey::Endpoint, endpoint),
697 (AmazonS3ConfigKey::Token, session_token),
698 ];
699 for (key, value) in config {
700 assert_eq!(value, builder.get_config_value(&key).unwrap());
701 }
702 assert_eq!(
704 builder.get_config_value(&AmazonS3ConfigKey::SkipSignature),
705 Some("false".into())
706 );
707
708 Ok(())
709 }
710
711 #[tokio::test]
712 async fn s3_object_store_builder_allow_http_error() -> Result<()> {
713 let access_key_id = "fake_access_key_id";
714 let secret_access_key = "fake_secret_access_key";
715 let endpoint = "http://endpoint33";
716 let location = "s3://bucket/path/file.parquet";
717
718 let table_url = ListingTableUrl::parse(location)?;
719 let scheme = table_url.scheme();
720 let sql = format!(
721 "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS\
722 ('aws.access_key_id' '{access_key_id}', \
723 'aws.secret_access_key' '{secret_access_key}', \
724 'aws.endpoint' '{endpoint}'\
725 ) LOCATION '{location}'"
726 );
727
728 let ctx = SessionContext::new();
729 ctx.register_table_options_extension_from_scheme(scheme);
730
731 let table_options = get_table_options(&ctx, &sql).await;
732 let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
733 let err = get_s3_object_store_builder(table_url.as_ref(), aws_options, false)
734 .await
735 .unwrap_err();
736
737 assert_eq!(
738 err.to_string().lines().next().unwrap_or_default(),
739 "Invalid or Unsupported Configuration: Invalid endpoint: http://endpoint33. HTTP is not allowed for S3 endpoints. To allow HTTP, set 'aws.allow_http' to true"
740 );
741
742 let sql = format!(
744 "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS\
745 ('aws.access_key_id' '{access_key_id}', \
746 'aws.secret_access_key' '{secret_access_key}', \
747 'aws.endpoint' '{endpoint}',\
748 'aws.allow_http' 'true'\
749 ) LOCATION '{location}'"
750 );
751 let table_options = get_table_options(&ctx, &sql).await;
752
753 let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
754 get_s3_object_store_builder(table_url.as_ref(), aws_options, false).await?;
756
757 Ok(())
758 }
759
760 #[tokio::test]
761 async fn s3_object_store_builder_resolves_region_when_none_provided() -> Result<()> {
762 if let Err(DataFusionError::Execution(e)) = check_aws_envs().await {
763 eprintln!("{e}");
765 return Ok(());
766 }
767 let location = "s3://test-bucket/path/file.parquet";
768 unsafe {
770 std::env::set_var("AWS_CONFIG_FILE", "data/aws.config");
771 }
772
773 let table_url = ListingTableUrl::parse(location)?;
774 let aws_options = AwsOptions {
775 region: None, ..Default::default()
777 };
778
779 let builder =
780 get_s3_object_store_builder(table_url.as_ref(), &aws_options, false).await?;
781
782 assert!(
784 builder
785 .get_config_value(&AmazonS3ConfigKey::Region)
786 .is_some()
787 );
788
789 Ok(())
790 }
791
792 #[tokio::test]
793 async fn s3_object_store_builder_overrides_region_when_resolve_region_enabled()
794 -> Result<()> {
795 if let Err(DataFusionError::Execution(e)) = check_aws_envs().await {
796 eprintln!("{e}");
798 return Ok(());
799 }
800
801 let original_region = "us-east-1";
802 let expected_region = "eu-central-1"; let location = "s3://test-bucket/path/file.parquet";
804
805 let table_url = ListingTableUrl::parse(location)?;
806 let aws_options = AwsOptions {
807 region: Some(original_region.to_string()), ..Default::default()
809 };
810
811 let builder =
812 get_s3_object_store_builder(table_url.as_ref(), &aws_options, true).await?;
813
814 assert_eq!(
816 builder.get_config_value(&AmazonS3ConfigKey::Region),
817 Some(expected_region.to_string())
818 );
819
820 Ok(())
821 }
822
823 #[tokio::test]
824 async fn oss_object_store_builder() -> Result<()> {
825 let access_key_id = "fake_access_key_id";
826 let secret_access_key = "fake_secret_access_key";
827 let endpoint = "fake_endpoint";
828 let location = "oss://bucket/path/file.parquet";
829
830 let table_url = ListingTableUrl::parse(location)?;
831 let scheme = table_url.scheme();
832 let sql = format!(
833 "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS('aws.access_key_id' '{access_key_id}', 'aws.secret_access_key' '{secret_access_key}', 'aws.oss.endpoint' '{endpoint}') LOCATION '{location}'"
834 );
835
836 let ctx = SessionContext::new();
837 ctx.register_table_options_extension_from_scheme(scheme);
838 let table_options = get_table_options(&ctx, &sql).await;
839
840 let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
841 let builder = get_oss_object_store_builder(table_url.as_ref(), aws_options)?;
842 let config = [
844 (AmazonS3ConfigKey::AccessKeyId, access_key_id),
845 (AmazonS3ConfigKey::SecretAccessKey, secret_access_key),
846 (AmazonS3ConfigKey::Endpoint, endpoint),
847 ];
848 for (key, value) in config {
849 assert_eq!(value, builder.get_config_value(&key).unwrap());
850 }
851
852 Ok(())
853 }
854
855 #[tokio::test]
856 async fn gcs_object_store_builder() -> Result<()> {
857 let service_account_path = "fake_service_account_path";
858 let service_account_key = "{\"private_key\": \"fake_private_key.pem\",\"client_email\":\"fake_client_email\"}";
859 let application_credentials_path = "fake_application_credentials_path";
860 let location = "gcs://bucket/path/file.parquet";
861
862 let table_url = ListingTableUrl::parse(location)?;
863 let scheme = table_url.scheme();
864 let sql = format!(
865 "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS('gcp.service_account_path' '{service_account_path}', 'gcp.service_account_key' '{service_account_key}', 'gcp.application_credentials_path' '{application_credentials_path}') LOCATION '{location}'"
866 );
867
868 let ctx = SessionContext::new();
869 ctx.register_table_options_extension_from_scheme(scheme);
870 let table_options = get_table_options(&ctx, &sql).await;
871
872 let gcp_options = table_options.extensions.get::<GcpOptions>().unwrap();
873 let builder = get_gcs_object_store_builder(table_url.as_ref(), gcp_options)?;
874 let config = [
876 (GoogleConfigKey::ServiceAccount, service_account_path),
877 (GoogleConfigKey::ServiceAccountKey, service_account_key),
878 (
879 GoogleConfigKey::ApplicationCredentials,
880 application_credentials_path,
881 ),
882 ];
883 for (key, value) in config {
884 assert_eq!(value, builder.get_config_value(&key).unwrap());
885 }
886
887 Ok(())
888 }
889
890 async fn get_table_options(ctx: &SessionContext, sql: &str) -> TableOptions {
893 let mut plan = ctx.state().create_logical_plan(sql).await.unwrap();
894
895 let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan else {
896 panic!("plan is not a CreateExternalTable");
897 };
898
899 let mut table_options = ctx.state().default_table_options();
900 table_options
901 .alter_with_string_hash_map(&cmd.options)
902 .unwrap();
903 table_options
904 }
905
906 async fn check_aws_envs() -> Result<()> {
907 let aws_envs = [
908 "AWS_ACCESS_KEY_ID",
909 "AWS_SECRET_ACCESS_KEY",
910 "AWS_REGION",
911 "AWS_ALLOW_HTTP",
912 ];
913 for aws_env in aws_envs {
914 std::env::var(aws_env).map_err(|_| {
915 exec_datafusion_err!("aws envs not set, skipping s3 tests")
916 })?;
917 }
918 Ok(())
919 }
920}