1use std::sync::{Arc, LazyLock};
2
3use object_store::ObjectStore;
4use object_store::local::LocalFileSystem;
5use polars_core::config::{self, verbose, verbose_print_sensitive};
6use polars_error::{PolarsError, PolarsResult, polars_bail, to_compute_err};
7use polars_utils::aliases::PlHashMap;
8use polars_utils::pl_path::{PlPath, PlRefPath};
9use polars_utils::pl_str::PlSmallStr;
10use polars_utils::{format_pl_smallstr, pl_serialize};
11use tokio::sync::RwLock;
12
13use super::{CloudLocation, CloudOptions, CloudType, PolarsObjectStore};
14use crate::cloud::{CloudConfig, CloudRetryConfig};
15
16#[allow(clippy::type_complexity)]
20static OBJECT_STORE_CACHE: LazyLock<RwLock<PlHashMap<Vec<u8>, PolarsObjectStore>>> =
21 LazyLock::new(Default::default);
22
23#[allow(dead_code)]
24fn err_missing_feature(
25 feature: &str,
26 cloud_type: &CloudType,
27) -> PolarsResult<Arc<dyn ObjectStore>> {
28 polars_bail!(
29 ComputeError:
30 "feature '{}' must be enabled in order to use '{:?}' cloud urls",
31 feature,
32 cloud_type,
33 );
34}
35
36fn path_and_creds_to_key(path: &PlPath, options: Option<&CloudOptions>) -> PolarsResult<Vec<u8>> {
38 #[cfg(feature = "cloud")]
41 let credential_cache_key = CacheKeyBytes(
42 options
43 .and_then(|o| o.credential_provider.as_ref())
44 .map(|x| x.stable_cache_key())
45 .transpose()?
46 .unwrap_or_default(),
47 );
48
49 let cloud_options = options
50 .map(
51 |CloudOptions {
52 #[cfg(feature = "file_cache")]
54 file_cache_ttl,
55 config,
56 retry_config,
57 #[cfg(feature = "cloud")]
58 credential_provider: _,
59 }|
60 -> PolarsResult<CloudOptionsKey> {
61 Ok(CloudOptionsKey {
62 #[cfg(feature = "file_cache")]
63 file_cache_ttl: *file_cache_ttl,
64 config: config.clone(),
65 retry_config: *retry_config,
66 #[cfg(feature = "cloud")]
67 credential_provider: credential_cache_key,
68 })
69 },
70 )
71 .transpose()?;
72
73 let cache_key = CacheKey {
74 url_base: format_pl_smallstr!("{}", &path.as_str()[..path.authority_end_position()]),
75 cloud_options,
76 };
77
78 verbose_print_sensitive(|| {
79 format!(
80 "object store cache key for path at '{}': {:?}",
81 path, &cache_key
82 )
83 });
84
85 return pl_serialize::serialize_to_bytes::<_, false>(&cache_key);
86
87 #[derive(Clone, Debug, PartialEq, Hash, Eq)]
88 #[cfg_attr(feature = "serde", derive(serde::Serialize))]
89 struct CacheKey {
90 url_base: PlSmallStr,
91 cloud_options: Option<CloudOptionsKey>,
92 }
93
94 #[derive(Clone, PartialEq, Hash, Eq)]
95 #[cfg_attr(feature = "serde", derive(serde::Serialize))]
96 struct CacheKeyBytes(Vec<u8>);
97
98 impl std::fmt::Debug for CacheKeyBytes {
99 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100 if self.0.is_empty() {
101 write!(f, "None")
102 } else {
103 for b in &self.0 {
104 write!(f, "{:02x}", b)?;
105 }
106 Ok(())
107 }
108 }
109 }
110
111 #[derive(Clone, Debug, PartialEq, Hash, Eq)]
114 #[cfg_attr(feature = "serde", derive(serde::Serialize))]
115 struct CloudOptionsKey {
116 #[cfg(feature = "file_cache")]
117 file_cache_ttl: u64,
118 config: Option<CloudConfig>,
119 retry_config: CloudRetryConfig,
120 #[cfg(feature = "cloud")]
121 credential_provider: CacheKeyBytes,
122 }
123}
124
125pub fn object_path_from_str(path: &str) -> PolarsResult<object_store::path::Path> {
127 object_store::path::Path::parse(path).map_err(to_compute_err)
128}
129
130#[derive(Debug, Clone)]
131pub(crate) struct PolarsObjectStoreBuilder {
132 path: PlRefPath,
133 cloud_type: CloudType,
134 options: Option<CloudOptions>,
135}
136
137impl PolarsObjectStoreBuilder {
138 pub(super) fn path(&self) -> &PlRefPath {
139 &self.path
140 }
141
142 pub(super) async fn build_impl(
143 &self,
144 clear_cached_credentials: bool,
146 ) -> PolarsResult<Arc<dyn ObjectStore>> {
147 let options = self
148 .options
149 .as_ref()
150 .unwrap_or_else(|| CloudOptions::default_static_ref());
151
152 if let Some(options) = &self.options
153 && verbose()
154 {
155 eprintln!(
156 "build object-store: file_cache_ttl: {}",
157 options.file_cache_ttl
158 )
159 }
160
161 let store = match self.cloud_type {
162 CloudType::Aws => {
163 #[cfg(feature = "aws")]
164 {
165 let store = options
166 .build_aws(self.path.clone(), clear_cached_credentials)
167 .await?;
168 Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
169 }
170 #[cfg(not(feature = "aws"))]
171 return err_missing_feature("aws", &self.cloud_type);
172 },
173 CloudType::Gcp => {
174 #[cfg(feature = "gcp")]
175 {
176 let store = options.build_gcp(self.path.clone(), clear_cached_credentials)?;
177
178 Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
179 }
180 #[cfg(not(feature = "gcp"))]
181 return err_missing_feature("gcp", &self.cloud_type);
182 },
183 CloudType::Azure => {
184 {
185 #[cfg(feature = "azure")]
186 {
187 let store =
188 options.build_azure(self.path.clone(), clear_cached_credentials)?;
189 Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
190 }
191 }
192 #[cfg(not(feature = "azure"))]
193 return err_missing_feature("azure", &self.cloud_type);
194 },
195 CloudType::File => {
196 let local = LocalFileSystem::new();
197 Ok::<_, PolarsError>(Arc::new(local) as Arc<dyn ObjectStore>)
198 },
199 CloudType::Http => {
200 {
201 #[cfg(feature = "http")]
202 {
203 let store = options.build_http(self.path.clone())?;
204 PolarsResult::Ok(Arc::new(store) as Arc<dyn ObjectStore>)
205 }
206 }
207 #[cfg(not(feature = "http"))]
208 return err_missing_feature("http", &cloud_location.scheme);
209 },
210 CloudType::Hf => panic!("impl error: unresolved hf:// path"),
211 }?;
212
213 Ok(store)
214 }
215
216 pub(super) async fn build(self) -> PolarsResult<PolarsObjectStore> {
218 let opt_cache_key = match &self.cloud_type {
219 CloudType::Aws | CloudType::Gcp | CloudType::Azure => {
220 Some(path_and_creds_to_key(&self.path, self.options.as_ref())?)
221 },
222 CloudType::File | CloudType::Http | CloudType::Hf => None,
223 };
224
225 let opt_cache_write_guard = if let Some(cache_key) = opt_cache_key.as_deref() {
226 let cache = OBJECT_STORE_CACHE.read().await;
227
228 if let Some(store) = cache.get(cache_key) {
229 return Ok(store.clone());
230 }
231
232 drop(cache);
233
234 let cache = OBJECT_STORE_CACHE.write().await;
235
236 if let Some(store) = cache.get(cache_key) {
237 return Ok(store.clone());
238 }
239
240 Some(cache)
241 } else {
242 None
243 };
244
245 let store = self.build_impl(false).await?;
246 let store = PolarsObjectStore::new_from_inner(store, self);
247
248 if let Some(mut cache) = opt_cache_write_guard {
249 if cache.len() >= 8 {
251 if config::verbose() {
252 eprintln!(
253 "build_object_store: clearing store cache (cache.len(): {})",
254 cache.len()
255 );
256 }
257 cache.clear()
258 }
259
260 cache.insert(opt_cache_key.unwrap(), store.clone());
261 }
262
263 Ok(store)
264 }
265
266 pub(crate) fn is_azure(&self) -> bool {
267 matches!(&self.cloud_type, CloudType::Azure)
268 }
269}
270
271pub async fn build_object_store(
273 path: PlRefPath,
274 #[cfg_attr(
275 not(any(feature = "aws", feature = "gcp", feature = "azure")),
276 allow(unused_variables)
277 )]
278 options: Option<&CloudOptions>,
279 glob: bool,
280) -> PolarsResult<(CloudLocation, PolarsObjectStore)> {
281 let path = path.to_absolute_path()?.into_owned();
282
283 let cloud_type = path
284 .scheme()
285 .map_or(CloudType::File, CloudType::from_cloud_scheme);
286 let cloud_location = CloudLocation::new(path.clone(), glob)?;
287
288 let store = PolarsObjectStoreBuilder {
289 path,
290 cloud_type,
291 options: options.cloned(),
292 }
293 .build()
294 .await?;
295
296 Ok((cloud_location, store))
297}
298
299mod test {
300 #[test]
301 fn test_object_path_from_str() {
302 use super::object_path_from_str;
303
304 let path = "%25";
305 let out = object_path_from_str(path).unwrap();
306
307 assert_eq!(out.as_ref(), path);
308 }
309}