Skip to main content

polars_io/cloud/
object_store_setup.rs

1use std::sync::{Arc, LazyLock};
2
3use object_store::ObjectStore;
4use object_store::local::LocalFileSystem;
5use polars_core::config::{self, verbose, verbose_print_sensitive};
6use polars_error::{PolarsError, PolarsResult, polars_bail, to_compute_err};
7use polars_utils::aliases::PlHashMap;
8use polars_utils::pl_path::{PlPath, PlRefPath};
9use polars_utils::pl_str::PlSmallStr;
10use polars_utils::{format_pl_smallstr, pl_serialize};
11use tokio::sync::RwLock;
12
13use super::{CloudLocation, CloudOptions, CloudType, PolarsObjectStore};
14use crate::cloud::{CloudConfig, CloudRetryConfig};
15
16/// Object stores must be cached. Every object-store will do DNS lookups and
17/// get rate limited when querying the DNS (can take up to 5s).
18/// Other reasons are connection pools that must be shared between as much as possible.
19#[allow(clippy::type_complexity)]
20static OBJECT_STORE_CACHE: LazyLock<RwLock<PlHashMap<Vec<u8>, PolarsObjectStore>>> =
21    LazyLock::new(Default::default);
22
23#[allow(dead_code)]
24fn err_missing_feature(
25    feature: &str,
26    cloud_type: &CloudType,
27) -> PolarsResult<Arc<dyn ObjectStore>> {
28    polars_bail!(
29        ComputeError:
30        "feature '{}' must be enabled in order to use '{:?}' cloud urls",
31        feature,
32        cloud_type,
33    );
34}
35
36/// Get the key of a url for object store registration.
37fn path_and_creds_to_key(path: &PlPath, options: Option<&CloudOptions>) -> PolarsResult<Vec<u8>> {
38    // We include credentials as they can expire, so users will send new credentials for the same url.
39
40    #[cfg(feature = "cloud")]
41    let credential_cache_key = CacheKeyBytes(
42        options
43            .and_then(|o| o.credential_provider.as_ref())
44            .map(|x| x.stable_cache_key())
45            .transpose()?
46            .unwrap_or_default(),
47    );
48
49    let cloud_options = options
50        .map(
51            |CloudOptions {
52                 // Destructure to ensure this breaks if anything changes.
53                 #[cfg(feature = "file_cache")]
54                 file_cache_ttl,
55                 config,
56                 retry_config,
57                 #[cfg(feature = "cloud")]
58                     credential_provider: _,
59             }|
60             -> PolarsResult<CloudOptionsKey> {
61                Ok(CloudOptionsKey {
62                    #[cfg(feature = "file_cache")]
63                    file_cache_ttl: *file_cache_ttl,
64                    config: config.clone(),
65                    retry_config: *retry_config,
66                    #[cfg(feature = "cloud")]
67                    credential_provider: credential_cache_key,
68                })
69            },
70        )
71        .transpose()?;
72
73    let cache_key = CacheKey {
74        url_base: format_pl_smallstr!("{}", &path.as_str()[..path.authority_end_position()]),
75        cloud_options,
76    };
77
78    verbose_print_sensitive(|| {
79        format!(
80            "object store cache key for path at '{}': {:?}",
81            path, &cache_key
82        )
83    });
84
85    return pl_serialize::serialize_to_bytes::<_, false>(&cache_key);
86
87    #[derive(Clone, Debug, PartialEq, Hash, Eq)]
88    #[cfg_attr(feature = "serde", derive(serde::Serialize))]
89    struct CacheKey {
90        url_base: PlSmallStr,
91        cloud_options: Option<CloudOptionsKey>,
92    }
93
94    #[derive(Clone, PartialEq, Hash, Eq)]
95    #[cfg_attr(feature = "serde", derive(serde::Serialize))]
96    struct CacheKeyBytes(Vec<u8>);
97
98    impl std::fmt::Debug for CacheKeyBytes {
99        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100            if self.0.is_empty() {
101                write!(f, "None")
102            } else {
103                for b in &self.0 {
104                    write!(f, "{:02x}", b)?;
105                }
106                Ok(())
107            }
108        }
109    }
110
111    /// Variant of CloudOptions for serializing to a cache key. The credential
112    /// provider is replaced by the function address.
113    #[derive(Clone, Debug, PartialEq, Hash, Eq)]
114    #[cfg_attr(feature = "serde", derive(serde::Serialize))]
115    struct CloudOptionsKey {
116        #[cfg(feature = "file_cache")]
117        file_cache_ttl: u64,
118        config: Option<CloudConfig>,
119        retry_config: CloudRetryConfig,
120        #[cfg(feature = "cloud")]
121        credential_provider: CacheKeyBytes,
122    }
123}
124
125/// Construct an object_store `Path` from a string without any encoding/decoding.
126pub fn object_path_from_str(path: &str) -> PolarsResult<object_store::path::Path> {
127    object_store::path::Path::parse(path).map_err(to_compute_err)
128}
129
130#[derive(Debug, Clone)]
131pub(crate) struct PolarsObjectStoreBuilder {
132    path: PlRefPath,
133    cloud_type: CloudType,
134    options: Option<CloudOptions>,
135}
136
137impl PolarsObjectStoreBuilder {
138    pub(super) fn path(&self) -> &PlRefPath {
139        &self.path
140    }
141
142    pub(super) async fn build_impl(
143        &self,
144        // Whether to clear cached credentials for Python credential providers.
145        clear_cached_credentials: bool,
146    ) -> PolarsResult<Arc<dyn ObjectStore>> {
147        let options = self
148            .options
149            .as_ref()
150            .unwrap_or_else(|| CloudOptions::default_static_ref());
151
152        if let Some(options) = &self.options
153            && verbose()
154        {
155            eprintln!(
156                "build object-store: file_cache_ttl: {}",
157                options.file_cache_ttl
158            )
159        }
160
161        let store = match self.cloud_type {
162            CloudType::Aws => {
163                #[cfg(feature = "aws")]
164                {
165                    let store = options
166                        .build_aws(self.path.clone(), clear_cached_credentials)
167                        .await?;
168                    Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
169                }
170                #[cfg(not(feature = "aws"))]
171                return err_missing_feature("aws", &self.cloud_type);
172            },
173            CloudType::Gcp => {
174                #[cfg(feature = "gcp")]
175                {
176                    let store = options.build_gcp(self.path.clone(), clear_cached_credentials)?;
177
178                    Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
179                }
180                #[cfg(not(feature = "gcp"))]
181                return err_missing_feature("gcp", &self.cloud_type);
182            },
183            CloudType::Azure => {
184                {
185                    #[cfg(feature = "azure")]
186                    {
187                        let store =
188                            options.build_azure(self.path.clone(), clear_cached_credentials)?;
189                        Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
190                    }
191                }
192                #[cfg(not(feature = "azure"))]
193                return err_missing_feature("azure", &self.cloud_type);
194            },
195            CloudType::File => {
196                let local = LocalFileSystem::new();
197                Ok::<_, PolarsError>(Arc::new(local) as Arc<dyn ObjectStore>)
198            },
199            CloudType::Http => {
200                {
201                    #[cfg(feature = "http")]
202                    {
203                        let store = options.build_http(self.path.clone())?;
204                        PolarsResult::Ok(Arc::new(store) as Arc<dyn ObjectStore>)
205                    }
206                }
207                #[cfg(not(feature = "http"))]
208                return err_missing_feature("http", &cloud_location.scheme);
209            },
210            CloudType::Hf => panic!("impl error: unresolved hf:// path"),
211        }?;
212
213        Ok(store)
214    }
215
216    /// Note: Use `build_impl` for a non-caching version.
217    pub(super) async fn build(self) -> PolarsResult<PolarsObjectStore> {
218        let opt_cache_key = match &self.cloud_type {
219            CloudType::Aws | CloudType::Gcp | CloudType::Azure => {
220                Some(path_and_creds_to_key(&self.path, self.options.as_ref())?)
221            },
222            CloudType::File | CloudType::Http | CloudType::Hf => None,
223        };
224
225        let opt_cache_write_guard = if let Some(cache_key) = opt_cache_key.as_deref() {
226            let cache = OBJECT_STORE_CACHE.read().await;
227
228            if let Some(store) = cache.get(cache_key) {
229                return Ok(store.clone());
230            }
231
232            drop(cache);
233
234            let cache = OBJECT_STORE_CACHE.write().await;
235
236            if let Some(store) = cache.get(cache_key) {
237                return Ok(store.clone());
238            }
239
240            Some(cache)
241        } else {
242            None
243        };
244
245        let store = self.build_impl(false).await?;
246        let store = PolarsObjectStore::new_from_inner(store, self);
247
248        if let Some(mut cache) = opt_cache_write_guard {
249            // Clear the cache if we surpass a certain amount of buckets.
250            if cache.len() >= 8 {
251                if config::verbose() {
252                    eprintln!(
253                        "build_object_store: clearing store cache (cache.len(): {})",
254                        cache.len()
255                    );
256                }
257                cache.clear()
258            }
259
260            cache.insert(opt_cache_key.unwrap(), store.clone());
261        }
262
263        Ok(store)
264    }
265
266    pub(crate) fn is_azure(&self) -> bool {
267        matches!(&self.cloud_type, CloudType::Azure)
268    }
269}
270
271/// Build an [`ObjectStore`] based on the URL and passed in url. Return the cloud location and an implementation of the object store.
272pub async fn build_object_store(
273    path: PlRefPath,
274    #[cfg_attr(
275        not(any(feature = "aws", feature = "gcp", feature = "azure")),
276        allow(unused_variables)
277    )]
278    options: Option<&CloudOptions>,
279    glob: bool,
280) -> PolarsResult<(CloudLocation, PolarsObjectStore)> {
281    let path = path.to_absolute_path()?.into_owned();
282
283    let cloud_type = path
284        .scheme()
285        .map_or(CloudType::File, CloudType::from_cloud_scheme);
286    let cloud_location = CloudLocation::new(path.clone(), glob)?;
287
288    let store = PolarsObjectStoreBuilder {
289        path,
290        cloud_type,
291        options: options.cloned(),
292    }
293    .build()
294    .await?;
295
296    Ok((cloud_location, store))
297}
298
299mod test {
300    #[test]
301    fn test_object_path_from_str() {
302        use super::object_path_from_str;
303
304        let path = "%25";
305        let out = object_path_from_str(path).unwrap();
306
307        assert_eq!(out.as_ref(), path);
308    }
309}