1use std::sync::{Arc, LazyLock};
2
3use object_store::ObjectStore;
4use object_store::local::LocalFileSystem;
5use polars_core::config::{self, verbose, verbose_print_sensitive};
6use polars_error::{PolarsError, PolarsResult, polars_bail, to_compute_err};
7use polars_utils::aliases::PlHashMap;
8use polars_utils::pl_path::{PlPath, PlRefPath};
9use polars_utils::pl_str::PlSmallStr;
10use polars_utils::{format_pl_smallstr, pl_serialize};
11use tokio::sync::RwLock;
12
13use super::{CloudLocation, CloudOptions, CloudType, PolarsObjectStore};
14use crate::cloud::{CloudConfig, CloudRetryConfig};
15
16#[allow(clippy::type_complexity)]
20static OBJECT_STORE_CACHE: LazyLock<RwLock<PlHashMap<Vec<u8>, PolarsObjectStore>>> =
21 LazyLock::new(Default::default);
22
23#[allow(dead_code)]
24fn err_missing_feature(
25 feature: &str,
26 cloud_type: &CloudType,
27) -> PolarsResult<Arc<dyn ObjectStore>> {
28 polars_bail!(
29 ComputeError:
30 "feature '{}' must be enabled in order to use '{:?}' cloud urls",
31 feature,
32 cloud_type,
33 );
34}
35
36fn path_and_creds_to_key(path: &PlPath, options: Option<&CloudOptions>) -> Vec<u8> {
38 let cloud_options = options.map(
40 |CloudOptions {
41 #[cfg(feature = "file_cache")]
43 file_cache_ttl,
44 config,
45 retry_config,
46 #[cfg(feature = "cloud")]
47 credential_provider,
48 }| {
49 CloudOptionsKey {
50 #[cfg(feature = "file_cache")]
51 file_cache_ttl: *file_cache_ttl,
52 config: config.clone(),
53 retry_config: *retry_config,
54 #[cfg(feature = "cloud")]
55 credential_provider: credential_provider.as_ref().map_or(0, |x| x.func_addr()),
56 }
57 },
58 );
59
60 let cache_key = CacheKey {
61 url_base: format_pl_smallstr!("{}", &path.as_str()[..path.authority_end_position()]),
62 cloud_options,
63 };
64
65 verbose_print_sensitive(|| {
66 format!(
67 "object store cache key for path at '{}': {:?}",
68 path, &cache_key
69 )
70 });
71
72 return pl_serialize::serialize_to_bytes::<_, false>(&cache_key).unwrap();
73
74 #[derive(Clone, Debug, PartialEq, Hash, Eq)]
75 #[cfg_attr(feature = "serde", derive(serde::Serialize))]
76 struct CacheKey {
77 url_base: PlSmallStr,
78 cloud_options: Option<CloudOptionsKey>,
79 }
80
81 #[derive(Clone, Debug, PartialEq, Hash, Eq)]
84 #[cfg_attr(feature = "serde", derive(serde::Serialize))]
85 struct CloudOptionsKey {
86 #[cfg(feature = "file_cache")]
87 file_cache_ttl: u64,
88 config: Option<CloudConfig>,
89 retry_config: CloudRetryConfig,
90 #[cfg(feature = "cloud")]
91 credential_provider: usize,
92 }
93}
94
95pub fn object_path_from_str(path: &str) -> PolarsResult<object_store::path::Path> {
97 object_store::path::Path::parse(path).map_err(to_compute_err)
98}
99
100#[derive(Debug, Clone)]
101pub(crate) struct PolarsObjectStoreBuilder {
102 path: PlRefPath,
103 cloud_type: CloudType,
104 options: Option<CloudOptions>,
105}
106
107impl PolarsObjectStoreBuilder {
108 pub(super) async fn build_impl(
109 &self,
110 clear_cached_credentials: bool,
112 ) -> PolarsResult<Arc<dyn ObjectStore>> {
113 let options = self
114 .options
115 .as_ref()
116 .unwrap_or_else(|| CloudOptions::default_static_ref());
117
118 if let Some(options) = &self.options
119 && verbose()
120 {
121 eprintln!(
122 "build object-store: file_cache_ttl: {}",
123 options.file_cache_ttl
124 )
125 }
126
127 let store = match self.cloud_type {
128 CloudType::Aws => {
129 #[cfg(feature = "aws")]
130 {
131 let store = options
132 .build_aws(self.path.clone(), clear_cached_credentials)
133 .await?;
134 Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
135 }
136 #[cfg(not(feature = "aws"))]
137 return err_missing_feature("aws", &self.cloud_type);
138 },
139 CloudType::Gcp => {
140 #[cfg(feature = "gcp")]
141 {
142 let store = options.build_gcp(self.path.clone(), clear_cached_credentials)?;
143
144 Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
145 }
146 #[cfg(not(feature = "gcp"))]
147 return err_missing_feature("gcp", &self.cloud_type);
148 },
149 CloudType::Azure => {
150 {
151 #[cfg(feature = "azure")]
152 {
153 let store =
154 options.build_azure(self.path.clone(), clear_cached_credentials)?;
155 Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
156 }
157 }
158 #[cfg(not(feature = "azure"))]
159 return err_missing_feature("azure", &self.cloud_type);
160 },
161 CloudType::File => {
162 let local = LocalFileSystem::new();
163 Ok::<_, PolarsError>(Arc::new(local) as Arc<dyn ObjectStore>)
164 },
165 CloudType::Http => {
166 {
167 #[cfg(feature = "http")]
168 {
169 let store = options.build_http(self.path.as_str())?;
170 PolarsResult::Ok(Arc::new(store) as Arc<dyn ObjectStore>)
171 }
172 }
173 #[cfg(not(feature = "http"))]
174 return err_missing_feature("http", &cloud_location.scheme);
175 },
176 CloudType::Hf => panic!("impl error: unresolved hf:// path"),
177 }?;
178
179 Ok(store)
180 }
181
182 pub(super) async fn build(self) -> PolarsResult<PolarsObjectStore> {
184 let opt_cache_key = match &self.cloud_type {
185 CloudType::Aws | CloudType::Gcp | CloudType::Azure => {
186 Some(path_and_creds_to_key(&self.path, self.options.as_ref()))
187 },
188 CloudType::File | CloudType::Http | CloudType::Hf => None,
189 };
190
191 let opt_cache_write_guard = if let Some(cache_key) = opt_cache_key.as_deref() {
192 let cache = OBJECT_STORE_CACHE.read().await;
193
194 if let Some(store) = cache.get(cache_key) {
195 return Ok(store.clone());
196 }
197
198 drop(cache);
199
200 let cache = OBJECT_STORE_CACHE.write().await;
201
202 if let Some(store) = cache.get(cache_key) {
203 return Ok(store.clone());
204 }
205
206 Some(cache)
207 } else {
208 None
209 };
210
211 let store = self.build_impl(false).await?;
212 let store = PolarsObjectStore::new_from_inner(store, self);
213
214 if let Some(mut cache) = opt_cache_write_guard {
215 if cache.len() >= 8 {
217 if config::verbose() {
218 eprintln!(
219 "build_object_store: clearing store cache (cache.len(): {})",
220 cache.len()
221 );
222 }
223 cache.clear()
224 }
225
226 cache.insert(opt_cache_key.unwrap(), store.clone());
227 }
228
229 Ok(store)
230 }
231
232 pub(crate) fn is_azure(&self) -> bool {
233 matches!(&self.cloud_type, CloudType::Azure)
234 }
235}
236
237pub async fn build_object_store(
239 path: PlRefPath,
240 #[cfg_attr(
241 not(any(feature = "aws", feature = "gcp", feature = "azure")),
242 allow(unused_variables)
243 )]
244 options: Option<&CloudOptions>,
245 glob: bool,
246) -> PolarsResult<(CloudLocation, PolarsObjectStore)> {
247 let path = path.to_absolute_path()?.into_owned();
248
249 let cloud_type = path
250 .scheme()
251 .map_or(CloudType::File, CloudType::from_cloud_scheme);
252 let cloud_location = CloudLocation::new(path.clone(), glob)?;
253
254 let store = PolarsObjectStoreBuilder {
255 path,
256 cloud_type,
257 options: options.cloned(),
258 }
259 .build()
260 .await?;
261
262 Ok((cloud_location, store))
263}
264
265mod test {
266 #[test]
267 fn test_object_path_from_str() {
268 use super::object_path_from_str;
269
270 let path = "%25";
271 let out = object_path_from_str(path).unwrap();
272
273 assert_eq!(out.as_ref(), path);
274 }
275}