object_store/
prefix.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! An object store wrapper handling a constant path prefix
19use bytes::Bytes;
20use futures::{stream::BoxStream, StreamExt, TryStreamExt};
21use std::ops::Range;
22
23use crate::path::Path;
24use crate::{
25    GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
26    PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
27};
28
29/// Store wrapper that applies a constant prefix to all paths handled by the store.
30#[derive(Debug, Clone)]
31pub struct PrefixStore<T: ObjectStore> {
32    prefix: Path,
33    inner: T,
34}
35
36impl<T: ObjectStore> std::fmt::Display for PrefixStore<T> {
37    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38        write!(f, "PrefixObjectStore({})", self.prefix.as_ref())
39    }
40}
41
42impl<T: ObjectStore> PrefixStore<T> {
43    /// Create a new instance of [`PrefixStore`]
44    pub fn new(store: T, prefix: impl Into<Path>) -> Self {
45        Self {
46            prefix: prefix.into(),
47            inner: store,
48        }
49    }
50
51    /// Create the full path from a path relative to prefix
52    fn full_path(&self, location: &Path) -> Path {
53        self.prefix.parts().chain(location.parts()).collect()
54    }
55
56    /// Strip the constant prefix from a given path
57    fn strip_prefix(&self, path: Path) -> Path {
58        // Note cannot use match because of borrow checker
59        if let Some(suffix) = path.prefix_match(&self.prefix) {
60            return suffix.collect();
61        }
62        path
63    }
64
65    /// Strip the constant prefix from a given ObjectMeta
66    fn strip_meta(&self, meta: ObjectMeta) -> ObjectMeta {
67        ObjectMeta {
68            last_modified: meta.last_modified,
69            size: meta.size,
70            location: self.strip_prefix(meta.location),
71            e_tag: meta.e_tag,
72            version: None,
73        }
74    }
75}
76
77// Note: This is a relative hack to move these two functions to pure functions so they don't rely
78// on the `self` lifetime. Expected to be cleaned up before merge.
79//
80/// Strip the constant prefix from a given path
81fn strip_prefix(prefix: &Path, path: Path) -> Path {
82    // Note cannot use match because of borrow checker
83    if let Some(suffix) = path.prefix_match(prefix) {
84        return suffix.collect();
85    }
86    path
87}
88
89/// Strip the constant prefix from a given ObjectMeta
90fn strip_meta(prefix: &Path, meta: ObjectMeta) -> ObjectMeta {
91    ObjectMeta {
92        last_modified: meta.last_modified,
93        size: meta.size,
94        location: strip_prefix(prefix, meta.location),
95        e_tag: meta.e_tag,
96        version: None,
97    }
98}
99#[async_trait::async_trait]
100impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
101    async fn put(&self, location: &Path, payload: PutPayload) -> Result<PutResult> {
102        let full_path = self.full_path(location);
103        self.inner.put(&full_path, payload).await
104    }
105
106    async fn put_opts(
107        &self,
108        location: &Path,
109        payload: PutPayload,
110        opts: PutOptions,
111    ) -> Result<PutResult> {
112        let full_path = self.full_path(location);
113        self.inner.put_opts(&full_path, payload, opts).await
114    }
115
116    async fn put_multipart(&self, location: &Path) -> Result<Box<dyn MultipartUpload>> {
117        let full_path = self.full_path(location);
118        self.inner.put_multipart(&full_path).await
119    }
120
121    async fn put_multipart_opts(
122        &self,
123        location: &Path,
124        opts: PutMultipartOptions,
125    ) -> Result<Box<dyn MultipartUpload>> {
126        let full_path = self.full_path(location);
127        self.inner.put_multipart_opts(&full_path, opts).await
128    }
129
130    async fn get(&self, location: &Path) -> Result<GetResult> {
131        let full_path = self.full_path(location);
132        self.inner.get(&full_path).await
133    }
134
135    async fn get_range(&self, location: &Path, range: Range<u64>) -> Result<Bytes> {
136        let full_path = self.full_path(location);
137        self.inner.get_range(&full_path, range).await
138    }
139
140    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
141        let full_path = self.full_path(location);
142        self.inner.get_opts(&full_path, options).await
143    }
144
145    async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
146        let full_path = self.full_path(location);
147        self.inner.get_ranges(&full_path, ranges).await
148    }
149
150    async fn head(&self, location: &Path) -> Result<ObjectMeta> {
151        let full_path = self.full_path(location);
152        let meta = self.inner.head(&full_path).await?;
153        Ok(self.strip_meta(meta))
154    }
155
156    async fn delete(&self, location: &Path) -> Result<()> {
157        let full_path = self.full_path(location);
158        self.inner.delete(&full_path).await
159    }
160
161    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
162        let prefix = self.full_path(prefix.unwrap_or(&Path::default()));
163        let s = self.inner.list(Some(&prefix));
164        let slf_prefix = self.prefix.clone();
165        s.map_ok(move |meta| strip_meta(&slf_prefix, meta)).boxed()
166    }
167
168    fn list_with_offset(
169        &self,
170        prefix: Option<&Path>,
171        offset: &Path,
172    ) -> BoxStream<'static, Result<ObjectMeta>> {
173        let offset = self.full_path(offset);
174        let prefix = self.full_path(prefix.unwrap_or(&Path::default()));
175        let s = self.inner.list_with_offset(Some(&prefix), &offset);
176        let slf_prefix = self.prefix.clone();
177        s.map_ok(move |meta| strip_meta(&slf_prefix, meta)).boxed()
178    }
179
180    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
181        let prefix = self.full_path(prefix.unwrap_or(&Path::default()));
182        self.inner
183            .list_with_delimiter(Some(&prefix))
184            .await
185            .map(|lst| ListResult {
186                common_prefixes: lst
187                    .common_prefixes
188                    .into_iter()
189                    .map(|p| self.strip_prefix(p))
190                    .collect(),
191                objects: lst
192                    .objects
193                    .into_iter()
194                    .map(|meta| self.strip_meta(meta))
195                    .collect(),
196            })
197    }
198
199    async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
200        let full_from = self.full_path(from);
201        let full_to = self.full_path(to);
202        self.inner.copy(&full_from, &full_to).await
203    }
204
205    async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
206        let full_from = self.full_path(from);
207        let full_to = self.full_path(to);
208        self.inner.rename(&full_from, &full_to).await
209    }
210
211    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
212        let full_from = self.full_path(from);
213        let full_to = self.full_path(to);
214        self.inner.copy_if_not_exists(&full_from, &full_to).await
215    }
216
217    async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
218        let full_from = self.full_path(from);
219        let full_to = self.full_path(to);
220        self.inner.rename_if_not_exists(&full_from, &full_to).await
221    }
222}
223
224#[cfg(not(target_arch = "wasm32"))]
225#[cfg(test)]
226mod tests {
227    use std::slice;
228
229    use super::*;
230    use crate::integration::*;
231    use crate::local::LocalFileSystem;
232
233    use tempfile::TempDir;
234
235    #[tokio::test]
236    async fn prefix_test() {
237        let root = TempDir::new().unwrap();
238        let inner = LocalFileSystem::new_with_prefix(root.path()).unwrap();
239        let integration = PrefixStore::new(inner, "prefix");
240
241        put_get_delete_list(&integration).await;
242        get_opts(&integration).await;
243        list_uses_directories_correctly(&integration).await;
244        list_with_delimiter(&integration).await;
245        rename_and_copy(&integration).await;
246        copy_if_not_exists(&integration).await;
247        stream_get(&integration).await;
248    }
249
250    #[tokio::test]
251    async fn prefix_test_applies_prefix() {
252        let tmpdir = TempDir::new().unwrap();
253        let local = LocalFileSystem::new_with_prefix(tmpdir.path()).unwrap();
254
255        let location = Path::from("prefix/test_file.json");
256        let data = Bytes::from("arbitrary data");
257
258        local.put(&location, data.clone().into()).await.unwrap();
259
260        let prefix = PrefixStore::new(local, "prefix");
261        let location_prefix = Path::from("test_file.json");
262
263        let content_list = flatten_list_stream(&prefix, None).await.unwrap();
264        assert_eq!(content_list, slice::from_ref(&location_prefix));
265
266        let root = Path::from("/");
267        let content_list = flatten_list_stream(&prefix, Some(&root)).await.unwrap();
268        assert_eq!(content_list, slice::from_ref(&location_prefix));
269
270        let read_data = prefix
271            .get(&location_prefix)
272            .await
273            .unwrap()
274            .bytes()
275            .await
276            .unwrap();
277        assert_eq!(&*read_data, data);
278
279        let target_prefix = Path::from("/test_written.json");
280        prefix
281            .put(&target_prefix, data.clone().into())
282            .await
283            .unwrap();
284
285        prefix.delete(&location_prefix).await.unwrap();
286
287        let local = LocalFileSystem::new_with_prefix(tmpdir.path()).unwrap();
288
289        let err = local.get(&location).await.unwrap_err();
290        assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
291
292        let location = Path::from("prefix/test_written.json");
293        let read_data = local.get(&location).await.unwrap().bytes().await.unwrap();
294        assert_eq!(&*read_data, data)
295    }
296}