use std::borrow::Cow;
use std::sync::Arc;
use num::Integer;
use zarrs_plugin::ZarrVersion;
use super::{CHECKSUM_SIZE, Fletcher32CodecConfiguration, Fletcher32CodecConfigurationV1};
#[cfg(feature = "async")]
use crate::array::codec::bytes_to_bytes::strip_suffix_partial_decoder::AsyncStripSuffixPartialDecoder;
use crate::array::codec::bytes_to_bytes::strip_suffix_partial_decoder::StripSuffixPartialDecoder;
use crate::array::{ArrayBytesRaw, BytesRepresentation};
#[cfg(feature = "async")]
use zarrs_codec::AsyncBytesPartialDecoderTraits;
use zarrs_codec::{
BytesPartialDecoderTraits, BytesToBytesCodecTraits, CodecError, CodecMetadataOptions,
CodecOptions, CodecTraits, PartialDecoderCapability, PartialEncoderCapability,
RecommendedConcurrency,
};
use zarrs_metadata::Configuration;
#[derive(Clone, Debug, Default)]
pub struct Fletcher32Codec;
impl Fletcher32Codec {
#[must_use]
pub const fn new() -> Self {
Self {}
}
#[must_use]
pub const fn new_with_configuration(_configuration: &Fletcher32CodecConfiguration) -> Self {
Self {}
}
}
impl CodecTraits for Fletcher32Codec {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn configuration(
&self,
_version: ZarrVersion,
_options: &CodecMetadataOptions,
) -> Option<Configuration> {
let configuration = Fletcher32CodecConfiguration::V1(Fletcher32CodecConfigurationV1 {});
Some(configuration.into())
}
fn partial_decoder_capability(&self) -> PartialDecoderCapability {
PartialDecoderCapability {
partial_read: false, partial_decode: false, }
}
fn partial_encoder_capability(&self) -> PartialEncoderCapability {
PartialEncoderCapability {
partial_encode: false,
}
}
}
fn h5_checksum_fletcher32(data: &[u8]) -> u32 {
let mut len = data.len() / 2;
let mut sum1: u32 = 0;
let mut sum2: u32 = 0;
let mut data_idx = 0;
while len > 0 {
let tlen = len.min(360);
len -= tlen;
for _ in 0..tlen {
sum1 += u32::from((u16::from(data[data_idx]) << 8u16) | u16::from(data[data_idx + 1]));
data_idx += 2;
sum2 += sum1;
}
sum1 = (sum1 & 0xffff) + (sum1 >> 16);
sum2 = (sum2 & 0xffff) + (sum2 >> 16);
}
if len.is_odd() {
sum1 += u32::from(u16::from(data[data_idx]) << 8);
sum2 += sum1;
sum1 = (sum1 & 0xffff) + (sum1 >> 16);
sum2 = (sum2 & 0xffff) + (sum2 >> 16);
}
sum1 = (sum1 & 0xffff) + (sum1 >> 16);
sum2 = (sum2 & 0xffff) + (sum2 >> 16);
(sum2 << 16) | sum1
}
#[cfg_attr(
all(feature = "async", not(target_arch = "wasm32")),
async_trait::async_trait
)]
#[cfg_attr(all(feature = "async", target_arch = "wasm32"), async_trait::async_trait(?Send))]
impl BytesToBytesCodecTraits for Fletcher32Codec {
fn into_dyn(self: Arc<Self>) -> Arc<dyn BytesToBytesCodecTraits> {
self as Arc<dyn BytesToBytesCodecTraits>
}
fn recommended_concurrency(
&self,
_decoded_representation: &BytesRepresentation,
) -> Result<RecommendedConcurrency, CodecError> {
Ok(RecommendedConcurrency::new_maximum(1))
}
fn encode<'a>(
&self,
decoded_value: ArrayBytesRaw<'a>,
_options: &CodecOptions,
) -> Result<ArrayBytesRaw<'a>, CodecError> {
let checksum = h5_checksum_fletcher32(&decoded_value).to_le_bytes();
let mut encoded_value: Vec<u8> = Vec::with_capacity(decoded_value.len() + checksum.len());
encoded_value.extend_from_slice(&decoded_value);
encoded_value.extend_from_slice(&checksum);
Ok(Cow::Owned(encoded_value))
}
fn decode<'a>(
&self,
encoded_value: ArrayBytesRaw<'a>,
_decoded_representation: &BytesRepresentation,
options: &CodecOptions,
) -> Result<ArrayBytesRaw<'a>, CodecError> {
if encoded_value.len() >= CHECKSUM_SIZE {
if options.validate_checksums() {
let decoded_value = &encoded_value[..encoded_value.len() - CHECKSUM_SIZE];
let checksum = h5_checksum_fletcher32(decoded_value).to_le_bytes();
let checksum_stored: [u8; CHECKSUM_SIZE] = encoded_value
[encoded_value.len() - CHECKSUM_SIZE..]
.try_into()
.unwrap();
if checksum != checksum_stored {
return Err(CodecError::InvalidChecksum);
}
}
let mut decoded_value = encoded_value.into_owned();
decoded_value.truncate(decoded_value.len() - CHECKSUM_SIZE);
Ok(Cow::Owned(decoded_value))
} else {
Err(CodecError::Other(
"fletcher32 decoder expects a 32 bit input".to_string(),
))
}
}
fn partial_decoder(
self: Arc<Self>,
input_handle: Arc<dyn BytesPartialDecoderTraits>,
_decoded_representation: &BytesRepresentation,
_options: &CodecOptions,
) -> Result<Arc<dyn BytesPartialDecoderTraits>, CodecError> {
Ok(Arc::new(StripSuffixPartialDecoder::new(
input_handle,
CHECKSUM_SIZE,
)))
}
#[cfg(feature = "async")]
async fn async_partial_decoder(
self: Arc<Self>,
input_handle: Arc<dyn AsyncBytesPartialDecoderTraits>,
_decoded_representation: &BytesRepresentation,
_options: &CodecOptions,
) -> Result<Arc<dyn AsyncBytesPartialDecoderTraits>, CodecError> {
Ok(Arc::new(AsyncStripSuffixPartialDecoder::new(
input_handle,
CHECKSUM_SIZE,
)))
}
fn encoded_representation(
&self,
decoded_representation: &BytesRepresentation,
) -> BytesRepresentation {
match decoded_representation {
BytesRepresentation::FixedSize(size) => {
BytesRepresentation::FixedSize(size + CHECKSUM_SIZE as u64)
}
BytesRepresentation::BoundedSize(size) => {
BytesRepresentation::BoundedSize(size + CHECKSUM_SIZE as u64)
}
BytesRepresentation::UnboundedSize => BytesRepresentation::UnboundedSize,
}
}
}