1use std::borrow::Cow;
2
3use futures::TryStreamExt;
4use object_store::path::Path;
5use polars_core::error::to_compute_err;
6use polars_error::{PolarsResult, polars_bail, polars_err};
7use polars_utils::pl_path::{CloudScheme, PlRefPath};
8use polars_utils::pl_str::PlSmallStr;
9use regex::Regex;
10
11use super::CloudOptions;
12
13pub(crate) fn extract_prefix_expansion(path: &str) -> PolarsResult<(Cow<'_, str>, Option<String>)> {
19 let mut replacements: Vec<(usize, usize, &[u8])> = vec![];
21
22 let mut pos: usize = if let Some(after_last_slash) =
26 memchr::memchr2(b'*', b'[', path.as_bytes()).map(|i| {
27 path.as_bytes()[..i]
28 .iter()
29 .rposition(|x| *x == b'/')
30 .map_or(0, |x| 1 + x)
31 }) {
32 replacements.push((after_last_slash, 0, &[]));
34 after_last_slash
35 } else {
36 usize::MAX
37 };
38
39 while pos < path.len() {
40 match memchr::memchr2(b'*', b'.', &path.as_bytes()[pos..]) {
41 None => break,
42 Some(i) => pos += i,
43 }
44
45 let (len, replace): (usize, &[u8]) = match &path[pos..] {
46 v if v.starts_with("**") && (v.len() == 2 || v.as_bytes()[2] == b'/') => {
50 (3, b"(.*/)?" as _)
52 },
53 v if v.starts_with("**") => {
54 polars_bail!(ComputeError: "invalid ** glob pattern")
55 },
56 v if v.starts_with('*') => (1, b"[^/]*" as _),
57 v if v.starts_with('.') => (1, b"\\." as _),
59 _ => {
60 pos += 1;
61 continue;
62 },
63 };
64
65 replacements.push((pos, len, replace));
66 pos += len;
67 }
68
69 if replacements.is_empty() {
70 return Ok((Cow::Borrowed(path), None));
71 }
72
73 let prefix = Cow::Borrowed(&path[..replacements[0].0]);
74
75 let mut pos = replacements[0].0;
76 let mut expansion = Vec::with_capacity(path.len() - pos);
77 expansion.push(b'^');
78
79 for (offset, len, replace) in replacements {
80 expansion.extend_from_slice(&path.as_bytes()[pos..offset]);
81 expansion.extend_from_slice(replace);
82 pos = offset + len;
83 }
84
85 if pos < path.len() {
86 expansion.extend_from_slice(&path.as_bytes()[pos..]);
87 }
88
89 expansion.push(b'$');
90
91 Ok((prefix, Some(String::from_utf8(expansion).unwrap())))
92}
93
94#[derive(PartialEq, Debug, Default)]
96pub struct CloudLocation {
97 pub scheme: &'static str,
99 pub bucket: PlSmallStr,
101 pub prefix: String,
103 pub expansion: Option<PlSmallStr>,
105}
106
107impl CloudLocation {
108 pub fn new(path: PlRefPath, glob: bool) -> PolarsResult<Self> {
109 if let Some(scheme @ CloudScheme::Http | scheme @ CloudScheme::Https) = path.scheme() {
110 return Ok(CloudLocation {
112 scheme: scheme.as_str(),
113 ..Default::default()
114 });
115 }
116
117 let path_is_local = matches!(
118 path.scheme(),
119 None | Some(CloudScheme::File | CloudScheme::FileNoHostname)
120 );
121
122 let (bucket, key) = path
123 .strip_scheme_split_authority()
124 .ok_or(Cow::Borrowed(
125 "could not extract bucket/key (path did not contain '/')",
126 ))
127 .and_then(|x @ (bucket, _)| {
128 let bucket_is_empty = bucket.is_empty();
129
130 if path_is_local && !bucket_is_empty {
131 Err(Cow::Owned(format!(
132 "unsupported: non-empty hostname for 'file:' URI: '{bucket}'",
133 )))
134 } else if bucket_is_empty && !path_is_local {
135 Err(Cow::Borrowed("empty bucket name"))
136 } else {
137 Ok(x)
138 }
139 })
140 .map_err(|failed_reason| {
141 polars_err!(
142 ComputeError:
143 "failed to create CloudLocation: {} (path: '{}')",
144 failed_reason,
145 path,
146 )
147 })?;
148
149 let key = if path_is_local {
150 key
151 } else {
152 key.strip_prefix('/').unwrap_or(key)
153 };
154
155 let (prefix, expansion) = if glob {
156 let (prefix, expansion) = extract_prefix_expansion(key)?;
157
158 assert_eq!(prefix.starts_with('/'), key.starts_with('/'));
159
160 (prefix, expansion.map(|x| x.into()))
161 } else {
162 (key.into(), None)
163 };
164
165 Ok(CloudLocation {
166 scheme: path.scheme().unwrap_or(CloudScheme::File).as_str(),
167 bucket: PlSmallStr::from_str(bucket),
168 prefix: prefix.into_owned(),
169 expansion,
170 })
171 }
172}
173
174fn full_url(scheme: &str, bucket: &str, key: Path) -> String {
176 format!("{scheme}://{bucket}/{key}")
177}
178
179pub(crate) struct Matcher {
182 prefix: String,
183 re: Option<Regex>,
184}
185
186impl Matcher {
187 pub(crate) fn new(prefix: String, expansion: Option<&str>) -> PolarsResult<Matcher> {
189 let re = expansion
191 .map(polars_utils::regex_cache::compile_regex)
192 .transpose()?;
193 Ok(Matcher { prefix, re })
194 }
195
196 pub(crate) fn is_matching(&self, key: &str) -> bool {
197 if !key.starts_with(self.prefix.as_str()) {
198 return false;
200 }
201 if self.re.is_none() {
202 return true;
203 }
204 let last = &key[self.prefix.len()..];
205 self.re.as_ref().unwrap().is_match(last.as_ref())
206 }
207}
208
209pub async fn glob(
211 url: PlRefPath,
212 cloud_options: Option<&CloudOptions>,
213) -> PolarsResult<Vec<String>> {
214 let (
217 CloudLocation {
218 scheme,
219 bucket,
220 prefix,
221 expansion,
222 },
223 store,
224 ) = super::build_object_store(url, cloud_options, true).await?;
225 let matcher = &Matcher::new(
226 if scheme == "file" {
227 prefix[1..].into()
229 } else {
230 prefix.clone()
231 },
232 expansion.as_deref(),
233 )?;
234
235 let path = Path::from(prefix.as_str());
236 let path = Some(&path);
237
238 let mut locations = store
239 .try_exec_rebuild_on_err(|store| {
240 let st = store.clone();
241
242 async {
243 let store = st;
244 store
245 .list(path)
246 .try_filter_map(|x| async move {
247 let out = (x.size > 0 && matcher.is_matching(x.location.as_ref()))
248 .then_some(x.location);
249 Ok(out)
250 })
251 .try_collect::<Vec<_>>()
252 .await
253 .map_err(to_compute_err)
254 }
255 })
256 .await?;
257
258 locations.sort_unstable();
259 Ok(locations
260 .into_iter()
261 .map(|l| full_url(scheme, &bucket, l))
262 .collect::<Vec<_>>())
263}
264
265#[cfg(test)]
266mod test {
267 use super::*;
268
269 #[test]
270 fn test_cloud_location() {
271 assert_eq!(
272 CloudLocation::new(PlRefPath::new("s3://a/b"), true).unwrap(),
273 CloudLocation {
274 scheme: "s3",
275 bucket: "a".into(),
276 prefix: "b".into(),
277 expansion: None,
278 }
279 );
280 assert_eq!(
281 CloudLocation::new(PlRefPath::new("s3://a/b/*.c"), true).unwrap(),
282 CloudLocation {
283 scheme: "s3",
284 bucket: "a".into(),
285 prefix: "b/".into(),
286 expansion: Some("^[^/]*\\.c$".into()),
287 }
288 );
289 assert_eq!(
290 CloudLocation::new(PlRefPath::new("file:///a/b"), true).unwrap(),
291 CloudLocation {
292 scheme: "file",
293 bucket: "".into(),
294 prefix: "/a/b".into(),
295 expansion: None,
296 }
297 );
298 assert_eq!(
299 CloudLocation::new(PlRefPath::new("file:/a/b"), true).unwrap(),
300 CloudLocation {
301 scheme: "file",
302 bucket: "".into(),
303 prefix: "/a/b".into(),
304 expansion: None,
305 }
306 );
307 }
308
309 #[test]
310 fn test_extract_prefix_expansion() {
311 assert!(extract_prefix_expansion("**url").is_err());
312 assert_eq!(
313 extract_prefix_expansion("a/b.c").unwrap(),
314 ("a/b.c".into(), None)
315 );
316 assert_eq!(
317 extract_prefix_expansion("a/**").unwrap(),
318 ("a/".into(), Some("^(.*/)?$".into()))
319 );
320 assert_eq!(
321 extract_prefix_expansion("a/**/b").unwrap(),
322 ("a/".into(), Some("^(.*/)?b$".into()))
323 );
324 assert_eq!(
325 extract_prefix_expansion("a/**/*b").unwrap(),
326 ("a/".into(), Some("^(.*/)?[^/]*b$".into()))
327 );
328 assert_eq!(
329 extract_prefix_expansion("a/**/data/*b").unwrap(),
330 ("a/".into(), Some("^(.*/)?data/[^/]*b$".into()))
331 );
332 assert_eq!(
333 extract_prefix_expansion("a/*b").unwrap(),
334 ("a/".into(), Some("^[^/]*b$".into()))
335 );
336 }
337
338 #[test]
339 fn test_matcher_file_name() {
340 let cloud_location =
341 CloudLocation::new(PlRefPath::new("s3://bucket/folder/*.parquet"), true).unwrap();
342 let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
343 assert!(a.is_matching(Path::from("folder/1.parquet").as_ref()));
345 assert!(!a.is_matching(Path::from("folder/1parquet").as_ref()));
347 assert!(!a.is_matching(Path::from("folder/other/1.parquet").as_ref()));
349 }
350
351 #[test]
352 fn test_matcher_folders() {
353 let cloud_location =
354 CloudLocation::new(PlRefPath::new("s3://bucket/folder/**/*.parquet"), true).unwrap();
355
356 let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
357 assert!(a.is_matching(Path::from("folder/1.parquet").as_ref()));
359 assert!(a.is_matching(Path::from("folder/other/1.parquet").as_ref()));
361
362 let cloud_location =
363 CloudLocation::new(PlRefPath::new("s3://bucket/folder/**/data/*.parquet"), true)
364 .unwrap();
365 let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
366
367 assert!(!a.is_matching(Path::from("folder/1.parquet").as_ref()));
369 assert!(a.is_matching(Path::from("folder/data/1.parquet").as_ref()));
371 assert!(a.is_matching(Path::from("folder/other/data/1.parquet").as_ref()));
373 }
374
375 #[test]
376 fn test_cloud_location_no_glob() {
377 let cloud_location = CloudLocation::new(PlRefPath::new("s3://bucket/[*"), false).unwrap();
378 assert_eq!(
379 cloud_location,
380 CloudLocation {
381 scheme: "s3",
382 bucket: "bucket".into(),
383 prefix: "[*".into(),
384 expansion: None,
385 },
386 )
387 }
388
389 #[test]
390 fn test_cloud_location_percentages() {
391 use super::CloudLocation;
392
393 let path = "s3://bucket/%25";
394 let cloud_location = CloudLocation::new(PlRefPath::new(path), true).unwrap();
395
396 assert_eq!(
397 cloud_location,
398 CloudLocation {
399 scheme: "s3",
400 bucket: "bucket".into(),
401 prefix: "%25".into(),
402 expansion: None,
403 }
404 );
405
406 let path = "https://pola.rs/%25";
407 let cloud_location = CloudLocation::new(PlRefPath::new(path), true).unwrap();
408
409 assert_eq!(
410 cloud_location,
411 CloudLocation {
412 scheme: "https",
413 bucket: "".into(),
414 prefix: "".into(),
415 expansion: None,
416 }
417 );
418 }
419
420 #[test]
421 fn test_glob_wildcard_21736() {
422 let path = "s3://bucket/folder/**/data.parquet";
423 let cloud_location = CloudLocation::new(PlRefPath::new(path), true).unwrap();
424
425 let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
426
427 assert!(!a.is_matching("folder/_data.parquet"));
428
429 assert!(a.is_matching("folder/data.parquet"));
430 assert!(a.is_matching("folder/abc/data.parquet"));
431 assert!(a.is_matching("folder/abc/def/data.parquet"));
432
433 let path = "s3://bucket/folder/data_*.parquet";
434 let cloud_location = CloudLocation::new(PlRefPath::new(path), true).unwrap();
435
436 let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
437
438 assert!(!a.is_matching("folder/data_1.ipc"))
439 }
440}