1#[cfg(feature = "aws")]
2use std::io::Read;
3#[cfg(feature = "aws")]
4use std::path::Path;
5use std::str::FromStr;
6#[cfg(any(feature = "aws", feature = "gcp", feature = "azure", feature = "http"))]
7use std::sync::Arc;
8use std::sync::LazyLock;
9
10#[cfg(any(feature = "aws", feature = "gcp", feature = "azure", feature = "http"))]
11use object_store::ClientOptions;
12#[cfg(feature = "aws")]
13use object_store::aws::AmazonS3Builder;
14#[cfg(feature = "aws")]
15pub use object_store::aws::AmazonS3ConfigKey;
16#[cfg(feature = "azure")]
17pub use object_store::azure::AzureConfigKey;
18#[cfg(feature = "azure")]
19use object_store::azure::MicrosoftAzureBuilder;
20#[cfg(feature = "gcp")]
21use object_store::gcp::GoogleCloudStorageBuilder;
22#[cfg(feature = "gcp")]
23pub use object_store::gcp::GoogleConfigKey;
24use polars_error::*;
25#[cfg(feature = "aws")]
26use polars_utils::cache::LruCache;
27use polars_utils::pl_path::{CloudScheme, PlRefPath};
28use polars_utils::total_ord::TotalOrdWrap;
29#[cfg(feature = "http")]
30use reqwest::header::HeaderMap;
31#[cfg(feature = "serde")]
32use serde::{Deserialize, Serialize};
33
34#[cfg(feature = "cloud")]
35use super::credential_provider::PlCredentialProvider;
36#[cfg(feature = "cloud")]
37use super::dns::get_dns_cache_ttl;
38#[cfg(feature = "cloud")]
39use crate::cloud::ObjectStoreErrorContext;
40#[cfg(any(feature = "aws", feature = "gcp", feature = "azure", feature = "http"))]
41use crate::cloud::dns::CachingResolver;
42#[cfg(feature = "file_cache")]
43use crate::file_cache::get_env_file_cache_ttl;
44#[cfg(feature = "aws")]
45use crate::pl_async::with_concurrency_budget;
46
47#[cfg(feature = "aws")]
48static BUCKET_REGION: LazyLock<
49 std::sync::Mutex<LruCache<polars_utils::pl_str::PlSmallStr, polars_utils::pl_str::PlSmallStr>>,
50> = LazyLock::new(|| std::sync::Mutex::new(LruCache::with_capacity(32)));
51
52#[allow(dead_code)]
59type Configs<T> = Vec<(T, String)>;
60
61#[derive(Clone, Debug, PartialEq, Hash, Eq)]
62#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
63#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
64pub(crate) enum CloudConfig {
65 #[cfg(feature = "aws")]
66 Aws(
67 #[cfg_attr(feature = "dsl-schema", schemars(with = "Vec<(String, String)>"))]
68 Configs<AmazonS3ConfigKey>,
69 ),
70 #[cfg(feature = "azure")]
71 Azure(
72 #[cfg_attr(feature = "dsl-schema", schemars(with = "Vec<(String, String)>"))]
73 Configs<AzureConfigKey>,
74 ),
75 #[cfg(feature = "gcp")]
76 Gcp(
77 #[cfg_attr(feature = "dsl-schema", schemars(with = "Vec<(String, String)>"))]
78 Configs<GoogleConfigKey>,
79 ),
80 #[cfg(feature = "http")]
81 Http { headers: Vec<(String, String)> },
82}
83
84#[derive(Clone, Debug, PartialEq, Hash, Eq)]
85#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
86#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
87pub struct CloudOptions {
89 #[cfg(feature = "file_cache")]
90 pub file_cache_ttl: u64,
91 pub(crate) config: Option<CloudConfig>,
92 #[cfg_attr(feature = "serde", serde(default))]
93 pub retry_config: CloudRetryConfig,
94 #[cfg(feature = "cloud")]
95 pub(crate) credential_provider: Option<PlCredentialProvider>,
98}
99
100impl Default for CloudOptions {
101 fn default() -> Self {
102 Self::default_static_ref().clone()
103 }
104}
105
106impl CloudOptions {
107 pub fn default_static_ref() -> &'static Self {
108 static DEFAULT: LazyLock<CloudOptions> = LazyLock::new(|| CloudOptions {
109 #[cfg(feature = "file_cache")]
110 file_cache_ttl: get_env_file_cache_ttl(),
111 config: None,
112 retry_config: CloudRetryConfig::default(),
113 #[cfg(feature = "cloud")]
114 credential_provider: None,
115 });
116
117 &DEFAULT
118 }
119}
120
121#[derive(Clone, Copy, Default, Debug, PartialEq, Hash, Eq)]
122#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
123#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
124pub struct CloudRetryConfig {
125 pub max_retries: Option<usize>,
126 pub retry_timeout: Option<std::time::Duration>,
127 pub retry_init_backoff: Option<std::time::Duration>,
128 pub retry_max_backoff: Option<std::time::Duration>,
129 pub retry_base_multiplier: Option<TotalOrdWrap<f64>>,
130}
131
132#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
133impl From<CloudRetryConfig> for object_store::RetryConfig {
134 fn from(value: CloudRetryConfig) -> Self {
135 use std::time::Duration;
136
137 use polars_core::config::verbose;
138
139 let out = object_store::RetryConfig {
140 backoff: object_store::BackoffConfig {
141 init_backoff: value
142 .retry_init_backoff
143 .unwrap_or_else(|| DEFAULTS.backoff.init_backoff),
144 max_backoff: value
145 .retry_max_backoff
146 .unwrap_or_else(|| DEFAULTS.backoff.max_backoff),
147 base: value
148 .retry_base_multiplier
149 .map_or_else(|| DEFAULTS.backoff.base, |x| x.0),
150 },
151 max_retries: value.max_retries.unwrap_or_else(|| DEFAULTS.max_retries),
152 retry_timeout: value
153 .retry_timeout
154 .unwrap_or_else(|| DEFAULTS.retry_timeout),
155 };
156
157 if verbose() {
158 eprintln!("object-store retry config: {:?}", &out)
159 }
160
161 return out;
162
163 static DEFAULTS: LazyLock<object_store::RetryConfig> =
164 LazyLock::new(|| object_store::RetryConfig {
165 backoff: object_store::BackoffConfig {
166 init_backoff: Duration::from_millis(parse_env_var(
167 100,
168 "POLARS_CLOUD_RETRY_INIT_BACKOFF_MS",
169 )),
170 max_backoff: Duration::from_millis(parse_env_var(
171 15 * 1000,
172 "POLARS_CLOUD_RETRY_MAX_BACKOFF_MS",
173 )),
174 base: parse_env_var(2., "POLARS_CLOUD_RETRY_BASE_MULTIPLIER"),
175 },
176 max_retries: parse_env_var(2, "POLARS_CLOUD_MAX_RETRIES"),
177 retry_timeout: Duration::from_millis(parse_env_var(
178 10 * 1000,
179 "POLARS_CLOUD_RETRY_TIMEOUT_MS",
180 )),
181 });
182
183 fn parse_env_var<T: FromStr>(default: T, name: &'static str) -> T {
184 std::env::var(name).map_or(default, |x| {
185 x.parse::<T>()
186 .ok()
187 .unwrap_or_else(|| panic!("invalid value for {name}: {x}"))
188 })
189 }
190 }
191}
192
193#[cfg(feature = "http")]
194pub(crate) fn try_build_http_header_map_from_items_slice<S: AsRef<str>>(
195 headers: &[(S, S)],
196) -> PolarsResult<HeaderMap> {
197 use reqwest::header::{HeaderName, HeaderValue};
198
199 let mut map = HeaderMap::with_capacity(headers.len());
200 for (k, v) in headers {
201 let (k, v) = (k.as_ref(), v.as_ref());
202 map.insert(
203 HeaderName::from_str(k).map_err(to_compute_err)?,
204 HeaderValue::from_str(v).map_err(to_compute_err)?,
205 );
206 }
207
208 Ok(map)
209}
210
211#[allow(dead_code)]
212fn parse_untyped_config<T, I: IntoIterator<Item = (impl AsRef<str>, impl Into<String>)>>(
214 config: I,
215) -> PolarsResult<Configs<T>>
216where
217 T: FromStr + Eq + std::hash::Hash,
218{
219 Ok(config
220 .into_iter()
221 .filter_map(|(key, val)| {
223 T::from_str(key.as_ref().to_ascii_lowercase().as_str())
224 .ok()
225 .map(|typed_key| (typed_key, val.into()))
226 })
227 .collect::<Configs<T>>())
228}
229
230#[derive(Debug, Clone, PartialEq)]
231pub enum CloudType {
232 Aws,
233 Azure,
234 File,
236 Gcp,
238 Http,
239 Hf,
241}
242
243impl CloudType {
244 pub fn from_cloud_scheme(scheme: CloudScheme) -> Self {
245 match scheme {
246 CloudScheme::Abfs
247 | CloudScheme::Abfss
248 | CloudScheme::Adl
249 | CloudScheme::Az
250 | CloudScheme::Azure => Self::Azure,
251
252 CloudScheme::File | CloudScheme::FileNoHostname => Self::File,
253
254 CloudScheme::Gcs | CloudScheme::Gs => Self::Gcp,
255
256 CloudScheme::Hf => Self::Hf,
257
258 CloudScheme::Http | CloudScheme::Https => Self::Http,
259
260 CloudScheme::S3 | CloudScheme::S3a => Self::Aws,
261 }
262 }
263}
264
265pub static USER_AGENT: &str = concat!("polars", "/", env!("CARGO_PKG_VERSION"),);
266
267#[cfg(any(feature = "aws", feature = "gcp", feature = "azure", feature = "http"))]
268pub(super) fn get_client_options() -> ClientOptions {
269 use std::num::NonZeroU64;
270
271 use reqwest::header::HeaderValue;
272
273 ClientOptions::new()
274 .with_timeout_disabled()
276 .with_connect_timeout(std::time::Duration::from_secs(
278 std::env::var("POLARS_HTTP_CONNECT_TIMEOUT_SECONDS")
279 .map(|x| {
280 x.parse::<NonZeroU64>()
281 .ok()
282 .unwrap_or_else(|| {
283 panic!("invalid value for POLARS_HTTP_CONNECT_TIMEOUT_SECONDS: {x}")
284 })
285 .get()
286 })
287 .unwrap_or(5 * 60),
288 ))
289 .with_user_agent(HeaderValue::from_static(USER_AGENT))
290 .with_allow_http(true)
291 }
293
294#[cfg(feature = "aws")]
295fn read_config(
296 builder: &mut AmazonS3Builder,
297 items: &[(&Path, &[(&str, AmazonS3ConfigKey)])],
298) -> Option<()> {
299 use crate::path_utils::resolve_homedir;
300
301 for (path, keys) in items {
302 if keys
303 .iter()
304 .all(|(_, key)| builder.get_config_value(key).is_some())
305 {
306 continue;
307 }
308
309 let mut config = std::fs::File::open(resolve_homedir(path)).ok()?;
310 let mut buf = vec![];
311 config.read_to_end(&mut buf).ok()?;
312 let content = std::str::from_utf8(buf.as_ref()).ok()?;
313
314 for (pattern, key) in keys.iter() {
315 if builder.get_config_value(key).is_none() {
316 let reg = polars_utils::regex_cache::compile_regex(pattern).unwrap();
317 let cap = reg.captures(content)?;
318 let m = cap.get(1)?;
319 let parsed = m.as_str();
320 *builder = std::mem::take(builder).with_config(*key, parsed);
321 }
322 }
323 }
324 Some(())
325}
326
327impl CloudOptions {
328 pub fn with_retry_config(mut self, retry_config: CloudRetryConfig) -> Self {
329 self.retry_config = retry_config;
330 self
331 }
332
333 #[cfg(feature = "cloud")]
334 pub fn with_credential_provider(
335 mut self,
336 credential_provider: Option<PlCredentialProvider>,
337 ) -> Self {
338 self.credential_provider = credential_provider;
339 self
340 }
341
342 #[cfg(feature = "aws")]
344 pub fn with_aws<I: IntoIterator<Item = (AmazonS3ConfigKey, impl Into<String>)>>(
345 mut self,
346 configs: I,
347 ) -> Self {
348 self.config = Some(CloudConfig::Aws(
349 configs.into_iter().map(|(k, v)| (k, v.into())).collect(),
350 ));
351 self
352 }
353
354 #[cfg(feature = "aws")]
356 pub async fn build_aws(
357 &self,
358 url: PlRefPath,
359 clear_cached_credentials: bool,
360 ) -> PolarsResult<impl object_store::ObjectStore> {
361 use super::credential_provider::IntoCredentialProvider;
362
363 let opt_credential_provider =
364 self.initialized_credential_provider(clear_cached_credentials)?;
365
366 let mut builder = AmazonS3Builder::from_env()
367 .with_client_options(get_client_options())
368 .with_url(url.clone().to_string());
369
370 if let Some(credential_provider) = &opt_credential_provider {
371 let storage_update_options = parse_untyped_config::<AmazonS3ConfigKey, _>(
372 credential_provider
373 .storage_update_options()?
374 .into_iter()
375 .map(|(k, v)| (k, v.to_string())),
376 )?;
377
378 for (key, value) in storage_update_options {
379 builder = builder.with_config(key, value);
380 }
381 }
382
383 read_config(
384 &mut builder,
385 &[(
386 Path::new("~/.aws/config"),
387 &[("region\\s*=\\s*([^\r\n]*)", AmazonS3ConfigKey::Region)],
388 )],
389 );
390
391 read_config(
392 &mut builder,
393 &[(
394 Path::new("~/.aws/credentials"),
395 &[
396 (
397 "aws_access_key_id\\s*=\\s*([^\\r\\n]*)",
398 AmazonS3ConfigKey::AccessKeyId,
399 ),
400 (
401 "aws_secret_access_key\\s*=\\s*([^\\r\\n]*)",
402 AmazonS3ConfigKey::SecretAccessKey,
403 ),
404 (
405 "aws_session_token\\s*=\\s*([^\\r\\n]*)",
406 AmazonS3ConfigKey::Token,
407 ),
408 ],
409 )],
410 );
411
412 if let Some(options) = &self.config {
413 let CloudConfig::Aws(options) = options else {
414 panic!("impl error: cloud type mismatch")
415 };
416 for (key, value) in options {
417 builder = builder.with_config(*key, value);
418 }
419 }
420
421 if builder
422 .get_config_value(&AmazonS3ConfigKey::DefaultRegion)
423 .is_none()
424 && builder
425 .get_config_value(&AmazonS3ConfigKey::Region)
426 .is_none()
427 {
428 let bucket = crate::cloud::CloudLocation::new(url.clone(), false)?.bucket;
429 let region = {
430 let mut bucket_region = BUCKET_REGION.lock().unwrap();
431 bucket_region.get(bucket.as_str()).cloned()
432 };
433
434 match region {
435 Some(region) => {
436 builder = builder.with_config(AmazonS3ConfigKey::Region, region.as_str())
437 },
438 None => {
439 if builder
440 .get_config_value(&AmazonS3ConfigKey::Endpoint)
441 .is_some()
442 {
443 builder = builder.with_config(AmazonS3ConfigKey::Region, "us-east-1");
446 } else {
447 polars_warn!(
448 "'(default_)region' not set; polars will try to get it from bucket\n\nSet the region manually to silence this warning."
449 );
450 let result = with_concurrency_budget(1, || async {
451 reqwest::Client::builder()
452 .user_agent(USER_AGENT)
453 .build()
454 .unwrap()
455 .head(format!("https://{bucket}.s3.amazonaws.com"))
456 .send()
457 .await
458 .map_err(to_compute_err)
459 })
460 .await?;
461 if let Some(region) = result.headers().get("x-amz-bucket-region") {
462 let region =
463 std::str::from_utf8(region.as_bytes()).map_err(to_compute_err)?;
464 let mut bucket_region = BUCKET_REGION.lock().unwrap();
465 bucket_region.insert(bucket, region.into());
466 builder = builder.with_config(AmazonS3ConfigKey::Region, region)
467 }
468 }
469 },
470 };
471 };
472
473 let builder = builder.with_retry(self.retry_config.into());
474
475 let opt_credential_provider = match opt_credential_provider {
476 #[cfg(feature = "python")]
477 Some(PlCredentialProvider::Python(object)) => {
478 if pyo3::Python::attach(|py| {
479 let Ok(func_object) = object
480 .unwrap_as_provider_ref()
481 .getattr(py, "_can_use_as_provider")
482 else {
483 return PolarsResult::Ok(true);
484 };
485
486 Ok(func_object.call0(py)?.extract::<bool>(py).unwrap())
487 })? {
488 Some(PlCredentialProvider::Python(object))
489 } else {
490 None
491 }
492 },
493
494 v => v,
495 };
496
497 let builder = if let Some(credential_provider) = opt_credential_provider {
498 builder.with_credentials(credential_provider.into_aws_provider())
499 } else {
500 builder
501 };
502
503 let out = builder
504 .with_unsigned_payload(true)
506 .build()
507 .map_err(|e| ObjectStoreErrorContext::new(url).attach_err_info(e))?;
508
509 Ok(out)
510 }
511
512 #[cfg(feature = "azure")]
514 pub fn with_azure<I: IntoIterator<Item = (AzureConfigKey, impl Into<String>)>>(
515 mut self,
516 configs: I,
517 ) -> Self {
518 self.config = Some(CloudConfig::Azure(
519 configs.into_iter().map(|(k, v)| (k, v.into())).collect(),
520 ));
521 self
522 }
523
524 #[cfg(feature = "azure")]
526 pub fn build_azure(
527 &self,
528 url: PlRefPath,
529 clear_cached_credentials: bool,
530 ) -> PolarsResult<impl object_store::ObjectStore> {
531 use super::credential_provider::IntoCredentialProvider;
532 use crate::cloud::ObjectStoreErrorContext;
533
534 let verbose = polars_core::config::verbose();
535
536 let mut builder =
539 MicrosoftAzureBuilder::from_env().with_client_options(get_client_options());
540
541 if let Some(options) = &self.config {
542 let CloudConfig::Azure(options) = options else {
543 panic!("impl error: cloud type mismatch")
544 };
545 for (key, value) in options.iter() {
546 builder = builder.with_config(*key, value);
547 }
548 }
549
550 let builder = builder
551 .with_url(url.to_string())
552 .with_retry(self.retry_config.into());
553
554 let builder =
555 if let Some(v) = self.initialized_credential_provider(clear_cached_credentials)? {
556 if verbose {
557 eprintln!(
558 "[CloudOptions::build_azure]: Using credential provider {:?}",
559 &v
560 );
561 }
562 builder.with_credentials(v.into_azure_provider())
563 } else {
564 builder
565 };
566
567 let out = builder
568 .build()
569 .map_err(|e| ObjectStoreErrorContext::new(url).attach_err_info(e))?;
570
571 Ok(out)
572 }
573
574 #[cfg(feature = "gcp")]
576 pub fn with_gcp<I: IntoIterator<Item = (GoogleConfigKey, impl Into<String>)>>(
577 mut self,
578 configs: I,
579 ) -> Self {
580 self.config = Some(CloudConfig::Gcp(
581 configs.into_iter().map(|(k, v)| (k, v.into())).collect(),
582 ));
583 self
584 }
585
586 #[cfg(feature = "gcp")]
588 pub fn build_gcp(
589 &self,
590 url: PlRefPath,
591 clear_cached_credentials: bool,
592 ) -> PolarsResult<impl object_store::ObjectStore> {
593 use super::credential_provider::IntoCredentialProvider;
594
595 let credential_provider = self.initialized_credential_provider(clear_cached_credentials)?;
596
597 let builder = if credential_provider.is_none() {
598 GoogleCloudStorageBuilder::from_env()
599 } else {
600 GoogleCloudStorageBuilder::new()
601 };
602
603 let mut builder = builder.with_client_options(get_client_options());
604
605 if let Some(options) = &self.config {
606 let CloudConfig::Gcp(options) = options else {
607 panic!("impl error: cloud type mismatch")
608 };
609 for (key, value) in options.iter() {
610 builder = builder.with_config(*key, value);
611 }
612 }
613
614 let builder = builder
615 .with_url(url.to_string())
616 .with_retry(self.retry_config.into());
617
618 let builder = if let Some(v) = credential_provider {
619 builder.with_credentials(v.into_gcp_provider())
620 } else {
621 builder
622 };
623
624 let out = builder
625 .build()
626 .map_err(|e| ObjectStoreErrorContext::new(url).attach_err_info(e))?;
627
628 Ok(out)
629 }
630
631 #[cfg(feature = "http")]
632 pub fn build_http(&self, url: PlRefPath) -> PolarsResult<impl object_store::ObjectStore> {
633 let out = object_store::http::HttpBuilder::new()
634 .with_url(url.to_string())
635 .with_client_options({
636 let mut opts = super::get_client_options();
637 if let Some(CloudConfig::Http { headers }) = &self.config {
638 opts = opts.with_default_headers(try_build_http_header_map_from_items_slice(
639 headers.as_slice(),
640 )?);
641 }
642 opts
643 })
644 .build()
645 .map_err(|e| ObjectStoreErrorContext::new(url).attach_err_info(e))?;
646
647 Ok(out)
648 }
649
650 #[allow(unused_variables)]
652 pub fn from_untyped_config<I: IntoIterator<Item = (impl AsRef<str>, impl Into<String>)>>(
653 scheme: Option<CloudScheme>,
654 config: I,
655 ) -> PolarsResult<Self> {
656 match scheme.map_or(CloudType::File, CloudType::from_cloud_scheme) {
657 CloudType::Aws => {
658 #[cfg(feature = "aws")]
659 {
660 parse_untyped_config::<AmazonS3ConfigKey, _>(config)
661 .map(|aws| Self::default().with_aws(aws))
662 }
663 #[cfg(not(feature = "aws"))]
664 {
665 polars_bail!(ComputeError: "'aws' feature is not enabled");
666 }
667 },
668 CloudType::Azure => {
669 #[cfg(feature = "azure")]
670 {
671 parse_untyped_config::<AzureConfigKey, _>(config)
672 .map(|azure| Self::default().with_azure(azure))
673 }
674 #[cfg(not(feature = "azure"))]
675 {
676 polars_bail!(ComputeError: "'azure' feature is not enabled");
677 }
678 },
679 CloudType::File => Ok(Self::default()),
680 CloudType::Http => Ok(Self::default()),
681 CloudType::Gcp => {
682 #[cfg(feature = "gcp")]
683 {
684 parse_untyped_config::<GoogleConfigKey, _>(config)
685 .map(|gcp| Self::default().with_gcp(gcp))
686 }
687 #[cfg(not(feature = "gcp"))]
688 {
689 polars_bail!(ComputeError: "'gcp' feature is not enabled");
690 }
691 },
692 CloudType::Hf => {
693 #[cfg(feature = "http")]
694 {
695 use polars_core::config;
696
697 use crate::path_utils::resolve_homedir;
698
699 let mut this = Self::default();
700 let mut token = None;
701 let verbose = config::verbose();
702
703 for (i, (k, v)) in config.into_iter().enumerate() {
704 let (k, v) = (k.as_ref(), v.into());
705
706 if i == 0 && k == "token" {
707 if verbose {
708 eprintln!("HF token sourced from storage_options");
709 }
710 token = Some(v);
711 } else {
712 polars_bail!(ComputeError: "unknown configuration key for HF: {}", k)
713 }
714 }
715
716 token = token
717 .or_else(|| {
718 let v = std::env::var("HF_TOKEN").ok();
719 if v.is_some() && verbose {
720 eprintln!("HF token sourced from HF_TOKEN env var");
721 }
722 v
723 })
724 .or_else(|| {
725 let hf_home = std::env::var("HF_HOME");
726 let hf_home = hf_home.as_deref();
727 let hf_home = hf_home.unwrap_or("~/.cache/huggingface");
728 let hf_home = resolve_homedir(hf_home);
729 let cached_token_path = hf_home.join("token");
730
731 let v = std::string::String::from_utf8(
732 std::fs::read(&cached_token_path).ok()?,
733 )
734 .ok()
735 .filter(|x| !x.is_empty());
736
737 if v.is_some() && verbose {
738 eprintln!("HF token sourced from {:?}", cached_token_path);
739 }
740
741 v
742 });
743
744 if let Some(v) = token {
745 this.config = Some(CloudConfig::Http {
746 headers: vec![("Authorization".into(), format!("Bearer {v}"))],
747 })
748 }
749
750 Ok(this)
751 }
752 #[cfg(not(feature = "http"))]
753 {
754 polars_bail!(ComputeError: "'http' feature is not enabled");
755 }
756 },
757 }
758 }
759
760 #[cfg(feature = "cloud")]
763 fn initialized_credential_provider(
764 &self,
765 clear_cached_credentials: bool,
766 ) -> PolarsResult<Option<PlCredentialProvider>> {
767 if let Some(v) = self.credential_provider.clone() {
768 v.try_into_initialized(clear_cached_credentials)
769 } else {
770 Ok(None)
771 }
772 }
773}
774
775#[cfg(feature = "cloud")]
776#[cfg(test)]
777mod tests {
778 use hashbrown::HashMap;
779
780 use super::parse_untyped_config;
781
782 #[cfg(feature = "aws")]
783 #[test]
784 fn test_parse_untyped_config() {
785 use object_store::aws::AmazonS3ConfigKey;
786
787 let aws_config = [
788 ("aws_secret_access_key", "a_key"),
789 ("aws_s3_allow_unsafe_rename", "true"),
790 ]
791 .into_iter()
792 .collect::<HashMap<_, _>>();
793 let aws_keys = parse_untyped_config::<AmazonS3ConfigKey, _>(aws_config)
794 .expect("Parsing keys shouldn't have thrown an error");
795
796 assert_eq!(
797 aws_keys.first().unwrap().0,
798 AmazonS3ConfigKey::SecretAccessKey
799 );
800 assert_eq!(aws_keys.len(), 1);
801
802 let aws_config = [
803 ("AWS_SECRET_ACCESS_KEY", "a_key"),
804 ("aws_s3_allow_unsafe_rename", "true"),
805 ]
806 .into_iter()
807 .collect::<HashMap<_, _>>();
808 let aws_keys = parse_untyped_config::<AmazonS3ConfigKey, _>(aws_config)
809 .expect("Parsing keys shouldn't have thrown an error");
810
811 assert_eq!(
812 aws_keys.first().unwrap().0,
813 AmazonS3ConfigKey::SecretAccessKey
814 );
815 assert_eq!(aws_keys.len(), 1);
816 }
817}