Skip to main content

polars_io/cloud/
glob.rs

1use std::borrow::Cow;
2
3use futures::TryStreamExt;
4use object_store::path::Path;
5use polars_core::error::to_compute_err;
6use polars_error::{PolarsResult, polars_bail, polars_err};
7use polars_utils::pl_path::{CloudScheme, PlRefPath};
8use polars_utils::pl_str::PlSmallStr;
9use regex::Regex;
10
11use super::CloudOptions;
12
13/// Converts a glob to regex form.
14///
15/// # Returns
16/// 1. the prefix part (all path components until the first one with '*')
17/// 2. a regular expression representation of the rest.
18pub(crate) fn extract_prefix_expansion(path: &str) -> PolarsResult<(Cow<'_, str>, Option<String>)> {
19    // (offset, len, replacement)
20    let mut replacements: Vec<(usize, usize, &[u8])> = vec![];
21
22    // The position after the last slash before glob characters begin.
23    // `a/b/c*/`
24    //      ^
25    let mut pos: usize = if let Some(after_last_slash) =
26        memchr::memchr2(b'*', b'[', path.as_bytes()).map(|i| {
27            path.as_bytes()[..i]
28                .iter()
29                .rposition(|x| *x == b'/')
30                .map_or(0, |x| 1 + x)
31        }) {
32        // First value is used as the starting point later.
33        replacements.push((after_last_slash, 0, &[]));
34        after_last_slash
35    } else {
36        usize::MAX
37    };
38
39    while pos < path.len() {
40        match memchr::memchr2(b'*', b'.', &path.as_bytes()[pos..]) {
41            None => break,
42            Some(i) => pos += i,
43        }
44
45        let (len, replace): (usize, &[u8]) = match &path[pos..] {
46            // Accept:
47            // - `**/`
48            // - `**` only if it is the end of the path
49            v if v.starts_with("**") && (v.len() == 2 || v.as_bytes()[2] == b'/') => {
50                // Wrapping in a capture group ensures we also match non-nested paths.
51                (3, b"(.*/)?" as _)
52            },
53            v if v.starts_with("**") => {
54                polars_bail!(ComputeError: "invalid ** glob pattern")
55            },
56            v if v.starts_with('*') => (1, b"[^/]*" as _),
57            // Dots need to be escaped in regex.
58            v if v.starts_with('.') => (1, b"\\." as _),
59            _ => {
60                pos += 1;
61                continue;
62            },
63        };
64
65        replacements.push((pos, len, replace));
66        pos += len;
67    }
68
69    if replacements.is_empty() {
70        return Ok((Cow::Borrowed(path), None));
71    }
72
73    let prefix = Cow::Borrowed(&path[..replacements[0].0]);
74
75    let mut pos = replacements[0].0;
76    let mut expansion = Vec::with_capacity(path.len() - pos);
77    expansion.push(b'^');
78
79    for (offset, len, replace) in replacements {
80        expansion.extend_from_slice(&path.as_bytes()[pos..offset]);
81        expansion.extend_from_slice(replace);
82        pos = offset + len;
83    }
84
85    if pos < path.len() {
86        expansion.extend_from_slice(&path.as_bytes()[pos..]);
87    }
88
89    expansion.push(b'$');
90
91    Ok((prefix, Some(String::from_utf8(expansion).unwrap())))
92}
93
94/// A location on cloud storage, may have wildcards.
95#[derive(PartialEq, Debug, Default)]
96pub struct CloudLocation {
97    /// The scheme (s3, ...).
98    pub scheme: &'static str,
99    /// The bucket name.
100    pub bucket: PlSmallStr,
101    /// The prefix inside the bucket, this will be the full key when wildcards are not used.
102    pub prefix: String,
103    /// The path components that need to be expanded.
104    pub expansion: Option<PlSmallStr>,
105}
106
107impl CloudLocation {
108    pub fn new(path: PlRefPath, glob: bool) -> PolarsResult<Self> {
109        if let Some(scheme @ CloudScheme::Http | scheme @ CloudScheme::Https) = path.scheme() {
110            // Http/s does not use this
111            return Ok(CloudLocation {
112                scheme: scheme.as_str(),
113                ..Default::default()
114            });
115        }
116
117        let path_is_local = matches!(
118            path.scheme(),
119            None | Some(CloudScheme::File | CloudScheme::FileNoHostname)
120        );
121
122        let (bucket, key) = path
123            .strip_scheme_split_authority()
124            .ok_or(Cow::Borrowed(
125                "could not extract bucket/key (path did not contain '/')",
126            ))
127            .and_then(|x @ (bucket, _)| {
128                let bucket_is_empty = bucket.is_empty();
129
130                if path_is_local && !bucket_is_empty {
131                    Err(Cow::Owned(format!(
132                        "unsupported: non-empty hostname for 'file:' URI: '{bucket}'",
133                    )))
134                } else if bucket_is_empty && !path_is_local {
135                    Err(Cow::Borrowed("empty bucket name"))
136                } else {
137                    Ok(x)
138                }
139            })
140            .map_err(|failed_reason| {
141                polars_err!(
142                    ComputeError:
143                    "failed to create CloudLocation: {} (path: '{}')",
144                    failed_reason,
145                    path,
146                )
147            })?;
148
149        let key = if path_is_local {
150            key
151        } else {
152            key.strip_prefix('/').unwrap_or(key)
153        };
154
155        let (prefix, expansion) = if glob {
156            let (prefix, expansion) = extract_prefix_expansion(key)?;
157
158            assert_eq!(prefix.starts_with('/'), key.starts_with('/'));
159
160            (prefix, expansion.map(|x| x.into()))
161        } else {
162            (key.into(), None)
163        };
164
165        Ok(CloudLocation {
166            scheme: path.scheme().unwrap_or(CloudScheme::File).as_str(),
167            bucket: PlSmallStr::from_str(bucket),
168            prefix: prefix.into_owned(),
169            expansion,
170        })
171    }
172}
173
174/// Return a full url from a key relative to the given location.
175fn full_url(scheme: &str, bucket: &str, key: Path) -> String {
176    format!("{scheme}://{bucket}/{key}")
177}
178
179/// A simple matcher, if more is required consider depending on https://crates.io/crates/globset.
180/// The Cloud list api returns a list of all the file names under a prefix, there is no additional cost of `readdir`.
181pub(crate) struct Matcher {
182    prefix: String,
183    re: Option<Regex>,
184}
185
186impl Matcher {
187    /// Build a Matcher for the given prefix and expansion.
188    pub(crate) fn new(prefix: String, expansion: Option<&str>) -> PolarsResult<Matcher> {
189        // Cloud APIs accept a prefix without any expansion, extract it.
190        let re = expansion
191            .map(polars_utils::regex_cache::compile_regex)
192            .transpose()?;
193        Ok(Matcher { prefix, re })
194    }
195
196    pub(crate) fn is_matching(&self, key: &str) -> bool {
197        if !key.starts_with(self.prefix.as_str()) {
198            // Prefix does not match, should not happen.
199            return false;
200        }
201        if self.re.is_none() {
202            return true;
203        }
204        let last = &key[self.prefix.len()..];
205        self.re.as_ref().unwrap().is_match(last.as_ref())
206    }
207}
208
209/// List files with a prefix derived from the pattern.
210pub async fn glob(
211    url: PlRefPath,
212    cloud_options: Option<&CloudOptions>,
213) -> PolarsResult<Vec<String>> {
214    // Find the fixed prefix, up to the first '*'.
215
216    let (
217        CloudLocation {
218            scheme,
219            bucket,
220            prefix,
221            expansion,
222        },
223        store,
224    ) = super::build_object_store(url, cloud_options, true).await?;
225    let matcher = &Matcher::new(
226        if scheme == "file" {
227            // For local paths the returned location has the leading slash stripped.
228            prefix[1..].into()
229        } else {
230            prefix.clone()
231        },
232        expansion.as_deref(),
233    )?;
234
235    let path = Path::from(prefix.as_str());
236    let path = Some(&path);
237
238    let mut locations = store
239        .try_exec_rebuild_on_err(|store| {
240            let st = store.clone();
241
242            async {
243                let store = st;
244                store
245                    .list(path)
246                    .try_filter_map(|x| async move {
247                        let out = (x.size > 0 && matcher.is_matching(x.location.as_ref()))
248                            .then_some(x.location);
249                        Ok(out)
250                    })
251                    .try_collect::<Vec<_>>()
252                    .await
253                    .map_err(to_compute_err)
254            }
255        })
256        .await?;
257
258    locations.sort_unstable();
259    Ok(locations
260        .into_iter()
261        .map(|l| full_url(scheme, &bucket, l))
262        .collect::<Vec<_>>())
263}
264
265#[cfg(test)]
266mod test {
267    use super::*;
268
269    #[test]
270    fn test_cloud_location() {
271        assert_eq!(
272            CloudLocation::new(PlRefPath::new("s3://a/b"), true).unwrap(),
273            CloudLocation {
274                scheme: "s3",
275                bucket: "a".into(),
276                prefix: "b".into(),
277                expansion: None,
278            }
279        );
280        assert_eq!(
281            CloudLocation::new(PlRefPath::new("s3://a/b/*.c"), true).unwrap(),
282            CloudLocation {
283                scheme: "s3",
284                bucket: "a".into(),
285                prefix: "b/".into(),
286                expansion: Some("^[^/]*\\.c$".into()),
287            }
288        );
289        assert_eq!(
290            CloudLocation::new(PlRefPath::new("file:///a/b"), true).unwrap(),
291            CloudLocation {
292                scheme: "file",
293                bucket: "".into(),
294                prefix: "/a/b".into(),
295                expansion: None,
296            }
297        );
298        assert_eq!(
299            CloudLocation::new(PlRefPath::new("file:/a/b"), true).unwrap(),
300            CloudLocation {
301                scheme: "file",
302                bucket: "".into(),
303                prefix: "/a/b".into(),
304                expansion: None,
305            }
306        );
307    }
308
309    #[test]
310    fn test_extract_prefix_expansion() {
311        assert!(extract_prefix_expansion("**url").is_err());
312        assert_eq!(
313            extract_prefix_expansion("a/b.c").unwrap(),
314            ("a/b.c".into(), None)
315        );
316        assert_eq!(
317            extract_prefix_expansion("a/**").unwrap(),
318            ("a/".into(), Some("^(.*/)?$".into()))
319        );
320        assert_eq!(
321            extract_prefix_expansion("a/**/b").unwrap(),
322            ("a/".into(), Some("^(.*/)?b$".into()))
323        );
324        assert_eq!(
325            extract_prefix_expansion("a/**/*b").unwrap(),
326            ("a/".into(), Some("^(.*/)?[^/]*b$".into()))
327        );
328        assert_eq!(
329            extract_prefix_expansion("a/**/data/*b").unwrap(),
330            ("a/".into(), Some("^(.*/)?data/[^/]*b$".into()))
331        );
332        assert_eq!(
333            extract_prefix_expansion("a/*b").unwrap(),
334            ("a/".into(), Some("^[^/]*b$".into()))
335        );
336    }
337
338    #[test]
339    fn test_matcher_file_name() {
340        let cloud_location =
341            CloudLocation::new(PlRefPath::new("s3://bucket/folder/*.parquet"), true).unwrap();
342        let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
343        // Regular match.
344        assert!(a.is_matching(Path::from("folder/1.parquet").as_ref()));
345        // Require . in the file name.
346        assert!(!a.is_matching(Path::from("folder/1parquet").as_ref()));
347        // Intermediary folders are not allowed.
348        assert!(!a.is_matching(Path::from("folder/other/1.parquet").as_ref()));
349    }
350
351    #[test]
352    fn test_matcher_folders() {
353        let cloud_location =
354            CloudLocation::new(PlRefPath::new("s3://bucket/folder/**/*.parquet"), true).unwrap();
355
356        let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
357        // Intermediary folders are optional.
358        assert!(a.is_matching(Path::from("folder/1.parquet").as_ref()));
359        // Intermediary folders are allowed.
360        assert!(a.is_matching(Path::from("folder/other/1.parquet").as_ref()));
361
362        let cloud_location =
363            CloudLocation::new(PlRefPath::new("s3://bucket/folder/**/data/*.parquet"), true)
364                .unwrap();
365        let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
366
367        // Required folder `data` is missing.
368        assert!(!a.is_matching(Path::from("folder/1.parquet").as_ref()));
369        // Required folder is present.
370        assert!(a.is_matching(Path::from("folder/data/1.parquet").as_ref()));
371        // Required folder is present and additional folders are allowed.
372        assert!(a.is_matching(Path::from("folder/other/data/1.parquet").as_ref()));
373    }
374
375    #[test]
376    fn test_cloud_location_no_glob() {
377        let cloud_location = CloudLocation::new(PlRefPath::new("s3://bucket/[*"), false).unwrap();
378        assert_eq!(
379            cloud_location,
380            CloudLocation {
381                scheme: "s3",
382                bucket: "bucket".into(),
383                prefix: "[*".into(),
384                expansion: None,
385            },
386        )
387    }
388
389    #[test]
390    fn test_cloud_location_percentages() {
391        use super::CloudLocation;
392
393        let path = "s3://bucket/%25";
394        let cloud_location = CloudLocation::new(PlRefPath::new(path), true).unwrap();
395
396        assert_eq!(
397            cloud_location,
398            CloudLocation {
399                scheme: "s3",
400                bucket: "bucket".into(),
401                prefix: "%25".into(),
402                expansion: None,
403            }
404        );
405
406        let path = "https://pola.rs/%25";
407        let cloud_location = CloudLocation::new(PlRefPath::new(path), true).unwrap();
408
409        assert_eq!(
410            cloud_location,
411            CloudLocation {
412                scheme: "https",
413                bucket: "".into(),
414                prefix: "".into(),
415                expansion: None,
416            }
417        );
418    }
419
420    #[test]
421    fn test_glob_wildcard_21736() {
422        let path = "s3://bucket/folder/**/data.parquet";
423        let cloud_location = CloudLocation::new(PlRefPath::new(path), true).unwrap();
424
425        let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
426
427        assert!(!a.is_matching("folder/_data.parquet"));
428
429        assert!(a.is_matching("folder/data.parquet"));
430        assert!(a.is_matching("folder/abc/data.parquet"));
431        assert!(a.is_matching("folder/abc/def/data.parquet"));
432
433        let path = "s3://bucket/folder/data_*.parquet";
434        let cloud_location = CloudLocation::new(PlRefPath::new(path), true).unwrap();
435
436        let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
437
438        assert!(!a.is_matching("folder/data_1.ipc"))
439    }
440}