#![doc = include_str!("../doc/version_compatibility_matrix.md")]
use futures::{StreamExt, TryStreamExt, stream};
pub use object_store;
use object_store::ObjectStoreExt;
use object_store::path::Path;
use zarrs_storage::byte_range::ByteRangeIterator;
use zarrs_storage::{
AsyncListableStorageTraits, AsyncMaybeBytesIterator, AsyncReadableStorageTraits,
AsyncWritableStorageTraits, Bytes, MaybeBytes, OffsetBytesIterator, StorageError, StoreKey,
StoreKeys, StoreKeysPrefixes, StorePrefix, async_store_set_partial_many,
};
fn key_to_path(key: &StoreKey) -> Path {
Path::from(key.as_str())
}
fn handle_result_notfound<T>(
result: Result<T, object_store::Error>,
) -> Result<Option<T>, StorageError> {
match result {
Ok(result) => Ok(Some(result)),
Err(err) => {
if matches!(err, object_store::Error::NotFound { .. }) {
Ok(None)
} else {
Err(StorageError::Other(err.to_string()))
}
}
}
}
fn handle_result<T>(result: Result<T, object_store::Error>) -> Result<T, StorageError> {
result.map_err(|err| StorageError::Other(err.to_string()))
}
pub struct AsyncObjectStore<T> {
object_store: T,
}
impl<T: object_store::ObjectStore> AsyncObjectStore<T> {
#[must_use]
pub fn new(object_store: T) -> Self {
Self { object_store }
}
}
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
impl<T: object_store::ObjectStore> AsyncReadableStorageTraits for AsyncObjectStore<T> {
async fn get(&self, key: &StoreKey) -> Result<MaybeBytes, StorageError> {
let get = handle_result_notfound(self.object_store.get(&key_to_path(key)).await)?;
if let Some(get) = get {
let bytes = handle_result(get.bytes().await)?;
Ok(Some(bytes))
} else {
Ok(None)
}
}
async fn get_partial_many<'a>(
&'a self,
key: &StoreKey,
byte_ranges: ByteRangeIterator<'a>,
) -> Result<AsyncMaybeBytesIterator<'a>, StorageError> {
let Some(size) = self.size_key(key).await? else {
return Ok(None);
};
let ranges = byte_ranges
.map(|byte_range| byte_range.to_range(size))
.collect::<Vec<_>>();
let get_ranges = handle_result_notfound(
self.object_store
.get_ranges(&key_to_path(key), &ranges)
.await,
)?;
if let Some(get_ranges) = get_ranges {
let result = std::iter::zip(ranges, get_ranges).map(|(range, bytes)| {
let range_len = range.end.saturating_sub(range.start);
if range_len == bytes.len() as u64 {
Ok(bytes)
} else {
Err(StorageError::Other(format!(
"Unexpected length of bytes returned, expected {}, got {}",
range_len,
bytes.len()
)))
}
});
Ok(Some(stream::iter(result).boxed()))
} else {
Ok(None)
}
}
async fn size_key(&self, key: &StoreKey) -> Result<Option<u64>, StorageError> {
Ok(
handle_result_notfound(self.object_store.head(&key_to_path(key)).await)?
.map(|meta| meta.size),
)
}
fn supports_get_partial(&self) -> bool {
true
}
}
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
impl<T: object_store::ObjectStore> AsyncWritableStorageTraits for AsyncObjectStore<T> {
async fn set(&self, key: &StoreKey, value: Bytes) -> Result<(), StorageError> {
handle_result(self.object_store.put(&key_to_path(key), value.into()).await)?;
Ok(())
}
async fn set_partial_many<'a>(
&'a self,
key: &StoreKey,
offset_values: OffsetBytesIterator<'a>,
) -> Result<(), StorageError> {
async_store_set_partial_many(self, key, offset_values).await
}
async fn erase(&self, key: &StoreKey) -> Result<(), StorageError> {
handle_result_notfound(self.object_store.delete(&key_to_path(key)).await)?;
Ok(())
}
async fn erase_prefix(&self, prefix: &StorePrefix) -> Result<(), StorageError> {
let prefix: Path = prefix.as_str().into();
let locations = self
.object_store
.list(Some(&prefix))
.map_ok(|m| m.location)
.boxed();
handle_result(
self.object_store
.delete_stream(locations)
.try_collect::<Vec<Path>>()
.await,
)?;
Ok(())
}
fn supports_set_partial(&self) -> bool {
false
}
}
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
impl<T: object_store::ObjectStore> AsyncListableStorageTraits for AsyncObjectStore<T> {
async fn list(&self) -> Result<StoreKeys, StorageError> {
let mut list = handle_result(
self.object_store
.list(None)
.collect::<Vec<_>>()
.await
.into_iter()
.map(|object_meta| {
object_meta.map(|object_meta| {
let path: &str = object_meta.location.as_ref();
StoreKey::try_from(path).unwrap() })
})
.collect::<Result<Vec<_>, _>>(),
)?;
list.sort();
Ok(list)
}
async fn list_prefix(&self, prefix: &StorePrefix) -> Result<StoreKeys, StorageError> {
let path: Path = prefix.as_str().into();
let mut list = handle_result(
self.object_store
.list(Some(&path))
.collect::<Vec<_>>()
.await
.into_iter()
.map(|object_meta| {
object_meta.map(|object_meta| {
let path: &str = object_meta.location.as_ref();
StoreKey::try_from(path).unwrap() })
})
.collect::<Result<Vec<_>, _>>(),
)?;
list.sort();
Ok(list)
}
async fn list_dir(&self, prefix: &StorePrefix) -> Result<StoreKeysPrefixes, StorageError> {
let path: Path = prefix.as_str().into();
let list_result = handle_result(self.object_store.list_with_delimiter(Some(&path)).await)?;
let mut prefixes = list_result
.common_prefixes
.iter()
.map(|path| {
let path: &str = path.as_ref();
StorePrefix::new(path.to_string() + "/")
})
.collect::<Result<Vec<_>, _>>()?;
let mut keys = list_result
.objects
.iter()
.map(|object_meta| {
let path: &str = object_meta.location.as_ref();
StoreKey::try_from(path)
})
.collect::<Result<Vec<_>, _>>()?;
keys.sort();
prefixes.sort();
Ok(StoreKeysPrefixes::new(keys, prefixes))
}
async fn size_prefix(&self, prefix: &StorePrefix) -> Result<u64, StorageError> {
let prefix: Path = prefix.as_str().into();
let mut locations = self.object_store.list(Some(&prefix));
let mut size = 0;
while let Some(item) = locations.next().await {
let meta = handle_result(item)?;
size += meta.size;
}
Ok(size)
}
async fn size(&self) -> Result<u64, StorageError> {
let mut locations = self.object_store.list(None);
let mut size = 0;
while let Some(item) = locations.next().await {
let meta = handle_result(item)?;
size += meta.size;
}
Ok(size)
}
}