exon_common/
object_store_files_from_table_path.rs

1// Copyright 2023 WHERE TRUE Technologies.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::io::Result;
16
17use futures::{stream::BoxStream, StreamExt, TryStreamExt};
18use object_store::{path::Path, ObjectMeta, ObjectStore};
19
20/// Returns a stream of object store files for a given table path.
21pub async fn object_store_files_from_table_path<'a>(
22    store: &'a dyn ObjectStore,
23    url: &'a url::Url,
24    table_prefix: &'a Path,
25    file_extension: &'a str,
26    glob: Option<glob::Pattern>,
27) -> BoxStream<'a, Result<ObjectMeta>> {
28    let object_list = if url.as_str().ends_with('/') {
29        let list = store.list(Some(table_prefix));
30        list
31    } else {
32        futures::stream::once(store.head(table_prefix)).boxed()
33    };
34
35    object_list
36        .map_err(Into::into)
37        .try_filter(move |meta| {
38            let path = &meta.location;
39            let extension_match = path.as_ref().ends_with(file_extension);
40            let glob_match = match glob {
41                Some(ref glob) => glob.matches(path.as_ref()),
42                None => true,
43            };
44            futures::future::ready(extension_match && glob_match)
45        })
46        .boxed()
47}