zarrs_object_store/
lib.rs

1//! [`object_store`] store support for the [`zarrs`](https://docs.rs/zarrs/latest/zarrs/index.html) crate.
2//!
3//! ```
4//! # use std::sync::Arc;
5//! use zarrs_storage::AsyncReadableWritableListableStorage;
6//! use zarrs_object_store::AsyncObjectStore;
7//!
8//! let options = object_store::ClientOptions::new().with_allow_http(true);
9//! let store = object_store::http::HttpBuilder::new()
10//!     .with_url("http://...")
11//!     .with_client_options(options)
12//!     .build()?;
13//! let store: AsyncReadableWritableListableStorage =
14//!     Arc::new(AsyncObjectStore::new(store));
15//! # Ok::<_, Box<dyn std::error::Error>>(())
16//! ```
17//!
18//! ## Version Compatibility Matrix
19//!
20#![doc = include_str!("../doc/version_compatibility_matrix.md")]
21//!
22//! `object_store` is re-exported as a dependency of this crate, so it does not need to be specified as a direct dependency.
23//! You can enable `object_store` features `fs`, `aws`, `azure`, `gcp` and `http` by enabling features for this crate of the same name.
24//!
25//! However, if `object_store` is a direct dependency, it is necessary to ensure that the version used by this crate is compatible.
26//! This crate can depend on a range of semver-incompatible versions of `object_store`, and Cargo will not automatically choose a single version of `object_store` that satisfies all dependencies.
27//! Use a precise cargo update to ensure compatibility.
28//! For example, if this crate resolves to `object_store` 0.11.1 and your code uses 0.10.2:
29//! ```shell
30//! cargo update --package object_store:0.11.1 --precise 0.10.2
31//! ```
32//!
33//! ## Licence
34//! `zarrs_object_store` is licensed under either of
35//! - the Apache License, Version 2.0 [LICENSE-APACHE](https://docs.rs/crate/zarrs_object_store/latest/source/LICENCE-APACHE) or <http://www.apache.org/licenses/LICENSE-2.0> or
36//! - the MIT license [LICENSE-MIT](https://docs.rs/crate/zarrs_object_store/latest/source/LICENCE-MIT) or <http://opensource.org/licenses/MIT>, at your option.
37
38pub use object_store;
39
40use futures::{stream, StreamExt, TryStreamExt};
41use object_store::path::Path;
42
43use zarrs_storage::{
44    async_store_set_partial_many, byte_range::ByteRangeIterator, AsyncListableStorageTraits,
45    AsyncMaybeBytesIterator, AsyncReadableStorageTraits, AsyncWritableStorageTraits, Bytes,
46    MaybeBytes, OffsetBytesIterator, StorageError, StoreKey, StoreKeys, StoreKeysPrefixes,
47    StorePrefix,
48};
49
50/// Maps a [`StoreKey`] to an [`object_store`] path.
51fn key_to_path(key: &StoreKey) -> object_store::path::Path {
52    object_store::path::Path::from(key.as_str())
53}
54
55/// Map [`object_store::Error::NotFound`] to None, pass through other errors
56fn handle_result_notfound<T>(
57    result: Result<T, object_store::Error>,
58) -> Result<Option<T>, StorageError> {
59    match result {
60        Ok(result) => Ok(Some(result)),
61        Err(err) => {
62            if matches!(err, object_store::Error::NotFound { .. }) {
63                Ok(None)
64            } else {
65                Err(StorageError::Other(err.to_string()))
66            }
67        }
68    }
69}
70
71fn handle_result<T>(result: Result<T, object_store::Error>) -> Result<T, StorageError> {
72    result.map_err(|err| StorageError::Other(err.to_string()))
73}
74
75/// An asynchronous store backed by an [`object_store::ObjectStore`].
76pub struct AsyncObjectStore<T> {
77    object_store: T,
78    // locks: AsyncStoreLocks,
79}
80
81impl<T: object_store::ObjectStore> AsyncObjectStore<T> {
82    /// Create a new [`AsyncObjectStore`].
83    #[must_use]
84    pub fn new(object_store: T) -> Self {
85        Self { object_store }
86    }
87}
88
89#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
90#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
91impl<T: object_store::ObjectStore> AsyncReadableStorageTraits for AsyncObjectStore<T> {
92    async fn get(&self, key: &StoreKey) -> Result<MaybeBytes, StorageError> {
93        let get = handle_result_notfound(self.object_store.get(&key_to_path(key)).await)?;
94        if let Some(get) = get {
95            let bytes = handle_result(get.bytes().await)?;
96            Ok(Some(bytes))
97        } else {
98            Ok(None)
99        }
100    }
101
102    async fn get_partial_many<'a>(
103        &'a self,
104        key: &StoreKey,
105        byte_ranges: ByteRangeIterator<'a>,
106    ) -> Result<AsyncMaybeBytesIterator<'a>, StorageError> {
107        let Some(size) = self.size_key(key).await? else {
108            return Ok(None);
109        };
110        let ranges = byte_ranges
111            .map(|byte_range| byte_range.to_range(size))
112            .collect::<Vec<_>>();
113        let get_ranges = handle_result_notfound(
114            self.object_store
115                .get_ranges(&key_to_path(key), &ranges)
116                .await,
117        )?;
118        if let Some(get_ranges) = get_ranges {
119            let result = std::iter::zip(ranges, get_ranges).map(|(range, bytes)| {
120                let range_len = range.end.saturating_sub(range.start);
121                if range_len == bytes.len() as u64 {
122                    Ok(bytes)
123                } else {
124                    Err(StorageError::Other(format!(
125                        "Unexpected length of bytes returned, expected {}, got {}",
126                        range_len,
127                        bytes.len()
128                    )))
129                }
130            });
131            Ok(Some(stream::iter(result).boxed()))
132        } else {
133            Ok(None)
134        }
135    }
136
137    async fn size_key(&self, key: &StoreKey) -> Result<Option<u64>, StorageError> {
138        Ok(
139            handle_result_notfound(self.object_store.head(&key_to_path(key)).await)?
140                .map(|meta| meta.size),
141        )
142    }
143
144    fn supports_get_partial(&self) -> bool {
145        true
146    }
147}
148
149#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
150#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
151impl<T: object_store::ObjectStore> AsyncWritableStorageTraits for AsyncObjectStore<T> {
152    async fn set(&self, key: &StoreKey, value: Bytes) -> Result<(), StorageError> {
153        handle_result(self.object_store.put(&key_to_path(key), value.into()).await)?;
154        Ok(())
155    }
156
157    async fn set_partial_many<'a>(
158        &'a self,
159        key: &StoreKey,
160        offset_values: OffsetBytesIterator<'a>,
161    ) -> Result<(), StorageError> {
162        async_store_set_partial_many(self, key, offset_values).await
163    }
164
165    async fn erase(&self, key: &StoreKey) -> Result<(), StorageError> {
166        handle_result_notfound(self.object_store.delete(&key_to_path(key)).await)?;
167        Ok(())
168    }
169
170    async fn erase_prefix(&self, prefix: &StorePrefix) -> Result<(), StorageError> {
171        let prefix: object_store::path::Path = prefix.as_str().into();
172        let locations = self
173            .object_store
174            .list(Some(&prefix))
175            .map_ok(|m| m.location)
176            .boxed();
177        handle_result(
178            self.object_store
179                .delete_stream(locations)
180                .try_collect::<Vec<Path>>()
181                .await,
182        )?;
183        Ok(())
184    }
185
186    fn supports_set_partial(&self) -> bool {
187        false
188    }
189}
190
191#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
192#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
193impl<T: object_store::ObjectStore> AsyncListableStorageTraits for AsyncObjectStore<T> {
194    async fn list(&self) -> Result<StoreKeys, StorageError> {
195        let mut list = handle_result(
196            self.object_store
197                .list(None)
198                .collect::<Vec<_>>()
199                .await
200                .into_iter()
201                .map(|object_meta| {
202                    object_meta.map(|object_meta| {
203                        let path: &str = object_meta.location.as_ref();
204                        StoreKey::try_from(path).unwrap() // FIXME
205                    })
206                })
207                .collect::<Result<Vec<_>, _>>(),
208        )?;
209        list.sort();
210        Ok(list)
211    }
212
213    async fn list_prefix(&self, prefix: &StorePrefix) -> Result<StoreKeys, StorageError> {
214        // TODO: Check if this is outputting everything under prefix, or just one level under
215        let path: object_store::path::Path = prefix.as_str().into();
216        let mut list = handle_result(
217            self.object_store
218                .list(Some(&path))
219                .collect::<Vec<_>>()
220                .await
221                .into_iter()
222                .map(|object_meta| {
223                    object_meta.map(|object_meta| {
224                        let path: &str = object_meta.location.as_ref();
225                        StoreKey::try_from(path).unwrap() // FIXME
226                    })
227                })
228                .collect::<Result<Vec<_>, _>>(),
229        )?;
230        list.sort();
231        Ok(list)
232    }
233
234    async fn list_dir(&self, prefix: &StorePrefix) -> Result<StoreKeysPrefixes, StorageError> {
235        let path: object_store::path::Path = prefix.as_str().into();
236        let list_result = handle_result(self.object_store.list_with_delimiter(Some(&path)).await)?;
237        let mut prefixes = list_result
238            .common_prefixes
239            .iter()
240            .map(|path| {
241                let path: &str = path.as_ref();
242                StorePrefix::new(path.to_string() + "/")
243            })
244            .collect::<Result<Vec<_>, _>>()?;
245        let mut keys = list_result
246            .objects
247            .iter()
248            .map(|object_meta| {
249                let path: &str = object_meta.location.as_ref();
250                StoreKey::try_from(path)
251            })
252            .collect::<Result<Vec<_>, _>>()?;
253        keys.sort();
254        prefixes.sort();
255        Ok(StoreKeysPrefixes::new(keys, prefixes))
256    }
257
258    async fn size_prefix(&self, prefix: &StorePrefix) -> Result<u64, StorageError> {
259        let prefix: object_store::path::Path = prefix.as_str().into();
260        let mut locations = self.object_store.list(Some(&prefix));
261        let mut size = 0;
262        while let Some(item) = locations.next().await {
263            let meta = handle_result(item)?;
264            size += meta.size;
265        }
266        Ok(size)
267    }
268
269    async fn size(&self) -> Result<u64, StorageError> {
270        let mut locations = self.object_store.list(None);
271        let mut size = 0;
272        while let Some(item) = locations.next().await {
273            let meta = handle_result(item)?;
274            size += meta.size;
275        }
276        Ok(size)
277    }
278}