use std::{
collections::HashSet,
fmt::Display,
iter,
ops::{Deref as _, DerefMut as _},
sync::{
Arc, Mutex,
atomic::{AtomicUsize, Ordering},
},
};
use async_stream::try_stream;
use bytes::Bytes;
use futures::{Stream, StreamExt as _, TryStreamExt as _};
use itertools::{Either, Itertools as _, izip, repeat_n};
use serde::{Deserialize, Serialize, de};
use thiserror::Error;
use tokio::sync::RwLock;
use tracing::instrument;
use crate::{
error::ICError,
format::{
ByteRange, ChunkIndices, ChunkOffset, Path, PathError,
format_constants::SpecVersionBin,
manifest::{ChunkPayload, VirtualChunkRef},
snapshot::{ArrayShape, DimensionName, NodeData, NodeSnapshot, NodeType},
},
refs::RefErrorKind,
repository::RepositoryErrorKind,
session::{Session, SessionError, SessionErrorKind, get_chunk, is_prefix_match},
};
use icechunk_types::{ICResultExt as _, error::ICResultCtxExt as _};
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub enum ListDirItem {
Key(String),
Prefix(String),
}
pub type StoreResult<A> = Result<A, StoreError>;
#[derive(Debug, Clone, PartialEq, Eq, Error)]
#[non_exhaustive]
pub enum KeyNotFoundError {
#[error("chunk cannot be find for key `{key}` (path={path}, coords={coords:?})")]
ChunkNotFound { key: String, path: Path, coords: ChunkIndices },
#[error("node not found at `{path}`")]
NodeNotFound { path: Path },
#[error("v2 key not found at `{key}`")]
ZarrV2KeyNotFound { key: String },
}
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum StoreErrorKind {
#[error(transparent)]
SessionError(#[from] SessionErrorKind),
#[error(transparent)]
RepositoryError(#[from] RepositoryErrorKind),
#[error(transparent)]
RefError(#[from] RefErrorKind),
#[error("invalid zarr key format `{key}`")]
InvalidKey { key: String },
#[error(
"invalid chunk coordinates {coords:?}: along axis {axis} with num_chunks={num_chunks}"
)]
InvalidIndex { axis: usize, coords: ChunkIndices, num_chunks: u32 },
#[error("this operation is not allowed: {0}")]
NotAllowed(String),
#[error(transparent)]
NotFound(#[from] KeyNotFoundError),
#[error("error merging stores: `{0}`")]
MergeError(String),
#[error("cannot commit when no snapshot is present")]
NoSnapshot,
#[error("could not create path from prefix")]
PathError(#[from] PathError),
#[error("all commits must be made on a branch")]
NotOnBranch,
#[error("bad metadata")]
BadMetadata(#[from] serde_json::Error),
#[error("error decoding Zarr chunk grid metadata: `{0}`")]
BadChunkGridMetadata(String),
#[error("deserialization error")]
DeserializationError(#[from] Box<rmp_serde::decode::Error>),
#[error("serialization error")]
SerializationError(#[from] Box<rmp_serde::encode::Error>),
#[error("store method `{0}` is not implemented by Icechunk")]
Unimplemented(&'static str),
#[error("bad key prefix ({prefix}): {message}")]
BadKeyPrefix { prefix: String, message: String },
#[error("error during parallel execution of get_partial_values")]
PartialValuesPanic,
#[error("cannot write to read-only store")]
ReadOnly,
#[error(
"uncommitted changes in repository, commit changes or reset repository and try again."
)]
UncommittedChanges,
#[error(
"invalid chunk location, no matching virtual chunk container: `{chunk_location}`"
)]
InvalidVirtualChunkContainer { chunk_location: String },
#[error("{0}")]
Other(String),
#[error("unknown store error")]
Unknown(Box<dyn std::error::Error + Send + Sync>),
}
pub type StoreError = ICError<StoreErrorKind>;
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum SetVirtualRefsResult {
Done,
FailedRefs(Vec<ChunkIndices>),
}
#[derive(Debug, Clone)]
pub struct Store {
session: Arc<RwLock<Session>>,
get_partial_values_concurrency: u16,
}
impl Store {
pub async fn from_session(session: Arc<RwLock<Session>>) -> Self {
let conc = session.read().await.config().get_partial_values_concurrency();
Self::from_session_and_config(session, conc)
}
pub fn from_session_and_config(
session: Arc<RwLock<Session>>,
get_partial_values_concurrency: u16,
) -> Self {
Self { session, get_partial_values_concurrency }
}
#[instrument(skip_all)]
pub fn from_bytes(bytes: &[u8]) -> StoreResult<Self> {
let session: Session =
rmp_serde::from_slice(bytes).map_err(Box::new).capture()?;
let conc = session.config().get_partial_values_concurrency();
Ok(Self::from_session_and_config(Arc::new(RwLock::new(session)), conc))
}
#[instrument(skip_all)]
pub async fn as_bytes(&self) -> StoreResult<Bytes> {
let session = self.session.write().await;
let bytes = rmp_serde::to_vec(session.deref()).map_err(Box::new).capture()?;
Ok(Bytes::from(bytes))
}
pub fn session(&self) -> Arc<RwLock<Session>> {
Arc::clone(&self.session)
}
#[instrument(skip_all)]
pub async fn read_only(&self) -> bool {
self.session.read().await.read_only()
}
#[instrument(skip(self))]
pub async fn is_empty(&self, prefix: &str) -> StoreResult<bool> {
Ok(self.list_dir(prefix).await?.next().await.is_none())
}
#[instrument(skip_all)]
pub async fn clear(&self) -> StoreResult<()> {
let mut repo = self.session.write().await;
repo.clear().await.inject()
}
#[instrument(skip(self))]
pub async fn get(&self, key: &str, byte_range: &ByteRange) -> StoreResult<Bytes> {
let repo = self.session.read().await;
get_key(key, byte_range, &repo).await
}
#[instrument(skip_all)]
pub async fn get_partial_values(
&self,
key_ranges: impl IntoIterator<Item = (String, ByteRange)>,
) -> StoreResult<Vec<StoreResult<Bytes>>> {
let stream = futures::stream::iter(key_ranges);
let results = Arc::new(Mutex::new(Vec::new()));
let num_keys = AtomicUsize::new(0);
stream
.for_each_concurrent(
self.get_partial_values_concurrency as usize,
|(key, range)| {
let index = num_keys.fetch_add(1, Ordering::Release);
let results = Arc::clone(&results);
async move {
let value = self.get(&key, &range).await;
if let Ok(mut results) = results.lock() {
if index >= results.len() {
results.resize_with(index + 1, || None);
}
results[index] = Some(value);
}
}
},
)
.await;
let results = Arc::into_inner(results)
.ok_or(StoreErrorKind::PartialValuesPanic)
.capture()?
.into_inner()
.map_err(|_| StoreErrorKind::PartialValuesPanic)
.capture()?;
debug_assert!(results.len() == num_keys.into_inner());
let res: Option<Vec<_>> = results.into_iter().collect();
res.ok_or(StoreErrorKind::PartialValuesPanic).capture()
}
#[instrument(skip(self))]
pub async fn exists(&self, key: &str) -> StoreResult<bool> {
let guard = self.session.read().await;
exists(key, guard.deref()).await
}
#[instrument(skip_all)]
pub fn supports_writes(&self) -> StoreResult<bool> {
Ok(true)
}
#[instrument(skip_all)]
pub fn supports_consolidated_metadata(&self) -> StoreResult<bool> {
Ok(false)
}
#[instrument(skip_all)]
pub fn supports_deletes(&self) -> StoreResult<bool> {
Ok(true)
}
#[instrument(skip(self, value))]
pub async fn set(&self, key: &str, value: Bytes) -> StoreResult<()> {
if self.read_only().await {
return Err(StoreError::capture(StoreErrorKind::ReadOnly));
}
self.set_with_optional_locking(key, value, None).await
}
async fn set_with_optional_locking(
&self,
key: &str,
value: Bytes,
locked_session: Option<&mut Session>,
) -> StoreResult<()> {
if let Some(session) = locked_session.as_ref() {
if session.read_only() {
return Err(StoreError::capture(StoreErrorKind::ReadOnly));
}
} else if self.read_only().await {
return Err(StoreError::capture(StoreErrorKind::ReadOnly));
}
match Key::parse(key)? {
Key::Metadata { node_path } => {
let node_meta =
serde_json::from_slice::<NodeMetadata>(value.as_ref()).capture()?;
match node_meta.node_type.as_str() {
"array" => {
let array_meta =
serde_json::from_slice(value.as_ref()).capture()?;
self.set_array_meta(node_path, value, array_meta, locked_session)
.await
}
"group" => {
self.set_group_meta(node_path, value, locked_session).await
}
_ => unreachable!(),
}
}
Key::Chunk { node_path, coords } => {
match locked_session {
Some(session) => {
let writer = session.get_chunk_writer().inject()?;
let payload = writer(value).await.inject()?;
session
.set_chunk_ref(node_path, coords, Some(payload))
.await
.inject()?;
}
None => {
let writer =
self.session.read().await.get_chunk_writer().inject()?;
let payload = writer(value).await.inject()?;
self.session
.write()
.await
.set_chunk_ref(node_path, coords, Some(payload))
.await
.inject()?;
}
}
Ok(())
}
Key::ZarrV2(_) => Err(StoreErrorKind::Unimplemented(
"Icechunk cannot set Zarr V2 metadata keys",
))
.capture(),
}
}
#[instrument(skip(self, bytes))]
pub async fn set_if_not_exists(&self, key: &str, bytes: Bytes) -> StoreResult<()> {
let mut guard = self.session.write().await;
if exists(key, guard.deref()).await? {
Ok(())
} else {
self.set_with_optional_locking(key, bytes, Some(guard.deref_mut())).await
}
}
#[instrument(skip(self))]
pub async fn set_virtual_ref(
&self,
key: &str,
reference: VirtualChunkRef,
validate_container: bool,
) -> StoreResult<()> {
if self.read_only().await {
return Err(StoreError::capture(StoreErrorKind::ReadOnly));
}
match Key::parse(key)? {
Key::Chunk { node_path, coords } => {
let mut session = self.session.write().await;
if validate_container
&& session.matching_container(&reference.location).is_none()
{
return Err(StoreErrorKind::InvalidVirtualChunkContainer {
chunk_location: reference.location.url().to_string(),
})
.capture();
}
session
.set_chunk_ref(
node_path,
coords,
Some(ChunkPayload::Virtual(reference)),
)
.await
.inject()?;
Ok(())
}
Key::Metadata { .. } | Key::ZarrV2(_) => Err(StoreErrorKind::NotAllowed(
format!("use .set to modify metadata for key {key}"),
))
.capture(),
}
}
#[instrument(skip(self, references))]
pub async fn set_virtual_refs<I>(
&self,
array_path: &Path,
validate_container: bool,
references: I,
) -> StoreResult<SetVirtualRefsResult>
where
I: IntoIterator<Item = (ChunkIndices, VirtualChunkRef)> + std::fmt::Debug,
{
if self.read_only().await {
return Err(StoreError::capture(StoreErrorKind::ReadOnly));
}
let mut session = self.session.write().await;
let node = session.get_array(array_path).await.inject()?;
let mut failed = Vec::new();
for (index, reference) in references.into_iter() {
if validate_container
&& session.matching_container(&reference.location).is_none()
{
failed.push(index);
} else {
session
.set_node_chunk_ref(
&node,
index,
Some(ChunkPayload::Virtual(reference)),
)
.await
.inject()?;
}
}
if failed.is_empty() {
Ok(SetVirtualRefsResult::Done)
} else {
Ok(SetVirtualRefsResult::FailedRefs(failed))
}
}
#[instrument(skip(self))]
pub async fn delete_dir(&self, prefix: &str) -> StoreResult<()> {
if self.read_only().await {
return Err(StoreError::capture(StoreErrorKind::ReadOnly));
}
let prefix = prefix.trim_start_matches('/').trim_end_matches('/');
let path = format!("/{prefix}")
.try_into()
.map_err(|_| StoreErrorKind::BadKeyPrefix {
prefix: prefix.to_owned(),
message: "Cannot convert to a path".to_string(),
})
.capture()?;
let mut guard = self.session.write().await;
let node = guard.get_node(&path).await;
match node {
Ok(node) => Ok(guard.deref_mut().delete_node(node).await.inject()?),
Err(SessionError { kind: SessionErrorKind::NodeNotFound { .. }, .. }) => {
let node = guard.get_closest_ancestor_node(&path).await;
if let Ok(NodeSnapshot {
path: node_path,
node_data: NodeData::Array { .. },
..
}) = node
{
let node_path = node_path.clone();
let to_delete = guard
.array_chunk_iterator(&node_path)
.await
.try_filter_map(|chunk| async {
let coords = chunk.coord.clone();
let chunk_key = Key::Chunk {
node_path: node_path.clone(),
coords: chunk.coord,
};
let res = if is_prefix_match(&chunk_key.to_string(), prefix) {
Some(coords)
} else {
None
};
Ok(res)
})
.try_collect::<Vec<_>>()
.await
.inject()?;
Ok(guard
.deref_mut()
.delete_chunks(&node_path, to_delete.into_iter())
.await
.inject()?)
} else {
Ok(())
}
}
Err(err) => Err(err.inject())?,
}
}
#[instrument(skip(self))]
pub async fn delete(&self, key: &str) -> StoreResult<()> {
let mut session = self.session.write().await;
match Key::parse(key)? {
Key::Metadata { node_path } => {
let node = session.get_node(&node_path).await;
if let Err(SessionError {
kind: SessionErrorKind::NodeNotFound { path: _, message: _ },
..
}) = node
{
return Ok(());
};
Ok(session.delete_node(node.inject()?).await.inject()?)
}
Key::Chunk { node_path, coords } => Ok(session
.delete_chunks(&node_path, vec![coords].into_iter())
.await
.inject()?),
Key::ZarrV2(_) => Ok(()),
}
}
pub fn supports_partial_writes(&self) -> StoreResult<bool> {
Ok(false)
}
#[instrument(skip(self, _key_start_values))]
pub async fn set_partial_values(
&self,
_key_start_values: impl IntoIterator<Item = (&str, ChunkOffset, Bytes)>,
) -> StoreResult<()> {
if self.read_only().await {
return Err(StoreError::capture(StoreErrorKind::ReadOnly));
}
Err(StoreError::capture(StoreErrorKind::Unimplemented("set_partial_values")))
}
pub fn supports_listing(&self) -> StoreResult<bool> {
Ok(true)
}
pub async fn list(
&self,
) -> StoreResult<impl Stream<Item = StoreResult<String>> + Send + use<>> {
self.list_prefix("/").await
}
#[instrument(skip(self))]
pub async fn list_prefix(
&self,
prefix: &str,
) -> StoreResult<impl Stream<Item = StoreResult<String>> + Send + use<>> {
let meta = self.list_metadata_prefix(prefix, false).await?;
let chunks = self.list_chunks_prefix(prefix).await?;
Ok(futures::stream::iter(meta.chain(chunks).collect::<Vec<_>>().await))
}
pub async fn list_dir(
&self,
prefix: &str,
) -> StoreResult<impl Stream<Item = StoreResult<String>> + Send + use<>> {
let res = self.list_dir_items(prefix).await?.map_ok(|item| match item {
ListDirItem::Key(k) => k,
ListDirItem::Prefix(p) => p,
});
Ok(res)
}
#[instrument(skip(self))]
pub async fn list_dir_items(
&self,
prefix: &str,
) -> StoreResult<impl Stream<Item = StoreResult<ListDirItem>> + Send + use<>> {
let prefix = prefix.trim_end_matches("/");
let absolute_prefix =
if !prefix.starts_with("/") { &format!("/{prefix}") } else { prefix };
let path = Path::try_from(absolute_prefix).capture()?;
let session = Arc::clone(&self.session).read_owned().await;
let results = match session.get_node(&path).await {
Ok(NodeSnapshot { node_data: NodeData::Array { .. }, .. }) => {
vec![
ListDirItem::Key("zarr.json".to_string()),
ListDirItem::Prefix("c".to_string()),
]
}
Ok(NodeSnapshot { node_data: NodeData::Group, .. }) => {
self.list_metadata_prefix(prefix, true)
.await?
.try_filter_map(|x| async move {
let x = x.trim_end_matches("/zarr.json").to_string();
let res = if x == "zarr.json" {
Some(ListDirItem::Key("zarr.json".to_string()))
} else if x.matches("/").count() == 0 {
Some(ListDirItem::Prefix(x))
} else {
None
};
Ok(res)
})
.try_collect::<Vec<_>>()
.await?
}
Err(_) => {
let node = session.get_closest_ancestor_node(&path).await;
if let Ok(node) = node {
let node_path = node.path.clone();
session
.array_chunk_iterator(&node.path)
.await
.try_filter_map(|chunk| async {
let chunk_key = Key::Chunk {
node_path: node_path.clone(),
coords: chunk.coord,
}
.to_string();
let res = if is_prefix_match(&chunk_key, prefix) {
{
let trimmed = chunk_key
.trim_start_matches(prefix)
.trim_start_matches('/');
if trimmed.is_empty() {
None
} else if let Some((chunk_prefix, _)) =
trimmed.split_once('/')
{
Some(ListDirItem::Prefix(
chunk_prefix.to_string(),
))
} else {
Some(ListDirItem::Key(trimmed.to_string()))
}
}
} else {
None
};
Ok(res)
})
.try_collect::<HashSet<_>>()
.await
.inject()?
.into_iter()
.collect::<Vec<_>>()
} else {
vec![]
}
}
};
Ok(futures::stream::iter(results.into_iter().map(Ok)))
}
#[instrument(skip(self))]
pub async fn getsize(&self, key: &str) -> StoreResult<u64> {
let session = self.session.read().await;
get_key_size(key, &session).await
}
#[instrument(skip(self))]
pub async fn getsize_prefix(&self, prefix: &str) -> StoreResult<u64> {
let session_guard = Arc::clone(&self.session).read_owned().await;
let session = session_guard.deref();
let meta = self.list_metadata_prefix(prefix, false).await?;
let chunks = self.list_chunks_prefix(prefix).await?;
meta.chain(chunks)
.try_fold(0, move |accum, key| async move {
get_key_size(key.as_str(), session).await.map(|n| n + accum)
})
.await
}
async fn set_array_meta(
&self,
path: Path,
user_data: Bytes,
array_meta: ArrayMetadata,
locked_session: Option<&mut Session>,
) -> Result<(), StoreError> {
match locked_session {
Some(session) => set_array_meta(path, user_data, array_meta, session).await,
None => self.set_array_meta_locking(path, user_data, array_meta).await,
}
}
async fn set_array_meta_locking(
&self,
path: Path,
user_data: Bytes,
array_meta: ArrayMetadata,
) -> Result<(), StoreError> {
let mut guard = self.session.write().await;
set_array_meta(path, user_data, array_meta, guard.deref_mut()).await
}
async fn set_group_meta(
&self,
path: Path,
user_data: Bytes,
locked_repo: Option<&mut Session>,
) -> Result<(), StoreError> {
match locked_repo {
Some(repo) => set_group_meta(path, user_data, repo).await,
None => self.set_group_meta_locking(path, user_data).await,
}
}
async fn set_group_meta_locking(
&self,
path: Path,
user_data: Bytes,
) -> Result<(), StoreError> {
let mut guard = self.session.write().await;
set_group_meta(path, user_data, guard.deref_mut()).await
}
async fn list_metadata_prefix<'a, 'b: 'a>(
&'a self,
prefix: &'b str,
strip_prefix: bool,
) -> StoreResult<impl Stream<Item = StoreResult<String>> + 'a + use<'a>> {
let prefix = prefix.trim_end_matches('/');
let path = format!("/{}", prefix.trim_start_matches('/'))
.try_into()
.map_err(|_| StoreErrorKind::BadKeyPrefix {
prefix: prefix.to_owned(),
message: "Cannot convert to a path".to_string(),
})
.capture()?;
let session = Arc::clone(&self.session).read_owned().await;
if path != Path::root() {
let _ = session
.get_node(&path)
.await
.map_err(|_| StoreErrorKind::BadKeyPrefix {
prefix: prefix.to_owned(),
message: "Only prefixes pointing to a group or array are allowed"
.to_string(),
})
.capture()?;
}
let res = try_stream! {
for node in session.list_nodes(&path).await.inject()? {
let meta_key = Key::Metadata { node_path: node.inject()?.path }.to_string();
if is_prefix_match(&meta_key, prefix) {
if strip_prefix {
yield meta_key.trim_start_matches(prefix).trim_start_matches('/').to_string();
} else {
yield meta_key;
}
}
}
};
Ok(res)
}
async fn list_chunks_prefix<'a, 'b: 'a>(
&'a self,
prefix: &'b str,
) -> StoreResult<impl Stream<Item = StoreResult<String>> + 'a + use<'a>> {
let prefix = prefix.trim_end_matches('/');
let res = try_stream! {
let session = Arc::clone(&self.session).read_owned().await;
let path = format!("/{}", prefix.trim_start_matches('/') ).try_into().map_err(|_| {
StoreErrorKind::BadKeyPrefix {
prefix: prefix.to_owned(),
message: "Cannot convert to a path".to_string(),
}
}).capture()?;
let nodes = if path == Path::root() {
Either::Left(session.list_nodes(&Path::root()).await.inject()?)
} else {
let node = session.get_node(&path).await.map_err(|_| {
StoreErrorKind::BadKeyPrefix {
prefix: prefix.to_owned(),
message: "Only prefixes pointing to a group or array are allowed".to_string(),
}
}).capture()?;
match node.node_type() {
NodeType::Group => Either::Left(session.list_nodes(&node.path).await.inject()?),
NodeType::Array => Either::Right(iter::once(Ok(node))),
}
};
for node in nodes {
let node = node.inject()?;
if node.node_type() == NodeType::Array &&
is_prefix_match(&node.path.to_string()[1..], prefix) {
for await maybe_path_chunk in session.all_node_chunks(&node).await {
match maybe_path_chunk {
Ok((path, chunk)) => {
let chunk_key = Key::Chunk { node_path: path, coords: chunk.coord }.to_string();
yield chunk_key;
}
Err(err) => Err(err).inject()?
}
}
}
}
};
Ok(res)
}
}
async fn set_array_meta(
path: Path,
user_data: Bytes,
array_meta: ArrayMetadata,
session: &mut Session,
) -> StoreResult<()> {
if !array_meta.is_regular() && session.spec_version() < SpecVersionBin::V2 {
return Err(StoreErrorKind::BadChunkGridMetadata(
"Non-regular chunk grids are not supported in icechunk format version 1. \
Please use spec_version=2 or higher."
.into(),
))
.capture();
}
let shape = array_meta.shape()?;
if let Ok(node) = session.get_array(&path).await {
if let NodeData::Array { .. } = node.node_data
&& node.user_data != user_data
{
session
.update_array(&path, shape, array_meta.dimension_names(), user_data)
.await
.inject()?;
}
Ok(())
} else {
session
.add_array(path.clone(), shape, array_meta.dimension_names(), user_data)
.await
.inject()?;
Ok(())
}
}
async fn set_group_meta(
path: Path,
user_data: Bytes,
session: &mut Session,
) -> StoreResult<()> {
if let Ok(node) = session.get_group(&path).await {
if let NodeData::Group = node.node_data
&& node.user_data != user_data
{
session.update_group(&path, user_data).await.inject()?;
}
Ok(())
} else {
session.add_group(path.clone(), user_data).await.inject()?;
Ok(())
}
}
async fn get_metadata(
_key: &str,
path: &Path,
range: &ByteRange,
session: &Session,
) -> StoreResult<Bytes> {
let node = session
.get_node(path)
.await
.map_err(|e| match e {
SessionError { kind: SessionErrorKind::NodeNotFound { .. }, .. } => {
StoreErrorKind::NotFound(KeyNotFoundError::NodeNotFound {
path: path.clone(),
})
}
e => StoreErrorKind::SessionError(e.kind),
})
.capture()?;
Ok(range.slice(&node.user_data))
}
async fn get_chunk_bytes(
key: &str,
path: Path,
coords: ChunkIndices,
byte_range: &ByteRange,
session: &Session,
) -> StoreResult<Bytes> {
let reader = session.get_chunk_reader(&path, &coords, byte_range).await.inject()?;
let chunk = get_chunk(reader).await.inject()?;
chunk
.ok_or_else(|| {
StoreErrorKind::NotFound(KeyNotFoundError::ChunkNotFound {
key: key.to_string(),
path,
coords,
})
})
.capture()
}
async fn get_metadata_size(
key: &str,
path: &Path,
session: &Session,
) -> StoreResult<u64> {
let bytes = get_metadata(key, path, &ByteRange::From(0), session).await?;
Ok(bytes.len() as u64)
}
async fn get_chunk_size(
_key: &str,
path: &Path,
coords: &ChunkIndices,
session: &Session,
) -> StoreResult<u64> {
let chunk_ref = session.get_chunk_ref(path, coords).await.inject()?;
let size = chunk_ref
.map(|payload| match payload {
ChunkPayload::Inline(bytes) => bytes.len() as u64,
ChunkPayload::Virtual(virtual_chunk_ref) => virtual_chunk_ref.length,
ChunkPayload::Ref(chunk_ref) => chunk_ref.length,
_ => 0,
})
.unwrap_or(0);
Ok(size)
}
async fn get_key(
key: &str,
byte_range: &ByteRange,
session: &Session,
) -> StoreResult<Bytes> {
let bytes = match Key::parse(key)? {
Key::Metadata { node_path } => {
get_metadata(key, &node_path, byte_range, session).await
}
Key::Chunk { node_path, coords } => {
get_chunk_bytes(key, node_path, coords, byte_range, session).await
}
Key::ZarrV2(key) => {
Err(StoreErrorKind::NotFound(KeyNotFoundError::ZarrV2KeyNotFound { key }))
.capture()
}
}?;
Ok(bytes)
}
async fn get_key_size(key: &str, session: &Session) -> StoreResult<u64> {
let bytes = match Key::parse(key)? {
Key::Metadata { node_path } => get_metadata_size(key, &node_path, session).await,
Key::Chunk { node_path, coords } => {
get_chunk_size(key, &node_path, &coords, session).await
}
Key::ZarrV2(key) => {
Err(StoreErrorKind::NotFound(KeyNotFoundError::ZarrV2KeyNotFound { key }))
.capture()
}
}?;
Ok(bytes)
}
async fn exists(key: &str, session: &Session) -> StoreResult<bool> {
match Key::parse(key)? {
Key::Metadata { node_path } => match session.get_node(&node_path).await {
Ok(_) => Ok(true),
Err(SessionError { kind: SessionErrorKind::NodeNotFound { .. }, .. }) => {
Ok(false)
}
Err(err) => Err(err.inject()),
},
Key::Chunk { node_path, coords } => {
match session.get_chunk_ref(&node_path, &coords).await {
Ok(r) => Ok(r.is_some()),
Err(SessionError {
kind: SessionErrorKind::NodeNotFound { .. }, ..
}) => Ok(false),
Err(err) => Err(err.inject()),
}
}
Key::ZarrV2(key) => {
Err(StoreErrorKind::NotFound(KeyNotFoundError::ZarrV2KeyNotFound { key }))
.capture()
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
enum Key {
Metadata { node_path: Path },
Chunk { node_path: Path, coords: ChunkIndices },
ZarrV2(String),
}
impl Key {
const ROOT_KEY: &'static str = "zarr.json";
const METADATA_SUFFIX: &'static str = "/zarr.json";
const CHUNK_COORD_PREFIX: &'static str = "c";
fn parse(key: &str) -> Result<Self, StoreError> {
fn parse_chunk(key: &str) -> Result<Key, StoreError> {
if key == ".zgroup"
|| key == ".zarray"
|| key == ".zattrs"
|| key == ".zmetadata"
|| key.ends_with("/.zgroup")
|| key.ends_with("/.zarray")
|| key.ends_with("/.zattrs")
|| key.ends_with("/.zmetadata")
{
return Ok(Key::ZarrV2(key.to_string()));
}
if key == "c" {
return Ok(Key::Chunk {
node_path: Path::root(),
coords: ChunkIndices(vec![]),
});
}
if let Some((path, coords)) = key.rsplit_once(Key::CHUNK_COORD_PREFIX) {
let path = path.strip_suffix('/').unwrap_or(path);
if coords.is_empty() {
Ok(Key::Chunk {
node_path: format!("/{path}")
.try_into()
.map_err(|_| StoreErrorKind::InvalidKey {
key: key.to_string(),
})
.capture()?,
coords: ChunkIndices(vec![]),
})
} else {
let absolute = format!("/{path}")
.try_into()
.map_err(|_| StoreErrorKind::InvalidKey { key: key.to_string() })
.capture()?;
coords
.strip_prefix('/')
.ok_or_else(|| StoreErrorKind::InvalidKey {
key: key.to_string(),
})
.capture()?
.split('/')
.map(|s| s.parse::<u32>())
.collect::<Result<Vec<_>, _>>()
.map(|coords| Key::Chunk {
node_path: absolute,
coords: ChunkIndices(coords),
})
.map_err(|_| StoreErrorKind::InvalidKey { key: key.to_string() })
.capture()
}
} else {
Err(StoreError::capture(StoreErrorKind::InvalidKey {
key: key.to_string(),
}))
}
}
if key == Key::ROOT_KEY {
Ok(Key::Metadata { node_path: Path::root() })
} else if let Some(path) = key.strip_suffix(Key::METADATA_SUFFIX) {
Ok(Key::Metadata {
node_path: format!("/{path}")
.try_into()
.map_err(|_| StoreErrorKind::InvalidKey { key: key.to_string() })
.capture()?,
})
} else {
parse_chunk(key)
}
}
}
impl Display for Key {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Key::Metadata { node_path } => {
let s =
format!("{}{}", &node_path.to_string()[1..], Key::METADATA_SUFFIX)
.trim_start_matches('/')
.to_string();
f.write_str(s.as_str())
}
Key::Chunk { node_path, coords } => {
let coords = coords.0.iter().map(|c| c.to_string()).join("/");
let s = [node_path.to_string()[1..].to_string(), "c".to_string(), coords]
.iter()
.filter(|s| !s.is_empty())
.join("/");
f.write_str(s.as_str())
}
Key::ZarrV2(key) => f.write_str(key.as_str()),
}
}
}
#[derive(Debug, Serialize, Deserialize, PartialEq)]
pub struct ArrayMetadata {
pub shape: Vec<u64>,
#[serde(deserialize_with = "validate_array_node_type")]
node_type: String,
chunk_grid: ChunkGridSerializer,
pub dimension_names: Option<Vec<Option<String>>>,
}
impl ArrayMetadata {
fn dimension_names(&self) -> Option<Vec<DimensionName>> {
self.dimension_names
.as_ref()
.map(|ds| ds.iter().map(|d| d.as_ref().map(|s| s.as_str()).into()).collect())
}
fn is_regular(&self) -> bool {
self.chunk_grid.name == "regular"
}
fn num_chunks(&self) -> StoreResult<Vec<u32>> {
let serde_json::Value::Object(kvs) = &self.chunk_grid.configuration else {
return Err(StoreErrorKind::BadChunkGridMetadata(
"Unsupported chunk grid".into(),
))
.capture();
};
match self.chunk_grid.name.as_str() {
"regular" => {
let values = kvs.get("chunk_shape").and_then(|v| v.as_array()).ok_or_else(|| StoreErrorKind::BadChunkGridMetadata(
"cannot parse `chunk_shape` for regular chunk grid".into(),
),
).capture()?;
let chunks =
values.iter().map(|c| c.as_u64()).collect::<Option<Vec<_>>>().ok_or_else(|| StoreErrorKind::BadChunkGridMetadata(
"cannot parse `chunk_shape` for regular chunk grid".into(),
),
).capture()?;
let num_chunks = chunks
.iter()
.zip(self.shape.iter())
.map(|(c, s)| if *c == 0 { 0 } else {(*s).div_ceil(*c) as u32})
.collect::<Vec<_>>();
Ok(num_chunks)
}
"rectilinear" => {
let values = kvs.get("chunk_shapes").and_then(|v| v.as_array()).ok_or_else(|| StoreErrorKind::BadChunkGridMetadata(
"cannot parse `chunk_shapes` for rectilinear chunk grid".into(),
),
).capture()?;
let num_chunks = values
.iter()
.map(|v| {
let v = v.as_array()?;
v.iter().try_fold(0u32, |acc, inner| {
if inner.is_number() {
Some(acc + 1)
} else if let Some(vec) = inner.as_array() {
let count = vec.get(1)?.as_u64()? as u32;
Some(acc + count)
} else {
None
}
})
})
.collect::<Option<Vec<_>>>()
.ok_or_else(|| StoreErrorKind::BadChunkGridMetadata(
"cannot parse `chunk_shapes` for rectilinear chunk grid".into(),
)).capture()?;
Ok(num_chunks)
}
_other => {
Err(StoreErrorKind::BadChunkGridMetadata(format!(
"Unsupported chunk grid {_other}. Only 'regular' and 'rectilinear' chunk grids are supported.")))
.capture()
}
}
}
pub fn get_chunk_shapes<'a>(
&self,
coords: impl Iterator<Item = &'a ChunkIndices> + 'a,
) -> StoreResult<Box<dyn Iterator<Item = StoreResult<Vec<u32>>> + 'a>> {
let serde_json::Value::Object(kvs) = &self.chunk_grid.configuration else {
return Err(StoreErrorKind::BadChunkGridMetadata(
"Unsupported chunk grid".into(),
))
.capture();
};
match self.chunk_grid.name.as_str() {
"regular" => {
let values = kvs.get("chunk_shape").and_then(|v| v.as_array()).ok_or_else(|| StoreErrorKind::BadChunkGridMetadata(
"cannot parse `chunk_shape` for regular chunk grid".into(),
),
).capture()?;
let chunks = values
.iter()
.map(|c| c.as_u64().map(|c| c as u32))
.collect::<Option<Vec<_>>>()
.ok_or_else(|| StoreErrorKind::BadChunkGridMetadata(
"cannot parse `chunk_shape` for regular chunk grid".into(),
)).capture()?;
let num_chunks = self.num_chunks()?;
let remainder: Vec<u32> = self
.shape
.iter()
.zip(chunks.iter())
.map(|(s, c)| (s % (*c as u64)) as u32)
.collect();
let iter = coords.map(move |coord| {
izip!(
coord.0.iter().enumerate(),
chunks.iter(),
num_chunks.iter(),
remainder.iter()
)
.map(|((axis, axcoord), chunksize, num_chunks, rem)| {
if axcoord >= num_chunks {
Err(StoreErrorKind::InvalidIndex {
axis,
coords: coord.clone(),
num_chunks: *num_chunks,
})
.capture()
} else {
Ok(if *rem == 0 || axcoord < &(num_chunks - 1) {
*chunksize
} else {
*rem
})
}
})
.collect::<StoreResult<Vec<_>>>()
});
Ok(Box::new(iter))
}
"rectilinear" => {
let values = kvs.get("chunk_shapes").and_then(|v| v.as_array()).ok_or_else(|| StoreErrorKind::BadChunkGridMetadata(
"cannot parse `chunk_shapes` for rectilinear chunk grid".into(),
),
).capture()?;
let chunks = values
.iter()
.map(|v| {
let v = v.as_array()?;
v.iter().try_fold(Vec::<u32>::new(), |mut acc, inner| {
if inner.is_number() {
acc.push(inner.as_u64()? as u32);
Some(acc)
} else if let Some(vec) = inner.as_array() {
let elem = vec.first()?.as_u64()? as u32;
let count = vec.get(1)?.as_u64()? as usize;
acc.extend(repeat_n(elem, count));
Some(acc)
} else {
None
}
})
})
.collect::<Option<Vec<_>>>()
.ok_or_else(|| StoreErrorKind::BadChunkGridMetadata(
"cannot parse `chunk_shapes` for rectilinear chunk grid".into(),
)).capture()?;
let iter = coords.map(move |coord: &ChunkIndices| {
coord
.0
.iter()
.enumerate()
.zip(chunks.iter())
.map(|((axis, chunkcoord), chunksizes)| {
if chunkcoord >= &(chunksizes.len() as u32) {
Err(StoreErrorKind::InvalidIndex {
coords: coord.clone(),
axis,
num_chunks: chunksizes.len() as u32,
})
.capture()
} else {
Ok(chunksizes[*chunkcoord as usize])
}
})
.collect::<StoreResult<Vec<_>>>()
});
Ok(Box::new(iter))
}
_other => {
Err(StoreErrorKind::BadChunkGridMetadata(format!(
"Unsupported chunk grid {_other}. Only 'regular' and 'rectilinear' chunk grids are supported.")))
.capture()
}
}
}
fn shape(&self) -> StoreResult<ArrayShape> {
let num_chunks = self.num_chunks()?;
if self.shape.len() != num_chunks.len() {
Err(StoreErrorKind::BadChunkGridMetadata(format!(
"Fewer dimensions on inferred number of chunks {} than shape {}",
self.shape.len(),
num_chunks.len()
)))
.capture()
} else {
ArrayShape::new(
self.shape.iter().zip(num_chunks.iter()).map(|(a, b)| (*a, *b)),
)
.ok_or_else(|| StoreErrorKind::BadChunkGridMetadata("invalid shape".into()))
.capture()
}
}
}
fn validate_array_node_type<'de, D>(d: D) -> Result<String, D::Error>
where
D: de::Deserializer<'de>,
{
let value = String::deserialize(d)?;
if value != "array" {
return Err(de::Error::invalid_value(
de::Unexpected::Str(value.as_str()),
&"the word 'array'",
));
}
Ok(value)
}
#[derive(Debug, Deserialize)]
struct NodeMetadata {
#[serde(deserialize_with = "validate_node_type")]
node_type: String,
}
fn validate_node_type<'de, D>(d: D) -> Result<String, D::Error>
where
D: de::Deserializer<'de>,
{
let value = String::deserialize(d)?;
if value != "array" && value != "group" {
return Err(de::Error::invalid_value(
de::Unexpected::Str(value.as_str()),
&"'array' or 'group'",
));
}
Ok(value)
}
#[derive(Debug, Eq, PartialEq, Serialize, Deserialize)]
struct ChunkGridSerializer {
name: String,
configuration: serde_json::Value,
}
#[cfg(test)]
impl From<Vec<u64>> for ChunkGridSerializer {
fn from(value: Vec<u64>) -> Self {
let arr = serde_json::Value::Array(
value
.iter()
.map(|v| serde_json::Value::Number(serde_json::value::Number::from(*v)))
.collect(),
);
let kvs = serde_json::value::Map::from_iter(iter::once((
"chunk_shape".to_string(),
arr,
)));
Self {
name: "regular".to_string(),
configuration: serde_json::Value::Object(kvs),
}
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use crate::{
ObjectStorage, Repository, repository::VersionInfo,
storage::new_in_memory_storage,
};
use super::*;
use icechunk_format::roundtrip_serialization_tests;
use icechunk_macros::tokio_test;
use pretty_assertions::assert_eq;
use proptest::prelude::*;
use proptest::{collection::vec, option};
use tempfile::TempDir;
prop_compose! {
fn array_metadata()
(shape in vec(any::<u64>(), 1..4),
node_type in Just("array".to_string()),
chunk_grid in vec(any::<u64>(), 1..4),
dimension_names in option::of(
vec(
option::of(any::<String>()),
2..4))) -> ArrayMetadata {
ArrayMetadata {
shape,
node_type,
chunk_grid: chunk_grid.into(),
dimension_names,
}
}
}
roundtrip_serialization_tests!(
serialize_and_deserialize_array_metadata - array_metadata
);
async fn add_group(store: &Store, path: &str) -> StoreResult<()> {
let bytes = Bytes::copy_from_slice(br#"{"zarr_format":3, "node_type":"group"}"#);
store.set(&format!("{path}/zarr.json"), bytes).await?;
Ok(())
}
async fn add_array_and_chunk(store: &Store, path: &str) -> StoreResult<()> {
let zarr_meta = Bytes::copy_from_slice(br#"{"zarr_format":3,"node_type":"array","attributes":{"foo":42},"shape":[2,2,2],"data_type":"int32","chunk_grid":{"name":"regular","configuration":{"chunk_shape":[1,1,1]}},"chunk_key_encoding":{"name":"default","configuration":{"separator":"/"}},"fill_value":0,"codecs":[{"name":"mycodec","configuration":{"foo":42}}],"storage_transformers":[{"name":"mytransformer","configuration":{"bar":43}}],"dimension_names":["x","y","t"]}"#);
store.set(&format!("{path}/zarr.json"), zarr_meta.clone()).await?;
let data = Bytes::copy_from_slice(b"hello");
store.set(&format!("{path}/c/0/1/0"), data).await?;
let data = Bytes::copy_from_slice(b"hello");
store.set(&format!("{path}/c/1/1/0"), data).await?;
Ok(())
}
async fn create_memory_store_repository() -> Repository {
create_memory_store_repository_with_spec(None).await
}
async fn create_memory_store_repository_with_spec(
spec_version: Option<SpecVersionBin>,
) -> Repository {
let storage =
new_in_memory_storage().await.expect("failed to create in-memory store");
Repository::create(None, storage, HashMap::new(), spec_version, true)
.await
.unwrap()
}
async fn all_keys(store: &Store) -> Result<Vec<String>, Box<dyn std::error::Error>> {
let version1 = keys(store, "/").await?;
let mut version2 = store.list().await?.try_collect::<Vec<_>>().await?;
version2.sort();
assert_eq!(version1, version2);
Ok(version1)
}
async fn keys(store: &Store, prefix: &str) -> Result<Vec<String>, StoreError> {
let mut res = store.list_prefix(prefix).await?.try_collect::<Vec<_>>().await?;
res.sort();
Ok(res)
}
#[icechunk_macros::test]
fn test_parse_key() {
assert!(matches!(
Key::parse("zarr.json"),
Ok(Key::Metadata { node_path}) if node_path.to_string() == "/"
));
assert!(matches!(
Key::parse("a/zarr.json"),
Ok(Key::Metadata { node_path }) if node_path.to_string() == "/a"
));
assert!(matches!(
Key::parse("a/b/c/zarr.json"),
Ok(Key::Metadata { node_path }) if node_path.to_string() == "/a/b/c"
));
assert!(matches!(
Key::parse("foo/c"),
Ok(Key::Chunk { node_path, coords }) if node_path.to_string() == "/foo" && coords == ChunkIndices(vec![])
));
assert!(matches!(
Key::parse("foo/bar/c"),
Ok(Key::Chunk { node_path, coords}) if node_path.to_string() == "/foo/bar" && coords == ChunkIndices(vec![])
));
assert!(matches!(
Key::parse("foo/c/1/2/3"),
Ok(Key::Chunk {
node_path,
coords,
}) if node_path.to_string() == "/foo" && coords == ChunkIndices(vec![1,2,3])
));
assert!(matches!(
Key::parse("foo/bar/baz/c/1/2/3"),
Ok(Key::Chunk {
node_path,
coords,
}) if node_path.to_string() == "/foo/bar/baz" && coords == ChunkIndices(vec![1,2,3])
));
assert!(matches!(
Key::parse("c"),
Ok(Key::Chunk { node_path, coords}) if node_path.to_string() == "/" && coords == ChunkIndices(vec![])
));
assert!(matches!(
Key::parse("c/0/0"),
Ok(Key::Chunk { node_path, coords}) if node_path.to_string() == "/" && coords == ChunkIndices(vec![0,0])
));
assert!(matches!(
Key::parse(".zarray"),
Ok(Key::ZarrV2(s) ) if s == ".zarray"
));
assert!(matches!(
Key::parse(".zgroup"),
Ok(Key::ZarrV2(s) ) if s == ".zgroup"
));
assert!(matches!(
Key::parse(".zattrs"),
Ok(Key::ZarrV2(s) ) if s == ".zattrs"
));
assert!(matches!(
Key::parse(".zmetadata"),
Ok(Key::ZarrV2(s) ) if s == ".zmetadata"
));
assert!(matches!(
Key::parse("foo/.zgroup"),
Ok(Key::ZarrV2(s) ) if s == "foo/.zgroup"
));
assert!(matches!(
Key::parse("foo/bar/.zarray"),
Ok(Key::ZarrV2(s) ) if s == "foo/bar/.zarray"
));
assert!(matches!(
Key::parse("foo/.zmetadata"),
Ok(Key::ZarrV2(s) ) if s == "foo/.zmetadata"
));
assert!(matches!(
Key::parse("foo/.zattrs"),
Ok(Key::ZarrV2(s) ) if s == "foo/.zattrs"
));
}
#[icechunk_macros::test]
fn test_format_key() {
assert_eq!(
Key::Metadata { node_path: Path::root() }.to_string(),
"zarr.json".to_string()
);
assert_eq!(
Key::Metadata { node_path: "/a".try_into().unwrap() }.to_string(),
"a/zarr.json".to_string()
);
assert_eq!(
Key::Metadata { node_path: "/a/b/c".try_into().unwrap() }.to_string(),
"a/b/c/zarr.json".to_string()
);
assert_eq!(
Key::Chunk { node_path: Path::root(), coords: ChunkIndices(vec![]) }
.to_string(),
"c".to_string()
);
assert_eq!(
Key::Chunk { node_path: Path::root(), coords: ChunkIndices(vec![0]) }
.to_string(),
"c/0".to_string()
);
assert_eq!(
Key::Chunk { node_path: Path::root(), coords: ChunkIndices(vec![1, 2]) }
.to_string(),
"c/1/2".to_string()
);
assert_eq!(
Key::Chunk {
node_path: "/a".try_into().unwrap(),
coords: ChunkIndices(vec![])
}
.to_string(),
"a/c".to_string()
);
assert_eq!(
Key::Chunk {
node_path: "/a".try_into().unwrap(),
coords: ChunkIndices(vec![1])
}
.to_string(),
"a/c/1".to_string()
);
assert_eq!(
Key::Chunk {
node_path: "/a".try_into().unwrap(),
coords: ChunkIndices(vec![1, 2])
}
.to_string(),
"a/c/1/2".to_string()
);
}
#[icechunk_macros::test]
fn test_metadata_serialization() {
assert!(
serde_json::from_str::<NodeMetadata>(
r#"{"zarr_format":3, "node_type":"group"}"#
)
.is_ok()
);
assert!(
serde_json::from_str::<NodeMetadata>(
r#"{"zarr_format":3, "node_type":"array"}"#
)
.is_ok()
);
assert!(
serde_json::from_str::<NodeMetadata>(
r#"{"zarr_format":3, "node_type":"zarr"}"#
)
.is_err()
);
assert!(serde_json::from_str::<ArrayMetadata>(
r#"{"zarr_format":3,"node_type":"array","shape":[2,2,2],"data_type":"int32","chunk_grid":{"name":"regular","configuration":{"chunk_shape":[1,1,1]}},"chunk_key_encoding":{"name":"default","configuration":{"separator":"/"}},"fill_value":0,"codecs":[{"name":"mycodec","configuration":{"foo":42}}],"storage_transformers":[{"name":"mytransformer","configuration":{"bar":43}}],"dimension_names":["x","y","t"]}"#
)
.is_ok());
assert!(serde_json::from_str::<ArrayMetadata>(
r#"{"zarr_format":3,"node_type":"group","shape":[2,2,2],"data_type":"int32","chunk_grid":{"name":"regular","configuration":{"chunk_shape":[1,1,1]}},"chunk_key_encoding":{"name":"default","configuration":{"separator":"/"}},"fill_value":0,"codecs":[{"name":"mycodec","configuration":{"foo":42}}],"storage_transformers":[{"name":"mytransformer","configuration":{"bar":43}}],"dimension_names":["x","y","t"]}"#
)
.is_err());
assert_eq!(
serde_json::from_str::<ArrayMetadata>(
r#"{"zarr_format":3,"node_type":"array","shape":[2,2,2],"data_type":"float16","chunk_grid":{"name":"regular","configuration":{"chunk_shape":[1,1,1]}},"chunk_key_encoding":{"name":"default","configuration":{"separator":"/"}},"fill_value":"NaN","codecs":[{"name":"mycodec","configuration":{"foo":42}}],"storage_transformers":[{"name":"mytransformer","configuration":{"bar":43}}],"dimension_names":["x","y","t"]}"#
).unwrap().dimension_names(),
Some(vec!["x".into(), "y".into(), "t".into()])
);
assert_eq!(
serde_json::from_str::<ArrayMetadata>(
r#"{"zarr_format":3,"node_type":"array","shape":[2,3,4],"data_type":"float16","chunk_grid":{"name":"regular","configuration":{"chunk_shape":[1,2,3]}},"chunk_key_encoding":{"name":"default","configuration":{"separator":"/"}},"fill_value":"NaN","codecs":[{"name":"mycodec","configuration":{"foo":42}}],"storage_transformers":[{"name":"mytransformer","configuration":{"bar":43}}],"dimension_names":["x","y","t"]}"#
).unwrap().shape().unwrap(),
ArrayShape::new(vec![(2,2), (3,2), (4,2) ]).unwrap()
);
}
#[tokio_test]
async fn test_metadata_set_and_get() -> Result<(), Box<dyn std::error::Error>> {
let repo = create_memory_store_repository().await;
let ds = repo.writable_session("main").await?;
let store = Store::from_session(Arc::new(RwLock::new(ds))).await;
assert!(matches!(
store.get("zarr.json", &ByteRange::ALL).await,
Err(StoreError{kind: StoreErrorKind::NotFound(KeyNotFoundError::NodeNotFound {path}),..}) if path.to_string() == "/"
));
store
.set(
"zarr.json",
Bytes::copy_from_slice(br#"{"zarr_format":3,"node_type":"group"}"#),
)
.await?;
assert_eq!(
store.get("zarr.json", &ByteRange::ALL).await.unwrap(),
Bytes::copy_from_slice(br#"{"zarr_format":3,"node_type":"group"}"#)
);
store.set("a/b/zarr.json", Bytes::copy_from_slice(br#"{"zarr_format":3,"node_type":"group","attributes":{"spam":"ham","eggs":42}}"#)).await?;
assert_eq!(
store.get("a/b/zarr.json", &ByteRange::ALL).await.unwrap(),
Bytes::copy_from_slice(
br#"{"zarr_format":3,"node_type":"group","attributes":{"spam":"ham","eggs":42}}"#
)
);
let zarr_meta = Bytes::copy_from_slice(br#"{"zarr_format":3,"node_type":"array","attributes":{"foo":42},"shape":[2,2,2],"data_type":"int32","chunk_grid":{"name":"regular","configuration":{"chunk_shape":[1,1,1]}},"chunk_key_encoding":{"name":"default","configuration":{"separator":"/"}},"fill_value":0,"codecs":[{"name":"mycodec","configuration":{"foo":42}}],"storage_transformers":[{"name":"mytransformer","configuration":{"bar":43}}],"dimension_names":["x","y","t"]}"#);
store.set("a/b/array/zarr.json", zarr_meta.clone()).await?;
assert_eq!(
store.get("a/b/array/zarr.json", &ByteRange::ALL).await.unwrap(),
zarr_meta.clone()
);
Ok(())
}
#[tokio_test]
async fn test_scalar_array_roundtrip() -> Result<(), Box<dyn std::error::Error>> {
let repo = create_memory_store_repository().await;
let session = repo.writable_session("main").await?;
let session = Arc::new(RwLock::new(session));
let store = Store::from_session(Arc::clone(&session)).await;
let zarr_meta = Bytes::copy_from_slice(br#"{"zarr_format":3,"node_type":"array","shape":[],"data_type":"float64","chunk_grid":{"name":"regular","configuration":{"chunk_shape":[]}},"chunk_key_encoding":{"name":"default","configuration":{"separator":"/"}},"fill_value":0.0,"codecs":[{"name":"bytes","configuration":{"endian":"little"}}]}"#);
store.set("scalar/zarr.json", zarr_meta.clone()).await?;
assert_eq!(store.get("scalar/zarr.json", &ByteRange::ALL).await?, zarr_meta);
let node = session.read().await.get_array(&Path::new("/scalar").unwrap()).await?;
if let NodeSnapshot { node_data: NodeData::Array { shape, .. }, .. } = node {
assert!(shape.is_empty());
} else {
unreachable!();
}
let zarr_meta = Bytes::copy_from_slice(br#"{"zarr_format":3,"node_type":"array","shape":[],"data_type":"float64","chunk_grid":{"name":"rectilinear","configuration":{"kind":"inline","chunk_shapes":[]}},"chunk_key_encoding":{"name":"default","configuration":{"separator":"/"}},"fill_value":0.0,"codecs":[{"name":"bytes","configuration":{"endian":"little"}}]}"#);
store.set("scalar_rect/zarr.json", zarr_meta.clone()).await?;
assert_eq!(store.get("scalar_rect/zarr.json", &ByteRange::ALL).await?, zarr_meta);
let node =
session.read().await.get_array(&Path::new("/scalar_rect").unwrap()).await?;
if let NodeSnapshot { node_data: NodeData::Array { shape, .. }, .. } = node {
assert!(shape.is_empty());
} else {
unreachable!();
}
Ok(())
}
#[tokio_test]
async fn test_rectilinear_chunk_grid_metadata()
-> Result<(), Box<dyn std::error::Error>> {
let repo = create_memory_store_repository().await;
let session = repo.writable_session("main").await?;
let session = Arc::new(RwLock::new(session));
let store = Store::from_session(Arc::clone(&session)).await;
let chunk_grid = r#"{
"name":"rectilinear",
"configuration": {
"kind": "inline",
"chunk_shapes": [
[2, 2, 2],
[[2, 3]],
[[1, 6]],
[1, [2, 1], 3],
[[1, 3], 3],
[6],
[0],
[1, 2, 3]
]
}}"#;
let zarr_meta = Bytes::from(format!(
r#"{{"zarr_format":3,"node_type":"array","attributes":{{"foo":42}},"shape":[6,6,6,6,6,6,0,0],"data_type":"int32","chunk_grid":{chunk_grid},"chunk_key_encoding":{{"name":"default","configuration":{{"separator":"/"}}}},"fill_value":0,"codecs":[{{"name":"mycodec","configuration":{{"foo":42}}}}],"storage_transformers":[{{"name":"mytransformer","configuration":{{"bar":43}}}}],"dimension_names":["x","y","t"]}}"#
));
store.set("a/b/array/zarr.json", zarr_meta.clone()).await?;
assert_eq!(
store.get("a/b/array/zarr.json", &ByteRange::ALL).await.unwrap(),
zarr_meta.clone()
);
let node =
session.read().await.get_array(&Path::new("/a/b/array").unwrap()).await?;
if let NodeSnapshot { node_data: NodeData::Array { shape, .. }, .. } = node {
assert_eq!(
shape.num_chunks().collect::<Vec<_>>(),
vec![3, 3, 6, 3, 4, 1, 1, 3]
);
} else {
unreachable!();
}
Ok(())
}
#[tokio_test]
async fn test_rectilinear_rejected_on_spec_v1()
-> Result<(), Box<dyn std::error::Error>> {
let repo =
create_memory_store_repository_with_spec(Some(SpecVersionBin::V1)).await;
let session = repo.writable_session("main").await?;
let session = Arc::new(RwLock::new(session));
let store = Store::from_session(Arc::clone(&session)).await;
let zarr_meta = Bytes::from(
r#"{"zarr_format":3,"node_type":"array","shape":[4],"data_type":"bool","chunk_grid":{"name":"rectilinear","configuration":{"kind":"inline","chunk_shapes":[[1,2,1]]}},"chunk_key_encoding":{"name":"default","configuration":{"separator":"/"}},"fill_value":false,"codecs":[{"name":"bytes","configuration":{"endian":"little"}}]}"#,
);
let result = store.set("a/zarr.json", zarr_meta).await;
assert!(result.is_err(), "store.set should reject rectilinear on V1");
let err = result.unwrap_err().to_string();
assert!(
err.contains("Non-regular chunk grids are not supported"),
"unexpected error: {err}"
);
Ok(())
}
#[tokio_test]
async fn test_get_chunk_shapes_regular() -> Result<(), Box<dyn std::error::Error>> {
let chunk_grid =
r#"{"name":"regular", "configuration": {"chunk_shape": [1, 2, 3, 4, 5, 6]}}"#;
let zarr_meta = Bytes::from(format!(
r#"{{"zarr_format":3,"node_type":"array","attributes":{{"foo":42}},"shape":[6,6,6,6,6,6],"data_type":"int32","chunk_grid":{chunk_grid},"chunk_key_encoding":{{"name":"default","configuration":{{"separator":"/"}}}},"fill_value":0,"codecs":[{{"name":"mycodec","configuration":{{"foo":42}}}}],"storage_transformers":[{{"name":"mytransformer","configuration":{{"bar":43}}}}],"dimension_names":["x","y","t"]}}"#
));
let meta: ArrayMetadata = serde_json::from_slice(&zarr_meta)?;
let actual = meta
.get_chunk_shapes(
[
ChunkIndices(vec![0, 0, 0, 0, 0, 0]),
ChunkIndices(vec![1, 1, 1, 0, 0, 0]),
ChunkIndices(vec![0, 1, 1, 1, 1, 0]),
]
.iter(),
)?
.collect::<StoreResult<Vec<_>>>()?;
let expected =
vec![vec![1, 2, 3, 4, 5, 6], vec![1, 2, 3, 4, 5, 6], vec![1, 2, 3, 2, 1, 6]];
assert_eq!(expected, actual);
let actual = meta
.get_chunk_shapes(
[
ChunkIndices(vec![6, 0, 0, 0, 0, 0]),
ChunkIndices(vec![0, 3, 0, 0, 0, 0]),
ChunkIndices(vec![0, 0, 4, 0, 0, 0]),
ChunkIndices(vec![0, 0, 0, 3, 0, 0]),
ChunkIndices(vec![0, 0, 0, 0, 5, 0]),
ChunkIndices(vec![0, 1, 2, 2, 3, 1]),
]
.iter(),
)?
.collect::<Vec<_>>();
assert!(actual.iter().all(|x| x.is_err()));
Ok(())
}
#[tokio_test]
async fn test_get_chunk_shapes_rectilinear() -> Result<(), Box<dyn std::error::Error>>
{
let chunk_grid = r#"{
"name":"rectilinear",
"configuration": {
"kind": "inline",
"chunk_shapes": [
[2, 2, 2],
[[2, 3]],
[[1, 6]],
[1, [2, 1], 3],
[[1, 3], 3],
[6]
]
}}"#;
let zarr_meta = Bytes::from(format!(
r#"{{"zarr_format":3,"node_type":"array","attributes":{{"foo":42}},"shape":[6,6,6,6,6,6],"data_type":"int32","chunk_grid":{chunk_grid},"chunk_key_encoding":{{"name":"default","configuration":{{"separator":"/"}}}},"fill_value":0,"codecs":[{{"name":"mycodec","configuration":{{"foo":42}}}}],"storage_transformers":[{{"name":"mytransformer","configuration":{{"bar":43}}}}],"dimension_names":["x","y","t"]}}"#
));
let meta: ArrayMetadata = serde_json::from_slice(&zarr_meta)?;
let actual = meta
.get_chunk_shapes(
[
ChunkIndices(vec![0, 0, 0, 0, 0, 0]),
ChunkIndices(vec![0, 1, 2, 2, 3, 0]),
]
.iter(),
)?
.collect::<StoreResult<Vec<_>>>()?;
let expected = vec![vec![2, 2, 1, 1, 1, 6], vec![2, 2, 1, 3, 3, 6]];
assert_eq!(expected, actual);
let actual = meta
.get_chunk_shapes(
[
ChunkIndices(vec![3, 0, 0, 0, 0, 0]),
ChunkIndices(vec![0, 3, 0, 0, 0, 0]),
ChunkIndices(vec![0, 0, 6, 0, 0, 0]),
ChunkIndices(vec![0, 0, 0, 3, 0, 0]),
ChunkIndices(vec![0, 0, 0, 0, 5, 0]),
ChunkIndices(vec![0, 1, 2, 2, 3, 1]),
]
.iter(),
)?
.collect::<Vec<_>>();
assert!(actual.iter().all(|x| x.is_err()));
Ok(())
}
#[tokio_test]
async fn test_metadata_delete() -> Result<(), Box<dyn std::error::Error>> {
let repo = create_memory_store_repository().await;
let ds = repo.writable_session("main").await?;
let store = Store::from_session(Arc::new(RwLock::new(ds))).await;
let group_data =
br#"{"zarr_format":3, "node_type":"group", "attributes": {"spam":"ham", "eggs":42}}"#;
store
.set(
"zarr.json",
Bytes::copy_from_slice(br#"{"zarr_format":3, "node_type":"group"}"#),
)
.await
.unwrap();
let zarr_meta = Bytes::copy_from_slice(br#"{"zarr_format":3,"node_type":"array","attributes":{"foo":42},"shape":[2,2,2],"data_type":"int32","chunk_grid":{"name":"regular","configuration":{"chunk_shape":[1,1,1]}},"chunk_key_encoding":{"name":"default","configuration":{"separator":"/"}},"fill_value":0,"codecs":[{"name":"mycodec","configuration":{"foo":42}}],"storage_transformers":[{"name":"mytransformer","configuration":{"bar":43}}],"dimension_names":["x","y","t"]}"#);
store.set("array/zarr.json", zarr_meta.clone()).await.unwrap();
store.delete("array/zarr.json").await.unwrap();
assert!(matches!(
store.get("array/zarr.json", &ByteRange::ALL).await,
Err(StoreError{kind: StoreErrorKind::NotFound(KeyNotFoundError::NodeNotFound { path }),..})
if path.to_string() == "/array",
));
store.delete("array/zarr.json").await.unwrap();
store.set("array/zarr.json", zarr_meta.clone()).await.unwrap();
store.delete("array/zarr.json").await.unwrap();
assert!(matches!(
store.get("array/zarr.json", &ByteRange::ALL).await,
Err(StoreError{kind: StoreErrorKind::NotFound(KeyNotFoundError::NodeNotFound { path } ), ..})
if path.to_string() == "/array",
));
store.set("array/zarr.json", Bytes::copy_from_slice(group_data)).await.unwrap();
Ok(())
}
#[tokio_test]
async fn test_chunk_set_and_get() -> Result<(), Box<dyn std::error::Error>> {
let repo = create_memory_store_repository().await;
let ds = Arc::new(RwLock::new(repo.writable_session("main").await?));
let store = Store::from_session(Arc::clone(&ds)).await;
store
.set(
"zarr.json",
Bytes::copy_from_slice(br#"{"zarr_format":3, "node_type":"group"}"#),
)
.await?;
let zarr_meta = Bytes::copy_from_slice(br#"{"zarr_format":3,"node_type":"array","attributes":{"foo":42},"shape":[2,2,2],"data_type":"int32","chunk_grid":{"name":"regular","configuration":{"chunk_shape":[1,1,1]}},"chunk_key_encoding":{"name":"default","configuration":{"separator":"/"}},"fill_value":0,"codecs":[{"name":"mycodec","configuration":{"foo":42}}],"storage_transformers":[{"name":"mytransformer","configuration":{"bar":43}}],"dimension_names":["x","y","t"]}"#);
store.set("array/zarr.json", zarr_meta.clone()).await?;
assert_eq!(
store.get("array/zarr.json", &ByteRange::ALL).await.unwrap(),
zarr_meta
);
assert_eq!(
store.get("array/zarr.json", &ByteRange::to_offset(5)).await.unwrap(),
zarr_meta[..5]
);
assert_eq!(
store.get("array/zarr.json", &ByteRange::from_offset(5)).await.unwrap(),
zarr_meta[5..]
);
assert_eq!(
store.get("array/zarr.json", &ByteRange::bounded(1, 24)).await.unwrap(),
zarr_meta[1..24]
);
let small_data = Bytes::copy_from_slice(b"hello");
store.set("array/c/0/1/0", small_data.clone()).await?;
assert_eq!(
store.get("array/c/0/1/0", &ByteRange::ALL).await.unwrap(),
small_data
);
assert_eq!(
store.get("array/c/0/1/0", &ByteRange::to_offset(2)).await.unwrap(),
small_data[0..2]
);
assert_eq!(
store.get("array/c/0/1/0", &ByteRange::from_offset(3)).await.unwrap(),
small_data[3..]
);
assert_eq!(
store.get("array/c/0/1/0", &ByteRange::bounded(1, 4)).await.unwrap(),
small_data[1..4]
);
let big_data = Bytes::copy_from_slice(b"hello".repeat(512).as_slice());
store.set("array/c/0/1/1", big_data.clone()).await?;
assert_eq!(store.get("array/c/0/1/1", &ByteRange::ALL).await.unwrap(), big_data);
assert_eq!(
store.get("array/c/0/1/1", &ByteRange::from_offset(512 - 3)).await.unwrap(),
big_data[(512 - 3)..]
);
assert_eq!(
store.get("array/c/0/1/1", &ByteRange::to_offset(5)).await.unwrap(),
big_data[..5]
);
assert_eq!(
store.get("array/c/0/1/1", &ByteRange::bounded(20, 90)).await.unwrap(),
big_data[20..90]
);
let _oid = {
ds.write().await.commit("commit").max_concurrent_nodes(8).execute().await?
};
let ds =
repo.readonly_session(&VersionInfo::BranchTipRef("main".to_string())).await?;
let store = Store::from_session(Arc::new(RwLock::new(ds))).await;
assert_eq!(
store.get("array/c/0/1/0", &ByteRange::ALL).await.unwrap(),
small_data
);
assert_eq!(store.get("array/c/0/1/1", &ByteRange::ALL).await.unwrap(), big_data);
Ok(())
}
#[tokio::test]
async fn test_chunk_delete() -> Result<(), Box<dyn std::error::Error>> {
let repo = create_memory_store_repository().await;
let ds = repo.writable_session("main").await?;
let store = Store::from_session(Arc::new(RwLock::new(ds))).await;
store
.set(
"zarr.json",
Bytes::copy_from_slice(br#"{"zarr_format":3, "node_type":"group"}"#),
)
.await
.unwrap();
let zarr_meta = Bytes::copy_from_slice(br#"{"zarr_format":3,"node_type":"array","attributes":{"foo":42},"shape":[2,2,2],"data_type":"int32","chunk_grid":{"name":"regular","configuration":{"chunk_shape":[1,1,1]}},"chunk_key_encoding":{"name":"default","configuration":{"separator":"/"}},"fill_value":0,"codecs":[{"name":"mycodec","configuration":{"foo":42}}],"storage_transformers":[{"name":"mytransformer","configuration":{"bar":43}}],"dimension_names":["x","y","t"]}"#);
store.set("array/zarr.json", zarr_meta.clone()).await.unwrap();
let data = Bytes::copy_from_slice(b"hello");
store.set("array/c/0/1/0", data.clone()).await.unwrap();
store.delete("array/c/0/1/0").await.unwrap();
store.delete("array/c/0/1/0").await.unwrap();
store.delete("array/c/1/1/1").await.unwrap();
assert!(matches!(
store.get("array/c/0/1/0", &ByteRange::ALL).await,
Err(StoreError{kind:StoreErrorKind::NotFound(KeyNotFoundError::ChunkNotFound { key, path, coords }),..})
if key == "array/c/0/1/0" && path.to_string() == "/array" && coords == ChunkIndices([0, 1, 0].to_vec())
));
assert!(matches!(
store.delete("array/foo").await,
Err(StoreError{kind: StoreErrorKind::InvalidKey { key }, ..}) if key == "array/foo",
));
assert!(matches!(
store.delete("array/c/10/1/1").await,
Err(StoreError{kind: StoreErrorKind::SessionError(SessionErrorKind::InvalidIndex { coords, path }),..})
if path.to_string() == "/array" && coords == ChunkIndices([10, 1, 1].to_vec())
));
Ok(())
}
#[tokio::test]
async fn test_delete_dir() -> Result<(), Box<dyn std::error::Error>> {
let repo = create_memory_store_repository().await;
let ds = repo.writable_session("main").await?;
let store = Store::from_session(Arc::new(RwLock::new(ds))).await;
add_group(&store, "").await.unwrap();
add_group(&store, "group").await.unwrap();
add_array_and_chunk(&store, "group/array").await.unwrap();
store.delete_dir("group/array").await.unwrap();
assert!(matches!(
store.get("group/array/zarr.json", &ByteRange::ALL).await,
Err(StoreError { kind: StoreErrorKind::NotFound(..), .. })
));
add_array_and_chunk(&store, "group/array").await.unwrap();
store.delete_dir("group").await.unwrap();
assert!(matches!(
store.get("group/zarr.json", &ByteRange::ALL).await,
Err(StoreError { kind: StoreErrorKind::NotFound(..), .. })
));
assert!(matches!(
store.get("group/array/zarr.json", &ByteRange::ALL).await,
Err(StoreError { kind: StoreErrorKind::NotFound(..), .. })
));
add_group(&store, "group").await.unwrap();
add_array_and_chunk(&store, "group/array").await.unwrap();
store.delete_dir("/group/array/c").await.unwrap();
let list = store
.list_prefix("group/array")
.await
.unwrap()
.try_collect::<Vec<_>>()
.await?;
assert_eq!(list, vec!["group/array/zarr.json"]);
add_array_and_chunk(&store, "group/array").await.unwrap();
store.delete_dir("group/array/c/0").await.unwrap();
let mut list = store
.list_prefix("group/array")
.await
.unwrap()
.try_collect::<Vec<_>>()
.await?;
list.sort();
assert_eq!(list, vec!["group/array/c/1/1/0", "group/array/zarr.json"]);
Ok(())
}
#[tokio::test]
async fn test_metadata_list() -> Result<(), Box<dyn std::error::Error>> {
let repo = create_memory_store_repository().await;
let ds = repo.writable_session("main").await?;
let store = Store::from_session(Arc::new(RwLock::new(ds))).await;
assert!(store.is_empty("").await.unwrap());
assert!(!store.exists("zarr.json").await.unwrap());
assert_eq!(all_keys(&store).await.unwrap(), Vec::<String>::new());
assert_eq!(all_keys(&store).await.unwrap(), keys(&store, "").await.unwrap());
store
.set(
"zarr.json",
Bytes::copy_from_slice(br#"{"zarr_format":3, "node_type":"group"}"#),
)
.await?;
assert!(!store.is_empty("").await.unwrap());
assert!(store.exists("zarr.json").await.unwrap());
assert_eq!(all_keys(&store).await.unwrap(), vec!["zarr.json".to_string()]);
assert_eq!(all_keys(&store).await.unwrap(), keys(&store, "").await.unwrap());
store
.set(
"group/zarr.json",
Bytes::copy_from_slice(br#"{"zarr_format":3, "node_type":"group"}"#),
)
.await?;
assert_eq!(
all_keys(&store).await.unwrap(),
vec!["group/zarr.json".to_string(), "zarr.json".to_string()]
);
assert_eq!(all_keys(&store).await.unwrap(), keys(&store, "").await.unwrap());
assert_eq!(
keys(&store, "group/").await.unwrap(),
vec!["group/zarr.json".to_string()]
);
let zarr_meta = Bytes::copy_from_slice(br#"{"zarr_format":3,"node_type":"array","attributes":{"foo":42},"shape":[2,2,2],"data_type":"int32","chunk_grid":{"name":"regular","configuration":{"chunk_shape":[1,1,1]}},"chunk_key_encoding":{"name":"default","configuration":{"separator":"/"}},"fill_value":0,"codecs":[{"name":"mycodec","configuration":{"foo":42}}],"storage_transformers":[{"name":"mytransformer","configuration":{"bar":43}}],"dimension_names":["x","y","t"]}"#);
store.set("group/array/zarr.json", zarr_meta).await?;
assert!(!store.is_empty("").await.unwrap());
assert!(store.exists("zarr.json").await.unwrap());
assert!(store.exists("group/array/zarr.json").await.unwrap());
assert!(store.exists("group/zarr.json").await.unwrap());
assert_eq!(
all_keys(&store).await.unwrap(),
vec![
"group/array/zarr.json".to_string(),
"group/zarr.json".to_string(),
"zarr.json".to_string()
]
);
assert_eq!(all_keys(&store).await.unwrap(), keys(&store, "").await.unwrap());
assert_eq!(
keys(&store, "group/").await.unwrap(),
vec!["group/array/zarr.json".to_string(), "group/zarr.json".to_string()]
);
assert_eq!(
keys(&store, "group/array/").await.unwrap(),
vec!["group/array/zarr.json".to_string()]
);
Ok(())
}
#[tokio::test]
async fn test_set_array_metadata() -> Result<(), Box<dyn std::error::Error>> {
let repo = create_memory_store_repository().await;
let ds = repo.writable_session("main").await?;
let store = Store::from_session(Arc::new(RwLock::new(ds))).await;
store
.set(
"zarr.json",
Bytes::copy_from_slice(br#"{"zarr_format":3, "node_type":"group"}"#),
)
.await?;
let zarr_meta = Bytes::copy_from_slice(br#"{"zarr_format":3,"node_type":"array","attributes":{"foo":42},"shape":[2,2,2],"data_type":"int32","chunk_grid":{"name":"regular","configuration":{"chunk_shape":[1,1,1]}},"chunk_key_encoding":{"name":"default","configuration":{"separator":"/"}},"fill_value":0,"codecs":[{"name":"mycodec","configuration":{"foo":42}}],"storage_transformers":[{"name":"mytransformer","configuration":{"bar":43}}],"dimension_names":["x","y","t"]}"#);
store.set("/array/zarr.json", zarr_meta.clone()).await?;
assert_eq!(
store.get("/array/zarr.json", &ByteRange::ALL).await?,
zarr_meta.clone()
);
store.set("0/zarr.json", zarr_meta.clone()).await?;
assert_eq!(store.get("0/zarr.json", &ByteRange::ALL).await?, zarr_meta.clone());
store.set("/0/zarr.json", zarr_meta.clone()).await?;
assert_eq!(store.get("/0/zarr.json", &ByteRange::ALL).await?, zarr_meta);
Ok(())
}
#[tokio::test]
async fn test_chunk_list() -> Result<(), Box<dyn std::error::Error>> {
let repo = create_memory_store_repository().await;
let session = Arc::new(RwLock::new(repo.writable_session("main").await?));
let store = Store::from_session(Arc::clone(&session)).await;
store
.set(
"zarr.json",
Bytes::copy_from_slice(br#"{"zarr_format":3, "node_type":"group"}"#),
)
.await?;
let zarr_meta = Bytes::copy_from_slice(br#"{"zarr_format":3,"node_type":"array","attributes":{"foo":42},"shape":[2,2,2],"data_type":"int32","chunk_grid":{"name":"regular","configuration":{"chunk_shape":[1,1,1]}},"chunk_key_encoding":{"name":"default","configuration":{"separator":"/"}},"fill_value":0,"codecs":[{"name":"mycodec","configuration":{"foo":42}}],"storage_transformers":[{"name":"mytransformer","configuration":{"bar":43}}],"dimension_names":["x","y","t"]}"#);
store.set("array/zarr.json", zarr_meta.clone()).await?;
let data = Bytes::copy_from_slice(b"hello");
store.set("array/c/0/1/0", data.clone()).await?;
store.set("array/c/1/1/1", data.clone()).await?;
assert_eq!(
all_keys(&store).await.unwrap(),
vec![
"array/c/0/1/0".to_string(),
"array/c/1/1/1".to_string(),
"array/zarr.json".to_string(),
"zarr.json".to_string()
]
);
assert_eq!(all_keys(&store).await.unwrap(), keys(&store, "").await.unwrap());
session.write().await.commit("foo").max_concurrent_nodes(8).execute().await?;
let session = repo.writable_session("main").await?;
let store = Store::from_session(Arc::new(RwLock::new(session))).await;
store.clear().await?;
store
.set(
"zarr.json",
Bytes::copy_from_slice(br#"{"zarr_format":3, "node_type":"group"}"#),
)
.await?;
store.set("array/zarr.json", zarr_meta).await?;
let data = Bytes::copy_from_slice(b"hello");
store.set("array/c/0/1/0", data.clone()).await?;
store.set("array/c/1/1/1", data.clone()).await?;
assert_eq!(
all_keys(&store).await.unwrap(),
vec![
"array/c/0/1/0".to_string(),
"array/c/1/1/1".to_string(),
"array/zarr.json".to_string(),
"zarr.json".to_string()
]
);
assert_eq!(all_keys(&store).await.unwrap(), keys(&store, "").await.unwrap());
for bad_prefix in ["arr", "/arr", "/arr/", "zarr", "array/c", "array/c/0"] {
assert!(matches!(
keys(&store, bad_prefix).await,
Err(StoreError {
kind: StoreErrorKind::BadKeyPrefix { message, .. },
..
}) if message.contains("group or array")
));
}
Ok(())
}
#[tokio::test]
async fn test_list_dir() -> Result<(), Box<dyn std::error::Error>> {
let repo = create_memory_store_repository().await;
let ds = repo.writable_session("main").await?;
let store = Store::from_session(Arc::new(RwLock::new(ds))).await;
store
.set(
"zarr.json",
Bytes::copy_from_slice(br#"{"zarr_format":3, "node_type":"group"}"#),
)
.await?;
let zarr_meta = Bytes::copy_from_slice(br#"{"zarr_format":3,"node_type":"array","attributes":{"foo":42},"shape":[2,2,2],"data_type":"int32","chunk_grid":{"name":"regular","configuration":{"chunk_shape":[1,1,1]}},"chunk_key_encoding":{"name":"default","configuration":{"separator":"/"}},"fill_value":0,"codecs":[{"name":"mycodec","configuration":{"foo":42}}],"storage_transformers":[{"name":"mytransformer","configuration":{"bar":43}}],"dimension_names":["x","y","t"]}"#);
store.set("array/zarr.json", zarr_meta).await?;
let data = Bytes::copy_from_slice(b"hello");
store.set("array/c/0/1/0", data.clone()).await?;
store.set("array/c/1/1/1", data.clone()).await?;
store.set("array/c/0/0/1", data.clone()).await?;
assert_eq!(
all_keys(&store).await.unwrap(),
vec![
"array/c/0/0/1".to_string(),
"array/c/0/1/0".to_string(),
"array/c/1/1/1".to_string(),
"array/zarr.json".to_string(),
"zarr.json".to_string()
]
);
for prefix in ["/", ""] {
let mut dir = store.list_dir(prefix).await?.try_collect::<Vec<_>>().await?;
dir.sort();
assert_eq!(dir, vec!["array".to_string(), "zarr.json".to_string()]);
let mut dir =
store.list_dir_items(prefix).await?.try_collect::<Vec<_>>().await?;
dir.sort();
assert_eq!(
dir,
vec![
ListDirItem::Key("zarr.json".to_string()),
ListDirItem::Prefix("array".to_string())
]
);
}
let mut dir = store.list_dir("array").await?.try_collect::<Vec<_>>().await?;
dir.sort();
assert_eq!(dir, vec!["c".to_string(), "zarr.json".to_string()]);
let mut dir =
store.list_dir_items("array").await?.try_collect::<Vec<_>>().await?;
dir.sort();
assert_eq!(
dir,
vec![
ListDirItem::Key("zarr.json".to_string()),
ListDirItem::Prefix("c".to_string())
]
);
let mut dir = store.list_dir("array/").await?.try_collect::<Vec<_>>().await?;
dir.sort();
assert_eq!(dir, vec!["c".to_string(), "zarr.json".to_string()]);
let mut dir =
store.list_dir_items("array/").await?.try_collect::<Vec<_>>().await?;
dir.sort();
assert_eq!(
dir,
vec![
ListDirItem::Key("zarr.json".to_string()),
ListDirItem::Prefix("c".to_string())
]
);
let mut dir = store.list_dir("array/c/").await?.try_collect::<Vec<_>>().await?;
dir.sort();
assert_eq!(dir, vec!["0".to_string(), "1".to_string()]);
let mut dir =
store.list_dir_items("array/c/").await?.try_collect::<Vec<_>>().await?;
dir.sort();
assert_eq!(
dir,
vec![
ListDirItem::Prefix("0".to_string()),
ListDirItem::Prefix("1".to_string()),
]
);
let mut dir = store.list_dir("array/c/1/").await?.try_collect::<Vec<_>>().await?;
dir.sort();
assert_eq!(dir, vec!["1".to_string()]);
let mut dir =
store.list_dir_items("array/c/1/").await?.try_collect::<Vec<_>>().await?;
dir.sort();
assert_eq!(dir, vec![ListDirItem::Prefix("1".to_string()),]);
let mut dir =
store.list_dir_items("array/c/1/1").await?.try_collect::<Vec<_>>().await?;
dir.sort();
assert_eq!(dir, vec![ListDirItem::Key("1".to_string()),]);
let mut dir =
store.list_dir_items("array/c/1/1/1").await?.try_collect::<Vec<_>>().await?;
dir.sort();
assert_eq!(dir, vec![]);
Ok(())
}
#[tokio::test]
async fn test_list_dir_with_prefix() -> Result<(), Box<dyn std::error::Error>> {
let repo = create_memory_store_repository().await;
let ds = repo.writable_session("main").await?;
let store = Store::from_session(Arc::new(RwLock::new(ds))).await;
store
.set(
"zarr.json",
Bytes::copy_from_slice(br#"{"zarr_format":3, "node_type":"group"}"#),
)
.await?;
store
.set(
"group/zarr.json",
Bytes::copy_from_slice(br#"{"zarr_format":3, "node_type":"group"}"#),
)
.await?;
let zarr_meta = Bytes::copy_from_slice(br#"{"zarr_format":3,"node_type":"array","attributes":{"foo":42},"shape":[2,2,2],"data_type":"int32","chunk_grid":{"name":"regular","configuration":{"chunk_shape":[1,1,1]}},"chunk_key_encoding":{"name":"default","configuration":{"separator":"/"}},"fill_value":0,"codecs":[{"name":"mycodec","configuration":{"foo":42}}],"storage_transformers":[{"name":"mytransformer","configuration":{"bar":43}}],"dimension_names":["x","y","t"]}"#);
store.set("group-suffix/zarr.json", zarr_meta).await.unwrap();
let data = Bytes::copy_from_slice(b"hello");
store.set_if_not_exists("group-suffix/c/0/1/0", data.clone()).await.unwrap();
assert_eq!(
store.list_dir("group/").await?.try_collect::<Vec<_>>().await?,
vec!["zarr.json"]
);
Ok(())
}
#[tokio::test]
async fn test_get_partial_values() -> Result<(), Box<dyn std::error::Error>> {
let repo = create_memory_store_repository().await;
let ds = repo.writable_session("main").await?;
let store = Store::from_session(Arc::new(RwLock::new(ds))).await;
store
.set(
"zarr.json",
Bytes::copy_from_slice(br#"{"zarr_format":3, "node_type":"group"}"#),
)
.await?;
let zarr_meta = Bytes::copy_from_slice(br#"{"zarr_format":3,"node_type":"array","attributes":{"foo":42},"shape":[20],"data_type":"int32","chunk_grid":{"name":"regular","configuration":{"chunk_shape":[1]}},"chunk_key_encoding":{"name":"default","configuration":{"separator":"/"}},"fill_value":0,"codecs":[{"name":"mycodec","configuration":{"foo":42}}],"storage_transformers":[{"name":"mytransformer","configuration":{"bar":43}}],"dimension_names":["x"]}"#);
store.set("array/zarr.json", zarr_meta).await?;
let key_vals: Vec<_> = (0i32..20)
.map(|idx| {
(
format!("array/c/{idx}"),
Bytes::copy_from_slice(idx.to_be_bytes().to_owned().as_slice()),
)
})
.collect();
for (key, value) in key_vals.iter() {
store.set(key.as_str(), value.clone()).await?;
}
let key_ranges = key_vals.iter().map(|(k, _)| (k.clone(), ByteRange::ALL));
assert_eq!(
key_vals.iter().map(|(_, v)| v.clone()).collect::<Vec<_>>(),
store
.get_partial_values(key_ranges)
.await?
.into_iter()
.map(|v| v.unwrap())
.collect::<Vec<_>>()
);
let key_ranges = key_vals.iter().rev().map(|(k, _)| (k.clone(), ByteRange::ALL));
assert_eq!(
key_vals.iter().rev().map(|(_, v)| v.clone()).collect::<Vec<_>>(),
store
.get_partial_values(key_ranges)
.await?
.into_iter()
.map(|v| v.unwrap())
.collect::<Vec<_>>()
);
Ok(())
}
#[tokio::test]
async fn test_commit_and_checkout() -> Result<(), Box<dyn std::error::Error>> {
let repo = create_memory_store_repository().await;
let ds = Arc::new(RwLock::new(repo.writable_session("main").await?));
let store = Store::from_session(Arc::clone(&ds)).await;
store
.set(
"zarr.json",
Bytes::copy_from_slice(br#"{"zarr_format":3, "node_type":"group"}"#),
)
.await
.unwrap();
let zarr_meta = Bytes::copy_from_slice(br#"{"zarr_format":3,"node_type":"array","attributes":{"foo":42},"shape":[2,2,2],"data_type":"int32","chunk_grid":{"name":"regular","configuration":{"chunk_shape":[1,1,1]}},"chunk_key_encoding":{"name":"default","configuration":{"separator":"/"}},"fill_value":0,"codecs":[{"name":"mycodec","configuration":{"foo":42}}],"storage_transformers":[{"name":"mytransformer","configuration":{"bar":43}}],"dimension_names":["x","y","t"]}"#);
store.set("array/zarr.json", zarr_meta.clone()).await.unwrap();
let data = Bytes::copy_from_slice(b"hello");
store.set_if_not_exists("array/c/0/1/0", data.clone()).await.unwrap();
assert_eq!(store.get("array/c/0/1/0", &ByteRange::ALL).await.unwrap(), data);
let snapshot_id = {
ds.write()
.await
.commit("initial commit")
.max_concurrent_nodes(8)
.execute()
.await
.unwrap()
};
let ds = Arc::new(RwLock::new(repo.writable_session("main").await?));
let store = Store::from_session(Arc::clone(&ds)).await;
let new_data = Bytes::copy_from_slice(b"world");
store.set_if_not_exists("array/c/0/1/0", new_data.clone()).await.unwrap();
assert_eq!(store.get("array/c/0/1/0", &ByteRange::ALL).await.unwrap(), data);
store.set("array/c/0/1/0", new_data.clone()).await.unwrap();
assert_eq!(store.get("array/c/0/1/0", &ByteRange::ALL).await.unwrap(), new_data);
let new_snapshot_id = {
ds.write()
.await
.commit("update")
.max_concurrent_nodes(8)
.execute()
.await
.unwrap()
};
let ds = repo.readonly_session(&VersionInfo::SnapshotId(snapshot_id)).await?;
let store = Store::from_session(Arc::new(RwLock::new(ds))).await;
assert_eq!(store.get("array/c/0/1/0", &ByteRange::ALL).await.unwrap(), data);
let ds = repo
.readonly_session(&VersionInfo::SnapshotId(new_snapshot_id.clone()))
.await?;
let store = Store::from_session(Arc::new(RwLock::new(ds))).await;
assert_eq!(store.get("array/c/0/1/0", &ByteRange::ALL).await.unwrap(), new_data);
repo.create_tag("tag_0", &new_snapshot_id).await.unwrap();
let _ds = repo
.readonly_session(&VersionInfo::TagRef("tag_0".to_string()))
.await
.unwrap();
let ds = Arc::new(RwLock::new(repo.writable_session("main").await?));
let store = Store::from_session(Arc::clone(&ds)).await;
let _newest_data = Bytes::copy_from_slice(b"earth");
store.set("array/c/0/1/0", data.clone()).await.unwrap();
assert_eq!(ds.read().await.has_uncommitted_changes(), true);
ds.write().await.discard_changes()?;
assert_eq!(store.get("array/c/0/1/0", &ByteRange::ALL).await.unwrap(), new_data);
repo.create_branch("dev", ds.read().await.snapshot_id()).await.unwrap();
let ds = Arc::new(RwLock::new(repo.writable_session("dev").await?));
let store = Store::from_session(Arc::clone(&ds)).await;
store.set("array/c/0/1/0", new_data.clone()).await?;
let dev_snapshot = {
ds.write()
.await
.commit("update dev branch")
.max_concurrent_nodes(8)
.execute()
.await
.unwrap()
};
let ds = repo.readonly_session(&VersionInfo::SnapshotId(dev_snapshot)).await?;
let store = Store::from_session(Arc::new(RwLock::new(ds))).await;
assert_eq!(store.get("array/c/0/1/0", &ByteRange::ALL).await.unwrap(), new_data);
Ok(())
}
#[tokio::test]
async fn test_clear() -> Result<(), Box<dyn std::error::Error>> {
let repo = create_memory_store_repository().await;
let ds = Arc::new(RwLock::new(repo.writable_session("main").await?));
let store = Store::from_session(Arc::clone(&ds)).await;
store
.set(
"zarr.json",
Bytes::copy_from_slice(br#"{"zarr_format":3, "node_type":"group"}"#),
)
.await
.unwrap();
let empty: Vec<String> = Vec::new();
store.clear().await?;
assert_eq!(
store.list_prefix("").await?.try_collect::<Vec<String>>().await?,
empty
);
store
.set(
"zarr.json",
Bytes::copy_from_slice(br#"{"zarr_format":3, "node_type":"group"}"#),
)
.await
.unwrap();
store
.set(
"group/zarr.json",
Bytes::copy_from_slice(br#"{"zarr_format":3, "node_type":"group"}"#),
)
.await
.unwrap();
let zarr_meta = Bytes::copy_from_slice(br#"{"zarr_format":3,"node_type":"array","attributes":{"foo":42},"shape":[2,2,2],"data_type":"int32","chunk_grid":{"name":"regular","configuration":{"chunk_shape":[1,1,1]}},"chunk_key_encoding":{"name":"default","configuration":{"separator":"/"}},"fill_value":0,"codecs":[{"name":"mycodec","configuration":{"foo":42}}],"storage_transformers":[{"name":"mytransformer","configuration":{"bar":43}}],"dimension_names":["x","y","t"]}"#);
let new_data = Bytes::copy_from_slice(b"world");
store.set("array/zarr.json", zarr_meta.clone()).await.unwrap();
store.set("group/array/zarr.json", zarr_meta.clone()).await.unwrap();
store.set("array/c/1/0/0", new_data.clone()).await.unwrap();
store.set("group/array/c/1/0/0", new_data.clone()).await.unwrap();
ds.write()
.await
.commit("initial commit")
.max_concurrent_nodes(8)
.execute()
.await
.unwrap();
let ds = Arc::new(RwLock::new(repo.writable_session("main").await?));
let store = Store::from_session(Arc::clone(&ds)).await;
store
.set(
"group/group2/zarr.json",
Bytes::copy_from_slice(br#"{"zarr_format":3, "node_type":"group"}"#),
)
.await
.unwrap();
store.set("group/group2/array/zarr.json", zarr_meta.clone()).await.unwrap();
store.set("group/group2/array/c/1/0/0", new_data.clone()).await.unwrap();
store.clear().await?;
assert_eq!(
store.list_prefix("").await?.try_collect::<Vec<String>>().await?,
empty
);
let empty_snap = ds
.write()
.await
.commit("no content commit")
.max_concurrent_nodes(8)
.execute()
.await
.unwrap();
assert_eq!(
store.list_prefix("").await?.try_collect::<Vec<String>>().await?,
empty
);
let ds = repo.readonly_session(&VersionInfo::SnapshotId(empty_snap)).await?;
let store = Store::from_session(Arc::new(RwLock::new(ds))).await;
assert_eq!(
store.list_prefix("").await?.try_collect::<Vec<String>>().await?,
empty
);
Ok(())
}
#[tokio::test]
async fn test_overwrite() -> Result<(), Box<dyn std::error::Error>> {
let repo = create_memory_store_repository().await;
let ds = Arc::new(RwLock::new(repo.writable_session("main").await?));
let store = Store::from_session(Arc::clone(&ds)).await;
let meta1 = Bytes::copy_from_slice(
br#"{"zarr_format":3,"node_type":"group","attributes":{"foo":42}}"#,
);
let meta2 = Bytes::copy_from_slice(
br#"{"zarr_format":3,"node_type":"group","attributes":{"foo":84}}"#,
);
let zarr_meta1 = Bytes::copy_from_slice(br#"{"zarr_format":3,"node_type":"array","attributes":{"foo":42},"shape":[2,2,2],"data_type":"int32","chunk_grid":{"name":"regular","configuration":{"chunk_shape":[1,1,1]}},"chunk_key_encoding":{"name":"default","configuration":{"separator":"/"}},"fill_value":0,"codecs":[{"name":"mycodec","configuration":{"foo":42}}],"storage_transformers":[{"name":"mytransformer","configuration":{"bar":43}}],"dimension_names":["x","y","t"]}"#);
let zarr_meta2 = Bytes::copy_from_slice(br#"{"zarr_format":3,"node_type":"array","attributes":{"foo":84},"shape":[2,2,2],"data_type":"int32","chunk_grid":{"name":"regular","configuration":{"chunk_shape":[1,1,1]}},"chunk_key_encoding":{"name":"default","configuration":{"separator":"/"}},"fill_value":0,"codecs":[{"name":"mycodec","configuration":{"foo":42}}],"storage_transformers":[{"name":"mytransformer","configuration":{"bar":43}}],"dimension_names":["x","y","t"]}"#);
store.set("zarr.json", meta1.clone()).await.unwrap();
store.set("array/zarr.json", zarr_meta1.clone()).await.unwrap();
store.delete("zarr.json").await.unwrap();
store.delete("array/zarr.json").await.unwrap();
store.set("zarr.json", meta2.clone()).await.unwrap();
store.set("array/zarr.json", zarr_meta2.clone()).await.unwrap();
assert_eq!(&store.get("zarr.json", &ByteRange::ALL).await.unwrap(), &meta2);
assert_eq!(
&store.get("array/zarr.json", &ByteRange::ALL).await.unwrap(),
&zarr_meta2
);
store.set("zarr.json", meta1).await.unwrap();
store.set("array/zarr.json", zarr_meta1.clone()).await.unwrap();
ds.write()
.await
.commit("initial commit")
.max_concurrent_nodes(8)
.execute()
.await
.unwrap();
let ds = Arc::new(RwLock::new(repo.writable_session("main").await?));
let store = Store::from_session(Arc::clone(&ds)).await;
store.delete("zarr.json").await.unwrap();
store.delete("array/zarr.json").await.unwrap();
store.set("zarr.json", meta2.clone()).await.unwrap();
store.set("array/zarr.json", zarr_meta2.clone()).await.unwrap();
assert_eq!(&store.get("zarr.json", &ByteRange::ALL).await.unwrap(), &meta2);
ds.write()
.await
.commit("commit 2")
.max_concurrent_nodes(8)
.execute()
.await
.unwrap();
assert_eq!(&store.get("zarr.json", &ByteRange::ALL).await.unwrap(), &meta2);
assert_eq!(
&store.get("array/zarr.json", &ByteRange::ALL).await.unwrap(),
&zarr_meta2
);
Ok(())
}
#[tokio::test]
async fn test_branch_reset() -> Result<(), Box<dyn std::error::Error>> {
let repo = create_memory_store_repository().await;
let ds = Arc::new(RwLock::new(repo.writable_session("main").await?));
let store = Store::from_session(Arc::clone(&ds)).await;
store
.set(
"zarr.json",
Bytes::copy_from_slice(br#"{"zarr_format":3, "node_type":"group"}"#),
)
.await
.unwrap();
ds.write()
.await
.commit("root group")
.max_concurrent_nodes(8)
.execute()
.await
.unwrap();
let ds = Arc::new(RwLock::new(repo.writable_session("main").await?));
let store = Store::from_session(Arc::clone(&ds)).await;
store
.set(
"a/zarr.json",
Bytes::copy_from_slice(br#"{"zarr_format":3, "node_type":"group"}"#),
)
.await
.unwrap();
let prev_snap =
ds.write().await.commit("group a").max_concurrent_nodes(8).execute().await?;
let ds = Arc::new(RwLock::new(repo.writable_session("main").await?));
let store = Store::from_session(Arc::clone(&ds)).await;
store
.set(
"b/zarr.json",
Bytes::copy_from_slice(br#"{"zarr_format":3, "node_type":"group"}"#),
)
.await
.unwrap();
ds.write()
.await
.commit("group b")
.max_concurrent_nodes(8)
.execute()
.await
.unwrap();
assert!(store.exists("a/zarr.json").await?);
assert!(store.exists("b/zarr.json").await?);
repo.reset_branch("main", &prev_snap, None).await?;
let ds = Arc::new(RwLock::new(
repo.readonly_session(&VersionInfo::BranchTipRef("main".to_string())).await?,
));
let store = Store::from_session(Arc::clone(&ds)).await;
assert!(!store.exists("b/zarr.json").await?);
assert!(store.exists("a/zarr.json").await?);
Ok(())
}
#[tokio::test]
async fn test_access_mode() {
let repo = create_memory_store_repository().await;
let ds = repo.writable_session("main").await.unwrap();
let writable_store = Store::from_session(Arc::new(RwLock::new(ds))).await;
writable_store
.set(
"zarr.json",
Bytes::copy_from_slice(br#"{"zarr_format":3, "node_type":"group"}"#),
)
.await
.unwrap();
let readable_store = Store::from_session(Arc::new(RwLock::new(
repo.readonly_session(&VersionInfo::BranchTipRef("main".to_string()))
.await
.unwrap(),
)))
.await;
assert_eq!(readable_store.read_only().await, true);
let result = readable_store
.set(
"zarr.json",
Bytes::copy_from_slice(br#"{"zarr_format":3, "node_type":"group"}"#),
)
.await;
let correct_error =
matches!(result, Err(StoreError { kind: StoreErrorKind::ReadOnly, .. }));
assert!(correct_error);
}
#[cfg(feature = "object-store-fs")]
#[tokio::test]
async fn test_serialize() {
let repo_dir = TempDir::new().expect("could not create temp dir");
let storage = Arc::new(
ObjectStorage::new_local_filesystem(repo_dir.path())
.await
.expect("could not create storage"),
);
let repo =
Repository::create(None, storage, HashMap::new(), None, true).await.unwrap();
let ds = Arc::new(RwLock::new(repo.writable_session("main").await.unwrap()));
let store = Store::from_session(Arc::clone(&ds)).await;
store
.set(
"zarr.json",
Bytes::copy_from_slice(br#"{"zarr_format":3,"node_type":"group"}"#),
)
.await
.unwrap();
ds.write().await.commit("first").max_concurrent_nodes(8).execute().await.unwrap();
let store_bytes = store.as_bytes().await.unwrap();
let store2: Store = Store::from_bytes(&store_bytes).unwrap();
let zarr_json = store2.get("zarr.json", &ByteRange::ALL).await.unwrap();
assert_eq!(
zarr_json,
Bytes::copy_from_slice(br#"{"zarr_format":3,"node_type":"group"}"#)
);
}
}