Skip to main content

polars_io/cloud/
options.rs

1#[cfg(feature = "aws")]
2use std::io::Read;
3#[cfg(feature = "aws")]
4use std::path::Path;
5use std::str::FromStr;
6#[cfg(any(feature = "aws", feature = "gcp", feature = "azure", feature = "http"))]
7use std::sync::Arc;
8use std::sync::LazyLock;
9
10#[cfg(any(feature = "aws", feature = "gcp", feature = "azure", feature = "http"))]
11use object_store::ClientOptions;
12#[cfg(feature = "aws")]
13use object_store::aws::AmazonS3Builder;
14#[cfg(feature = "aws")]
15pub use object_store::aws::AmazonS3ConfigKey;
16#[cfg(feature = "azure")]
17pub use object_store::azure::AzureConfigKey;
18#[cfg(feature = "azure")]
19use object_store::azure::MicrosoftAzureBuilder;
20#[cfg(feature = "gcp")]
21use object_store::gcp::GoogleCloudStorageBuilder;
22#[cfg(feature = "gcp")]
23pub use object_store::gcp::GoogleConfigKey;
24use polars_error::*;
25#[cfg(feature = "aws")]
26use polars_utils::cache::LruCache;
27use polars_utils::pl_path::{CloudScheme, PlRefPath};
28use polars_utils::total_ord::TotalOrdWrap;
29#[cfg(feature = "http")]
30use reqwest::header::HeaderMap;
31#[cfg(feature = "serde")]
32use serde::{Deserialize, Serialize};
33
34#[cfg(feature = "cloud")]
35use super::credential_provider::PlCredentialProvider;
36#[cfg(feature = "cloud")]
37use super::dns::get_dns_cache_ttl;
38#[cfg(feature = "cloud")]
39use crate::cloud::ObjectStoreErrorContext;
40#[cfg(any(feature = "aws", feature = "gcp", feature = "azure", feature = "http"))]
41use crate::cloud::dns::CachingResolver;
42#[cfg(feature = "file_cache")]
43use crate::file_cache::get_env_file_cache_ttl;
44#[cfg(feature = "aws")]
45use crate::pl_async::with_concurrency_budget;
46
47#[cfg(feature = "aws")]
48static BUCKET_REGION: LazyLock<
49    std::sync::Mutex<LruCache<polars_utils::pl_str::PlSmallStr, polars_utils::pl_str::PlSmallStr>>,
50> = LazyLock::new(|| std::sync::Mutex::new(LruCache::with_capacity(32)));
51
52/// The type of the config keys must satisfy the following requirements:
53/// 1. must be easily collected into a HashMap, the type required by the object_crate API.
54/// 2. be Serializable, required when the serde-lazy feature is defined.
55/// 3. not actually use HashMap since that type is disallowed in Polars for performance reasons.
56///
57/// Currently this type is a vector of pairs config key - config value.
58#[allow(dead_code)]
59type Configs<T> = Vec<(T, String)>;
60
61#[derive(Clone, Debug, PartialEq, Hash, Eq)]
62#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
63#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
64pub(crate) enum CloudConfig {
65    #[cfg(feature = "aws")]
66    Aws(
67        #[cfg_attr(feature = "dsl-schema", schemars(with = "Vec<(String, String)>"))]
68        Configs<AmazonS3ConfigKey>,
69    ),
70    #[cfg(feature = "azure")]
71    Azure(
72        #[cfg_attr(feature = "dsl-schema", schemars(with = "Vec<(String, String)>"))]
73        Configs<AzureConfigKey>,
74    ),
75    #[cfg(feature = "gcp")]
76    Gcp(
77        #[cfg_attr(feature = "dsl-schema", schemars(with = "Vec<(String, String)>"))]
78        Configs<GoogleConfigKey>,
79    ),
80    #[cfg(feature = "http")]
81    Http { headers: Vec<(String, String)> },
82}
83
84#[derive(Clone, Debug, PartialEq, Hash, Eq)]
85#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
86#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
87/// Options to connect to various cloud providers.
88pub struct CloudOptions {
89    #[cfg(feature = "file_cache")]
90    pub file_cache_ttl: u64,
91    pub(crate) config: Option<CloudConfig>,
92    #[cfg_attr(feature = "serde", serde(default))]
93    pub retry_config: CloudRetryConfig,
94    #[cfg(feature = "cloud")]
95    /// Note: In most cases you will want to access this via [`CloudOptions::initialized_credential_provider`]
96    /// rather than directly.
97    pub(crate) credential_provider: Option<PlCredentialProvider>,
98}
99
100impl Default for CloudOptions {
101    fn default() -> Self {
102        Self::default_static_ref().clone()
103    }
104}
105
106impl CloudOptions {
107    pub fn default_static_ref() -> &'static Self {
108        static DEFAULT: LazyLock<CloudOptions> = LazyLock::new(|| CloudOptions {
109            #[cfg(feature = "file_cache")]
110            file_cache_ttl: get_env_file_cache_ttl(),
111            config: None,
112            retry_config: CloudRetryConfig::default(),
113            #[cfg(feature = "cloud")]
114            credential_provider: None,
115        });
116
117        &DEFAULT
118    }
119}
120
121#[derive(Clone, Copy, Default, Debug, PartialEq, Hash, Eq)]
122#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
123#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
124pub struct CloudRetryConfig {
125    pub max_retries: Option<usize>,
126    pub retry_timeout: Option<std::time::Duration>,
127    pub retry_init_backoff: Option<std::time::Duration>,
128    pub retry_max_backoff: Option<std::time::Duration>,
129    pub retry_base_multiplier: Option<TotalOrdWrap<f64>>,
130}
131
132#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
133impl From<CloudRetryConfig> for object_store::RetryConfig {
134    fn from(value: CloudRetryConfig) -> Self {
135        use std::time::Duration;
136
137        use polars_core::config::verbose;
138
139        let out = object_store::RetryConfig {
140            backoff: object_store::BackoffConfig {
141                init_backoff: value
142                    .retry_init_backoff
143                    .unwrap_or_else(|| DEFAULTS.backoff.init_backoff),
144                max_backoff: value
145                    .retry_max_backoff
146                    .unwrap_or_else(|| DEFAULTS.backoff.max_backoff),
147                base: value
148                    .retry_base_multiplier
149                    .map_or_else(|| DEFAULTS.backoff.base, |x| x.0),
150            },
151            max_retries: value.max_retries.unwrap_or_else(|| DEFAULTS.max_retries),
152            retry_timeout: value
153                .retry_timeout
154                .unwrap_or_else(|| DEFAULTS.retry_timeout),
155        };
156
157        if verbose() {
158            eprintln!("object-store retry config: {:?}", &out)
159        }
160
161        return out;
162
163        static DEFAULTS: LazyLock<object_store::RetryConfig> =
164            LazyLock::new(|| object_store::RetryConfig {
165                backoff: object_store::BackoffConfig {
166                    init_backoff: Duration::from_millis(parse_env_var(
167                        100,
168                        "POLARS_CLOUD_RETRY_INIT_BACKOFF_MS",
169                    )),
170                    max_backoff: Duration::from_millis(parse_env_var(
171                        15 * 1000,
172                        "POLARS_CLOUD_RETRY_MAX_BACKOFF_MS",
173                    )),
174                    base: parse_env_var(2., "POLARS_CLOUD_RETRY_BASE_MULTIPLIER"),
175                },
176                max_retries: parse_env_var(2, "POLARS_CLOUD_MAX_RETRIES"),
177                retry_timeout: Duration::from_millis(parse_env_var(
178                    10 * 1000,
179                    "POLARS_CLOUD_RETRY_TIMEOUT_MS",
180                )),
181            });
182
183        fn parse_env_var<T: FromStr>(default: T, name: &'static str) -> T {
184            std::env::var(name).map_or(default, |x| {
185                x.parse::<T>()
186                    .ok()
187                    .unwrap_or_else(|| panic!("invalid value for {name}: {x}"))
188            })
189        }
190    }
191}
192
193#[cfg(feature = "http")]
194pub(crate) fn try_build_http_header_map_from_items_slice<S: AsRef<str>>(
195    headers: &[(S, S)],
196) -> PolarsResult<HeaderMap> {
197    use reqwest::header::{HeaderName, HeaderValue};
198
199    let mut map = HeaderMap::with_capacity(headers.len());
200    for (k, v) in headers {
201        let (k, v) = (k.as_ref(), v.as_ref());
202        map.insert(
203            HeaderName::from_str(k).map_err(to_compute_err)?,
204            HeaderValue::from_str(v).map_err(to_compute_err)?,
205        );
206    }
207
208    Ok(map)
209}
210
211#[allow(dead_code)]
212/// Parse an untype configuration hashmap to a typed configuration for the given configuration key type.
213fn parse_untyped_config<T, I: IntoIterator<Item = (impl AsRef<str>, impl Into<String>)>>(
214    config: I,
215) -> PolarsResult<Configs<T>>
216where
217    T: FromStr + Eq + std::hash::Hash,
218{
219    Ok(config
220        .into_iter()
221        // Silently ignores custom upstream storage_options
222        .filter_map(|(key, val)| {
223            T::from_str(key.as_ref().to_ascii_lowercase().as_str())
224                .ok()
225                .map(|typed_key| (typed_key, val.into()))
226        })
227        .collect::<Configs<T>>())
228}
229
230#[derive(Debug, Clone, PartialEq)]
231pub enum CloudType {
232    Aws,
233    Azure,
234    /// URI with 'file:' scheme
235    File,
236    /// Google cloud platform
237    Gcp,
238    Http,
239    /// HuggingFace
240    Hf,
241}
242
243impl CloudType {
244    pub fn from_cloud_scheme(scheme: CloudScheme) -> Self {
245        match scheme {
246            CloudScheme::Abfs
247            | CloudScheme::Abfss
248            | CloudScheme::Adl
249            | CloudScheme::Az
250            | CloudScheme::Azure => Self::Azure,
251
252            CloudScheme::File | CloudScheme::FileNoHostname => Self::File,
253
254            CloudScheme::Gcs | CloudScheme::Gs => Self::Gcp,
255
256            CloudScheme::Hf => Self::Hf,
257
258            CloudScheme::Http | CloudScheme::Https => Self::Http,
259
260            CloudScheme::S3 | CloudScheme::S3a => Self::Aws,
261        }
262    }
263}
264
265pub static USER_AGENT: &str = concat!("polars", "/", env!("CARGO_PKG_VERSION"),);
266
267#[cfg(any(feature = "aws", feature = "gcp", feature = "azure", feature = "http"))]
268pub(super) fn get_client_options() -> ClientOptions {
269    use std::num::NonZeroU64;
270
271    use reqwest::header::HeaderValue;
272
273    ClientOptions::new()
274        // Disables the time limit for downloading the response body.
275        .with_timeout_disabled()
276        // Set the time limit for establishing the connection.
277        .with_connect_timeout(std::time::Duration::from_secs(
278            std::env::var("POLARS_HTTP_CONNECT_TIMEOUT_SECONDS")
279                .map(|x| {
280                    x.parse::<NonZeroU64>()
281                        .ok()
282                        .unwrap_or_else(|| {
283                            panic!("invalid value for POLARS_HTTP_CONNECT_TIMEOUT_SECONDS: {x}")
284                        })
285                        .get()
286                })
287                .unwrap_or(5 * 60),
288        ))
289        .with_user_agent(HeaderValue::from_static(USER_AGENT))
290        .with_allow_http(true)
291    // .with_dns_resolver(Arc::new(CachingResolver::new(get_dns_cache_ttl())))
292}
293
294#[cfg(feature = "aws")]
295fn read_config(
296    builder: &mut AmazonS3Builder,
297    items: &[(&Path, &[(&str, AmazonS3ConfigKey)])],
298) -> Option<()> {
299    use crate::path_utils::resolve_homedir;
300
301    for (path, keys) in items {
302        if keys
303            .iter()
304            .all(|(_, key)| builder.get_config_value(key).is_some())
305        {
306            continue;
307        }
308
309        let mut config = std::fs::File::open(resolve_homedir(path)).ok()?;
310        let mut buf = vec![];
311        config.read_to_end(&mut buf).ok()?;
312        let content = std::str::from_utf8(buf.as_ref()).ok()?;
313
314        for (pattern, key) in keys.iter() {
315            if builder.get_config_value(key).is_none() {
316                let reg = polars_utils::regex_cache::compile_regex(pattern).unwrap();
317                let cap = reg.captures(content)?;
318                let m = cap.get(1)?;
319                let parsed = m.as_str();
320                *builder = std::mem::take(builder).with_config(*key, parsed);
321            }
322        }
323    }
324    Some(())
325}
326
327impl CloudOptions {
328    pub fn with_retry_config(mut self, retry_config: CloudRetryConfig) -> Self {
329        self.retry_config = retry_config;
330        self
331    }
332
333    #[cfg(feature = "cloud")]
334    pub fn with_credential_provider(
335        mut self,
336        credential_provider: Option<PlCredentialProvider>,
337    ) -> Self {
338        self.credential_provider = credential_provider;
339        self
340    }
341
342    /// Set the configuration for AWS connections. This is the preferred API from rust.
343    #[cfg(feature = "aws")]
344    pub fn with_aws<I: IntoIterator<Item = (AmazonS3ConfigKey, impl Into<String>)>>(
345        mut self,
346        configs: I,
347    ) -> Self {
348        self.config = Some(CloudConfig::Aws(
349            configs.into_iter().map(|(k, v)| (k, v.into())).collect(),
350        ));
351        self
352    }
353
354    /// Build the [`object_store::ObjectStore`] implementation for AWS.
355    #[cfg(feature = "aws")]
356    pub async fn build_aws(
357        &self,
358        url: PlRefPath,
359        clear_cached_credentials: bool,
360    ) -> PolarsResult<impl object_store::ObjectStore> {
361        use super::credential_provider::IntoCredentialProvider;
362
363        let opt_credential_provider =
364            self.initialized_credential_provider(clear_cached_credentials)?;
365
366        let mut builder = AmazonS3Builder::from_env()
367            .with_client_options(get_client_options())
368            .with_url(url.clone().to_string());
369
370        if let Some(credential_provider) = &opt_credential_provider {
371            let storage_update_options = parse_untyped_config::<AmazonS3ConfigKey, _>(
372                credential_provider
373                    .storage_update_options()?
374                    .into_iter()
375                    .map(|(k, v)| (k, v.to_string())),
376            )?;
377
378            for (key, value) in storage_update_options {
379                builder = builder.with_config(key, value);
380            }
381        }
382
383        read_config(
384            &mut builder,
385            &[(
386                Path::new("~/.aws/config"),
387                &[("region\\s*=\\s*([^\r\n]*)", AmazonS3ConfigKey::Region)],
388            )],
389        );
390
391        read_config(
392            &mut builder,
393            &[(
394                Path::new("~/.aws/credentials"),
395                &[
396                    (
397                        "aws_access_key_id\\s*=\\s*([^\\r\\n]*)",
398                        AmazonS3ConfigKey::AccessKeyId,
399                    ),
400                    (
401                        "aws_secret_access_key\\s*=\\s*([^\\r\\n]*)",
402                        AmazonS3ConfigKey::SecretAccessKey,
403                    ),
404                    (
405                        "aws_session_token\\s*=\\s*([^\\r\\n]*)",
406                        AmazonS3ConfigKey::Token,
407                    ),
408                ],
409            )],
410        );
411
412        if let Some(options) = &self.config {
413            let CloudConfig::Aws(options) = options else {
414                panic!("impl error: cloud type mismatch")
415            };
416            for (key, value) in options {
417                builder = builder.with_config(*key, value);
418            }
419        }
420
421        if builder
422            .get_config_value(&AmazonS3ConfigKey::DefaultRegion)
423            .is_none()
424            && builder
425                .get_config_value(&AmazonS3ConfigKey::Region)
426                .is_none()
427        {
428            let bucket = crate::cloud::CloudLocation::new(url.clone(), false)?.bucket;
429            let region = {
430                let mut bucket_region = BUCKET_REGION.lock().unwrap();
431                bucket_region.get(bucket.as_str()).cloned()
432            };
433
434            match region {
435                Some(region) => {
436                    builder = builder.with_config(AmazonS3ConfigKey::Region, region.as_str())
437                },
438                None => {
439                    if builder
440                        .get_config_value(&AmazonS3ConfigKey::Endpoint)
441                        .is_some()
442                    {
443                        // Set a default value if the endpoint is not aws.
444                        // See: #13042
445                        builder = builder.with_config(AmazonS3ConfigKey::Region, "us-east-1");
446                    } else {
447                        polars_warn!(
448                            "'(default_)region' not set; polars will try to get it from bucket\n\nSet the region manually to silence this warning."
449                        );
450                        let result = with_concurrency_budget(1, || async {
451                            reqwest::Client::builder()
452                                .user_agent(USER_AGENT)
453                                .build()
454                                .unwrap()
455                                .head(format!("https://{bucket}.s3.amazonaws.com"))
456                                .send()
457                                .await
458                                .map_err(to_compute_err)
459                        })
460                        .await?;
461                        if let Some(region) = result.headers().get("x-amz-bucket-region") {
462                            let region =
463                                std::str::from_utf8(region.as_bytes()).map_err(to_compute_err)?;
464                            let mut bucket_region = BUCKET_REGION.lock().unwrap();
465                            bucket_region.insert(bucket, region.into());
466                            builder = builder.with_config(AmazonS3ConfigKey::Region, region)
467                        }
468                    }
469                },
470            };
471        };
472
473        let builder = builder.with_retry(self.retry_config.into());
474
475        let opt_credential_provider = match opt_credential_provider {
476            #[cfg(feature = "python")]
477            Some(PlCredentialProvider::Python(object)) => {
478                if pyo3::Python::attach(|py| {
479                    let Ok(func_object) = object
480                        .unwrap_as_provider_ref()
481                        .getattr(py, "_can_use_as_provider")
482                    else {
483                        return PolarsResult::Ok(true);
484                    };
485
486                    Ok(func_object.call0(py)?.extract::<bool>(py).unwrap())
487                })? {
488                    Some(PlCredentialProvider::Python(object))
489                } else {
490                    None
491                }
492            },
493
494            v => v,
495        };
496
497        let builder = if let Some(credential_provider) = opt_credential_provider {
498            builder.with_credentials(credential_provider.into_aws_provider())
499        } else {
500            builder
501        };
502
503        let out = builder
504            // .with_checksum_algorithm(object_store::aws::Checksum::CRC64NVME)
505            .with_unsigned_payload(true)
506            .build()
507            .map_err(|e| ObjectStoreErrorContext::new(url).attach_err_info(e))?;
508
509        Ok(out)
510    }
511
512    /// Set the configuration for Azure connections. This is the preferred API from rust.
513    #[cfg(feature = "azure")]
514    pub fn with_azure<I: IntoIterator<Item = (AzureConfigKey, impl Into<String>)>>(
515        mut self,
516        configs: I,
517    ) -> Self {
518        self.config = Some(CloudConfig::Azure(
519            configs.into_iter().map(|(k, v)| (k, v.into())).collect(),
520        ));
521        self
522    }
523
524    /// Build the [`object_store::ObjectStore`] implementation for Azure.
525    #[cfg(feature = "azure")]
526    pub fn build_azure(
527        &self,
528        url: PlRefPath,
529        clear_cached_credentials: bool,
530    ) -> PolarsResult<impl object_store::ObjectStore> {
531        use super::credential_provider::IntoCredentialProvider;
532        use crate::cloud::ObjectStoreErrorContext;
533
534        let verbose = polars_core::config::verbose();
535
536        // The credential provider `self.credentials` is prioritized if it is set. We also need
537        // `from_env()` as it may source environment configured storage account name.
538        let mut builder =
539            MicrosoftAzureBuilder::from_env().with_client_options(get_client_options());
540
541        if let Some(options) = &self.config {
542            let CloudConfig::Azure(options) = options else {
543                panic!("impl error: cloud type mismatch")
544            };
545            for (key, value) in options.iter() {
546                builder = builder.with_config(*key, value);
547            }
548        }
549
550        let builder = builder
551            .with_url(url.to_string())
552            .with_retry(self.retry_config.into());
553
554        let builder =
555            if let Some(v) = self.initialized_credential_provider(clear_cached_credentials)? {
556                if verbose {
557                    eprintln!(
558                        "[CloudOptions::build_azure]: Using credential provider {:?}",
559                        &v
560                    );
561                }
562                builder.with_credentials(v.into_azure_provider())
563            } else {
564                builder
565            };
566
567        let out = builder
568            .build()
569            .map_err(|e| ObjectStoreErrorContext::new(url).attach_err_info(e))?;
570
571        Ok(out)
572    }
573
574    /// Set the configuration for GCP connections. This is the preferred API from rust.
575    #[cfg(feature = "gcp")]
576    pub fn with_gcp<I: IntoIterator<Item = (GoogleConfigKey, impl Into<String>)>>(
577        mut self,
578        configs: I,
579    ) -> Self {
580        self.config = Some(CloudConfig::Gcp(
581            configs.into_iter().map(|(k, v)| (k, v.into())).collect(),
582        ));
583        self
584    }
585
586    /// Build the [`object_store::ObjectStore`] implementation for GCP.
587    #[cfg(feature = "gcp")]
588    pub fn build_gcp(
589        &self,
590        url: PlRefPath,
591        clear_cached_credentials: bool,
592    ) -> PolarsResult<impl object_store::ObjectStore> {
593        use super::credential_provider::IntoCredentialProvider;
594
595        let credential_provider = self.initialized_credential_provider(clear_cached_credentials)?;
596
597        let builder = if credential_provider.is_none() {
598            GoogleCloudStorageBuilder::from_env()
599        } else {
600            GoogleCloudStorageBuilder::new()
601        };
602
603        let mut builder = builder.with_client_options(get_client_options());
604
605        if let Some(options) = &self.config {
606            let CloudConfig::Gcp(options) = options else {
607                panic!("impl error: cloud type mismatch")
608            };
609            for (key, value) in options.iter() {
610                builder = builder.with_config(*key, value);
611            }
612        }
613
614        let builder = builder
615            .with_url(url.to_string())
616            .with_retry(self.retry_config.into());
617
618        let builder = if let Some(v) = credential_provider {
619            builder.with_credentials(v.into_gcp_provider())
620        } else {
621            builder
622        };
623
624        let out = builder
625            .build()
626            .map_err(|e| ObjectStoreErrorContext::new(url).attach_err_info(e))?;
627
628        Ok(out)
629    }
630
631    #[cfg(feature = "http")]
632    pub fn build_http(&self, url: PlRefPath) -> PolarsResult<impl object_store::ObjectStore> {
633        let out = object_store::http::HttpBuilder::new()
634            .with_url(url.to_string())
635            .with_client_options({
636                let mut opts = super::get_client_options();
637                if let Some(CloudConfig::Http { headers }) = &self.config {
638                    opts = opts.with_default_headers(try_build_http_header_map_from_items_slice(
639                        headers.as_slice(),
640                    )?);
641                }
642                opts
643            })
644            .build()
645            .map_err(|e| ObjectStoreErrorContext::new(url).attach_err_info(e))?;
646
647        Ok(out)
648    }
649
650    /// Parse a configuration from a Hashmap. This is the interface from Python.
651    #[allow(unused_variables)]
652    pub fn from_untyped_config<I: IntoIterator<Item = (impl AsRef<str>, impl Into<String>)>>(
653        scheme: Option<CloudScheme>,
654        config: I,
655    ) -> PolarsResult<Self> {
656        match scheme.map_or(CloudType::File, CloudType::from_cloud_scheme) {
657            CloudType::Aws => {
658                #[cfg(feature = "aws")]
659                {
660                    parse_untyped_config::<AmazonS3ConfigKey, _>(config)
661                        .map(|aws| Self::default().with_aws(aws))
662                }
663                #[cfg(not(feature = "aws"))]
664                {
665                    polars_bail!(ComputeError: "'aws' feature is not enabled");
666                }
667            },
668            CloudType::Azure => {
669                #[cfg(feature = "azure")]
670                {
671                    parse_untyped_config::<AzureConfigKey, _>(config)
672                        .map(|azure| Self::default().with_azure(azure))
673                }
674                #[cfg(not(feature = "azure"))]
675                {
676                    polars_bail!(ComputeError: "'azure' feature is not enabled");
677                }
678            },
679            CloudType::File => Ok(Self::default()),
680            CloudType::Http => Ok(Self::default()),
681            CloudType::Gcp => {
682                #[cfg(feature = "gcp")]
683                {
684                    parse_untyped_config::<GoogleConfigKey, _>(config)
685                        .map(|gcp| Self::default().with_gcp(gcp))
686                }
687                #[cfg(not(feature = "gcp"))]
688                {
689                    polars_bail!(ComputeError: "'gcp' feature is not enabled");
690                }
691            },
692            CloudType::Hf => {
693                #[cfg(feature = "http")]
694                {
695                    use polars_core::config;
696
697                    use crate::path_utils::resolve_homedir;
698
699                    let mut this = Self::default();
700                    let mut token = None;
701                    let verbose = config::verbose();
702
703                    for (i, (k, v)) in config.into_iter().enumerate() {
704                        let (k, v) = (k.as_ref(), v.into());
705
706                        if i == 0 && k == "token" {
707                            if verbose {
708                                eprintln!("HF token sourced from storage_options");
709                            }
710                            token = Some(v);
711                        } else {
712                            polars_bail!(ComputeError: "unknown configuration key for HF: {}", k)
713                        }
714                    }
715
716                    token = token
717                        .or_else(|| {
718                            let v = std::env::var("HF_TOKEN").ok();
719                            if v.is_some() && verbose {
720                                eprintln!("HF token sourced from HF_TOKEN env var");
721                            }
722                            v
723                        })
724                        .or_else(|| {
725                            let hf_home = std::env::var("HF_HOME");
726                            let hf_home = hf_home.as_deref();
727                            let hf_home = hf_home.unwrap_or("~/.cache/huggingface");
728                            let hf_home = resolve_homedir(hf_home);
729                            let cached_token_path = hf_home.join("token");
730
731                            let v = std::string::String::from_utf8(
732                                std::fs::read(&cached_token_path).ok()?,
733                            )
734                            .ok()
735                            .filter(|x| !x.is_empty());
736
737                            if v.is_some() && verbose {
738                                eprintln!("HF token sourced from {:?}", cached_token_path);
739                            }
740
741                            v
742                        });
743
744                    if let Some(v) = token {
745                        this.config = Some(CloudConfig::Http {
746                            headers: vec![("Authorization".into(), format!("Bearer {v}"))],
747                        })
748                    }
749
750                    Ok(this)
751                }
752                #[cfg(not(feature = "http"))]
753                {
754                    polars_bail!(ComputeError: "'http' feature is not enabled");
755                }
756            },
757        }
758    }
759
760    /// Python passes a credential provider builder that needs to be called to get the actual credential
761    /// provider.
762    #[cfg(feature = "cloud")]
763    fn initialized_credential_provider(
764        &self,
765        clear_cached_credentials: bool,
766    ) -> PolarsResult<Option<PlCredentialProvider>> {
767        if let Some(v) = self.credential_provider.clone() {
768            v.try_into_initialized(clear_cached_credentials)
769        } else {
770            Ok(None)
771        }
772    }
773}
774
775#[cfg(feature = "cloud")]
776#[cfg(test)]
777mod tests {
778    use hashbrown::HashMap;
779
780    use super::parse_untyped_config;
781
782    #[cfg(feature = "aws")]
783    #[test]
784    fn test_parse_untyped_config() {
785        use object_store::aws::AmazonS3ConfigKey;
786
787        let aws_config = [
788            ("aws_secret_access_key", "a_key"),
789            ("aws_s3_allow_unsafe_rename", "true"),
790        ]
791        .into_iter()
792        .collect::<HashMap<_, _>>();
793        let aws_keys = parse_untyped_config::<AmazonS3ConfigKey, _>(aws_config)
794            .expect("Parsing keys shouldn't have thrown an error");
795
796        assert_eq!(
797            aws_keys.first().unwrap().0,
798            AmazonS3ConfigKey::SecretAccessKey
799        );
800        assert_eq!(aws_keys.len(), 1);
801
802        let aws_config = [
803            ("AWS_SECRET_ACCESS_KEY", "a_key"),
804            ("aws_s3_allow_unsafe_rename", "true"),
805        ]
806        .into_iter()
807        .collect::<HashMap<_, _>>();
808        let aws_keys = parse_untyped_config::<AmazonS3ConfigKey, _>(aws_config)
809            .expect("Parsing keys shouldn't have thrown an error");
810
811        assert_eq!(
812            aws_keys.first().unwrap().0,
813            AmazonS3ConfigKey::SecretAccessKey
814        );
815        assert_eq!(aws_keys.len(), 1);
816    }
817}