1use std::any::Any;
19use std::fmt::{Debug, Display};
20use std::sync::Arc;
21
22use datafusion::common::config::{
23 ConfigEntry, ConfigExtension, ConfigField, ExtensionOptions, TableOptions, Visit,
24};
25use datafusion::common::{config_err, exec_datafusion_err, exec_err};
26use datafusion::error::{DataFusionError, Result};
27use datafusion::execution::context::SessionState;
28
29use async_trait::async_trait;
30use aws_config::BehaviorVersion;
31use aws_credential_types::provider::ProvideCredentials;
32use object_store::aws::{AmazonS3Builder, AwsCredential};
33use object_store::gcp::GoogleCloudStorageBuilder;
34use object_store::http::HttpBuilder;
35use object_store::{ClientOptions, CredentialProvider, ObjectStore};
36use url::Url;
37
38pub async fn get_s3_object_store_builder(
39 url: &Url,
40 aws_options: &AwsOptions,
41) -> Result<AmazonS3Builder> {
42 let AwsOptions {
43 access_key_id,
44 secret_access_key,
45 session_token,
46 region,
47 endpoint,
48 allow_http,
49 } = aws_options;
50
51 let bucket_name = get_bucket_name(url)?;
52 let mut builder = AmazonS3Builder::from_env().with_bucket_name(bucket_name);
53
54 if let (Some(access_key_id), Some(secret_access_key)) =
55 (access_key_id, secret_access_key)
56 {
57 builder = builder
58 .with_access_key_id(access_key_id)
59 .with_secret_access_key(secret_access_key);
60
61 if let Some(session_token) = session_token {
62 builder = builder.with_token(session_token);
63 }
64 } else {
65 let config = aws_config::defaults(BehaviorVersion::latest()).load().await;
66 if let Some(region) = config.region() {
67 builder = builder.with_region(region.to_string());
68 }
69
70 let credentials = config
71 .credentials_provider()
72 .ok_or_else(|| {
73 DataFusionError::ObjectStore(object_store::Error::Generic {
74 store: "S3",
75 source: "Failed to get S3 credentials from the environment".into(),
76 })
77 })?
78 .clone();
79
80 let credentials = Arc::new(S3CredentialProvider { credentials });
81 builder = builder.with_credentials(credentials);
82 }
83
84 if let Some(region) = region {
85 builder = builder.with_region(region);
86 }
87
88 if let Some(endpoint) = endpoint {
89 if let Ok(endpoint_url) = Url::try_from(endpoint.as_str()) {
92 if !matches!(allow_http, Some(true)) && endpoint_url.scheme() == "http" {
93 return config_err!(
94 "Invalid endpoint: {endpoint}. \
95 HTTP is not allowed for S3 endpoints. \
96 To allow HTTP, set 'aws.allow_http' to true"
97 );
98 }
99 }
100
101 builder = builder.with_endpoint(endpoint);
102 }
103
104 if let Some(allow_http) = allow_http {
105 builder = builder.with_allow_http(*allow_http);
106 }
107
108 Ok(builder)
109}
110
111#[derive(Debug)]
112struct S3CredentialProvider {
113 credentials: aws_credential_types::provider::SharedCredentialsProvider,
114}
115
116#[async_trait]
117impl CredentialProvider for S3CredentialProvider {
118 type Credential = AwsCredential;
119
120 async fn get_credential(&self) -> object_store::Result<Arc<Self::Credential>> {
121 let creds = self.credentials.provide_credentials().await.map_err(|e| {
122 object_store::Error::Generic {
123 store: "S3",
124 source: Box::new(e),
125 }
126 })?;
127 Ok(Arc::new(AwsCredential {
128 key_id: creds.access_key_id().to_string(),
129 secret_key: creds.secret_access_key().to_string(),
130 token: creds.session_token().map(ToString::to_string),
131 }))
132 }
133}
134
135pub fn get_oss_object_store_builder(
136 url: &Url,
137 aws_options: &AwsOptions,
138) -> Result<AmazonS3Builder> {
139 get_object_store_builder(url, aws_options, true)
140}
141
142pub fn get_cos_object_store_builder(
143 url: &Url,
144 aws_options: &AwsOptions,
145) -> Result<AmazonS3Builder> {
146 get_object_store_builder(url, aws_options, false)
147}
148
149fn get_object_store_builder(
150 url: &Url,
151 aws_options: &AwsOptions,
152 virtual_hosted_style_request: bool,
153) -> Result<AmazonS3Builder> {
154 let bucket_name = get_bucket_name(url)?;
155 let mut builder = AmazonS3Builder::from_env()
156 .with_virtual_hosted_style_request(virtual_hosted_style_request)
157 .with_bucket_name(bucket_name)
158 .with_region("do_not_care");
160
161 if let (Some(access_key_id), Some(secret_access_key)) =
162 (&aws_options.access_key_id, &aws_options.secret_access_key)
163 {
164 builder = builder
165 .with_access_key_id(access_key_id)
166 .with_secret_access_key(secret_access_key);
167 }
168
169 if let Some(endpoint) = &aws_options.endpoint {
170 builder = builder.with_endpoint(endpoint);
171 }
172
173 Ok(builder)
174}
175
176pub fn get_gcs_object_store_builder(
177 url: &Url,
178 gs_options: &GcpOptions,
179) -> Result<GoogleCloudStorageBuilder> {
180 let bucket_name = get_bucket_name(url)?;
181 let mut builder = GoogleCloudStorageBuilder::from_env().with_bucket_name(bucket_name);
182
183 if let Some(service_account_path) = &gs_options.service_account_path {
184 builder = builder.with_service_account_path(service_account_path);
185 }
186
187 if let Some(service_account_key) = &gs_options.service_account_key {
188 builder = builder.with_service_account_key(service_account_key);
189 }
190
191 if let Some(application_credentials_path) = &gs_options.application_credentials_path {
192 builder = builder.with_application_credentials(application_credentials_path);
193 }
194
195 Ok(builder)
196}
197
198fn get_bucket_name(url: &Url) -> Result<&str> {
199 url.host_str().ok_or_else(|| {
200 DataFusionError::Execution(format!(
201 "Not able to parse bucket name from url: {}",
202 url.as_str()
203 ))
204 })
205}
206
207#[derive(Default, Debug, Clone)]
209pub struct AwsOptions {
210 pub access_key_id: Option<String>,
212 pub secret_access_key: Option<String>,
214 pub session_token: Option<String>,
216 pub region: Option<String>,
218 pub endpoint: Option<String>,
220 pub allow_http: Option<bool>,
222}
223
224impl ExtensionOptions for AwsOptions {
225 fn as_any(&self) -> &dyn Any {
226 self
227 }
228
229 fn as_any_mut(&mut self) -> &mut dyn Any {
230 self
231 }
232
233 fn cloned(&self) -> Box<dyn ExtensionOptions> {
234 Box::new(self.clone())
235 }
236
237 fn set(&mut self, key: &str, value: &str) -> Result<()> {
238 let (_key, aws_key) = key.split_once('.').unwrap_or((key, ""));
239 let (key, rem) = aws_key.split_once('.').unwrap_or((aws_key, ""));
240 match key {
241 "access_key_id" => {
242 self.access_key_id.set(rem, value)?;
243 }
244 "secret_access_key" => {
245 self.secret_access_key.set(rem, value)?;
246 }
247 "session_token" => {
248 self.session_token.set(rem, value)?;
249 }
250 "region" => {
251 self.region.set(rem, value)?;
252 }
253 "oss" | "cos" | "endpoint" => {
254 self.endpoint.set(rem, value)?;
255 }
256 "allow_http" => {
257 self.allow_http.set(rem, value)?;
258 }
259 _ => {
260 return config_err!("Config value \"{}\" not found on AwsOptions", rem);
261 }
262 }
263 Ok(())
264 }
265
266 fn entries(&self) -> Vec<ConfigEntry> {
267 struct Visitor(Vec<ConfigEntry>);
268
269 impl Visit for Visitor {
270 fn some<V: Display>(
271 &mut self,
272 key: &str,
273 value: V,
274 description: &'static str,
275 ) {
276 self.0.push(ConfigEntry {
277 key: key.to_string(),
278 value: Some(value.to_string()),
279 description,
280 })
281 }
282
283 fn none(&mut self, key: &str, description: &'static str) {
284 self.0.push(ConfigEntry {
285 key: key.to_string(),
286 value: None,
287 description,
288 })
289 }
290 }
291
292 let mut v = Visitor(vec![]);
293 self.access_key_id.visit(&mut v, "access_key_id", "");
294 self.secret_access_key
295 .visit(&mut v, "secret_access_key", "");
296 self.session_token.visit(&mut v, "session_token", "");
297 self.region.visit(&mut v, "region", "");
298 self.endpoint.visit(&mut v, "endpoint", "");
299 self.allow_http.visit(&mut v, "allow_http", "");
300 v.0
301 }
302}
303
304impl ConfigExtension for AwsOptions {
305 const PREFIX: &'static str = "aws";
306}
307
308#[derive(Debug, Clone, Default)]
310pub struct GcpOptions {
311 pub service_account_path: Option<String>,
313 pub service_account_key: Option<String>,
315 pub application_credentials_path: Option<String>,
317}
318
319impl ExtensionOptions for GcpOptions {
320 fn as_any(&self) -> &dyn Any {
321 self
322 }
323
324 fn as_any_mut(&mut self) -> &mut dyn Any {
325 self
326 }
327
328 fn cloned(&self) -> Box<dyn ExtensionOptions> {
329 Box::new(self.clone())
330 }
331
332 fn set(&mut self, key: &str, value: &str) -> Result<()> {
333 let (_key, rem) = key.split_once('.').unwrap_or((key, ""));
334 match rem {
335 "service_account_path" => {
336 self.service_account_path.set(rem, value)?;
337 }
338 "service_account_key" => {
339 self.service_account_key.set(rem, value)?;
340 }
341 "application_credentials_path" => {
342 self.application_credentials_path.set(rem, value)?;
343 }
344 _ => {
345 return config_err!("Config value \"{}\" not found on GcpOptions", rem);
346 }
347 }
348 Ok(())
349 }
350
351 fn entries(&self) -> Vec<ConfigEntry> {
352 struct Visitor(Vec<ConfigEntry>);
353
354 impl Visit for Visitor {
355 fn some<V: Display>(
356 &mut self,
357 key: &str,
358 value: V,
359 description: &'static str,
360 ) {
361 self.0.push(ConfigEntry {
362 key: key.to_string(),
363 value: Some(value.to_string()),
364 description,
365 })
366 }
367
368 fn none(&mut self, key: &str, description: &'static str) {
369 self.0.push(ConfigEntry {
370 key: key.to_string(),
371 value: None,
372 description,
373 })
374 }
375 }
376
377 let mut v = Visitor(vec![]);
378 self.service_account_path
379 .visit(&mut v, "service_account_path", "");
380 self.service_account_key
381 .visit(&mut v, "service_account_key", "");
382 self.application_credentials_path.visit(
383 &mut v,
384 "application_credentials_path",
385 "",
386 );
387 v.0
388 }
389}
390
391impl ConfigExtension for GcpOptions {
392 const PREFIX: &'static str = "gcp";
393}
394
395pub(crate) async fn get_object_store(
396 state: &SessionState,
397 scheme: &str,
398 url: &Url,
399 table_options: &TableOptions,
400) -> Result<Arc<dyn ObjectStore>, DataFusionError> {
401 let store: Arc<dyn ObjectStore> = match scheme {
402 "s3" => {
403 let Some(options) = table_options.extensions.get::<AwsOptions>() else {
404 return exec_err!(
405 "Given table options incompatible with the 's3' scheme"
406 );
407 };
408 let builder = get_s3_object_store_builder(url, options).await?;
409 Arc::new(builder.build()?)
410 }
411 "oss" => {
412 let Some(options) = table_options.extensions.get::<AwsOptions>() else {
413 return exec_err!(
414 "Given table options incompatible with the 'oss' scheme"
415 );
416 };
417 let builder = get_oss_object_store_builder(url, options)?;
418 Arc::new(builder.build()?)
419 }
420 "cos" => {
421 let Some(options) = table_options.extensions.get::<AwsOptions>() else {
422 return exec_err!(
423 "Given table options incompatible with the 'cos' scheme"
424 );
425 };
426 let builder = get_cos_object_store_builder(url, options)?;
427 Arc::new(builder.build()?)
428 }
429 "gs" | "gcs" => {
430 let Some(options) = table_options.extensions.get::<GcpOptions>() else {
431 return exec_err!(
432 "Given table options incompatible with the 'gs'/'gcs' scheme"
433 );
434 };
435 let builder = get_gcs_object_store_builder(url, options)?;
436 Arc::new(builder.build()?)
437 }
438 "http" | "https" => Arc::new(
439 HttpBuilder::new()
440 .with_client_options(ClientOptions::new().with_allow_http(true))
441 .with_url(url.origin().ascii_serialization())
442 .build()?,
443 ),
444 _ => {
445 state
447 .runtime_env()
448 .object_store_registry
449 .get_store(url)
450 .map_err(|_| {
451 exec_datafusion_err!("Unsupported object store scheme: {}", scheme)
452 })?
453 }
454 };
455 Ok(store)
456}
457
458#[cfg(test)]
459mod tests {
460 use crate::cli_context::CliSessionContext;
461
462 use super::*;
463
464 use datafusion::common::plan_err;
465 use datafusion::{
466 datasource::listing::ListingTableUrl,
467 logical_expr::{DdlStatement, LogicalPlan},
468 prelude::SessionContext,
469 };
470
471 use object_store::{aws::AmazonS3ConfigKey, gcp::GoogleConfigKey};
472
473 #[tokio::test]
474 async fn s3_object_store_builder() -> Result<()> {
475 let access_key_id = "FAKE_access_key_id";
477 let secret_access_key = "FAKE_secret_access_key";
478 let region = "fake_us-east-2";
479 let endpoint = "endpoint33";
480 let session_token = "FAKE_session_token";
481 let location = "s3://bucket/path/FAKE/file.parquet";
482
483 let table_url = ListingTableUrl::parse(location)?;
484 let scheme = table_url.scheme();
485 let sql = format!(
486 "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS\
487 ('aws.access_key_id' '{access_key_id}', \
488 'aws.secret_access_key' '{secret_access_key}', \
489 'aws.region' '{region}', \
490 'aws.session_token' {session_token}, \
491 'aws.endpoint' '{endpoint}'\
492 ) LOCATION '{location}'"
493 );
494
495 let ctx = SessionContext::new();
496 let mut plan = ctx.state().create_logical_plan(&sql).await?;
497
498 if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan {
499 ctx.register_table_options_extension_from_scheme(scheme);
500 let mut table_options = ctx.state().default_table_options();
501 table_options.alter_with_string_hash_map(&cmd.options)?;
502 let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
503 let builder =
504 get_s3_object_store_builder(table_url.as_ref(), aws_options).await?;
505 let config = [
507 (AmazonS3ConfigKey::AccessKeyId, access_key_id),
508 (AmazonS3ConfigKey::SecretAccessKey, secret_access_key),
509 (AmazonS3ConfigKey::Region, region),
510 (AmazonS3ConfigKey::Endpoint, endpoint),
511 (AmazonS3ConfigKey::Token, session_token),
512 ];
513 for (key, value) in config {
514 assert_eq!(value, builder.get_config_value(&key).unwrap());
515 }
516 } else {
517 return plan_err!("LogicalPlan is not a CreateExternalTable");
518 }
519
520 Ok(())
521 }
522
523 #[tokio::test]
524 async fn s3_object_store_builder_allow_http_error() -> Result<()> {
525 let access_key_id = "fake_access_key_id";
526 let secret_access_key = "fake_secret_access_key";
527 let endpoint = "http://endpoint33";
528 let location = "s3://bucket/path/file.parquet";
529
530 let table_url = ListingTableUrl::parse(location)?;
531 let scheme = table_url.scheme();
532 let sql = format!(
533 "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS\
534 ('aws.access_key_id' '{access_key_id}', \
535 'aws.secret_access_key' '{secret_access_key}', \
536 'aws.endpoint' '{endpoint}'\
537 ) LOCATION '{location}'"
538 );
539
540 let ctx = SessionContext::new();
541 let mut plan = ctx.state().create_logical_plan(&sql).await?;
542
543 if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan {
544 ctx.register_table_options_extension_from_scheme(scheme);
545 let mut table_options = ctx.state().default_table_options();
546 table_options.alter_with_string_hash_map(&cmd.options)?;
547 let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
548 let err = get_s3_object_store_builder(table_url.as_ref(), aws_options)
549 .await
550 .unwrap_err();
551
552 assert_eq!(err.to_string().lines().next().unwrap_or_default(), "Invalid or Unsupported Configuration: Invalid endpoint: http://endpoint33. HTTP is not allowed for S3 endpoints. To allow HTTP, set 'aws.allow_http' to true");
553 } else {
554 return plan_err!("LogicalPlan is not a CreateExternalTable");
555 }
556
557 let sql = format!(
559 "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS\
560 ('aws.access_key_id' '{access_key_id}', \
561 'aws.secret_access_key' '{secret_access_key}', \
562 'aws.endpoint' '{endpoint}',\
563 'aws.allow_http' 'true'\
564 ) LOCATION '{location}'"
565 );
566
567 let mut plan = ctx.state().create_logical_plan(&sql).await?;
568
569 if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan {
570 ctx.register_table_options_extension_from_scheme(scheme);
571 let mut table_options = ctx.state().default_table_options();
572 table_options.alter_with_string_hash_map(&cmd.options)?;
573 let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
574 get_s3_object_store_builder(table_url.as_ref(), aws_options).await?;
576 } else {
577 return plan_err!("LogicalPlan is not a CreateExternalTable");
578 }
579
580 Ok(())
581 }
582
583 #[tokio::test]
584 async fn oss_object_store_builder() -> Result<()> {
585 let access_key_id = "fake_access_key_id";
586 let secret_access_key = "fake_secret_access_key";
587 let endpoint = "fake_endpoint";
588 let location = "oss://bucket/path/file.parquet";
589
590 let table_url = ListingTableUrl::parse(location)?;
591 let scheme = table_url.scheme();
592 let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS('aws.access_key_id' '{access_key_id}', 'aws.secret_access_key' '{secret_access_key}', 'aws.oss.endpoint' '{endpoint}') LOCATION '{location}'");
593
594 let ctx = SessionContext::new();
595 let mut plan = ctx.state().create_logical_plan(&sql).await?;
596
597 if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan {
598 ctx.register_table_options_extension_from_scheme(scheme);
599 let mut table_options = ctx.state().default_table_options();
600 table_options.alter_with_string_hash_map(&cmd.options)?;
601 let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
602 let builder = get_oss_object_store_builder(table_url.as_ref(), aws_options)?;
603 let config = [
605 (AmazonS3ConfigKey::AccessKeyId, access_key_id),
606 (AmazonS3ConfigKey::SecretAccessKey, secret_access_key),
607 (AmazonS3ConfigKey::Endpoint, endpoint),
608 ];
609 for (key, value) in config {
610 assert_eq!(value, builder.get_config_value(&key).unwrap());
611 }
612 } else {
613 return plan_err!("LogicalPlan is not a CreateExternalTable");
614 }
615
616 Ok(())
617 }
618
619 #[tokio::test]
620 async fn gcs_object_store_builder() -> Result<()> {
621 let service_account_path = "fake_service_account_path";
622 let service_account_key =
623 "{\"private_key\": \"fake_private_key.pem\",\"client_email\":\"fake_client_email\"}";
624 let application_credentials_path = "fake_application_credentials_path";
625 let location = "gcs://bucket/path/file.parquet";
626
627 let table_url = ListingTableUrl::parse(location)?;
628 let scheme = table_url.scheme();
629 let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS('gcp.service_account_path' '{service_account_path}', 'gcp.service_account_key' '{service_account_key}', 'gcp.application_credentials_path' '{application_credentials_path}') LOCATION '{location}'");
630
631 let ctx = SessionContext::new();
632 let mut plan = ctx.state().create_logical_plan(&sql).await?;
633
634 if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan {
635 ctx.register_table_options_extension_from_scheme(scheme);
636 let mut table_options = ctx.state().default_table_options();
637 table_options.alter_with_string_hash_map(&cmd.options)?;
638 let gcp_options = table_options.extensions.get::<GcpOptions>().unwrap();
639 let builder = get_gcs_object_store_builder(table_url.as_ref(), gcp_options)?;
640 let config = [
642 (GoogleConfigKey::ServiceAccount, service_account_path),
643 (GoogleConfigKey::ServiceAccountKey, service_account_key),
644 (
645 GoogleConfigKey::ApplicationCredentials,
646 application_credentials_path,
647 ),
648 ];
649 for (key, value) in config {
650 assert_eq!(value, builder.get_config_value(&key).unwrap());
651 }
652 } else {
653 return plan_err!("LogicalPlan is not a CreateExternalTable");
654 }
655
656 Ok(())
657 }
658}