mod codec_specific_options;
mod options;
pub use codec_specific_options::CodecSpecificOptions;
mod codec_partial_default;
pub use codec_partial_default::CodecPartialDefault;
use codec_partial_default::{
ArrayToArrayCodecPartialDefault, ArrayToBytesCodecPartialDefault,
BytesToBytesCodecPartialDefault,
};
mod array_bytes_fixed_disjoint_view;
pub use array_bytes_fixed_disjoint_view::{
ArrayBytesFixedDisjointView, ArrayBytesFixedDisjointViewCreateError,
};
mod recommended_concurrency;
pub use recommended_concurrency::RecommendedConcurrency;
mod bytes_representation;
pub use bytes_representation::BytesRepresentation;
mod array_bytes;
pub use array_bytes::{
ArrayBytes, ArrayBytesError, ArrayBytesOffsets, ArrayBytesOptional, ArrayBytesRaw,
ArrayBytesRawOffsetsCreateError, ArrayBytesRawOffsetsOutOfBoundsError,
ArrayBytesVariableLength, ExpectedFixedLengthBytesError, ExpectedOptionalBytesError,
ExpectedVariableLengthBytesError, copy_fill_value_into, decode_into_array_bytes_target,
update_array_bytes,
};
mod byte_interval_partial_decoder;
#[cfg(feature = "async")]
pub use byte_interval_partial_decoder::AsyncByteIntervalPartialDecoder;
pub use byte_interval_partial_decoder::ByteIntervalPartialDecoder;
use derive_more::derive::Display;
pub use options::{CodecMetadataOptions, CodecOptions};
use thiserror::Error;
use zarrs_metadata::{ArrayShape, ChunkShape, Configuration};
use zarrs_storage::byte_range::{
ByteRange, ByteRangeIterator, InvalidByteRangeError, extract_byte_ranges,
};
#[cfg(feature = "async")]
use zarrs_storage::{AsyncReadableStorage, AsyncReadableWritableStorage};
use zarrs_storage::{
OffsetBytesIterator, ReadableStorage, ReadableWritableStorage, StorageError, StoreKey,
};
use std::any::Any;
use std::borrow::Cow;
use std::num::NonZeroU64;
use std::sync::{Arc, LazyLock, Mutex};
use zarrs_chunk_grid::{
ArraySubset, ArraySubsetError, IncompatibleDimensionalityError, Indexer, IndexerError,
};
use zarrs_data_type::{DataType, DataTypeFillValueError, FillValue};
use zarrs_metadata::v2::MetadataV2;
use zarrs_metadata::v3::MetadataV3;
use zarrs_plugin::{
ExtensionAliases, ExtensionName, MaybeSend, MaybeSync, Plugin, PluginCreateError,
PluginUnsupportedError, RuntimePlugin, RuntimeRegistry, ZarrVersion, ZarrVersion2,
ZarrVersion3,
};
pub enum ArrayBytesDecodeIntoTarget<'a> {
Fixed(&'a mut ArrayBytesFixedDisjointView<'a>),
Optional(
Box<ArrayBytesDecodeIntoTarget<'a>>,
&'a mut ArrayBytesFixedDisjointView<'a>,
),
}
impl ArrayBytesDecodeIntoTarget<'_> {
#[must_use]
pub fn num_elements(&self) -> u64 {
match self {
Self::Fixed(data) => data.num_elements(),
Self::Optional(data, _) => data.num_elements(),
}
}
}
impl<'a> From<&'a mut ArrayBytesFixedDisjointView<'a>> for ArrayBytesDecodeIntoTarget<'a> {
fn from(view: &'a mut ArrayBytesFixedDisjointView<'a>) -> Self {
Self::Fixed(view)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct PartialDecoderCapability {
pub partial_read: bool,
pub partial_decode: bool,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct PartialEncoderCapability {
pub partial_encode: bool,
}
#[derive(derive_more::Deref)]
pub struct CodecPluginV3(Plugin<Codec, MetadataV3>);
inventory::collect!(CodecPluginV3);
impl CodecPluginV3 {
pub const fn new<T: ExtensionAliases<ZarrVersion3> + CodecTraitsV3>() -> Self {
Self(Plugin::new(T::matches_name, T::create))
}
}
#[derive(derive_more::Deref)]
pub struct CodecPluginV2(Plugin<Codec, MetadataV2>);
inventory::collect!(CodecPluginV2);
impl CodecPluginV2 {
pub const fn new<T: ExtensionAliases<ZarrVersion2> + CodecTraitsV2>() -> Self {
Self(Plugin::new(T::matches_name, T::create))
}
}
pub type CodecRuntimePluginV3 = RuntimePlugin<Codec, MetadataV3>;
pub type CodecRuntimePluginV2 = RuntimePlugin<Codec, MetadataV2>;
pub static CODEC_RUNTIME_REGISTRY_V3: LazyLock<RuntimeRegistry<CodecRuntimePluginV3>> =
LazyLock::new(RuntimeRegistry::new);
pub static CODEC_RUNTIME_REGISTRY_V2: LazyLock<RuntimeRegistry<CodecRuntimePluginV2>> =
LazyLock::new(RuntimeRegistry::new);
pub type CodecRuntimeRegistryHandleV3 = Arc<CodecRuntimePluginV3>;
pub type CodecRuntimeRegistryHandleV2 = Arc<CodecRuntimePluginV2>;
pub fn register_codec_v3(plugin: CodecRuntimePluginV3) -> CodecRuntimeRegistryHandleV3 {
CODEC_RUNTIME_REGISTRY_V3.register(plugin)
}
pub fn register_codec_v2(plugin: CodecRuntimePluginV2) -> CodecRuntimeRegistryHandleV2 {
CODEC_RUNTIME_REGISTRY_V2.register(plugin)
}
pub fn unregister_codec_v3(handle: &CodecRuntimeRegistryHandleV3) -> bool {
CODEC_RUNTIME_REGISTRY_V3.unregister(handle)
}
pub fn unregister_codec_v2(handle: &CodecRuntimeRegistryHandleV2) -> bool {
CODEC_RUNTIME_REGISTRY_V2.unregister(handle)
}
#[derive(Debug)]
pub enum Codec {
ArrayToArray(Arc<dyn ArrayToArrayCodecTraits>),
ArrayToBytes(Arc<dyn ArrayToBytesCodecTraits>),
BytesToBytes(Arc<dyn BytesToBytesCodecTraits>),
}
#[derive(Debug, Clone, Copy, derive_more::From)]
pub enum CodecMetadata<'a> {
V3(&'a MetadataV3),
V2(&'a MetadataV2),
}
impl Codec {
pub fn from_metadata<'a>(
metadata: impl Into<CodecMetadata<'a>>,
) -> Result<Self, PluginCreateError> {
match metadata.into() {
CodecMetadata::V2(metadata) => Self::from_metadata_v2(metadata),
CodecMetadata::V3(metadata) => Self::from_metadata_v3(metadata),
}
}
fn from_metadata_v3(metadata: &MetadataV3) -> Result<Self, PluginCreateError> {
let name = metadata.name();
{
let result = CODEC_RUNTIME_REGISTRY_V3.with_plugins(|plugins| {
for plugin in plugins {
if plugin.match_name(name) {
return Some(plugin.create(metadata));
}
}
None
});
if let Some(result) = result {
return result;
}
}
for plugin in inventory::iter::<CodecPluginV3> {
if plugin.match_name(name) {
return plugin.create(metadata);
}
}
Err(PluginUnsupportedError::new(metadata.name().to_string(), "codec".to_string()).into())
}
fn from_metadata_v2(metadata: &MetadataV2) -> Result<Self, PluginCreateError> {
let name = metadata.id();
{
let result = CODEC_RUNTIME_REGISTRY_V2.with_plugins(|plugins| {
for plugin in plugins {
if plugin.match_name(name) {
return Some(plugin.create(metadata));
}
}
None
});
if let Some(result) = result {
return result;
}
}
for plugin in inventory::iter::<CodecPluginV2> {
if plugin.match_name(name) {
return plugin.create(metadata);
}
}
Err(PluginUnsupportedError::new(metadata.id().to_string(), "codec".to_string()).into())
}
#[must_use]
pub fn configuration(
&self,
version: ZarrVersion,
options: &CodecMetadataOptions,
) -> Option<Configuration> {
match self {
Self::ArrayToArray(codec) => codec.configuration(version, options),
Self::ArrayToBytes(codec) => codec.configuration(version, options),
Self::BytesToBytes(codec) => codec.configuration(version, options),
}
}
#[must_use]
pub fn configuration_v3(&self, options: &CodecMetadataOptions) -> Option<Configuration> {
self.configuration(ZarrVersion::V3, options)
}
#[must_use]
pub fn configuration_v2(&self, options: &CodecMetadataOptions) -> Option<Configuration> {
self.configuration(ZarrVersion::V2, options)
}
}
impl ExtensionName for Codec {
fn name(&self, version: ZarrVersion) -> Option<std::borrow::Cow<'static, str>> {
match self {
Self::ArrayToArray(codec) => codec.name(version),
Self::ArrayToBytes(codec) => codec.name(version),
Self::BytesToBytes(codec) => codec.name(version),
}
}
}
pub trait CodecTraitsV2 {
fn create(metadata: &MetadataV2) -> Result<Codec, PluginCreateError>
where
Self: Sized;
}
pub trait CodecTraitsV3 {
fn create(metadata: &MetadataV3) -> Result<Codec, PluginCreateError>
where
Self: Sized;
}
pub trait CodecTraits: ExtensionName + MaybeSend + MaybeSync {
fn as_any(&self) -> &dyn Any;
fn configuration(
&self,
version: ZarrVersion,
options: &CodecMetadataOptions,
) -> Option<Configuration>;
fn configuration_v3(&self, options: &CodecMetadataOptions) -> Option<Configuration> {
self.configuration(ZarrVersion::V3, options)
}
fn configuration_v2(&self, options: &CodecMetadataOptions) -> Option<Configuration> {
self.configuration(ZarrVersion::V2, options)
}
fn partial_decoder_capability(&self) -> PartialDecoderCapability;
fn partial_encoder_capability(&self) -> PartialEncoderCapability;
}
pub trait ArrayCodecTraits: CodecTraits {
fn recommended_concurrency(
&self,
shape: &[NonZeroU64],
data_type: &DataType,
) -> Result<RecommendedConcurrency, CodecError>;
fn partial_decode_granularity(&self, shape: &[NonZeroU64]) -> ChunkShape {
shape.to_vec()
}
}
pub trait BytesPartialDecoderTraits: Any + MaybeSend + MaybeSync {
fn exists(&self) -> Result<bool, StorageError>;
fn size_held(&self) -> usize;
fn partial_decode(
&self,
decoded_region: ByteRange,
options: &CodecOptions,
) -> Result<Option<ArrayBytesRaw<'_>>, CodecError> {
Ok(self
.partial_decode_many(Box::new([decoded_region].into_iter()), options)?
.map(|mut v| v.pop().expect("single byte range")))
}
fn partial_decode_many(
&self,
decoded_regions: ByteRangeIterator,
options: &CodecOptions,
) -> Result<Option<Vec<ArrayBytesRaw<'_>>>, CodecError>;
fn decode(&self, options: &CodecOptions) -> Result<Option<ArrayBytesRaw<'_>>, CodecError> {
self.partial_decode(ByteRange::FromStart(0, None), options)
}
fn supports_partial_decode(&self) -> bool;
}
#[cfg(feature = "async")]
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
pub trait AsyncBytesPartialDecoderTraits: Any + MaybeSend + MaybeSync {
async fn exists(&self) -> Result<bool, StorageError>;
fn size_held(&self) -> usize;
async fn partial_decode<'a>(
&'a self,
decoded_region: ByteRange,
options: &CodecOptions,
) -> Result<Option<ArrayBytesRaw<'a>>, CodecError> {
Ok(self
.partial_decode_many(Box::new([decoded_region].into_iter()), options)
.await?
.map(|mut v| v.pop().expect("single byte range")))
}
async fn partial_decode_many<'a>(
&'a self,
decoded_regions: ByteRangeIterator<'a>,
options: &CodecOptions,
) -> Result<Option<Vec<ArrayBytesRaw<'a>>>, CodecError>;
async fn decode<'a>(
&'a self,
options: &CodecOptions,
) -> Result<Option<ArrayBytesRaw<'a>>, CodecError> {
self.partial_decode(ByteRange::FromStart(0, None), options)
.await
}
fn supports_partial_decode(&self) -> bool;
}
pub trait ArrayPartialDecoderTraits: Any + MaybeSend + MaybeSync {
fn data_type(&self) -> &DataType;
fn exists(&self) -> Result<bool, StorageError>;
fn size_held(&self) -> usize;
fn partial_decode(
&self,
indexer: &dyn Indexer,
options: &CodecOptions,
) -> Result<ArrayBytes<'_>, CodecError>;
fn partial_decode_into(
&self,
indexer: &dyn Indexer,
output_target: ArrayBytesDecodeIntoTarget<'_>,
options: &CodecOptions,
) -> Result<(), CodecError> {
if indexer.len() != output_target.num_elements() {
return Err(InvalidNumberOfElementsError::new(
indexer.len(),
output_target.num_elements(),
)
.into());
}
let decoded_value = self.partial_decode(indexer, options)?;
decode_into_array_bytes_target(&decoded_value, output_target)
}
fn supports_partial_decode(&self) -> bool;
}
pub trait ArrayPartialEncoderTraits:
ArrayPartialDecoderTraits + Any + MaybeSend + MaybeSync
{
fn into_dyn_decoder(self: Arc<Self>) -> Arc<dyn ArrayPartialDecoderTraits>;
fn erase(&self) -> Result<(), CodecError>;
fn partial_encode(
&self,
indexer: &dyn Indexer,
bytes: &ArrayBytes<'_>,
options: &CodecOptions,
) -> Result<(), CodecError>;
fn supports_partial_encode(&self) -> bool;
}
#[cfg(feature = "async")]
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
pub trait AsyncArrayPartialEncoderTraits:
AsyncArrayPartialDecoderTraits + Any + MaybeSend + MaybeSync
{
fn into_dyn_decoder(self: Arc<Self>) -> Arc<dyn AsyncArrayPartialDecoderTraits>;
async fn erase(&self) -> Result<(), CodecError>;
async fn partial_encode(
&self,
indexer: &dyn Indexer,
bytes: &ArrayBytes<'_>,
options: &CodecOptions,
) -> Result<(), CodecError>;
fn supports_partial_encode(&self) -> bool;
}
pub trait BytesPartialEncoderTraits:
BytesPartialDecoderTraits + Any + MaybeSend + MaybeSync
{
fn into_dyn_decoder(self: Arc<Self>) -> Arc<dyn BytesPartialDecoderTraits>;
fn erase(&self) -> Result<(), CodecError>;
fn partial_encode(
&self,
offset: u64,
bytes: ArrayBytesRaw<'_>,
options: &CodecOptions,
) -> Result<(), CodecError> {
self.partial_encode_many(Box::new([(offset, bytes)].into_iter()), options)
}
fn partial_encode_many(
&self,
offset_values: OffsetBytesIterator<ArrayBytesRaw<'_>>,
options: &CodecOptions,
) -> Result<(), CodecError>;
fn supports_partial_encode(&self) -> bool;
}
#[cfg(feature = "async")]
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
pub trait AsyncBytesPartialEncoderTraits:
AsyncBytesPartialDecoderTraits + Any + MaybeSend + MaybeSync
{
fn into_dyn_decoder(self: Arc<Self>) -> Arc<dyn AsyncBytesPartialDecoderTraits>;
async fn erase(&self) -> Result<(), CodecError>;
async fn partial_encode(
&self,
offset: u64,
bytes: ArrayBytesRaw<'_>,
options: &CodecOptions,
) -> Result<(), CodecError> {
self.partial_encode_many(Box::new([(offset, bytes)].into_iter()), options)
.await
}
async fn partial_encode_many<'a>(
&'a self,
offset_values: OffsetBytesIterator<'a, ArrayBytesRaw<'_>>,
options: &CodecOptions,
) -> Result<(), CodecError>;
fn supports_partial_encode(&self) -> bool;
}
#[cfg(feature = "async")]
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
pub trait AsyncArrayPartialDecoderTraits: Any + MaybeSend + MaybeSync {
fn data_type(&self) -> &DataType;
async fn exists(&self) -> Result<bool, StorageError>;
fn size_held(&self) -> usize;
async fn partial_decode<'a>(
&'a self,
indexer: &dyn Indexer,
options: &CodecOptions,
) -> Result<ArrayBytes<'a>, CodecError>;
#[allow(clippy::missing_safety_doc)]
async fn partial_decode_into(
&self,
indexer: &dyn Indexer,
output_target: ArrayBytesDecodeIntoTarget<'_>,
options: &CodecOptions,
) -> Result<(), CodecError> {
if indexer.len() != output_target.num_elements() {
return Err(InvalidNumberOfElementsError::new(
indexer.len(),
output_target.num_elements(),
)
.into());
}
let decoded_value = self.partial_decode(indexer, options).await?;
decode_into_array_bytes_target(&decoded_value, output_target)
}
fn supports_partial_decode(&self) -> bool;
}
pub struct StoragePartialDecoder {
storage: ReadableStorage,
key: StoreKey,
}
impl StoragePartialDecoder {
pub fn new(storage: ReadableStorage, key: StoreKey) -> Self {
Self { storage, key }
}
}
impl BytesPartialDecoderTraits for StoragePartialDecoder {
fn exists(&self) -> Result<bool, StorageError> {
Ok(self.storage.size_key(&self.key)?.is_some())
}
fn size_held(&self) -> usize {
0
}
fn partial_decode_many(
&self,
decoded_regions: ByteRangeIterator,
_options: &CodecOptions,
) -> Result<Option<Vec<ArrayBytesRaw<'_>>>, CodecError> {
let bytes = self.storage.get_partial_many(&self.key, decoded_regions)?;
if let Some(bytes) = bytes {
let bytes = bytes
.map(|b| Ok::<_, StorageError>(Cow::Owned(b?.into())))
.collect::<Result<Vec<_>, _>>()?;
Ok(Some(bytes))
} else {
Ok(None)
}
}
fn supports_partial_decode(&self) -> bool {
self.storage.supports_get_partial()
}
}
#[cfg(feature = "async")]
pub struct AsyncStoragePartialDecoder {
storage: AsyncReadableStorage,
key: StoreKey,
}
#[cfg(feature = "async")]
impl AsyncStoragePartialDecoder {
pub fn new(storage: AsyncReadableStorage, key: StoreKey) -> Self {
Self { storage, key }
}
}
#[cfg(feature = "async")]
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
impl AsyncBytesPartialDecoderTraits for AsyncStoragePartialDecoder {
async fn exists(&self) -> Result<bool, StorageError> {
Ok(self.storage.size_key(&self.key).await?.is_some())
}
fn size_held(&self) -> usize {
0
}
async fn partial_decode_many<'a>(
&'a self,
decoded_regions: ByteRangeIterator<'a>,
_options: &CodecOptions,
) -> Result<Option<Vec<ArrayBytesRaw<'a>>>, CodecError> {
let bytes = self
.storage
.get_partial_many(&self.key, decoded_regions)
.await?;
Ok(if let Some(bytes) = bytes {
use futures::{StreamExt, TryStreamExt};
Some(
bytes
.map(|bytes| Ok::<_, StorageError>(Cow::Owned(bytes?.into())))
.try_collect()
.await?,
)
} else {
None
})
}
fn supports_partial_decode(&self) -> bool {
self.storage.supports_get_partial()
}
}
impl BytesPartialDecoderTraits for Mutex<Option<Vec<u8>>> {
fn exists(&self) -> Result<bool, StorageError> {
Ok(self.lock().unwrap().is_some())
}
fn size_held(&self) -> usize {
self.lock().unwrap().as_ref().map_or(0, Vec::len)
}
fn partial_decode_many(
&self,
decoded_regions: ByteRangeIterator,
_options: &CodecOptions,
) -> Result<Option<Vec<ArrayBytesRaw<'_>>>, CodecError> {
if let Some(input) = self.lock().unwrap().as_ref() {
let size = input.len() as u64;
let mut outputs = vec![];
for byte_range in decoded_regions {
if byte_range.end(size) <= size {
outputs.push(Cow::Owned(input[byte_range.to_range_usize(size)].into()));
} else {
return Err(InvalidByteRangeError::new(byte_range, size).into());
}
}
Ok(Some(outputs))
} else {
Ok(None)
}
}
fn supports_partial_decode(&self) -> bool {
true
}
}
impl BytesPartialEncoderTraits for Mutex<Option<Vec<u8>>> {
fn into_dyn_decoder(self: Arc<Self>) -> Arc<dyn BytesPartialDecoderTraits> {
self.clone()
}
fn erase(&self) -> Result<(), CodecError> {
*self.lock().unwrap() = None;
Ok(())
}
fn partial_encode_many(
&self,
offset_values: OffsetBytesIterator<ArrayBytesRaw<'_>>,
_options: &CodecOptions,
) -> Result<(), CodecError> {
let mut v = self.lock().unwrap();
let mut output = v.as_ref().cloned().unwrap_or_default();
for (offset, value) in offset_values {
let offset = usize::try_from(offset).unwrap();
if output.len() < offset + value.len() {
output.resize(offset + value.len(), 0);
}
output[offset..offset + value.len()].copy_from_slice(&value);
}
*v = Some(output);
Ok(())
}
fn supports_partial_encode(&self) -> bool {
true
}
}
pub struct StoragePartialEncoder<TStorage> {
storage: TStorage,
key: StoreKey,
}
impl<TStorage> StoragePartialEncoder<TStorage> {
pub fn new(storage: TStorage, key: StoreKey) -> Self {
Self { storage, key }
}
}
impl BytesPartialDecoderTraits for StoragePartialEncoder<ReadableWritableStorage> {
fn exists(&self) -> Result<bool, StorageError> {
Ok(self.storage.size_key(&self.key)?.is_some())
}
fn size_held(&self) -> usize {
0
}
fn partial_decode_many(
&self,
decoded_regions: ByteRangeIterator,
_options: &CodecOptions,
) -> Result<Option<Vec<ArrayBytesRaw<'_>>>, CodecError> {
let results = self.storage.get_partial_many(&self.key, decoded_regions)?;
if let Some(results) = results {
Ok(Some(
results
.into_iter()
.map(|bytes| Ok::<_, StorageError>(Cow::Owned(bytes?.into())))
.collect::<Result<Vec<_>, _>>()?,
))
} else {
Ok(None)
}
}
fn supports_partial_decode(&self) -> bool {
self.storage.supports_get_partial()
}
}
impl BytesPartialEncoderTraits for StoragePartialEncoder<ReadableWritableStorage> {
fn into_dyn_decoder(self: Arc<Self>) -> Arc<dyn BytesPartialDecoderTraits> {
self.clone()
}
fn erase(&self) -> Result<(), CodecError> {
Ok(self.storage.erase(&self.key)?)
}
fn partial_encode_many(
&self,
offset_values: OffsetBytesIterator<ArrayBytesRaw<'_>>,
_options: &CodecOptions,
) -> Result<(), CodecError> {
let offset_values = offset_values
.into_iter()
.map(|(offset, bytes)| (offset, bytes.into_owned().into()));
Ok(self
.storage
.set_partial_many(&self.key, Box::new(offset_values))?)
}
fn supports_partial_encode(&self) -> bool {
self.storage.supports_set_partial()
}
}
#[cfg(feature = "async")]
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
impl AsyncBytesPartialDecoderTraits for StoragePartialEncoder<AsyncReadableWritableStorage> {
async fn exists(&self) -> Result<bool, StorageError> {
Ok(self.storage.size_key(&self.key).await?.is_some())
}
fn size_held(&self) -> usize {
0
}
async fn partial_decode_many<'a>(
&'a self,
decoded_regions: ByteRangeIterator<'a>,
_options: &CodecOptions,
) -> Result<Option<Vec<ArrayBytesRaw<'a>>>, CodecError> {
let results = self
.storage
.get_partial_many(&self.key, decoded_regions)
.await?;
if let Some(results) = results {
use futures::{StreamExt, TryStreamExt};
Ok(Some(
results
.map(|bytes| Ok::<_, StorageError>(Cow::Owned(bytes?.into())))
.try_collect()
.await?,
))
} else {
Ok(None)
}
}
fn supports_partial_decode(&self) -> bool {
self.storage.supports_get_partial()
}
}
#[cfg(feature = "async")]
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
impl AsyncBytesPartialEncoderTraits for StoragePartialEncoder<AsyncReadableWritableStorage> {
fn into_dyn_decoder(self: Arc<Self>) -> Arc<dyn AsyncBytesPartialDecoderTraits> {
self.clone()
}
async fn erase(&self) -> Result<(), CodecError> {
Ok(self.storage.erase(&self.key).await?)
}
async fn partial_encode_many<'a>(
&'a self,
offset_values: OffsetBytesIterator<'a, ArrayBytesRaw<'_>>,
_options: &CodecOptions,
) -> Result<(), CodecError> {
let offset_values = offset_values
.into_iter()
.map(|(offset, bytes)| (offset, bytes.into_owned().into()));
Ok(self
.storage
.set_partial_many(&self.key, Box::new(offset_values))
.await?)
}
fn supports_partial_encode(&self) -> bool {
self.storage.supports_set_partial()
}
}
#[cfg_attr(
all(feature = "async", not(target_arch = "wasm32")),
async_trait::async_trait
)]
#[cfg_attr(all(feature = "async", target_arch = "wasm32"), async_trait::async_trait(?Send))]
pub trait ArrayToArrayCodecTraits: ArrayCodecTraits + core::fmt::Debug {
fn into_dyn(self: Arc<Self>) -> Arc<dyn ArrayToArrayCodecTraits>;
#[expect(unused_variables)]
fn with_codec_specific_options(
self: Arc<Self>,
opts: &CodecSpecificOptions,
) -> Arc<dyn ArrayToArrayCodecTraits> {
self.into_dyn()
}
fn encoded_data_type(&self, decoded_data_type: &DataType) -> Result<DataType, CodecError>;
fn encoded_fill_value(
&self,
decoded_data_type: &DataType,
decoded_fill_value: &FillValue,
) -> Result<FillValue, CodecError> {
let element_shape = ChunkShape::from(vec![unsafe { NonZeroU64::new_unchecked(1) }]);
let fill_value = self
.encode(
ArrayBytes::new_fill_value(decoded_data_type, 1, decoded_fill_value)?,
&element_shape,
decoded_data_type,
decoded_fill_value,
&CodecOptions::default(),
)?
.into_fixed()?
.into_owned();
Ok(FillValue::new(fill_value))
}
fn encoded_shape(&self, decoded_shape: &[NonZeroU64]) -> Result<ChunkShape, CodecError> {
Ok(decoded_shape.to_vec())
}
fn decoded_shape(
&self,
encoded_shape: &[NonZeroU64],
) -> Result<Option<ChunkShape>, CodecError> {
Ok(Some(encoded_shape.to_vec()))
}
fn encoded_representation(
&self,
shape: &[NonZeroU64],
data_type: &DataType,
fill_value: &FillValue,
) -> Result<(ChunkShape, DataType, FillValue), CodecError> {
Ok((
self.encoded_shape(shape)?,
self.encoded_data_type(data_type)?,
self.encoded_fill_value(data_type, fill_value)?,
))
}
fn encode<'a>(
&self,
bytes: ArrayBytes<'a>,
shape: &[NonZeroU64],
data_type: &DataType,
fill_value: &FillValue,
options: &CodecOptions,
) -> Result<ArrayBytes<'a>, CodecError>;
fn decode<'a>(
&self,
bytes: ArrayBytes<'a>,
shape: &[NonZeroU64],
data_type: &DataType,
fill_value: &FillValue,
options: &CodecOptions,
) -> Result<ArrayBytes<'a>, CodecError>;
#[allow(unused_variables)]
fn partial_decoder(
self: Arc<Self>,
input_handle: Arc<dyn ArrayPartialDecoderTraits>,
shape: &[NonZeroU64],
data_type: &DataType,
fill_value: &FillValue,
options: &CodecOptions,
) -> Result<Arc<dyn ArrayPartialDecoderTraits>, CodecError> {
Ok(Arc::new(ArrayToArrayCodecPartialDefault::new(
input_handle,
shape.to_vec(),
data_type.clone(),
fill_value.clone(),
self.into_dyn(),
)))
}
#[allow(unused_variables)]
fn partial_encoder(
self: Arc<Self>,
input_output_handle: Arc<dyn ArrayPartialEncoderTraits>,
shape: &[NonZeroU64],
data_type: &DataType,
fill_value: &FillValue,
options: &CodecOptions,
) -> Result<Arc<dyn ArrayPartialEncoderTraits>, CodecError> {
Ok(Arc::new(ArrayToArrayCodecPartialDefault::new(
input_output_handle,
shape.to_vec(),
data_type.clone(),
fill_value.clone(),
self.into_dyn(),
)))
}
#[cfg(feature = "async")]
#[allow(unused_variables)]
async fn async_partial_decoder(
self: Arc<Self>,
input_handle: Arc<dyn AsyncArrayPartialDecoderTraits>,
shape: &[NonZeroU64],
data_type: &DataType,
fill_value: &FillValue,
options: &CodecOptions,
) -> Result<Arc<dyn AsyncArrayPartialDecoderTraits>, CodecError> {
Ok(Arc::new(ArrayToArrayCodecPartialDefault::new(
input_handle,
shape.to_vec(),
data_type.clone(),
fill_value.clone(),
self.into_dyn(),
)))
}
#[cfg(feature = "async")]
#[allow(unused_variables)]
async fn async_partial_encoder(
self: Arc<Self>,
input_output_handle: Arc<dyn AsyncArrayPartialEncoderTraits>,
shape: &[NonZeroU64],
data_type: &DataType,
fill_value: &FillValue,
options: &CodecOptions,
) -> Result<Arc<dyn AsyncArrayPartialEncoderTraits>, CodecError> {
Ok(Arc::new(ArrayToArrayCodecPartialDefault::new(
input_output_handle,
shape.to_vec(),
data_type.clone(),
fill_value.clone(),
self.into_dyn(),
)))
}
}
#[cfg_attr(
all(feature = "async", not(target_arch = "wasm32")),
async_trait::async_trait
)]
#[cfg_attr(all(feature = "async", target_arch = "wasm32"), async_trait::async_trait(?Send))]
pub trait ArrayToBytesCodecTraits: ArrayCodecTraits + core::fmt::Debug {
fn into_dyn(self: Arc<Self>) -> Arc<dyn ArrayToBytesCodecTraits>;
#[expect(unused_variables)]
fn with_codec_specific_options(
self: Arc<Self>,
opts: &CodecSpecificOptions,
) -> Arc<dyn ArrayToBytesCodecTraits> {
self.into_dyn()
}
fn encoded_representation(
&self,
shape: &[NonZeroU64],
data_type: &DataType,
fill_value: &FillValue,
) -> Result<BytesRepresentation, CodecError>;
fn encode<'a>(
&self,
bytes: ArrayBytes<'a>,
shape: &[NonZeroU64],
data_type: &DataType,
fill_value: &FillValue,
options: &CodecOptions,
) -> Result<ArrayBytesRaw<'a>, CodecError>;
fn decode<'a>(
&self,
bytes: ArrayBytesRaw<'a>,
shape: &[NonZeroU64],
data_type: &DataType,
fill_value: &FillValue,
options: &CodecOptions,
) -> Result<ArrayBytes<'a>, CodecError>;
#[expect(unused_variables)]
fn compact<'a>(
&self,
bytes: ArrayBytesRaw<'a>,
shape: &[NonZeroU64],
data_type: &DataType,
fill_value: &FillValue,
options: &CodecOptions,
) -> Result<Option<ArrayBytesRaw<'a>>, CodecError> {
Ok(None)
}
fn decode_into(
&self,
bytes: ArrayBytesRaw<'_>,
shape: &[NonZeroU64],
data_type: &DataType,
fill_value: &FillValue,
output_target: ArrayBytesDecodeIntoTarget<'_>,
options: &CodecOptions,
) -> Result<(), CodecError> {
let num_elements = output_target.num_elements();
let shape_num_elements: u64 = shape.iter().map(|d| d.get()).product();
if shape_num_elements != num_elements {
return Err(InvalidNumberOfElementsError::new(num_elements, shape_num_elements).into());
}
let decoded_value = self.decode(bytes, shape, data_type, fill_value, options)?;
decode_into_array_bytes_target(&decoded_value, output_target)
}
#[allow(unused_variables)]
fn partial_decoder(
self: Arc<Self>,
input_handle: Arc<dyn BytesPartialDecoderTraits>,
shape: &[NonZeroU64],
data_type: &DataType,
fill_value: &FillValue,
options: &CodecOptions,
) -> Result<Arc<dyn ArrayPartialDecoderTraits>, CodecError> {
Ok(Arc::new(ArrayToBytesCodecPartialDefault::new(
input_handle,
shape.to_vec(),
data_type.clone(),
fill_value.clone(),
self.into_dyn(),
)))
}
#[allow(unused_variables)]
fn partial_encoder(
self: Arc<Self>,
input_output_handle: Arc<dyn BytesPartialEncoderTraits>,
shape: &[NonZeroU64],
data_type: &DataType,
fill_value: &FillValue,
options: &CodecOptions,
) -> Result<Arc<dyn ArrayPartialEncoderTraits>, CodecError> {
Ok(Arc::new(ArrayToBytesCodecPartialDefault::new(
input_output_handle,
shape.to_vec(),
data_type.clone(),
fill_value.clone(),
self.into_dyn(),
)))
}
#[cfg(feature = "async")]
#[allow(unused_variables)]
async fn async_partial_decoder(
self: Arc<Self>,
input_handle: Arc<dyn AsyncBytesPartialDecoderTraits>,
shape: &[NonZeroU64],
data_type: &DataType,
fill_value: &FillValue,
options: &CodecOptions,
) -> Result<Arc<dyn AsyncArrayPartialDecoderTraits>, CodecError> {
Ok(Arc::new(ArrayToBytesCodecPartialDefault::new(
input_handle,
shape.to_vec(),
data_type.clone(),
fill_value.clone(),
self.into_dyn(),
)))
}
#[cfg(feature = "async")]
#[allow(unused_variables)]
async fn async_partial_encoder(
self: Arc<Self>,
input_output_handle: Arc<dyn AsyncBytesPartialEncoderTraits>,
shape: &[NonZeroU64],
data_type: &DataType,
fill_value: &FillValue,
options: &CodecOptions,
) -> Result<Arc<dyn AsyncArrayPartialEncoderTraits>, CodecError> {
Ok(Arc::new(ArrayToBytesCodecPartialDefault::new(
input_output_handle,
shape.to_vec(),
data_type.clone(),
fill_value.clone(),
self.into_dyn(),
)))
}
}
#[cfg_attr(
all(feature = "async", not(target_arch = "wasm32")),
async_trait::async_trait
)]
#[cfg_attr(all(feature = "async", target_arch = "wasm32"), async_trait::async_trait(?Send))]
pub trait BytesToBytesCodecTraits: CodecTraits + core::fmt::Debug {
fn into_dyn(self: Arc<Self>) -> Arc<dyn BytesToBytesCodecTraits>;
#[expect(unused_variables)]
fn with_codec_specific_options(
self: Arc<Self>,
opts: &CodecSpecificOptions,
) -> Arc<dyn BytesToBytesCodecTraits> {
self.into_dyn()
}
fn recommended_concurrency(
&self,
decoded_representation: &BytesRepresentation,
) -> Result<RecommendedConcurrency, CodecError>;
fn encoded_representation(
&self,
decoded_representation: &BytesRepresentation,
) -> BytesRepresentation;
fn encode<'a>(
&self,
decoded_value: ArrayBytesRaw<'a>,
options: &CodecOptions,
) -> Result<ArrayBytesRaw<'a>, CodecError>;
fn decode<'a>(
&self,
encoded_value: ArrayBytesRaw<'a>,
decoded_representation: &BytesRepresentation,
options: &CodecOptions,
) -> Result<ArrayBytesRaw<'a>, CodecError>;
#[allow(unused_variables)]
fn partial_decoder(
self: Arc<Self>,
input_handle: Arc<dyn BytesPartialDecoderTraits>,
decoded_representation: &BytesRepresentation,
options: &CodecOptions,
) -> Result<Arc<dyn BytesPartialDecoderTraits>, CodecError> {
Ok(Arc::new(BytesToBytesCodecPartialDefault::new_bytes(
input_handle,
*decoded_representation,
self.into_dyn(),
)))
}
#[allow(unused_variables)]
fn partial_encoder(
self: Arc<Self>,
input_output_handle: Arc<dyn BytesPartialEncoderTraits>,
decoded_representation: &BytesRepresentation,
options: &CodecOptions,
) -> Result<Arc<dyn BytesPartialEncoderTraits>, CodecError> {
Ok(Arc::new(BytesToBytesCodecPartialDefault::new_bytes(
input_output_handle,
*decoded_representation,
self.into_dyn(),
)))
}
#[cfg(feature = "async")]
#[allow(unused_variables)]
async fn async_partial_decoder(
self: Arc<Self>,
input_handle: Arc<dyn AsyncBytesPartialDecoderTraits>,
decoded_representation: &BytesRepresentation,
options: &CodecOptions,
) -> Result<Arc<dyn AsyncBytesPartialDecoderTraits>, CodecError> {
Ok(Arc::new(BytesToBytesCodecPartialDefault::new_bytes(
input_handle,
*decoded_representation,
self.into_dyn(),
)))
}
#[cfg(feature = "async")]
#[allow(unused_variables)]
async fn async_partial_encoder(
self: Arc<Self>,
input_output_handle: Arc<dyn AsyncBytesPartialEncoderTraits>,
decoded_representation: &BytesRepresentation,
options: &CodecOptions,
) -> Result<Arc<dyn AsyncBytesPartialEncoderTraits>, CodecError> {
Ok(Arc::new(BytesToBytesCodecPartialDefault::new_bytes(
input_output_handle,
*decoded_representation,
self.into_dyn(),
)))
}
}
impl BytesPartialDecoderTraits for Cow<'static, [u8]> {
fn exists(&self) -> Result<bool, StorageError> {
Ok(true)
}
fn size_held(&self) -> usize {
self.as_ref().len()
}
fn partial_decode_many(
&self,
decoded_regions: ByteRangeIterator,
_parallel: &CodecOptions,
) -> Result<Option<Vec<ArrayBytesRaw<'_>>>, CodecError> {
Ok(Some(
extract_byte_ranges(self, decoded_regions)?
.into_iter()
.map(Cow::Owned)
.collect(),
))
}
fn supports_partial_decode(&self) -> bool {
true
}
}
impl BytesPartialDecoderTraits for Vec<u8> {
fn exists(&self) -> Result<bool, StorageError> {
Ok(true)
}
fn size_held(&self) -> usize {
self.len()
}
fn partial_decode_many(
&self,
decoded_regions: ByteRangeIterator,
_parallel: &CodecOptions,
) -> Result<Option<Vec<ArrayBytesRaw<'_>>>, CodecError> {
Ok(Some(
extract_byte_ranges(self, decoded_regions)?
.into_iter()
.map(Cow::Owned)
.collect(),
))
}
fn supports_partial_decode(&self) -> bool {
true
}
}
#[cfg(feature = "async")]
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
impl AsyncBytesPartialDecoderTraits for Cow<'static, [u8]> {
async fn exists(&self) -> Result<bool, StorageError> {
Ok(true)
}
fn size_held(&self) -> usize {
self.as_ref().len()
}
async fn partial_decode_many<'a>(
&'a self,
decoded_regions: ByteRangeIterator<'a>,
_parallel: &CodecOptions,
) -> Result<Option<Vec<ArrayBytesRaw<'a>>>, CodecError> {
Ok(Some(
extract_byte_ranges(self, decoded_regions)?
.into_iter()
.map(Cow::Owned)
.collect(),
))
}
fn supports_partial_decode(&self) -> bool {
true
}
}
#[cfg(feature = "async")]
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
impl AsyncBytesPartialDecoderTraits for Vec<u8> {
async fn exists(&self) -> Result<bool, StorageError> {
Ok(true)
}
fn size_held(&self) -> usize {
self.len()
}
async fn partial_decode_many<'a>(
&'a self,
decoded_regions: ByteRangeIterator<'a>,
_parallel: &CodecOptions,
) -> Result<Option<Vec<ArrayBytesRaw<'a>>>, CodecError> {
Ok(Some(
extract_byte_ranges(self, decoded_regions)?
.into_iter()
.map(Cow::Owned)
.collect(),
))
}
fn supports_partial_decode(&self) -> bool {
true
}
}
#[derive(Clone, Debug, Display, Error)]
#[display("Invalid bytes len {len}, expected {expected_len}")]
pub struct InvalidBytesLengthError {
len: usize,
expected_len: usize,
}
impl InvalidBytesLengthError {
#[must_use]
pub fn new(len: usize, expected_len: usize) -> Self {
Self { len, expected_len }
}
}
#[derive(Clone, Debug, Display, Error)]
#[display("Invalid shape {shape:?} for number of elements {expected_num_elements}")]
pub struct InvalidArrayShapeError {
shape: ArrayShape,
expected_num_elements: usize,
}
impl InvalidArrayShapeError {
#[must_use]
pub fn new(shape: ArrayShape, expected_num_elements: usize) -> Self {
Self {
shape,
expected_num_elements,
}
}
}
#[derive(Clone, Debug, Display, Error)]
#[display("Invalid number of elements {num}, expected {expected}")]
pub struct InvalidNumberOfElementsError {
num: u64,
expected: u64,
}
impl InvalidNumberOfElementsError {
#[must_use]
pub fn new(num: u64, expected: u64) -> Self {
Self { num, expected }
}
}
#[derive(Clone, Debug, Display, Error)]
#[display("Subset {subset} is out of bounds of {must_be_within}")]
pub struct SubsetOutOfBoundsError {
subset: ArraySubset,
must_be_within: ArraySubset,
}
impl SubsetOutOfBoundsError {
#[must_use]
pub fn new(subset: ArraySubset, must_be_within: ArraySubset) -> Self {
Self {
subset,
must_be_within,
}
}
}
#[non_exhaustive]
#[derive(Clone, Debug, Error)]
pub enum CodecError {
#[error(transparent)]
IncompatibleDimensionalityError(#[from] IncompatibleDimensionalityError),
#[error(transparent)]
IOError(#[from] Arc<std::io::Error>),
#[error(transparent)]
InvalidByteRangeError(#[from] InvalidByteRangeError),
#[error(transparent)]
IncompatibleIndexer(#[from] IndexerError),
#[error("the size of a decoded chunk is {}, expected {}", _0.len, _0.expected_len)]
UnexpectedChunkDecodedSize(#[from] InvalidBytesLengthError),
#[error("the checksum is invalid")]
InvalidChecksum,
#[error(transparent)]
StorageError(#[from] StorageError),
#[error("{}", format_unsupported_data_type(.0, .1))]
UnsupportedDataType(DataType, String),
#[error(transparent)]
UnsupportedDataTypeCodec(#[from] zarrs_data_type::DataTypeCodecError),
#[error(
"Offsets are invalid or are not compatible with the data type (e.g. fixed-sized data types)"
)]
InvalidOffsets,
#[error("{_0}")]
Other(String),
#[error("Invalid variable sized array offsets")]
InvalidVariableSizedArrayOffsets,
#[error(transparent)]
ExpectedFixedLengthBytes(#[from] ExpectedFixedLengthBytesError),
#[error(transparent)]
ExpectedVariableLengthBytes(#[from] ExpectedVariableLengthBytesError),
#[error(transparent)]
ExpectedOptionalBytes(#[from] ExpectedOptionalBytesError),
#[error(transparent)]
InvalidArrayShape(#[from] InvalidArrayShapeError),
#[error(transparent)]
InvalidNumberOfElements(#[from] InvalidNumberOfElementsError),
#[error(transparent)]
SubsetOutOfBounds(#[from] SubsetOutOfBoundsError),
#[error(transparent)]
RawBytesOffsetsCreate(#[from] ArrayBytesRawOffsetsCreateError),
#[error(transparent)]
RawBytesOffsetsOutOfBounds(#[from] ArrayBytesRawOffsetsOutOfBoundsError),
#[error(transparent)]
DataTypeFillValueError(#[from] DataTypeFillValueError),
#[error(transparent)]
ArraySubsetError(#[from] ArraySubsetError),
}
fn format_unsupported_data_type(data_type: &DataType, codec_name: &String) -> String {
let data_type_name = data_type
.name(zarrs_plugin::ZarrVersion::V3)
.unwrap_or_default();
if data_type.is_optional() {
format!(
"Unsupported data type {data_type_name} for codec {codec_name}. Use the optional codec to handle optional data types.",
)
} else {
format!("Unsupported data type {data_type_name} for codec {codec_name}")
}
}
impl From<std::io::Error> for CodecError {
fn from(err: std::io::Error) -> Self {
Self::IOError(Arc::new(err))
}
}
impl From<&str> for CodecError {
fn from(err: &str) -> Self {
Self::Other(err.to_string())
}
}
impl From<String> for CodecError {
fn from(err: String) -> Self {
Self::Other(err)
}
}
impl From<zarrs_data_type::codec_traits::bytes::BytesCodecEndiannessMissingError> for CodecError {
fn from(err: zarrs_data_type::codec_traits::bytes::BytesCodecEndiannessMissingError) -> Self {
Self::Other(err.to_string())
}
}