1use bytes::Bytes;
7use futures::{stream::BoxStream, StreamExt, TryStreamExt};
8use std::borrow::Cow;
9use std::ops::Range;
10use std::sync::OnceLock;
11
12use object_store::path::Path;
13#[allow(deprecated)]
15use object_store::{
16 GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOpts,
17 PutOptions, PutPayload, PutResult, Result,
18};
19
20static DEFAULT_PATH: OnceLock<Path> = OnceLock::new();
21
22#[derive(Debug, Clone)]
24pub struct MaybePrefixedStore<T: ObjectStore> {
25 prefix: Option<Path>,
26 inner: T,
27}
28
29impl<T: ObjectStore> std::fmt::Display for MaybePrefixedStore<T> {
30 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31 if let Some(prefix) = self.prefix.as_ref() {
32 write!(f, "PrefixObjectStore({prefix})")
33 } else {
34 write!(f, "ObjectStore")
35 }
36 }
37}
38
39impl<T: ObjectStore> MaybePrefixedStore<T> {
40 pub fn new(store: T, prefix: Option<impl Into<Path>>) -> Self {
42 Self {
43 prefix: prefix.map(|x| x.into()),
44 inner: store,
45 }
46 }
47
48 pub fn inner(&self) -> &T {
50 &self.inner
51 }
52
53 fn full_path<'a>(&'a self, location: &'a Path) -> Cow<'a, Path> {
55 if let Some(prefix) = &self.prefix {
56 Cow::Owned(prefix.parts().chain(location.parts()).collect())
57 } else {
58 Cow::Borrowed(location)
59 }
60 }
61
62 fn strip_prefix(&self, path: Path) -> Path {
64 if let Some(prefix) = &self.prefix {
65 if let Some(suffix) = path.prefix_match(prefix) {
67 return suffix.collect();
68 }
69 path
70 } else {
71 path
72 }
73 }
74
75 fn strip_meta(&self, meta: ObjectMeta) -> ObjectMeta {
77 ObjectMeta {
78 last_modified: meta.last_modified,
79 size: meta.size,
80 location: self.strip_prefix(meta.location),
81 e_tag: meta.e_tag,
82 version: None,
83 }
84 }
85}
86
87fn strip_prefix(prefix: &Path, path: Path) -> Path {
92 if let Some(suffix) = path.prefix_match(prefix) {
94 return suffix.collect();
95 }
96 path
97}
98
99fn strip_meta(prefix: Option<&Path>, meta: ObjectMeta) -> ObjectMeta {
101 if let Some(prefix) = prefix {
102 ObjectMeta {
103 last_modified: meta.last_modified,
104 size: meta.size,
105 location: strip_prefix(prefix, meta.location),
106 e_tag: meta.e_tag,
107 version: None,
108 }
109 } else {
110 meta
111 }
112}
113#[async_trait::async_trait]
114impl<T: ObjectStore> ObjectStore for MaybePrefixedStore<T> {
115 async fn put(&self, location: &Path, payload: PutPayload) -> Result<PutResult> {
116 let full_path = self.full_path(location);
117 self.inner.put(&full_path, payload).await
118 }
119
120 async fn put_opts(
121 &self,
122 location: &Path,
123 payload: PutPayload,
124 opts: PutOptions,
125 ) -> Result<PutResult> {
126 let full_path = self.full_path(location);
127 self.inner.put_opts(&full_path, payload, opts).await
128 }
129
130 async fn put_multipart(&self, location: &Path) -> Result<Box<dyn MultipartUpload>> {
131 let full_path = self.full_path(location);
132 self.inner.put_multipart(&full_path).await
133 }
134
135 #[allow(deprecated)]
137 async fn put_multipart_opts(
138 &self,
139 location: &Path,
140 opts: PutMultipartOpts,
141 ) -> Result<Box<dyn MultipartUpload>> {
142 let full_path = self.full_path(location);
143 self.inner.put_multipart_opts(&full_path, opts).await
144 }
145
146 async fn get(&self, location: &Path) -> Result<GetResult> {
147 let full_path = self.full_path(location);
148 self.inner.get(&full_path).await
149 }
150
151 async fn get_range(&self, location: &Path, range: Range<u64>) -> Result<Bytes> {
152 let full_path = self.full_path(location);
153 self.inner.get_range(&full_path, range).await
154 }
155
156 async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
157 let full_path = self.full_path(location);
158 self.inner.get_opts(&full_path, options).await
159 }
160
161 async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
162 let full_path = self.full_path(location);
163 self.inner.get_ranges(&full_path, ranges).await
164 }
165
166 async fn head(&self, location: &Path) -> Result<ObjectMeta> {
167 let full_path = self.full_path(location);
168 let meta = self.inner.head(&full_path).await?;
169 Ok(self.strip_meta(meta))
170 }
171
172 async fn delete(&self, location: &Path) -> Result<()> {
173 let full_path = self.full_path(location);
174 self.inner.delete(&full_path).await
175 }
176
177 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
178 let prefix = self.full_path(prefix.unwrap_or(DEFAULT_PATH.get_or_init(Path::default)));
179 let s = self.inner.list(Some(&prefix));
180 let slf_prefix = self.prefix.clone();
181 s.map_ok(move |meta| strip_meta(slf_prefix.as_ref(), meta))
182 .boxed()
183 }
184
185 fn list_with_offset(
186 &self,
187 prefix: Option<&Path>,
188 offset: &Path,
189 ) -> BoxStream<'static, Result<ObjectMeta>> {
190 let offset = self.full_path(offset);
191 let prefix = self.full_path(prefix.unwrap_or(DEFAULT_PATH.get_or_init(Path::default)));
192 let s = self.inner.list_with_offset(Some(&prefix), &offset);
193 let slf_prefix = self.prefix.clone();
194 s.map_ok(move |meta| strip_meta(slf_prefix.as_ref(), meta))
195 .boxed()
196 }
197
198 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
199 let prefix = self.full_path(prefix.unwrap_or(DEFAULT_PATH.get_or_init(Path::default)));
200 self.inner
201 .list_with_delimiter(Some(&prefix))
202 .await
203 .map(|lst| ListResult {
204 common_prefixes: lst
205 .common_prefixes
206 .into_iter()
207 .map(|p| self.strip_prefix(p))
208 .collect(),
209 objects: lst
210 .objects
211 .into_iter()
212 .map(|meta| self.strip_meta(meta))
213 .collect(),
214 })
215 }
216
217 async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
218 let full_from = self.full_path(from);
219 let full_to = self.full_path(to);
220 self.inner.copy(&full_from, &full_to).await
221 }
222
223 async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
224 let full_from = self.full_path(from);
225 let full_to = self.full_path(to);
226 self.inner.rename(&full_from, &full_to).await
227 }
228
229 async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
230 let full_from = self.full_path(from);
231 let full_to = self.full_path(to);
232 self.inner.copy_if_not_exists(&full_from, &full_to).await
233 }
234
235 async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
236 let full_from = self.full_path(from);
237 let full_to = self.full_path(to);
238 self.inner.rename_if_not_exists(&full_from, &full_to).await
239 }
240}