objstore/
store.rs

1use std::sync::Arc;
2
3use anyhow::Context as _;
4use bytes::Bytes;
5
6use crate::{
7    Conditions, Copy, DataSource, DownloadUrlArgs, KeyPage, KeyStream, ListArgs, MetaStream,
8    ObjectMeta, ObjectMetaPage, Put, ValueStream,
9};
10use futures::{StreamExt as _, TryStreamExt as _, stream};
11
12/// Abstraction for a generic key-value store.
13#[async_trait::async_trait]
14pub trait ObjStore: Send + Sync + std::fmt::Debug {
15    /// Get a descriptive name for backend implementation.
16    ///
17    /// eg: "memory", "s3", ...
18    fn kind(&self) -> &str;
19
20    /// Get a "safe" URI for the store, which does not include any sensitive information
21    /// like api keys.
22    fn safe_uri(&self) -> &url::Url;
23
24    /// Checks if the store is usable.
25    ///
26    /// May perform upstream service requests to validate connectivity and credentials.
27    async fn healthcheck(&self) -> Result<(), anyhow::Error>;
28
29    /// Get metadata for a given key.
30    async fn meta(&self, key: &str) -> Result<Option<ObjectMeta>, anyhow::Error>;
31
32    /// Get the value for a given key.
33    async fn get(&self, key: &str) -> Result<Option<Bytes>, anyhow::Error>;
34
35    async fn get_stream(&self, key: &str) -> Result<Option<ValueStream>, anyhow::Error>;
36
37    /// Get both the value and metadata for a given key.
38    async fn get_with_meta(&self, key: &str) -> Result<Option<(Bytes, ObjectMeta)>, anyhow::Error>;
39
40    async fn get_stream_with_meta(
41        &self,
42        key: &str,
43    ) -> Result<Option<(ObjectMeta, ValueStream)>, anyhow::Error>;
44
45    /// Generate a download URL for a given key.
46    ///
47    /// NOTE: Must return `Ok(None)` if the store does not support download URLs!
48    async fn generate_download_url(
49        &self,
50        args: DownloadUrlArgs,
51    ) -> Result<Option<url::Url>, anyhow::Error>;
52
53    /// Store a value under a given key.
54    async fn send_put(&self, put: Put) -> Result<ObjectMeta, anyhow::Error>;
55
56    /// Copy an existing object to a new key.
57    ///
58    /// May apply server-side copy optimizations and respects `Conditions`.
59    async fn send_copy(&self, copy: Copy) -> Result<ObjectMeta, anyhow::Error>;
60
61    /// Delete a key from the store.
62    async fn delete(&self, key: &str) -> Result<(), anyhow::Error>;
63
64    /// Delete all keys with a given prefix.
65    async fn delete_prefix(&self, prefix: &str) -> Result<(), anyhow::Error>;
66
67    /// List keys in the store.
68    ///
69    /// In contrast to [`Self::list`], this returns only the keys, not their metadata.
70    async fn list_keys(&self, args: ListArgs) -> Result<KeyPage, anyhow::Error>;
71
72    /// List all the keys, optionally filtered by a prefix.
73    ///
74    /// NOTE: this method will paginate through all keys, and accumulates
75    /// the results in memory.
76    ///
77    /// Use with caution.
78    async fn list_all_keys(&self, prefix: &str) -> Result<Vec<String>, anyhow::Error> {
79        let args = ListArgs::new().with_prefix(prefix);
80        self.list_keys_stream(args)
81            .map_ok(|v| v.items)
82            .try_concat()
83            .await
84    }
85
86    fn list_keys_stream<'a>(&'a self, args: ListArgs) -> KeyStream<'a> {
87        let init = Some(args.clone());
88        let page_stream = stream::try_unfold(init, move |state| async move {
89            if let Some(args) = state {
90                let page = self.list_keys(args.clone()).await?;
91                let next = page
92                    .next_cursor
93                    .as_ref()
94                    .map(|c| args.clone().with_cursor(c.clone()));
95                Ok(Some((page, next)))
96            } else {
97                Ok(None)
98            }
99        });
100        Box::pin(page_stream)
101    }
102
103    /// List metadata for a given key.
104    ///
105    /// The arguments allow for prefix filtering, pagination, and limiting
106    /// the number of results.
107    async fn list(&self, args: ListArgs) -> Result<ObjectMetaPage, anyhow::Error>;
108
109    /// Streaming variant of [`Self::list`]: pages through [`Self::list`] and yields each metadata page (`ObjectMetaPage`).
110    ///
111    /// This default method repeatedly calls `list` to page through all results lazily.
112    fn list_stream(&self, args: ListArgs) -> MetaStream
113    where
114        Self: Sized + Clone + 'static,
115    {
116        let store = self.clone();
117        let init = Some(args.clone());
118        let page_stream = stream::try_unfold(init, move |state| {
119            let store = store.clone();
120            async move {
121                if let Some(args) = state {
122                    let page = store.list(args.clone()).await?;
123                    let next = page
124                        .next_cursor
125                        .as_ref()
126                        .map(|c| args.clone().with_cursor(c.clone()));
127                    Ok(Some((page, next)))
128                } else {
129                    Ok(None)
130                }
131            }
132        });
133        Box::pin(page_stream)
134    }
135
136    /// Purge all keys in the store.
137    async fn purge_all(&self) -> Result<(), anyhow::Error> {
138        self.delete_prefix("").await
139    }
140
141    /// Get a JSON value from the store.
142    async fn get_json<T: serde::de::DeserializeOwned>(
143        &self,
144        key: &str,
145    ) -> Result<Option<T>, anyhow::Error>
146    where
147        Self: Sized,
148    {
149        match self.get(key).await {
150            Ok(Some(data)) => {
151                let jd = &mut serde_json::Deserializer::from_slice(&data);
152                let out =
153                    serde_path_to_error::deserialize(jd).context("could not deserialize JSON")?;
154
155                Ok(Some(out))
156            }
157            Ok(None) => Ok(None),
158            Err(e) => Err(e),
159        }
160    }
161}
162
163#[async_trait::async_trait]
164impl<K: ObjStore> ObjStore for Arc<K> {
165    fn kind(&self) -> &str {
166        self.as_ref().kind()
167    }
168
169    fn safe_uri(&self) -> &url::Url {
170        self.as_ref().safe_uri()
171    }
172
173    async fn healthcheck(&self) -> Result<(), anyhow::Error> {
174        self.as_ref().healthcheck().await
175    }
176
177    async fn meta(&self, key: &str) -> Result<Option<ObjectMeta>, anyhow::Error> {
178        self.as_ref().meta(key).await
179    }
180
181    async fn get(&self, key: &str) -> Result<Option<Bytes>, anyhow::Error> {
182        self.as_ref().get(key).await
183    }
184
185    async fn get_stream(&self, key: &str) -> Result<Option<ValueStream>, anyhow::Error> {
186        self.as_ref().get_stream(key).await
187    }
188
189    async fn get_with_meta(&self, key: &str) -> Result<Option<(Bytes, ObjectMeta)>, anyhow::Error> {
190        self.as_ref().get_with_meta(key).await
191    }
192
193    async fn get_stream_with_meta(
194        &self,
195        key: &str,
196    ) -> Result<Option<(ObjectMeta, ValueStream)>, anyhow::Error> {
197        self.as_ref().get_stream_with_meta(key).await
198    }
199
200    async fn generate_download_url(
201        &self,
202        args: DownloadUrlArgs,
203    ) -> Result<Option<url::Url>, anyhow::Error> {
204        self.as_ref().generate_download_url(args).await
205    }
206
207    async fn send_put(&self, put: Put) -> Result<ObjectMeta, anyhow::Error> {
208        self.as_ref().send_put(put).await
209    }
210    async fn send_copy(&self, copy: Copy) -> Result<ObjectMeta, anyhow::Error> {
211        self.as_ref().send_copy(copy).await
212    }
213
214    async fn delete(&self, key: &str) -> Result<(), anyhow::Error> {
215        self.as_ref().delete(key).await
216    }
217
218    async fn delete_prefix(&self, prefix: &str) -> Result<(), anyhow::Error> {
219        self.as_ref().delete_prefix(prefix).await
220    }
221
222    async fn list(&self, args: ListArgs) -> Result<ObjectMetaPage, anyhow::Error> {
223        self.as_ref().list(args).await
224    }
225
226    async fn list_keys(&self, args: ListArgs) -> Result<KeyPage, anyhow::Error> {
227        self.as_ref().list_keys(args).await
228    }
229}
230
231pub type DynObjStore = Arc<dyn ObjStore>;
232
233#[async_trait::async_trait]
234impl ObjStore for DynObjStore {
235    fn kind(&self) -> &str {
236        self.as_ref().kind()
237    }
238
239    fn safe_uri(&self) -> &url::Url {
240        self.as_ref().safe_uri()
241    }
242
243    async fn healthcheck(&self) -> Result<(), anyhow::Error> {
244        self.as_ref().healthcheck().await
245    }
246
247    async fn meta(&self, key: &str) -> Result<Option<ObjectMeta>, anyhow::Error> {
248        self.as_ref().meta(key).await
249    }
250
251    async fn get(&self, key: &str) -> Result<Option<Bytes>, anyhow::Error> {
252        self.as_ref().get(key).await
253    }
254
255    async fn get_stream(&self, key: &str) -> Result<Option<ValueStream>, anyhow::Error> {
256        self.as_ref().get_stream(key).await
257    }
258
259    async fn get_with_meta(&self, key: &str) -> Result<Option<(Bytes, ObjectMeta)>, anyhow::Error> {
260        self.as_ref().get_with_meta(key).await
261    }
262
263    async fn get_stream_with_meta(
264        &self,
265        key: &str,
266    ) -> Result<Option<(ObjectMeta, ValueStream)>, anyhow::Error> {
267        self.as_ref().get_stream_with_meta(key).await
268    }
269
270    async fn generate_download_url(
271        &self,
272        args: DownloadUrlArgs,
273    ) -> Result<Option<url::Url>, anyhow::Error> {
274        self.as_ref().generate_download_url(args).await
275    }
276
277    async fn send_put(&self, put: Put) -> Result<ObjectMeta, anyhow::Error> {
278        self.as_ref().send_put(put).await
279    }
280    async fn send_copy(&self, copy: Copy) -> Result<ObjectMeta, anyhow::Error> {
281        self.as_ref().send_copy(copy).await
282    }
283
284    async fn delete(&self, key: &str) -> Result<(), anyhow::Error> {
285        self.as_ref().delete(key).await
286    }
287
288    async fn list(&self, args: ListArgs) -> Result<ObjectMetaPage, anyhow::Error> {
289        self.as_ref().list(args).await
290    }
291
292    async fn list_keys(&self, args: ListArgs) -> Result<KeyPage, anyhow::Error> {
293        self.as_ref().list_keys(args).await
294    }
295
296    async fn delete_prefix(&self, prefix: &str) -> Result<(), anyhow::Error> {
297        self.as_ref().delete_prefix(prefix).await
298    }
299
300    async fn get_json<T: serde::de::DeserializeOwned>(
301        &self,
302        key: &str,
303    ) -> Result<Option<T>, anyhow::Error> {
304        match self.get(key).await {
305            Ok(Some(data)) => {
306                let jd = &mut serde_json::Deserializer::from_slice(&data);
307                let out =
308                    serde_path_to_error::deserialize(jd).context("could not deserialize JSON")?;
309
310                Ok(Some(out))
311            }
312            Ok(None) => Ok(None),
313            Err(e) => Err(e),
314        }
315    }
316}
317
318pub struct PutBuilder<'a, S> {
319    store: &'a S,
320    key: String,
321    conditions: Conditions,
322    /// Specifies the MIME type of the data.
323    mime_type: Option<String>,
324}
325
326impl<'a, S: ObjStore> PutBuilder<'a, S>
327where
328    S: ObjStore,
329{
330    pub fn build(self, data: impl Into<DataSource>) -> Put {
331        let mut put = Put::new(self.key, data.into());
332        put.conditions = self.conditions;
333        put.mime_type = self.mime_type;
334        put
335    }
336
337    pub async fn json<T: serde::Serialize>(self, data: &T) -> Result<ObjectMeta, anyhow::Error> {
338        let data = serde_json::to_vec(data).context("could not serialize JSON data for put")?;
339        let store = self.store;
340        let put = self.build(DataSource::Data(Bytes::from(data)));
341        store.send_put(put).await
342    }
343
344    pub async fn send(self, data: impl Into<DataSource>) -> Result<ObjectMeta, anyhow::Error> {
345        let store = self.store;
346        let put = self.build(data);
347        store.send_put(put).await
348    }
349
350    pub async fn text(self, text: impl Into<String>) -> Result<ObjectMeta, anyhow::Error> {
351        let data = Bytes::from(text.into());
352        self.send(DataSource::Data(data)).await
353    }
354
355    pub async fn bytes(self, data: impl Into<Bytes>) -> Result<ObjectMeta, anyhow::Error> {
356        self.send(DataSource::Data(data.into())).await
357    }
358
359    pub async fn stream<D, E>(
360        self,
361        stream: impl futures::Stream<Item = Result<D, E>> + Send + 'static,
362    ) -> Result<ObjectMeta, anyhow::Error>
363    where
364        Bytes: From<D>,
365        anyhow::Error: From<E>,
366        E: Send + 'static,
367    {
368        let stream: ValueStream = stream
369            .map_ok(|item: D| Bytes::from(item))
370            .map_err(anyhow::Error::from)
371            .boxed();
372
373        self.send(DataSource::Stream(stream)).await
374    }
375}
376
377/// Builder for a copy request from one key to another, respecting conditions.
378pub struct CopyBuilder<'a, S> {
379    store: &'a S,
380    src: String,
381    dest: String,
382    conditions: Conditions,
383}
384
385impl<'a, S: ObjStore> CopyBuilder<'a, S>
386where
387    S: ObjStore,
388{
389    /// Construct the underlying `Copy` request.
390    pub fn build(&self) -> Copy {
391        let mut copy = Copy::new(self.src.clone(), self.dest.clone());
392        copy.conditions = self.conditions.clone();
393        copy
394    }
395
396    /// Execute the copy request.
397    pub async fn send(self) -> Result<ObjectMeta, anyhow::Error> {
398        let mut copy = Copy::new(self.src.clone(), self.dest.clone());
399        copy.conditions = self.conditions.clone();
400        self.store.send_copy(copy).await
401    }
402}
403
404pub trait ObjStoreExt: ObjStore
405where
406    Self: Sized,
407{
408    fn put(&self, key: &str) -> PutBuilder<'_, Self> {
409        PutBuilder {
410            store: self,
411            key: key.to_string(),
412            conditions: Conditions::default(),
413            mime_type: None,
414        }
415    }
416
417    /// Begin a copy operation from `src` to `dest`, allows setting conditions.
418    fn copy(&self, src: &str, dest: &str) -> CopyBuilder<'_, Self> {
419        CopyBuilder {
420            store: self,
421            src: src.to_string(),
422            dest: dest.to_string(),
423            conditions: Conditions::default(),
424        }
425    }
426}
427
428impl<S: ObjStore> ObjStoreExt for S {}