use std::borrow::Cow;
use std::num::NonZeroU64;
use std::sync::Arc;
use zarrs_chunk_grid::Indexer;
use zarrs_data_type::FillValue;
use super::{
ArrayBytes, ArrayBytesOffsets, ArrayBytesRaw, ArrayPartialDecoderTraits,
ArrayPartialEncoderTraits, ArraySubset, ArrayToArrayCodecTraits, ArrayToBytesCodecTraits,
BytesPartialDecoderTraits, BytesPartialEncoderTraits, BytesRepresentation,
BytesToBytesCodecTraits, ChunkShape, CodecError, CodecOptions, DataType,
};
use crate::array_bytes::update_array_bytes;
#[cfg(feature = "async")]
use crate::{
AsyncArrayPartialDecoderTraits, AsyncArrayPartialEncoderTraits, AsyncBytesPartialDecoderTraits,
AsyncBytesPartialEncoderTraits,
};
use zarrs_metadata::DataTypeSize;
use zarrs_storage::byte_range::{ByteRangeIterator, extract_byte_ranges};
use zarrs_storage::{OffsetBytesIterator, StorageError};
pub(super) struct ArrayDecodedRepresentation {
shape: ChunkShape,
data_type: DataType,
fill_value: FillValue,
}
impl ArrayDecodedRepresentation {
pub(super) fn new(shape: ChunkShape, data_type: DataType, fill_value: FillValue) -> Self {
Self {
shape,
data_type,
fill_value,
}
}
pub(super) fn shape(&self) -> &[NonZeroU64] {
&self.shape
}
pub(super) fn shape_u64(&self) -> &[u64] {
bytemuck::must_cast_slice(&self.shape)
}
pub(super) fn data_type(&self) -> &DataType {
&self.data_type
}
pub(super) fn fill_value(&self) -> &FillValue {
&self.fill_value
}
pub(super) fn num_elements(&self) -> u64 {
self.shape.iter().map(|d| d.get()).product()
}
}
pub struct CodecPartialDefault<T: ?Sized, R, C: ?Sized> {
input_output_handle: Arc<T>,
decoded_representation: R,
codec: Arc<C>,
}
pub(super) type ArrayToArrayCodecPartialDefault<T> =
CodecPartialDefault<T, ArrayDecodedRepresentation, dyn ArrayToArrayCodecTraits>;
pub(super) type ArrayToBytesCodecPartialDefault<T> =
CodecPartialDefault<T, ArrayDecodedRepresentation, dyn ArrayToBytesCodecTraits>;
pub(super) type BytesToBytesCodecPartialDefault<T> =
CodecPartialDefault<T, BytesRepresentation, dyn BytesToBytesCodecTraits>;
impl<T: ?Sized, C: ?Sized> CodecPartialDefault<T, ArrayDecodedRepresentation, C> {
#[must_use]
pub fn new(
input_output_handle: Arc<T>,
shape: ChunkShape,
data_type: DataType,
fill_value: FillValue,
codec: Arc<C>,
) -> Self {
Self {
input_output_handle,
decoded_representation: ArrayDecodedRepresentation::new(shape, data_type, fill_value),
codec,
}
}
}
impl<T: ?Sized, C: ?Sized> CodecPartialDefault<T, BytesRepresentation, C> {
#[must_use]
pub fn new_bytes(
input_output_handle: Arc<T>,
decoded_representation: BytesRepresentation,
codec: Arc<C>,
) -> Self {
Self {
input_output_handle,
decoded_representation,
codec,
}
}
}
impl<T: ?Sized> ArrayPartialDecoderTraits
for CodecPartialDefault<T, ArrayDecodedRepresentation, dyn ArrayToArrayCodecTraits>
where
T: ArrayPartialDecoderTraits,
{
fn data_type(&self) -> &super::DataType {
self.decoded_representation.data_type()
}
fn exists(&self) -> Result<bool, StorageError> {
self.input_output_handle.exists()
}
fn size_held(&self) -> usize {
self.input_output_handle.size_held()
}
fn partial_decode(
&self,
indexer: &dyn Indexer,
options: &super::CodecOptions,
) -> Result<ArrayBytes<'_>, super::CodecError> {
let output_shape: Result<Vec<NonZeroU64>, _> = indexer
.output_shape()
.iter()
.map(|f| NonZeroU64::try_from(*f))
.collect();
let chunk_bytes = self.input_output_handle.partial_decode(indexer, options)?;
if let Ok(shape) = output_shape {
let shape = ChunkShape::from(shape);
self.codec
.decode(
chunk_bytes,
&shape,
self.decoded_representation.data_type(),
self.decoded_representation.fill_value(),
options,
)
.map(ArrayBytes::into_owned)
} else {
Ok(match self.decoded_representation.data_type().size() {
DataTypeSize::Fixed(_) => ArrayBytes::new_flen(vec![]),
DataTypeSize::Variable => {
ArrayBytes::new_vlen(vec![], ArrayBytesOffsets::new(vec![0]).unwrap()).unwrap()
}
})
}
}
fn supports_partial_decode(&self) -> bool {
self.input_output_handle.supports_partial_decode()
}
}
impl<T: ?Sized> ArrayPartialEncoderTraits
for CodecPartialDefault<T, ArrayDecodedRepresentation, dyn ArrayToArrayCodecTraits>
where
T: ArrayPartialEncoderTraits,
{
fn into_dyn_decoder(self: Arc<Self>) -> Arc<dyn ArrayPartialDecoderTraits> {
self.clone()
}
fn erase(&self) -> Result<(), super::CodecError> {
self.input_output_handle.erase()
}
fn partial_encode(
&self,
indexer: &dyn Indexer,
bytes: &ArrayBytes<'_>,
options: &super::CodecOptions,
) -> Result<(), super::CodecError> {
let chunk_shape = self.decoded_representation.shape_u64();
let array_subset_all = ArraySubset::new_with_shape(chunk_shape.to_vec());
let encoded_value = self
.input_output_handle
.partial_decode(&array_subset_all, options)?;
let mut decoded_value = self.codec.decode(
encoded_value,
self.decoded_representation.shape(),
self.decoded_representation.data_type(),
self.decoded_representation.fill_value(),
options,
)?;
decoded_value.validate(
self.decoded_representation.num_elements(),
self.decoded_representation.data_type(),
)?;
bytes.validate(indexer.len(), self.decoded_representation.data_type())?;
decoded_value = update_array_bytes(
decoded_value,
chunk_shape,
indexer,
bytes,
self.decoded_representation.data_type().size(),
)?;
self.input_output_handle.erase()?;
let is_fill_value = !options.store_empty_chunks()
&& decoded_value.is_fill_value(self.decoded_representation.fill_value());
if is_fill_value {
Ok(())
} else {
let encoded_value = self.codec.encode(
decoded_value,
self.decoded_representation.shape(),
self.decoded_representation.data_type(),
self.decoded_representation.fill_value(),
options,
)?;
self.input_output_handle
.partial_encode(&array_subset_all, &encoded_value, options)
}
}
fn supports_partial_encode(&self) -> bool {
false
}
}
impl<T: ?Sized> ArrayPartialDecoderTraits
for CodecPartialDefault<T, ArrayDecodedRepresentation, dyn ArrayToBytesCodecTraits>
where
T: BytesPartialDecoderTraits,
{
fn data_type(&self) -> &super::DataType {
self.decoded_representation.data_type()
}
fn exists(&self) -> Result<bool, StorageError> {
self.input_output_handle.exists()
}
fn size_held(&self) -> usize {
self.input_output_handle.size_held()
}
fn partial_decode(
&self,
indexer: &dyn Indexer,
options: &super::CodecOptions,
) -> Result<ArrayBytes<'_>, super::CodecError> {
let bytes_enc = self.input_output_handle.decode(options)?;
if let Some(bytes_enc) = bytes_enc {
let bytes_dec = self.codec.decode(
bytes_enc,
self.decoded_representation.shape(),
self.decoded_representation.data_type(),
self.decoded_representation.fill_value(),
options,
)?;
let chunk_shape = self.decoded_representation.shape_u64();
bytes_dec
.extract_array_subset(
indexer,
chunk_shape,
self.decoded_representation.data_type(),
)
.map(ArrayBytes::into_owned)
} else {
ArrayBytes::new_fill_value(
self.decoded_representation.data_type(),
indexer.len(),
self.decoded_representation.fill_value(),
)
.map_err(CodecError::from)
}
}
fn supports_partial_decode(&self) -> bool {
false
}
}
impl<T: ?Sized> ArrayPartialEncoderTraits
for CodecPartialDefault<T, ArrayDecodedRepresentation, dyn ArrayToBytesCodecTraits>
where
T: BytesPartialEncoderTraits,
{
fn into_dyn_decoder(self: Arc<Self>) -> Arc<dyn ArrayPartialDecoderTraits> {
self.clone()
}
fn erase(&self) -> Result<(), super::CodecError> {
self.input_output_handle.erase()
}
fn partial_encode(
&self,
indexer: &dyn Indexer,
bytes: &ArrayBytes<'_>,
options: &super::CodecOptions,
) -> Result<(), super::CodecError> {
let chunk_shape = self.decoded_representation.shape_u64();
let chunk_bytes = self.input_output_handle.decode(options)?;
let mut chunk_bytes = if let Some(chunk_bytes) = chunk_bytes {
self.codec.decode(
chunk_bytes,
self.decoded_representation.shape(),
self.decoded_representation.data_type(),
self.decoded_representation.fill_value(),
options,
)?
} else {
ArrayBytes::new_fill_value(
self.decoded_representation.data_type(),
self.decoded_representation.num_elements(),
self.decoded_representation.fill_value(),
)?
};
chunk_bytes.validate(
self.decoded_representation.num_elements(),
self.decoded_representation.data_type(),
)?;
bytes.validate(indexer.len(), self.decoded_representation.data_type())?;
chunk_bytes = update_array_bytes(
chunk_bytes,
chunk_shape,
indexer,
bytes,
self.decoded_representation.data_type().size(),
)?;
self.input_output_handle.erase()?;
let is_fill_value = !options.store_empty_chunks()
&& chunk_bytes.is_fill_value(self.decoded_representation.fill_value());
if is_fill_value {
Ok(())
} else {
let chunk_bytes = self.codec.encode(
chunk_bytes,
self.decoded_representation.shape(),
self.decoded_representation.data_type(),
self.decoded_representation.fill_value(),
options,
)?;
self.input_output_handle
.partial_encode(0, chunk_bytes, options)
}
}
fn supports_partial_encode(&self) -> bool {
false
}
}
impl<T: ?Sized> BytesPartialDecoderTraits
for CodecPartialDefault<T, BytesRepresentation, dyn BytesToBytesCodecTraits>
where
T: BytesPartialDecoderTraits,
{
fn exists(&self) -> Result<bool, StorageError> {
self.input_output_handle.exists()
}
fn size_held(&self) -> usize {
self.input_output_handle.size_held()
}
fn partial_decode_many(
&self,
decoded_regions: ByteRangeIterator,
options: &CodecOptions,
) -> Result<Option<Vec<ArrayBytesRaw<'_>>>, CodecError> {
let encoded_value = self.input_output_handle.decode(options)?;
let Some(encoded_value) = encoded_value else {
return Ok(None);
};
let decoded_value = self
.codec
.decode(encoded_value, &self.decoded_representation, options)?
.into_owned();
Ok(Some(
extract_byte_ranges(&decoded_value, decoded_regions)
.map_err(CodecError::InvalidByteRangeError)?
.into_iter()
.map(Cow::Owned)
.collect(),
))
}
fn supports_partial_decode(&self) -> bool {
false
}
}
impl<T: ?Sized> BytesPartialEncoderTraits
for CodecPartialDefault<T, BytesRepresentation, dyn BytesToBytesCodecTraits>
where
T: BytesPartialEncoderTraits,
{
fn into_dyn_decoder(self: Arc<Self>) -> Arc<dyn BytesPartialDecoderTraits> {
self.clone()
}
fn erase(&self) -> Result<(), super::CodecError> {
self.input_output_handle.erase()
}
fn partial_encode_many(
&self,
offset_values: OffsetBytesIterator<ArrayBytesRaw<'_>>,
options: &super::CodecOptions,
) -> Result<(), super::CodecError> {
let encoded_value = self
.input_output_handle
.decode(options)?
.map(Cow::into_owned);
let mut decoded_value = if let Some(encoded_value) = encoded_value {
self.codec
.decode(
Cow::Owned(encoded_value),
&self.decoded_representation,
options,
)?
.into_owned()
} else {
vec![]
};
for (offset, value) in offset_values {
let offset = usize::try_from(offset).unwrap();
if decoded_value.len() < offset + value.len() {
decoded_value.resize(offset + value.len(), 0);
}
decoded_value[offset..offset + value.len()].copy_from_slice(&value);
}
let bytes_encoded = self
.codec
.encode(Cow::Owned(decoded_value), options)?
.into_owned();
self.input_output_handle
.partial_encode(0, Cow::Owned(bytes_encoded), options)
}
fn supports_partial_encode(&self) -> bool {
false
}
}
#[cfg(feature = "async")]
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
impl<T: ?Sized> AsyncArrayPartialDecoderTraits
for CodecPartialDefault<T, ArrayDecodedRepresentation, dyn ArrayToArrayCodecTraits>
where
T: AsyncArrayPartialDecoderTraits,
{
async fn exists(&self) -> Result<bool, StorageError> {
self.input_output_handle.exists().await
}
fn size_held(&self) -> usize {
self.input_output_handle.size_held()
}
fn data_type(&self) -> &super::DataType {
self.decoded_representation.data_type()
}
async fn partial_decode<'a>(
&'a self,
indexer: &dyn Indexer,
options: &super::CodecOptions,
) -> Result<ArrayBytes<'a>, super::CodecError> {
let output_shape: Result<Vec<NonZeroU64>, _> = indexer
.output_shape()
.iter()
.map(|f| NonZeroU64::try_from(*f))
.collect();
let chunk_bytes = self
.input_output_handle
.partial_decode(indexer, options)
.await?;
if let Ok(shape) = output_shape {
let shape = ChunkShape::from(shape);
self.codec
.decode(
chunk_bytes,
&shape,
self.decoded_representation.data_type(),
self.decoded_representation.fill_value(),
options,
)
.map(ArrayBytes::into_owned)
} else {
Ok(match self.decoded_representation.data_type().size() {
DataTypeSize::Fixed(_) => ArrayBytes::new_flen(vec![]),
DataTypeSize::Variable => {
ArrayBytes::new_vlen(vec![], ArrayBytesOffsets::new(vec![0]).unwrap()).unwrap()
}
})
}
}
fn supports_partial_decode(&self) -> bool {
self.input_output_handle.supports_partial_decode()
}
}
#[cfg(feature = "async")]
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
impl<T: ?Sized> AsyncArrayPartialEncoderTraits
for CodecPartialDefault<T, ArrayDecodedRepresentation, dyn ArrayToArrayCodecTraits>
where
T: AsyncArrayPartialEncoderTraits,
{
fn into_dyn_decoder(self: Arc<Self>) -> Arc<dyn AsyncArrayPartialDecoderTraits> {
self.clone()
}
async fn erase(&self) -> Result<(), super::CodecError> {
self.input_output_handle.erase().await
}
async fn partial_encode(
&self,
indexer: &dyn Indexer,
bytes: &ArrayBytes<'_>,
options: &super::CodecOptions,
) -> Result<(), super::CodecError> {
let chunk_shape = self.decoded_representation.shape_u64();
let array_subset_all = ArraySubset::new_with_shape(chunk_shape.to_vec());
let encoded_value = self
.input_output_handle
.partial_decode(&array_subset_all, options)
.await?;
let mut decoded_value = self.codec.decode(
encoded_value,
self.decoded_representation.shape(),
self.decoded_representation.data_type(),
self.decoded_representation.fill_value(),
options,
)?;
decoded_value.validate(
self.decoded_representation.num_elements(),
self.decoded_representation.data_type(),
)?;
bytes.validate(indexer.len(), self.decoded_representation.data_type())?;
decoded_value = update_array_bytes(
decoded_value,
chunk_shape,
indexer,
bytes,
self.decoded_representation.data_type().size(),
)?;
self.input_output_handle.erase().await?;
let is_fill_value = !options.store_empty_chunks()
&& decoded_value.is_fill_value(self.decoded_representation.fill_value());
if is_fill_value {
Ok(())
} else {
let encoded_value = self.codec.encode(
decoded_value,
self.decoded_representation.shape(),
self.decoded_representation.data_type(),
self.decoded_representation.fill_value(),
options,
)?;
self.input_output_handle
.partial_encode(&array_subset_all, &encoded_value, options)
.await
}
}
fn supports_partial_encode(&self) -> bool {
false
}
}
#[cfg(feature = "async")]
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
impl<T: ?Sized> AsyncArrayPartialDecoderTraits
for CodecPartialDefault<T, ArrayDecodedRepresentation, dyn ArrayToBytesCodecTraits>
where
T: AsyncBytesPartialDecoderTraits,
{
fn data_type(&self) -> &super::DataType {
self.decoded_representation.data_type()
}
async fn exists(&self) -> Result<bool, StorageError> {
self.input_output_handle.exists().await
}
fn size_held(&self) -> usize {
self.input_output_handle.size_held()
}
async fn partial_decode<'a>(
&'a self,
indexer: &dyn Indexer,
options: &super::CodecOptions,
) -> Result<ArrayBytes<'a>, super::CodecError> {
let bytes_enc = self.input_output_handle.decode(options).await?;
if let Some(bytes_enc) = bytes_enc {
let bytes_dec = self.codec.decode(
bytes_enc,
self.decoded_representation.shape(),
self.decoded_representation.data_type(),
self.decoded_representation.fill_value(),
options,
)?;
let chunk_shape = self.decoded_representation.shape_u64();
bytes_dec
.extract_array_subset(
indexer,
chunk_shape,
self.decoded_representation.data_type(),
)
.map(ArrayBytes::into_owned)
} else {
ArrayBytes::new_fill_value(
self.decoded_representation.data_type(),
indexer.len(),
self.decoded_representation.fill_value(),
)
.map_err(CodecError::from)
}
}
fn supports_partial_decode(&self) -> bool {
false
}
}
#[cfg(feature = "async")]
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
impl<T: ?Sized> AsyncArrayPartialEncoderTraits
for CodecPartialDefault<T, ArrayDecodedRepresentation, dyn ArrayToBytesCodecTraits>
where
T: AsyncBytesPartialEncoderTraits,
{
fn into_dyn_decoder(self: Arc<Self>) -> Arc<dyn AsyncArrayPartialDecoderTraits> {
self.clone()
}
async fn erase(&self) -> Result<(), super::CodecError> {
self.input_output_handle.erase().await
}
async fn partial_encode(
&self,
indexer: &dyn Indexer,
bytes: &ArrayBytes<'_>,
options: &super::CodecOptions,
) -> Result<(), super::CodecError> {
let chunk_shape = self.decoded_representation.shape_u64();
let chunk_bytes = self.input_output_handle.decode(options).await?;
let mut chunk_bytes = if let Some(chunk_bytes) = chunk_bytes {
self.codec.decode(
chunk_bytes,
self.decoded_representation.shape(),
self.decoded_representation.data_type(),
self.decoded_representation.fill_value(),
options,
)?
} else {
ArrayBytes::new_fill_value(
self.decoded_representation.data_type(),
self.decoded_representation.num_elements(),
self.decoded_representation.fill_value(),
)?
};
chunk_bytes.validate(
self.decoded_representation.num_elements(),
self.decoded_representation.data_type(),
)?;
bytes.validate(indexer.len(), self.decoded_representation.data_type())?;
chunk_bytes = update_array_bytes(
chunk_bytes,
chunk_shape,
indexer,
bytes,
self.decoded_representation.data_type().size(),
)?;
self.input_output_handle.erase().await?;
let is_fill_value = !options.store_empty_chunks()
&& chunk_bytes.is_fill_value(self.decoded_representation.fill_value());
if is_fill_value {
Ok(())
} else {
let chunk_bytes = self.codec.encode(
chunk_bytes,
self.decoded_representation.shape(),
self.decoded_representation.data_type(),
self.decoded_representation.fill_value(),
options,
)?;
self.input_output_handle
.partial_encode(0, chunk_bytes, options)
.await
}
}
fn supports_partial_encode(&self) -> bool {
false
}
}
#[cfg(feature = "async")]
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
impl<T: ?Sized> AsyncBytesPartialDecoderTraits
for CodecPartialDefault<T, BytesRepresentation, dyn BytesToBytesCodecTraits>
where
T: AsyncBytesPartialDecoderTraits,
{
async fn exists(&self) -> Result<bool, StorageError> {
self.input_output_handle.exists().await
}
fn size_held(&self) -> usize {
self.input_output_handle.size_held()
}
async fn partial_decode_many<'a>(
&'a self,
decoded_regions: ByteRangeIterator<'a>,
options: &CodecOptions,
) -> Result<Option<Vec<ArrayBytesRaw<'a>>>, CodecError> {
let encoded_value = self.input_output_handle.decode(options).await?;
let Some(encoded_value) = encoded_value else {
return Ok(None);
};
let decoded_value = self
.codec
.decode(encoded_value, &self.decoded_representation, options)?
.into_owned();
Ok(Some(
extract_byte_ranges(&decoded_value, decoded_regions)
.map_err(CodecError::InvalidByteRangeError)?
.into_iter()
.map(Cow::Owned)
.collect(),
))
}
fn supports_partial_decode(&self) -> bool {
false
}
}
#[cfg(feature = "async")]
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
impl<T: ?Sized> AsyncBytesPartialEncoderTraits
for CodecPartialDefault<T, BytesRepresentation, dyn BytesToBytesCodecTraits>
where
T: AsyncBytesPartialEncoderTraits,
{
fn into_dyn_decoder(self: Arc<Self>) -> Arc<dyn AsyncBytesPartialDecoderTraits> {
self.clone()
}
async fn erase(&self) -> Result<(), super::CodecError> {
self.input_output_handle.erase().await
}
async fn partial_encode_many<'a>(
&'a self,
offset_values: OffsetBytesIterator<'a, ArrayBytesRaw<'_>>,
options: &super::CodecOptions,
) -> Result<(), super::CodecError> {
let encoded_value = self
.input_output_handle
.decode(options)
.await?
.map(Cow::into_owned);
let mut decoded_value = if let Some(encoded_value) = encoded_value {
self.codec
.decode(
Cow::Owned(encoded_value),
&self.decoded_representation,
options,
)?
.into_owned()
} else {
vec![]
};
for (offset, value) in offset_values {
let offset = usize::try_from(offset).unwrap();
if decoded_value.len() < offset + value.len() {
decoded_value.resize(offset + value.len(), 0);
}
decoded_value[offset..offset + value.len()].copy_from_slice(&value);
}
let bytes_encoded = self
.codec
.encode(Cow::Owned(decoded_value), options)?
.into_owned();
self.input_output_handle
.partial_encode(0, Cow::Owned(bytes_encoded), options)
.await
}
fn supports_partial_encode(&self) -> bool {
false
}
}