1use bytes::Bytes;
20use futures::{stream::BoxStream, StreamExt, TryStreamExt};
21use std::ops::Range;
22
23use crate::path::Path;
24use crate::{
25 GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
26 PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
27};
28
29#[derive(Debug, Clone)]
31pub struct PrefixStore<T: ObjectStore> {
32 prefix: Path,
33 inner: T,
34}
35
36impl<T: ObjectStore> std::fmt::Display for PrefixStore<T> {
37 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38 write!(f, "PrefixObjectStore({})", self.prefix.as_ref())
39 }
40}
41
42impl<T: ObjectStore> PrefixStore<T> {
43 pub fn new(store: T, prefix: impl Into<Path>) -> Self {
45 Self {
46 prefix: prefix.into(),
47 inner: store,
48 }
49 }
50
51 fn full_path(&self, location: &Path) -> Path {
53 self.prefix.parts().chain(location.parts()).collect()
54 }
55
56 fn strip_prefix(&self, path: Path) -> Path {
58 if let Some(suffix) = path.prefix_match(&self.prefix) {
60 return suffix.collect();
61 }
62 path
63 }
64
65 fn strip_meta(&self, meta: ObjectMeta) -> ObjectMeta {
67 ObjectMeta {
68 last_modified: meta.last_modified,
69 size: meta.size,
70 location: self.strip_prefix(meta.location),
71 e_tag: meta.e_tag,
72 version: None,
73 }
74 }
75}
76
77fn strip_prefix(prefix: &Path, path: Path) -> Path {
82 if let Some(suffix) = path.prefix_match(prefix) {
84 return suffix.collect();
85 }
86 path
87}
88
89fn strip_meta(prefix: &Path, meta: ObjectMeta) -> ObjectMeta {
91 ObjectMeta {
92 last_modified: meta.last_modified,
93 size: meta.size,
94 location: strip_prefix(prefix, meta.location),
95 e_tag: meta.e_tag,
96 version: None,
97 }
98}
99#[async_trait::async_trait]
100impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
101 async fn put(&self, location: &Path, payload: PutPayload) -> Result<PutResult> {
102 let full_path = self.full_path(location);
103 self.inner.put(&full_path, payload).await
104 }
105
106 async fn put_opts(
107 &self,
108 location: &Path,
109 payload: PutPayload,
110 opts: PutOptions,
111 ) -> Result<PutResult> {
112 let full_path = self.full_path(location);
113 self.inner.put_opts(&full_path, payload, opts).await
114 }
115
116 async fn put_multipart(&self, location: &Path) -> Result<Box<dyn MultipartUpload>> {
117 let full_path = self.full_path(location);
118 self.inner.put_multipart(&full_path).await
119 }
120
121 async fn put_multipart_opts(
122 &self,
123 location: &Path,
124 opts: PutMultipartOptions,
125 ) -> Result<Box<dyn MultipartUpload>> {
126 let full_path = self.full_path(location);
127 self.inner.put_multipart_opts(&full_path, opts).await
128 }
129
130 async fn get(&self, location: &Path) -> Result<GetResult> {
131 let full_path = self.full_path(location);
132 self.inner.get(&full_path).await
133 }
134
135 async fn get_range(&self, location: &Path, range: Range<u64>) -> Result<Bytes> {
136 let full_path = self.full_path(location);
137 self.inner.get_range(&full_path, range).await
138 }
139
140 async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
141 let full_path = self.full_path(location);
142 self.inner.get_opts(&full_path, options).await
143 }
144
145 async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
146 let full_path = self.full_path(location);
147 self.inner.get_ranges(&full_path, ranges).await
148 }
149
150 async fn head(&self, location: &Path) -> Result<ObjectMeta> {
151 let full_path = self.full_path(location);
152 let meta = self.inner.head(&full_path).await?;
153 Ok(self.strip_meta(meta))
154 }
155
156 async fn delete(&self, location: &Path) -> Result<()> {
157 let full_path = self.full_path(location);
158 self.inner.delete(&full_path).await
159 }
160
161 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
162 let prefix = self.full_path(prefix.unwrap_or(&Path::default()));
163 let s = self.inner.list(Some(&prefix));
164 let slf_prefix = self.prefix.clone();
165 s.map_ok(move |meta| strip_meta(&slf_prefix, meta)).boxed()
166 }
167
168 fn list_with_offset(
169 &self,
170 prefix: Option<&Path>,
171 offset: &Path,
172 ) -> BoxStream<'static, Result<ObjectMeta>> {
173 let offset = self.full_path(offset);
174 let prefix = self.full_path(prefix.unwrap_or(&Path::default()));
175 let s = self.inner.list_with_offset(Some(&prefix), &offset);
176 let slf_prefix = self.prefix.clone();
177 s.map_ok(move |meta| strip_meta(&slf_prefix, meta)).boxed()
178 }
179
180 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
181 let prefix = self.full_path(prefix.unwrap_or(&Path::default()));
182 self.inner
183 .list_with_delimiter(Some(&prefix))
184 .await
185 .map(|lst| ListResult {
186 common_prefixes: lst
187 .common_prefixes
188 .into_iter()
189 .map(|p| self.strip_prefix(p))
190 .collect(),
191 objects: lst
192 .objects
193 .into_iter()
194 .map(|meta| self.strip_meta(meta))
195 .collect(),
196 })
197 }
198
199 async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
200 let full_from = self.full_path(from);
201 let full_to = self.full_path(to);
202 self.inner.copy(&full_from, &full_to).await
203 }
204
205 async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
206 let full_from = self.full_path(from);
207 let full_to = self.full_path(to);
208 self.inner.rename(&full_from, &full_to).await
209 }
210
211 async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
212 let full_from = self.full_path(from);
213 let full_to = self.full_path(to);
214 self.inner.copy_if_not_exists(&full_from, &full_to).await
215 }
216
217 async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
218 let full_from = self.full_path(from);
219 let full_to = self.full_path(to);
220 self.inner.rename_if_not_exists(&full_from, &full_to).await
221 }
222}
223
224#[cfg(not(target_arch = "wasm32"))]
225#[cfg(test)]
226mod tests {
227 use std::slice;
228
229 use super::*;
230 use crate::integration::*;
231 use crate::local::LocalFileSystem;
232
233 use tempfile::TempDir;
234
235 #[tokio::test]
236 async fn prefix_test() {
237 let root = TempDir::new().unwrap();
238 let inner = LocalFileSystem::new_with_prefix(root.path()).unwrap();
239 let integration = PrefixStore::new(inner, "prefix");
240
241 put_get_delete_list(&integration).await;
242 get_opts(&integration).await;
243 list_uses_directories_correctly(&integration).await;
244 list_with_delimiter(&integration).await;
245 rename_and_copy(&integration).await;
246 copy_if_not_exists(&integration).await;
247 stream_get(&integration).await;
248 }
249
250 #[tokio::test]
251 async fn prefix_test_applies_prefix() {
252 let tmpdir = TempDir::new().unwrap();
253 let local = LocalFileSystem::new_with_prefix(tmpdir.path()).unwrap();
254
255 let location = Path::from("prefix/test_file.json");
256 let data = Bytes::from("arbitrary data");
257
258 local.put(&location, data.clone().into()).await.unwrap();
259
260 let prefix = PrefixStore::new(local, "prefix");
261 let location_prefix = Path::from("test_file.json");
262
263 let content_list = flatten_list_stream(&prefix, None).await.unwrap();
264 assert_eq!(content_list, slice::from_ref(&location_prefix));
265
266 let root = Path::from("/");
267 let content_list = flatten_list_stream(&prefix, Some(&root)).await.unwrap();
268 assert_eq!(content_list, slice::from_ref(&location_prefix));
269
270 let read_data = prefix
271 .get(&location_prefix)
272 .await
273 .unwrap()
274 .bytes()
275 .await
276 .unwrap();
277 assert_eq!(&*read_data, data);
278
279 let target_prefix = Path::from("/test_written.json");
280 prefix
281 .put(&target_prefix, data.clone().into())
282 .await
283 .unwrap();
284
285 prefix.delete(&location_prefix).await.unwrap();
286
287 let local = LocalFileSystem::new_with_prefix(tmpdir.path()).unwrap();
288
289 let err = local.get(&location).await.unwrap_err();
290 assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
291
292 let location = Path::from("prefix/test_written.json");
293 let read_data = local.get(&location).await.unwrap().bytes().await.unwrap();
294 assert_eq!(&*read_data, data)
295 }
296}