1use bytes::Bytes;
7use futures::{stream::BoxStream, StreamExt, TryStreamExt};
8use http::{Extensions, Method};
9use object_store::signer::Signer;
10use std::borrow::Cow;
11use std::future::Future;
12use std::ops::Range;
13use std::pin::Pin;
14use std::sync::OnceLock;
15use std::time::Duration;
16use url::Url;
17
18use object_store::{path::Path, CopyOptions};
19use object_store::{
20 GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
21 PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
22};
23
24static DEFAULT_PATH: OnceLock<Path> = OnceLock::new();
25
26#[derive(Debug, Clone)]
28pub struct MaybePrefixedStore<T: ObjectStore> {
29 prefix: Option<Path>,
30 inner: T,
31}
32
33impl<T: ObjectStore> std::fmt::Display for MaybePrefixedStore<T> {
34 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35 if let Some(prefix) = self.prefix.as_ref() {
36 write!(f, "PrefixObjectStore({prefix})")
37 } else {
38 write!(f, "ObjectStore")
39 }
40 }
41}
42
43impl<T: ObjectStore> MaybePrefixedStore<T> {
44 pub fn new(store: T, prefix: Option<impl Into<Path>>) -> Self {
46 Self {
47 prefix: prefix.map(|x| x.into()),
48 inner: store,
49 }
50 }
51
52 pub fn inner(&self) -> &T {
54 &self.inner
55 }
56
57 fn full_path<'a>(&'a self, location: &'a Path) -> Cow<'a, Path> {
59 if let Some(prefix) = &self.prefix {
60 Cow::Owned(prefix.parts().chain(location.parts()).collect())
61 } else {
62 Cow::Borrowed(location)
63 }
64 }
65
66 fn strip_prefix(&self, path: Path) -> Path {
68 if let Some(prefix) = &self.prefix {
69 if let Some(suffix) = path.prefix_match(prefix) {
71 return suffix.collect();
72 }
73 path
74 } else {
75 path
76 }
77 }
78
79 fn strip_meta(&self, meta: ObjectMeta) -> ObjectMeta {
81 ObjectMeta {
82 last_modified: meta.last_modified,
83 size: meta.size,
84 location: self.strip_prefix(meta.location),
85 e_tag: meta.e_tag,
86 version: None,
87 }
88 }
89}
90
91fn full_path<'a>(prefix: Option<&'a Path>, location: &'a Path) -> Cow<'a, Path> {
96 if let Some(prefix) = prefix {
97 Cow::Owned(prefix.parts().chain(location.parts()).collect())
98 } else {
99 Cow::Borrowed(location)
100 }
101}
102
103fn strip_prefix(prefix: Option<&Path>, path: Path) -> Path {
105 if let Some(prefix) = &prefix {
106 if let Some(suffix) = path.prefix_match(prefix) {
108 return suffix.collect();
109 }
110 path
111 } else {
112 path
113 }
114}
115
116fn strip_meta(prefix: Option<&Path>, meta: ObjectMeta) -> ObjectMeta {
118 ObjectMeta {
119 last_modified: meta.last_modified,
120 size: meta.size,
121 location: strip_prefix(prefix, meta.location),
122 e_tag: meta.e_tag,
123 version: None,
124 }
125}
126#[async_trait::async_trait]
127impl<T: ObjectStore> ObjectStore for MaybePrefixedStore<T> {
128 async fn put_opts(
129 &self,
130 location: &Path,
131 payload: PutPayload,
132 opts: PutOptions,
133 ) -> Result<PutResult> {
134 let full_path = self.full_path(location);
135 self.inner.put_opts(&full_path, payload, opts).await
136 }
137
138 #[allow(deprecated)]
140 async fn put_multipart_opts(
141 &self,
142 location: &Path,
143 opts: PutMultipartOptions,
144 ) -> Result<Box<dyn MultipartUpload>> {
145 let full_path = self.full_path(location);
146 self.inner.put_multipart_opts(&full_path, opts).await
147 }
148
149 async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
150 let full_path = self.full_path(location);
151 self.inner.get_opts(&full_path, options).await
152 }
153
154 async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
155 let full_path = self.full_path(location);
156 self.inner.get_ranges(&full_path, ranges).await
157 }
158
159 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
160 let prefix = self.full_path(prefix.unwrap_or(DEFAULT_PATH.get_or_init(Path::default)));
161 let s = self.inner.list(Some(&prefix));
162 let slf_prefix = self.prefix.clone();
163 s.map_ok(move |meta| strip_meta(slf_prefix.as_ref(), meta))
164 .boxed()
165 }
166
167 fn list_with_offset(
168 &self,
169 prefix: Option<&Path>,
170 offset: &Path,
171 ) -> BoxStream<'static, Result<ObjectMeta>> {
172 let offset = self.full_path(offset);
173 let prefix = self.full_path(prefix.unwrap_or(DEFAULT_PATH.get_or_init(Path::default)));
174 let s = self.inner.list_with_offset(Some(&prefix), &offset);
175 let slf_prefix = self.prefix.clone();
176 s.map_ok(move |meta| strip_meta(slf_prefix.as_ref(), meta))
177 .boxed()
178 }
179
180 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
181 let prefix = self.full_path(prefix.unwrap_or(DEFAULT_PATH.get_or_init(Path::default)));
182 self.inner
183 .list_with_delimiter(Some(&prefix))
184 .await
185 .map(|lst| ListResult {
186 common_prefixes: lst
187 .common_prefixes
188 .into_iter()
189 .map(|p| self.strip_prefix(p))
190 .collect(),
191 objects: lst
192 .objects
193 .into_iter()
194 .map(|meta| self.strip_meta(meta))
195 .collect(),
196 extensions: Extensions::default(),
197 })
198 }
199
200 async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> Result<()> {
201 let from_full = self.full_path(from);
202 let to_full = self.full_path(to);
203 self.inner.copy_opts(&from_full, &to_full, options).await
204 }
205
206 fn delete_stream(
207 &self,
208 locations: BoxStream<'static, Result<Path>>,
209 ) -> BoxStream<'static, Result<Path>> {
210 let prefix_owned = self.prefix.clone();
211 let locations = locations
212 .map(move |location| {
213 location.map(|loc| full_path(prefix_owned.as_ref(), &loc).into_owned())
214 })
215 .boxed();
216 let prefix = self.prefix.clone();
217 self.inner
218 .delete_stream(locations)
219 .map(move |location| location.map(|loc| strip_prefix(prefix.as_ref(), loc)))
220 .boxed()
221 }
222}
223
224impl<T: ObjectStore + Signer> Signer for MaybePrefixedStore<T> {
225 fn signed_url<'life0, 'life1, 'async_trait>(
226 &'life0 self,
227 method: Method,
228 path: &'life1 Path,
229 expires_in: Duration,
230 ) -> Pin<Box<dyn Future<Output = object_store::Result<Url>> + Send + 'async_trait>>
231 where
232 'life0: 'async_trait,
233 'life1: 'async_trait,
234 Self: 'async_trait,
235 {
236 let full = full_path(self.prefix.as_ref(), path).into_owned();
237 Box::pin(async move { self.inner.signed_url(method, &full, expires_in).await })
238 }
239
240 fn signed_urls<'life0, 'life1, 'async_trait>(
241 &'life0 self,
242 method: Method,
243 paths: &'life1 [Path],
244 expires_in: Duration,
245 ) -> Pin<Box<dyn Future<Output = Result<Vec<Url>>> + Send + 'async_trait>>
246 where
247 'life0: 'async_trait,
248 'life1: 'async_trait,
249 Self: 'async_trait,
250 {
251 let full_paths = paths
252 .iter()
253 .map(|path| full_path(self.prefix.as_ref(), path).into_owned())
254 .collect::<Vec<_>>();
255 Box::pin(async move {
256 self.inner
257 .signed_urls(method, &full_paths, expires_in)
258 .await
259 })
260 }
261}