Skip to main content

pyo3_object_store/
prefix.rs

1//! An object store wrapper handling a constant path prefix
2//! This was originally vendored from https://github.com/apache/arrow-rs/blob/3bf29a2c7474e59722d885cd11fafd0dca13a28e/object_store/src/prefix.rs#L4 so that we can access the raw `T` underlying the MaybePrefixedStore.
3//! It was further edited to use an `Option<Path>` internally so that we can apply a
4//! `MaybePrefixedStore` to all store classes.
5
6use bytes::Bytes;
7use futures::{stream::BoxStream, StreamExt, TryStreamExt};
8use std::borrow::Cow;
9use std::ops::Range;
10use std::sync::OnceLock;
11
12use object_store::path::Path;
13// Remove when updating to object_store 0.13
14#[allow(deprecated)]
15use object_store::{
16    GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOpts,
17    PutOptions, PutPayload, PutResult, Result,
18};
19
20static DEFAULT_PATH: OnceLock<Path> = OnceLock::new();
21
22/// Store wrapper that applies a constant prefix to all paths handled by the store.
23#[derive(Debug, Clone)]
24pub struct MaybePrefixedStore<T: ObjectStore> {
25    prefix: Option<Path>,
26    inner: T,
27}
28
29impl<T: ObjectStore> std::fmt::Display for MaybePrefixedStore<T> {
30    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31        if let Some(prefix) = self.prefix.as_ref() {
32            write!(f, "PrefixObjectStore({prefix})")
33        } else {
34            write!(f, "ObjectStore")
35        }
36    }
37}
38
39impl<T: ObjectStore> MaybePrefixedStore<T> {
40    /// Create a new instance of [`MaybePrefixedStore`]
41    pub fn new(store: T, prefix: Option<impl Into<Path>>) -> Self {
42        Self {
43            prefix: prefix.map(|x| x.into()),
44            inner: store,
45        }
46    }
47
48    /// Access the underlying T under the MaybePrefixedStore
49    pub fn inner(&self) -> &T {
50        &self.inner
51    }
52
53    /// Create the full path from a path relative to prefix
54    fn full_path<'a>(&'a self, location: &'a Path) -> Cow<'a, Path> {
55        if let Some(prefix) = &self.prefix {
56            Cow::Owned(prefix.parts().chain(location.parts()).collect())
57        } else {
58            Cow::Borrowed(location)
59        }
60    }
61
62    /// Strip the constant prefix from a given path
63    fn strip_prefix(&self, path: Path) -> Path {
64        if let Some(prefix) = &self.prefix {
65            // Note cannot use match because of borrow checker
66            if let Some(suffix) = path.prefix_match(prefix) {
67                return suffix.collect();
68            }
69            path
70        } else {
71            path
72        }
73    }
74
75    /// Strip the constant prefix from a given ObjectMeta
76    fn strip_meta(&self, meta: ObjectMeta) -> ObjectMeta {
77        ObjectMeta {
78            last_modified: meta.last_modified,
79            size: meta.size,
80            location: self.strip_prefix(meta.location),
81            e_tag: meta.e_tag,
82            version: None,
83        }
84    }
85}
86
87// Note: This is a relative hack to move these two functions to pure functions so they don't rely
88// on the `self` lifetime. Expected to be cleaned up before merge.
89//
90/// Strip the constant prefix from a given path
91fn strip_prefix(prefix: &Path, path: Path) -> Path {
92    // Note cannot use match because of borrow checker
93    if let Some(suffix) = path.prefix_match(prefix) {
94        return suffix.collect();
95    }
96    path
97}
98
99/// Strip the constant prefix from a given ObjectMeta
100fn strip_meta(prefix: Option<&Path>, meta: ObjectMeta) -> ObjectMeta {
101    if let Some(prefix) = prefix {
102        ObjectMeta {
103            last_modified: meta.last_modified,
104            size: meta.size,
105            location: strip_prefix(prefix, meta.location),
106            e_tag: meta.e_tag,
107            version: None,
108        }
109    } else {
110        meta
111    }
112}
113#[async_trait::async_trait]
114impl<T: ObjectStore> ObjectStore for MaybePrefixedStore<T> {
115    async fn put(&self, location: &Path, payload: PutPayload) -> Result<PutResult> {
116        let full_path = self.full_path(location);
117        self.inner.put(&full_path, payload).await
118    }
119
120    async fn put_opts(
121        &self,
122        location: &Path,
123        payload: PutPayload,
124        opts: PutOptions,
125    ) -> Result<PutResult> {
126        let full_path = self.full_path(location);
127        self.inner.put_opts(&full_path, payload, opts).await
128    }
129
130    async fn put_multipart(&self, location: &Path) -> Result<Box<dyn MultipartUpload>> {
131        let full_path = self.full_path(location);
132        self.inner.put_multipart(&full_path).await
133    }
134
135    // Remove when updating to object_store 0.13
136    #[allow(deprecated)]
137    async fn put_multipart_opts(
138        &self,
139        location: &Path,
140        opts: PutMultipartOpts,
141    ) -> Result<Box<dyn MultipartUpload>> {
142        let full_path = self.full_path(location);
143        self.inner.put_multipart_opts(&full_path, opts).await
144    }
145
146    async fn get(&self, location: &Path) -> Result<GetResult> {
147        let full_path = self.full_path(location);
148        self.inner.get(&full_path).await
149    }
150
151    async fn get_range(&self, location: &Path, range: Range<u64>) -> Result<Bytes> {
152        let full_path = self.full_path(location);
153        self.inner.get_range(&full_path, range).await
154    }
155
156    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
157        let full_path = self.full_path(location);
158        self.inner.get_opts(&full_path, options).await
159    }
160
161    async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
162        let full_path = self.full_path(location);
163        self.inner.get_ranges(&full_path, ranges).await
164    }
165
166    async fn head(&self, location: &Path) -> Result<ObjectMeta> {
167        let full_path = self.full_path(location);
168        let meta = self.inner.head(&full_path).await?;
169        Ok(self.strip_meta(meta))
170    }
171
172    async fn delete(&self, location: &Path) -> Result<()> {
173        let full_path = self.full_path(location);
174        self.inner.delete(&full_path).await
175    }
176
177    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
178        let prefix = self.full_path(prefix.unwrap_or(DEFAULT_PATH.get_or_init(Path::default)));
179        let s = self.inner.list(Some(&prefix));
180        let slf_prefix = self.prefix.clone();
181        s.map_ok(move |meta| strip_meta(slf_prefix.as_ref(), meta))
182            .boxed()
183    }
184
185    fn list_with_offset(
186        &self,
187        prefix: Option<&Path>,
188        offset: &Path,
189    ) -> BoxStream<'static, Result<ObjectMeta>> {
190        let offset = self.full_path(offset);
191        let prefix = self.full_path(prefix.unwrap_or(DEFAULT_PATH.get_or_init(Path::default)));
192        let s = self.inner.list_with_offset(Some(&prefix), &offset);
193        let slf_prefix = self.prefix.clone();
194        s.map_ok(move |meta| strip_meta(slf_prefix.as_ref(), meta))
195            .boxed()
196    }
197
198    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
199        let prefix = self.full_path(prefix.unwrap_or(DEFAULT_PATH.get_or_init(Path::default)));
200        self.inner
201            .list_with_delimiter(Some(&prefix))
202            .await
203            .map(|lst| ListResult {
204                common_prefixes: lst
205                    .common_prefixes
206                    .into_iter()
207                    .map(|p| self.strip_prefix(p))
208                    .collect(),
209                objects: lst
210                    .objects
211                    .into_iter()
212                    .map(|meta| self.strip_meta(meta))
213                    .collect(),
214            })
215    }
216
217    async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
218        let full_from = self.full_path(from);
219        let full_to = self.full_path(to);
220        self.inner.copy(&full_from, &full_to).await
221    }
222
223    async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
224        let full_from = self.full_path(from);
225        let full_to = self.full_path(to);
226        self.inner.rename(&full_from, &full_to).await
227    }
228
229    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
230        let full_from = self.full_path(from);
231        let full_to = self.full_path(to);
232        self.inner.copy_if_not_exists(&full_from, &full_to).await
233    }
234
235    async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
236        let full_from = self.full_path(from);
237        let full_to = self.full_path(to);
238        self.inner.rename_if_not_exists(&full_from, &full_to).await
239    }
240}