use std::sync::Arc;
use zarrs_plugin::{ExtensionAliasesV3, PluginCreateError, ZarrVersion};
use super::{
BytesCodecConfiguration, BytesCodecConfigurationV1, BytesDataTypeExt, Endianness,
bytes_codec_partial,
};
use crate::array::{
ArrayBytes, ArrayBytesRaw, BytesRepresentation, ChunkShapeTraits, DataType, DataTypeSize,
FillValue,
};
use std::num::NonZeroU64;
use zarrs_codec::{
ArrayCodecTraits, ArrayPartialDecoderTraits, ArrayPartialEncoderTraits,
ArrayToBytesCodecTraits, BytesPartialDecoderTraits, BytesPartialEncoderTraits, CodecError,
CodecMetadataOptions, CodecOptions, CodecTraits, PartialDecoderCapability,
PartialEncoderCapability, RecommendedConcurrency,
};
#[cfg(feature = "async")]
use zarrs_codec::{
AsyncArrayPartialDecoderTraits, AsyncArrayPartialEncoderTraits, AsyncBytesPartialDecoderTraits,
AsyncBytesPartialEncoderTraits,
};
use zarrs_metadata::Configuration;
#[derive(Debug, Clone)]
pub struct BytesCodec {
endian: Option<Endianness>,
}
impl Default for BytesCodec {
fn default() -> Self {
Self::new(Some(Endianness::native()))
}
}
impl BytesCodec {
#[must_use]
pub const fn new(endian: Option<Endianness>) -> Self {
Self { endian }
}
#[must_use]
pub const fn little() -> Self {
Self::new(Some(Endianness::Little))
}
#[must_use]
pub const fn big() -> Self {
Self::new(Some(Endianness::Big))
}
pub fn new_with_configuration(
configuration: &BytesCodecConfiguration,
) -> Result<Self, PluginCreateError> {
match configuration {
BytesCodecConfiguration::V1(configuration) => Ok(Self::new(configuration.endian)),
_ => Err(PluginCreateError::Other(
"this bytes codec configuration variant is unsupported".to_string(),
)),
}
}
}
impl CodecTraits for BytesCodec {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn configuration(
&self,
_version: ZarrVersion,
_options: &CodecMetadataOptions,
) -> Option<Configuration> {
let configuration = BytesCodecConfiguration::V1(BytesCodecConfigurationV1 {
endian: self.endian,
});
Some(configuration.into())
}
fn partial_decoder_capability(&self) -> PartialDecoderCapability {
PartialDecoderCapability {
partial_read: true,
partial_decode: true,
}
}
fn partial_encoder_capability(&self) -> PartialEncoderCapability {
PartialEncoderCapability {
partial_encode: true,
}
}
}
impl ArrayCodecTraits for BytesCodec {
fn recommended_concurrency(
&self,
_shape: &[NonZeroU64],
_data_type: &DataType,
) -> Result<RecommendedConcurrency, CodecError> {
Ok(RecommendedConcurrency::new_maximum(1))
}
}
#[cfg_attr(
all(feature = "async", not(target_arch = "wasm32")),
async_trait::async_trait
)]
#[cfg_attr(all(feature = "async", target_arch = "wasm32"), async_trait::async_trait(?Send))]
impl ArrayToBytesCodecTraits for BytesCodec {
fn into_dyn(self: Arc<Self>) -> Arc<dyn ArrayToBytesCodecTraits> {
self as Arc<dyn ArrayToBytesCodecTraits>
}
fn encode<'a>(
&self,
bytes: ArrayBytes<'a>,
shape: &[NonZeroU64],
data_type: &DataType,
_fill_value: &FillValue,
_options: &CodecOptions,
) -> Result<ArrayBytesRaw<'a>, CodecError> {
if data_type.is_optional() {
return Err(CodecError::UnsupportedDataType(
data_type.clone(),
Self::aliases_v3().default_name.to_string(),
));
}
let num_elements = shape.iter().map(|d| d.get()).product::<u64>();
bytes.validate(num_elements, data_type)?;
let bytes = bytes.into_fixed()?;
let bytes_encoded = data_type.codec_bytes()?.encode(bytes, self.endian)?;
Ok(bytes_encoded)
}
fn decode<'a>(
&self,
bytes: ArrayBytesRaw<'a>,
shape: &[NonZeroU64],
data_type: &DataType,
_fill_value: &FillValue,
_options: &CodecOptions,
) -> Result<ArrayBytes<'a>, CodecError> {
if data_type.is_optional() {
return Err(CodecError::UnsupportedDataType(
data_type.clone(),
Self::aliases_v3().default_name.to_string(),
));
}
let bytes_decoded: ArrayBytes = data_type.codec_bytes()?.decode(bytes, self.endian)?.into();
let num_elements = shape.iter().map(|d| d.get()).product::<u64>();
bytes_decoded.validate(num_elements, data_type)?;
Ok(bytes_decoded)
}
fn partial_decoder(
self: Arc<Self>,
input_handle: Arc<dyn BytesPartialDecoderTraits>,
shape: &[NonZeroU64],
data_type: &DataType,
fill_value: &FillValue,
_options: &CodecOptions,
) -> Result<Arc<dyn ArrayPartialDecoderTraits>, CodecError> {
Ok(Arc::new(bytes_codec_partial::BytesCodecPartial::new(
input_handle,
shape,
data_type,
fill_value,
self.endian,
)))
}
fn partial_encoder(
self: Arc<Self>,
input_output_handle: Arc<dyn BytesPartialEncoderTraits>,
shape: &[NonZeroU64],
data_type: &DataType,
fill_value: &FillValue,
_options: &CodecOptions,
) -> Result<Arc<dyn ArrayPartialEncoderTraits>, CodecError> {
Ok(Arc::new(bytes_codec_partial::BytesCodecPartial::new(
input_output_handle,
shape,
data_type,
fill_value,
self.endian,
)))
}
#[cfg(feature = "async")]
async fn async_partial_decoder(
self: Arc<Self>,
input_handle: Arc<dyn AsyncBytesPartialDecoderTraits>,
shape: &[NonZeroU64],
data_type: &DataType,
fill_value: &FillValue,
_options: &CodecOptions,
) -> Result<Arc<dyn AsyncArrayPartialDecoderTraits>, CodecError> {
Ok(Arc::new(bytes_codec_partial::BytesCodecPartial::new(
input_handle,
shape,
data_type,
fill_value,
self.endian,
)))
}
#[cfg(feature = "async")]
async fn async_partial_encoder(
self: Arc<Self>,
input_output_handle: Arc<dyn AsyncBytesPartialEncoderTraits>,
shape: &[NonZeroU64],
data_type: &DataType,
fill_value: &FillValue,
_options: &CodecOptions,
) -> Result<Arc<dyn AsyncArrayPartialEncoderTraits>, CodecError> {
Ok(Arc::new(bytes_codec_partial::BytesCodecPartial::new(
input_output_handle,
shape,
data_type,
fill_value,
self.endian,
)))
}
fn encoded_representation(
&self,
shape: &[NonZeroU64],
data_type: &DataType,
_fill_value: &FillValue,
) -> Result<BytesRepresentation, CodecError> {
if data_type.is_optional() {
return Err(CodecError::UnsupportedDataType(
data_type.clone(),
Self::aliases_v3().default_name.to_string(),
));
}
match data_type.size() {
DataTypeSize::Variable => Err(CodecError::UnsupportedDataType(
data_type.clone(),
Self::aliases_v3().default_name.to_string(),
)),
DataTypeSize::Fixed(data_type_size) => Ok(BytesRepresentation::FixedSize(
shape.num_elements_u64() * data_type_size as u64,
)),
}
}
}