Skip to main content

pyo3_object_store/
prefix.rs

1//! An object store wrapper handling a constant path prefix
2//! This was originally vendored from https://github.com/apache/arrow-rs/blob/3bf29a2c7474e59722d885cd11fafd0dca13a28e/object_store/src/prefix.rs#L4 so that we can access the raw `T` underlying the MaybePrefixedStore.
3//! It was further edited to use an `Option<Path>` internally so that we can apply a
4//! `MaybePrefixedStore` to all store classes.
5
6use bytes::Bytes;
7use futures::{stream::BoxStream, StreamExt, TryStreamExt};
8use http::{Extensions, Method};
9use object_store::signer::Signer;
10use std::borrow::Cow;
11use std::future::Future;
12use std::ops::Range;
13use std::pin::Pin;
14use std::sync::OnceLock;
15use std::time::Duration;
16use url::Url;
17
18use object_store::{path::Path, CopyOptions};
19use object_store::{
20    GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
21    PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
22};
23
24static DEFAULT_PATH: OnceLock<Path> = OnceLock::new();
25
26/// Store wrapper that applies a constant prefix to all paths handled by the store.
27#[derive(Debug, Clone)]
28pub struct MaybePrefixedStore<T: ObjectStore> {
29    prefix: Option<Path>,
30    inner: T,
31}
32
33impl<T: ObjectStore> std::fmt::Display for MaybePrefixedStore<T> {
34    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35        if let Some(prefix) = self.prefix.as_ref() {
36            write!(f, "PrefixObjectStore({prefix})")
37        } else {
38            write!(f, "ObjectStore")
39        }
40    }
41}
42
43impl<T: ObjectStore> MaybePrefixedStore<T> {
44    /// Create a new instance of [`MaybePrefixedStore`]
45    pub fn new(store: T, prefix: Option<impl Into<Path>>) -> Self {
46        Self {
47            prefix: prefix.map(|x| x.into()),
48            inner: store,
49        }
50    }
51
52    /// Access the underlying T under the MaybePrefixedStore
53    pub fn inner(&self) -> &T {
54        &self.inner
55    }
56
57    /// Create the full path from a path relative to prefix
58    fn full_path<'a>(&'a self, location: &'a Path) -> Cow<'a, Path> {
59        if let Some(prefix) = &self.prefix {
60            Cow::Owned(prefix.parts().chain(location.parts()).collect())
61        } else {
62            Cow::Borrowed(location)
63        }
64    }
65
66    /// Strip the constant prefix from a given path
67    fn strip_prefix(&self, path: Path) -> Path {
68        if let Some(prefix) = &self.prefix {
69            // Note cannot use match because of borrow checker
70            if let Some(suffix) = path.prefix_match(prefix) {
71                return suffix.collect();
72            }
73            path
74        } else {
75            path
76        }
77    }
78
79    /// Strip the constant prefix from a given ObjectMeta
80    fn strip_meta(&self, meta: ObjectMeta) -> ObjectMeta {
81        ObjectMeta {
82            last_modified: meta.last_modified,
83            size: meta.size,
84            location: self.strip_prefix(meta.location),
85            e_tag: meta.e_tag,
86            version: None,
87        }
88    }
89}
90
91// Note: This is a relative hack to move these two functions to pure functions so they don't rely
92// on the `self` lifetime. Expected to be cleaned up before merge.
93//
94/// Create the full path from a path relative to prefix
95fn full_path<'a>(prefix: Option<&'a Path>, location: &'a Path) -> Cow<'a, Path> {
96    if let Some(prefix) = prefix {
97        Cow::Owned(prefix.parts().chain(location.parts()).collect())
98    } else {
99        Cow::Borrowed(location)
100    }
101}
102
103/// Strip the constant prefix from a given path
104fn strip_prefix(prefix: Option<&Path>, path: Path) -> Path {
105    if let Some(prefix) = &prefix {
106        // Note cannot use match because of borrow checker
107        if let Some(suffix) = path.prefix_match(prefix) {
108            return suffix.collect();
109        }
110        path
111    } else {
112        path
113    }
114}
115
116/// Strip the constant prefix from a given ObjectMeta
117fn strip_meta(prefix: Option<&Path>, meta: ObjectMeta) -> ObjectMeta {
118    ObjectMeta {
119        last_modified: meta.last_modified,
120        size: meta.size,
121        location: strip_prefix(prefix, meta.location),
122        e_tag: meta.e_tag,
123        version: None,
124    }
125}
126#[async_trait::async_trait]
127impl<T: ObjectStore> ObjectStore for MaybePrefixedStore<T> {
128    async fn put_opts(
129        &self,
130        location: &Path,
131        payload: PutPayload,
132        opts: PutOptions,
133    ) -> Result<PutResult> {
134        let full_path = self.full_path(location);
135        self.inner.put_opts(&full_path, payload, opts).await
136    }
137
138    // Remove when updating to object_store 0.13
139    #[allow(deprecated)]
140    async fn put_multipart_opts(
141        &self,
142        location: &Path,
143        opts: PutMultipartOptions,
144    ) -> Result<Box<dyn MultipartUpload>> {
145        let full_path = self.full_path(location);
146        self.inner.put_multipart_opts(&full_path, opts).await
147    }
148
149    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
150        let full_path = self.full_path(location);
151        self.inner.get_opts(&full_path, options).await
152    }
153
154    async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
155        let full_path = self.full_path(location);
156        self.inner.get_ranges(&full_path, ranges).await
157    }
158
159    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
160        let prefix = self.full_path(prefix.unwrap_or(DEFAULT_PATH.get_or_init(Path::default)));
161        let s = self.inner.list(Some(&prefix));
162        let slf_prefix = self.prefix.clone();
163        s.map_ok(move |meta| strip_meta(slf_prefix.as_ref(), meta))
164            .boxed()
165    }
166
167    fn list_with_offset(
168        &self,
169        prefix: Option<&Path>,
170        offset: &Path,
171    ) -> BoxStream<'static, Result<ObjectMeta>> {
172        let offset = self.full_path(offset);
173        let prefix = self.full_path(prefix.unwrap_or(DEFAULT_PATH.get_or_init(Path::default)));
174        let s = self.inner.list_with_offset(Some(&prefix), &offset);
175        let slf_prefix = self.prefix.clone();
176        s.map_ok(move |meta| strip_meta(slf_prefix.as_ref(), meta))
177            .boxed()
178    }
179
180    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
181        let prefix = self.full_path(prefix.unwrap_or(DEFAULT_PATH.get_or_init(Path::default)));
182        self.inner
183            .list_with_delimiter(Some(&prefix))
184            .await
185            .map(|lst| ListResult {
186                common_prefixes: lst
187                    .common_prefixes
188                    .into_iter()
189                    .map(|p| self.strip_prefix(p))
190                    .collect(),
191                objects: lst
192                    .objects
193                    .into_iter()
194                    .map(|meta| self.strip_meta(meta))
195                    .collect(),
196                extensions: Extensions::default(),
197            })
198    }
199
200    async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> Result<()> {
201        let from_full = self.full_path(from);
202        let to_full = self.full_path(to);
203        self.inner.copy_opts(&from_full, &to_full, options).await
204    }
205
206    fn delete_stream(
207        &self,
208        locations: BoxStream<'static, Result<Path>>,
209    ) -> BoxStream<'static, Result<Path>> {
210        let prefix_owned = self.prefix.clone();
211        let locations = locations
212            .map(move |location| {
213                location.map(|loc| full_path(prefix_owned.as_ref(), &loc).into_owned())
214            })
215            .boxed();
216        let prefix = self.prefix.clone();
217        self.inner
218            .delete_stream(locations)
219            .map(move |location| location.map(|loc| strip_prefix(prefix.as_ref(), loc)))
220            .boxed()
221    }
222}
223
224impl<T: ObjectStore + Signer> Signer for MaybePrefixedStore<T> {
225    fn signed_url<'life0, 'life1, 'async_trait>(
226        &'life0 self,
227        method: Method,
228        path: &'life1 Path,
229        expires_in: Duration,
230    ) -> Pin<Box<dyn Future<Output = object_store::Result<Url>> + Send + 'async_trait>>
231    where
232        'life0: 'async_trait,
233        'life1: 'async_trait,
234        Self: 'async_trait,
235    {
236        let full = full_path(self.prefix.as_ref(), path).into_owned();
237        Box::pin(async move { self.inner.signed_url(method, &full, expires_in).await })
238    }
239
240    fn signed_urls<'life0, 'life1, 'async_trait>(
241        &'life0 self,
242        method: Method,
243        paths: &'life1 [Path],
244        expires_in: Duration,
245    ) -> Pin<Box<dyn Future<Output = Result<Vec<Url>>> + Send + 'async_trait>>
246    where
247        'life0: 'async_trait,
248        'life1: 'async_trait,
249        Self: 'async_trait,
250    {
251        let full_paths = paths
252            .iter()
253            .map(|path| full_path(self.prefix.as_ref(), path).into_owned())
254            .collect::<Vec<_>>();
255        Box::pin(async move {
256            self.inner
257                .signed_urls(method, &full_paths, expires_in)
258                .await
259        })
260    }
261}