1use std::sync::Arc;
2
3use anyhow::Context as _;
4use bytes::Bytes;
5
6use crate::{
7 Conditions, Copy, DataSource, DownloadUrlArgs, KeyPage, KeyStream, ListArgs, MetaStream,
8 ObjectMeta, ObjectMetaPage, Put, ValueStream,
9};
10use futures::{StreamExt as _, TryStreamExt as _, stream};
11
12#[async_trait::async_trait]
14pub trait ObjStore: Send + Sync + std::fmt::Debug {
15 fn kind(&self) -> &str;
19
20 fn safe_uri(&self) -> &url::Url;
23
24 async fn healthcheck(&self) -> Result<(), anyhow::Error>;
28
29 async fn meta(&self, key: &str) -> Result<Option<ObjectMeta>, anyhow::Error>;
31
32 async fn get(&self, key: &str) -> Result<Option<Bytes>, anyhow::Error>;
34
35 async fn get_stream(&self, key: &str) -> Result<Option<ValueStream>, anyhow::Error>;
36
37 async fn get_with_meta(&self, key: &str) -> Result<Option<(Bytes, ObjectMeta)>, anyhow::Error>;
39
40 async fn get_stream_with_meta(
41 &self,
42 key: &str,
43 ) -> Result<Option<(ObjectMeta, ValueStream)>, anyhow::Error>;
44
45 async fn generate_download_url(
49 &self,
50 args: DownloadUrlArgs,
51 ) -> Result<Option<url::Url>, anyhow::Error>;
52
53 async fn send_put(&self, put: Put) -> Result<ObjectMeta, anyhow::Error>;
55
56 async fn send_copy(&self, copy: Copy) -> Result<ObjectMeta, anyhow::Error>;
60
61 async fn delete(&self, key: &str) -> Result<(), anyhow::Error>;
63
64 async fn delete_prefix(&self, prefix: &str) -> Result<(), anyhow::Error>;
66
67 async fn list_keys(&self, args: ListArgs) -> Result<KeyPage, anyhow::Error>;
71
72 async fn list_all_keys(&self, prefix: &str) -> Result<Vec<String>, anyhow::Error> {
79 let args = ListArgs::new().with_prefix(prefix);
80 self.list_keys_stream(args)
81 .map_ok(|v| v.items)
82 .try_concat()
83 .await
84 }
85
86 fn list_keys_stream<'a>(&'a self, args: ListArgs) -> KeyStream<'a> {
87 let init = Some(args.clone());
88 let page_stream = stream::try_unfold(init, move |state| async move {
89 if let Some(args) = state {
90 let page = self.list_keys(args.clone()).await?;
91 let next = page
92 .next_cursor
93 .as_ref()
94 .map(|c| args.clone().with_cursor(c.clone()));
95 Ok(Some((page, next)))
96 } else {
97 Ok(None)
98 }
99 });
100 Box::pin(page_stream)
101 }
102
103 async fn list(&self, args: ListArgs) -> Result<ObjectMetaPage, anyhow::Error>;
108
109 fn list_stream(&self, args: ListArgs) -> MetaStream
113 where
114 Self: Sized + Clone + 'static,
115 {
116 let store = self.clone();
117 let init = Some(args.clone());
118 let page_stream = stream::try_unfold(init, move |state| {
119 let store = store.clone();
120 async move {
121 if let Some(args) = state {
122 let page = store.list(args.clone()).await?;
123 let next = page
124 .next_cursor
125 .as_ref()
126 .map(|c| args.clone().with_cursor(c.clone()));
127 Ok(Some((page, next)))
128 } else {
129 Ok(None)
130 }
131 }
132 });
133 Box::pin(page_stream)
134 }
135
136 async fn purge_all(&self) -> Result<(), anyhow::Error> {
138 self.delete_prefix("").await
139 }
140
141 async fn get_json<T: serde::de::DeserializeOwned>(
143 &self,
144 key: &str,
145 ) -> Result<Option<T>, anyhow::Error>
146 where
147 Self: Sized,
148 {
149 match self.get(key).await {
150 Ok(Some(data)) => {
151 let jd = &mut serde_json::Deserializer::from_slice(&data);
152 let out =
153 serde_path_to_error::deserialize(jd).context("could not deserialize JSON")?;
154
155 Ok(Some(out))
156 }
157 Ok(None) => Ok(None),
158 Err(e) => Err(e),
159 }
160 }
161}
162
163#[async_trait::async_trait]
164impl<K: ObjStore> ObjStore for Arc<K> {
165 fn kind(&self) -> &str {
166 self.as_ref().kind()
167 }
168
169 fn safe_uri(&self) -> &url::Url {
170 self.as_ref().safe_uri()
171 }
172
173 async fn healthcheck(&self) -> Result<(), anyhow::Error> {
174 self.as_ref().healthcheck().await
175 }
176
177 async fn meta(&self, key: &str) -> Result<Option<ObjectMeta>, anyhow::Error> {
178 self.as_ref().meta(key).await
179 }
180
181 async fn get(&self, key: &str) -> Result<Option<Bytes>, anyhow::Error> {
182 self.as_ref().get(key).await
183 }
184
185 async fn get_stream(&self, key: &str) -> Result<Option<ValueStream>, anyhow::Error> {
186 self.as_ref().get_stream(key).await
187 }
188
189 async fn get_with_meta(&self, key: &str) -> Result<Option<(Bytes, ObjectMeta)>, anyhow::Error> {
190 self.as_ref().get_with_meta(key).await
191 }
192
193 async fn get_stream_with_meta(
194 &self,
195 key: &str,
196 ) -> Result<Option<(ObjectMeta, ValueStream)>, anyhow::Error> {
197 self.as_ref().get_stream_with_meta(key).await
198 }
199
200 async fn generate_download_url(
201 &self,
202 args: DownloadUrlArgs,
203 ) -> Result<Option<url::Url>, anyhow::Error> {
204 self.as_ref().generate_download_url(args).await
205 }
206
207 async fn send_put(&self, put: Put) -> Result<ObjectMeta, anyhow::Error> {
208 self.as_ref().send_put(put).await
209 }
210 async fn send_copy(&self, copy: Copy) -> Result<ObjectMeta, anyhow::Error> {
211 self.as_ref().send_copy(copy).await
212 }
213
214 async fn delete(&self, key: &str) -> Result<(), anyhow::Error> {
215 self.as_ref().delete(key).await
216 }
217
218 async fn delete_prefix(&self, prefix: &str) -> Result<(), anyhow::Error> {
219 self.as_ref().delete_prefix(prefix).await
220 }
221
222 async fn list(&self, args: ListArgs) -> Result<ObjectMetaPage, anyhow::Error> {
223 self.as_ref().list(args).await
224 }
225
226 async fn list_keys(&self, args: ListArgs) -> Result<KeyPage, anyhow::Error> {
227 self.as_ref().list_keys(args).await
228 }
229}
230
231pub type DynObjStore = Arc<dyn ObjStore>;
232
233#[async_trait::async_trait]
234impl ObjStore for DynObjStore {
235 fn kind(&self) -> &str {
236 self.as_ref().kind()
237 }
238
239 fn safe_uri(&self) -> &url::Url {
240 self.as_ref().safe_uri()
241 }
242
243 async fn healthcheck(&self) -> Result<(), anyhow::Error> {
244 self.as_ref().healthcheck().await
245 }
246
247 async fn meta(&self, key: &str) -> Result<Option<ObjectMeta>, anyhow::Error> {
248 self.as_ref().meta(key).await
249 }
250
251 async fn get(&self, key: &str) -> Result<Option<Bytes>, anyhow::Error> {
252 self.as_ref().get(key).await
253 }
254
255 async fn get_stream(&self, key: &str) -> Result<Option<ValueStream>, anyhow::Error> {
256 self.as_ref().get_stream(key).await
257 }
258
259 async fn get_with_meta(&self, key: &str) -> Result<Option<(Bytes, ObjectMeta)>, anyhow::Error> {
260 self.as_ref().get_with_meta(key).await
261 }
262
263 async fn get_stream_with_meta(
264 &self,
265 key: &str,
266 ) -> Result<Option<(ObjectMeta, ValueStream)>, anyhow::Error> {
267 self.as_ref().get_stream_with_meta(key).await
268 }
269
270 async fn generate_download_url(
271 &self,
272 args: DownloadUrlArgs,
273 ) -> Result<Option<url::Url>, anyhow::Error> {
274 self.as_ref().generate_download_url(args).await
275 }
276
277 async fn send_put(&self, put: Put) -> Result<ObjectMeta, anyhow::Error> {
278 self.as_ref().send_put(put).await
279 }
280 async fn send_copy(&self, copy: Copy) -> Result<ObjectMeta, anyhow::Error> {
281 self.as_ref().send_copy(copy).await
282 }
283
284 async fn delete(&self, key: &str) -> Result<(), anyhow::Error> {
285 self.as_ref().delete(key).await
286 }
287
288 async fn list(&self, args: ListArgs) -> Result<ObjectMetaPage, anyhow::Error> {
289 self.as_ref().list(args).await
290 }
291
292 async fn list_keys(&self, args: ListArgs) -> Result<KeyPage, anyhow::Error> {
293 self.as_ref().list_keys(args).await
294 }
295
296 async fn delete_prefix(&self, prefix: &str) -> Result<(), anyhow::Error> {
297 self.as_ref().delete_prefix(prefix).await
298 }
299
300 async fn get_json<T: serde::de::DeserializeOwned>(
301 &self,
302 key: &str,
303 ) -> Result<Option<T>, anyhow::Error> {
304 match self.get(key).await {
305 Ok(Some(data)) => {
306 let jd = &mut serde_json::Deserializer::from_slice(&data);
307 let out =
308 serde_path_to_error::deserialize(jd).context("could not deserialize JSON")?;
309
310 Ok(Some(out))
311 }
312 Ok(None) => Ok(None),
313 Err(e) => Err(e),
314 }
315 }
316}
317
318pub struct PutBuilder<'a, S> {
319 store: &'a S,
320 key: String,
321 conditions: Conditions,
322 mime_type: Option<String>,
324}
325
326impl<'a, S: ObjStore> PutBuilder<'a, S>
327where
328 S: ObjStore,
329{
330 pub fn build(self, data: impl Into<DataSource>) -> Put {
331 let mut put = Put::new(self.key, data.into());
332 put.conditions = self.conditions;
333 put.mime_type = self.mime_type;
334 put
335 }
336
337 pub async fn json<T: serde::Serialize>(self, data: &T) -> Result<ObjectMeta, anyhow::Error> {
338 let data = serde_json::to_vec(data).context("could not serialize JSON data for put")?;
339 let store = self.store;
340 let put = self.build(DataSource::Data(Bytes::from(data)));
341 store.send_put(put).await
342 }
343
344 pub async fn send(self, data: impl Into<DataSource>) -> Result<ObjectMeta, anyhow::Error> {
345 let store = self.store;
346 let put = self.build(data);
347 store.send_put(put).await
348 }
349
350 pub async fn text(self, text: impl Into<String>) -> Result<ObjectMeta, anyhow::Error> {
351 let data = Bytes::from(text.into());
352 self.send(DataSource::Data(data)).await
353 }
354
355 pub async fn bytes(self, data: impl Into<Bytes>) -> Result<ObjectMeta, anyhow::Error> {
356 self.send(DataSource::Data(data.into())).await
357 }
358
359 pub async fn stream<D, E>(
360 self,
361 stream: impl futures::Stream<Item = Result<D, E>> + Send + 'static,
362 ) -> Result<ObjectMeta, anyhow::Error>
363 where
364 Bytes: From<D>,
365 anyhow::Error: From<E>,
366 E: Send + 'static,
367 {
368 let stream: ValueStream = stream
369 .map_ok(|item: D| Bytes::from(item))
370 .map_err(anyhow::Error::from)
371 .boxed();
372
373 self.send(DataSource::Stream(stream)).await
374 }
375}
376
377pub struct CopyBuilder<'a, S> {
379 store: &'a S,
380 src: String,
381 dest: String,
382 conditions: Conditions,
383}
384
385impl<'a, S: ObjStore> CopyBuilder<'a, S>
386where
387 S: ObjStore,
388{
389 pub fn build(&self) -> Copy {
391 let mut copy = Copy::new(self.src.clone(), self.dest.clone());
392 copy.conditions = self.conditions.clone();
393 copy
394 }
395
396 pub async fn send(self) -> Result<ObjectMeta, anyhow::Error> {
398 let mut copy = Copy::new(self.src.clone(), self.dest.clone());
399 copy.conditions = self.conditions.clone();
400 self.store.send_copy(copy).await
401 }
402}
403
404pub trait ObjStoreExt: ObjStore
405where
406 Self: Sized,
407{
408 fn put(&self, key: &str) -> PutBuilder<'_, Self> {
409 PutBuilder {
410 store: self,
411 key: key.to_string(),
412 conditions: Conditions::default(),
413 mime_type: None,
414 }
415 }
416
417 fn copy(&self, src: &str, dest: &str) -> CopyBuilder<'_, Self> {
419 CopyBuilder {
420 store: self,
421 src: src.to_string(),
422 dest: dest.to_string(),
423 conditions: Conditions::default(),
424 }
425 }
426}
427
428impl<S: ObjStore> ObjStoreExt for S {}