#![expect(clippy::multiple_crate_versions)]
use std::{borrow::Cow, fmt, num::NonZeroUsize};
use ndarray::{Array, Array1, ArrayBase, ArrayViewMut, Data, Dimension, ShapeError};
use numcodecs::{
AnyArray, AnyArrayAssignError, AnyArrayDType, AnyArrayView, AnyArrayViewMut, AnyCowArray,
Codec, StaticCodec, StaticCodecConfig, StaticCodecVersion,
};
use schemars::{JsonSchema, JsonSchema_repr};
use serde::{Deserialize, Serialize};
use serde_repr::{Deserialize_repr, Serialize_repr};
use thiserror::Error;
#[cfg(test)]
use ::serde_json as _;
type PcodecVersion = StaticCodecVersion<0, 1, 0>;
#[derive(Clone, Serialize, Deserialize, JsonSchema)]
#[schemars(deny_unknown_fields)] pub struct Pcodec {
pub level: PcoCompressionLevel,
#[serde(flatten)]
pub mode: PcoModeSpec,
#[serde(flatten)]
pub delta: PcoDeltaSpec,
#[serde(flatten)]
pub paging: PcoPagingSpec,
#[serde(default, rename = "_version")]
pub version: PcodecVersion,
}
#[derive(
Copy, Clone, Debug, Default, PartialEq, Eq, Serialize_repr, Deserialize_repr, JsonSchema_repr,
)]
#[repr(u8)]
#[expect(missing_docs)]
pub enum PcoCompressionLevel {
Level0 = 0,
Level1 = 1,
Level2 = 2,
Level3 = 3,
Level4 = 4,
Level5 = 5,
Level6 = 6,
Level7 = 7,
#[default]
Level8 = 8,
Level9 = 9,
Level10 = 10,
Level11 = 11,
Level12 = 12,
}
#[derive(Copy, Clone, Debug, Default, PartialEq, Serialize, Deserialize, JsonSchema)]
#[schemars(deny_unknown_fields)] #[serde(tag = "mode", rename_all = "kebab-case")]
pub enum PcoModeSpec {
#[default]
Auto,
Classic,
TryFloatMult {
float_mult_base: f64,
},
TryFloatQuant {
float_quant_bits: u32,
},
TryIntMult {
int_mult_base: u64,
},
}
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
#[schemars(deny_unknown_fields)] #[serde(tag = "delta", rename_all = "kebab-case")]
pub enum PcoDeltaSpec {
#[default]
Auto,
None,
TryConsecutive {
delta_encoding_order: PcoDeltaEncodingOrder,
},
TryLookback,
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize_repr, Deserialize_repr, JsonSchema_repr)]
#[repr(u8)]
#[expect(missing_docs)]
pub enum PcoDeltaEncodingOrder {
Order0 = 0,
Order1 = 1,
Order2 = 2,
Order3 = 3,
Order4 = 4,
Order5 = 5,
Order6 = 6,
Order7 = 7,
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
#[schemars(deny_unknown_fields)] #[serde(tag = "paging", rename_all = "kebab-case")]
pub enum PcoPagingSpec {
EqualPagesUpTo {
#[serde(default = "default_equal_pages_up_to")]
equal_pages_up_to: NonZeroUsize,
},
}
impl Default for PcoPagingSpec {
fn default() -> Self {
Self::EqualPagesUpTo {
equal_pages_up_to: default_equal_pages_up_to(),
}
}
}
const fn default_equal_pages_up_to() -> NonZeroUsize {
NonZeroUsize::MIN.saturating_add(pco::DEFAULT_MAX_PAGE_N.saturating_sub(1))
}
impl Codec for Pcodec {
type Error = PcodecError;
fn encode(&self, data: AnyCowArray) -> Result<AnyArray, Self::Error> {
match data {
AnyCowArray::U16(data) => Ok(AnyArray::U8(
Array1::from(compress(
data,
self.level,
self.mode,
self.delta,
self.paging,
)?)
.into_dyn(),
)),
AnyCowArray::U32(data) => Ok(AnyArray::U8(
Array1::from(compress(
data,
self.level,
self.mode,
self.delta,
self.paging,
)?)
.into_dyn(),
)),
AnyCowArray::U64(data) => Ok(AnyArray::U8(
Array1::from(compress(
data,
self.level,
self.mode,
self.delta,
self.paging,
)?)
.into_dyn(),
)),
AnyCowArray::I16(data) => Ok(AnyArray::U8(
Array1::from(compress(
data,
self.level,
self.mode,
self.delta,
self.paging,
)?)
.into_dyn(),
)),
AnyCowArray::I32(data) => Ok(AnyArray::U8(
Array1::from(compress(
data,
self.level,
self.mode,
self.delta,
self.paging,
)?)
.into_dyn(),
)),
AnyCowArray::I64(data) => Ok(AnyArray::U8(
Array1::from(compress(
data,
self.level,
self.mode,
self.delta,
self.paging,
)?)
.into_dyn(),
)),
AnyCowArray::F32(data) => Ok(AnyArray::U8(
Array1::from(compress(
data,
self.level,
self.mode,
self.delta,
self.paging,
)?)
.into_dyn(),
)),
AnyCowArray::F64(data) => Ok(AnyArray::U8(
Array1::from(compress(
data,
self.level,
self.mode,
self.delta,
self.paging,
)?)
.into_dyn(),
)),
encoded => Err(PcodecError::UnsupportedDtype(encoded.dtype())),
}
}
fn decode(&self, encoded: AnyCowArray) -> Result<AnyArray, Self::Error> {
let AnyCowArray::U8(encoded) = encoded else {
return Err(PcodecError::EncodedDataNotBytes {
dtype: encoded.dtype(),
});
};
if !matches!(encoded.shape(), [_]) {
return Err(PcodecError::EncodedDataNotOneDimensional {
shape: encoded.shape().to_vec(),
});
}
decompress(&AnyCowArray::U8(encoded).as_bytes())
}
fn decode_into(
&self,
encoded: AnyArrayView,
decoded: AnyArrayViewMut,
) -> Result<(), Self::Error> {
let AnyArrayView::U8(encoded) = encoded else {
return Err(PcodecError::EncodedDataNotBytes {
dtype: encoded.dtype(),
});
};
if !matches!(encoded.shape(), [_]) {
return Err(PcodecError::EncodedDataNotOneDimensional {
shape: encoded.shape().to_vec(),
});
}
let encoded = AnyArrayView::U8(encoded);
let encoded = encoded.as_bytes();
match decoded {
AnyArrayViewMut::U16(decoded) => decompress_into(&encoded, decoded),
AnyArrayViewMut::U32(decoded) => decompress_into(&encoded, decoded),
AnyArrayViewMut::U64(decoded) => decompress_into(&encoded, decoded),
AnyArrayViewMut::I16(decoded) => decompress_into(&encoded, decoded),
AnyArrayViewMut::I32(decoded) => decompress_into(&encoded, decoded),
AnyArrayViewMut::I64(decoded) => decompress_into(&encoded, decoded),
AnyArrayViewMut::F32(decoded) => decompress_into(&encoded, decoded),
AnyArrayViewMut::F64(decoded) => decompress_into(&encoded, decoded),
decoded => Err(PcodecError::UnsupportedDtype(decoded.dtype())),
}
}
}
impl StaticCodec for Pcodec {
const CODEC_ID: &'static str = "pco.rs";
type Config<'de> = Self;
fn from_config(config: Self::Config<'_>) -> Self {
config
}
fn get_config(&self) -> StaticCodecConfig<'_, Self> {
StaticCodecConfig::from(self)
}
}
#[derive(Debug, Error)]
pub enum PcodecError {
#[error("Pco does not support the dtype {0}")]
UnsupportedDtype(AnyArrayDType),
#[error("Pco failed to encode the header")]
HeaderEncodeFailed {
source: PcoHeaderError,
},
#[error("Pco failed to encode the data")]
PcoEncodeFailed {
source: PcoCodingError,
},
#[error(
"Pco can only decode one-dimensional byte arrays but received an array of dtype {dtype}"
)]
EncodedDataNotBytes {
dtype: AnyArrayDType,
},
#[error(
"Pco can only decode one-dimensional byte arrays but received a byte array of shape {shape:?}"
)]
EncodedDataNotOneDimensional {
shape: Vec<usize>,
},
#[error("Pco failed to decode the header")]
HeaderDecodeFailed {
source: PcoHeaderError,
},
#[error("Pco failed to decode the data")]
PcoDecodeFailed {
source: PcoCodingError,
},
#[error("Pco decoded an invalid array shape header which does not fit the decoded data")]
DecodeInvalidShapeHeader {
#[from]
source: ShapeError,
},
#[error("Pco cannot decode into the provided array")]
MismatchedDecodeIntoArray {
#[from]
source: AnyArrayAssignError,
},
}
#[derive(Debug, Error)]
#[error(transparent)]
pub struct PcoHeaderError(postcard::Error);
#[derive(Debug, Error)]
#[error(transparent)]
pub struct PcoCodingError(pco::errors::PcoError);
#[expect(clippy::needless_pass_by_value)]
pub fn compress<T: PcoElement, S: Data<Elem = T>, D: Dimension>(
data: ArrayBase<S, D>,
level: PcoCompressionLevel,
mode: PcoModeSpec,
delta: PcoDeltaSpec,
paging: PcoPagingSpec,
) -> Result<Vec<u8>, PcodecError> {
let mut encoded_bytes = postcard::to_extend(
&CompressionHeader {
dtype: <T as PcoElement>::DTYPE,
shape: Cow::Borrowed(data.shape()),
version: StaticCodecVersion,
},
Vec::new(),
)
.map_err(|err| PcodecError::HeaderEncodeFailed {
source: PcoHeaderError(err),
})?;
let data_owned;
#[expect(clippy::option_if_let_else)]
let data = if let Some(slice) = data.as_slice() {
slice
} else {
data_owned = data.into_iter().copied().collect::<Vec<T>>();
data_owned.as_slice()
};
let config = pco::ChunkConfig::default()
.with_compression_level(level as usize)
.with_mode_spec(match mode {
PcoModeSpec::Auto => pco::ModeSpec::Auto,
PcoModeSpec::Classic => pco::ModeSpec::Classic,
PcoModeSpec::TryFloatMult { float_mult_base } => {
pco::ModeSpec::TryFloatMult(float_mult_base)
}
PcoModeSpec::TryFloatQuant { float_quant_bits } => {
pco::ModeSpec::TryFloatQuant(float_quant_bits)
}
PcoModeSpec::TryIntMult { int_mult_base } => pco::ModeSpec::TryIntMult(int_mult_base),
})
.with_delta_spec(match delta {
PcoDeltaSpec::Auto => pco::DeltaSpec::Auto,
PcoDeltaSpec::None => pco::DeltaSpec::None,
PcoDeltaSpec::TryConsecutive {
delta_encoding_order,
} => pco::DeltaSpec::TryConsecutive(delta_encoding_order as usize),
PcoDeltaSpec::TryLookback => pco::DeltaSpec::TryLookback,
})
.with_paging_spec(match paging {
PcoPagingSpec::EqualPagesUpTo { equal_pages_up_to } => {
pco::PagingSpec::EqualPagesUpTo(equal_pages_up_to.get())
}
});
let encoded = pco::standalone::simple_compress(data, &config).map_err(|err| {
PcodecError::PcoEncodeFailed {
source: PcoCodingError(err),
}
})?;
encoded_bytes.extend_from_slice(&encoded);
Ok(encoded_bytes)
}
pub fn decompress(encoded: &[u8]) -> Result<AnyArray, PcodecError> {
let (header, data) =
postcard::take_from_bytes::<CompressionHeader>(encoded).map_err(|err| {
PcodecError::HeaderDecodeFailed {
source: PcoHeaderError(err),
}
})?;
let decoded = match header.dtype {
PcoDType::U16 => AnyArray::U16(Array::from_shape_vec(
&*header.shape,
pco::standalone::simple_decompress(data).map_err(|err| {
PcodecError::PcoDecodeFailed {
source: PcoCodingError(err),
}
})?,
)?),
PcoDType::U32 => AnyArray::U32(Array::from_shape_vec(
&*header.shape,
pco::standalone::simple_decompress(data).map_err(|err| {
PcodecError::PcoDecodeFailed {
source: PcoCodingError(err),
}
})?,
)?),
PcoDType::U64 => AnyArray::U64(Array::from_shape_vec(
&*header.shape,
pco::standalone::simple_decompress(data).map_err(|err| {
PcodecError::PcoDecodeFailed {
source: PcoCodingError(err),
}
})?,
)?),
PcoDType::I16 => AnyArray::I16(Array::from_shape_vec(
&*header.shape,
pco::standalone::simple_decompress(data).map_err(|err| {
PcodecError::PcoDecodeFailed {
source: PcoCodingError(err),
}
})?,
)?),
PcoDType::I32 => AnyArray::I32(Array::from_shape_vec(
&*header.shape,
pco::standalone::simple_decompress(data).map_err(|err| {
PcodecError::PcoDecodeFailed {
source: PcoCodingError(err),
}
})?,
)?),
PcoDType::I64 => AnyArray::I64(Array::from_shape_vec(
&*header.shape,
pco::standalone::simple_decompress(data).map_err(|err| {
PcodecError::PcoDecodeFailed {
source: PcoCodingError(err),
}
})?,
)?),
PcoDType::F32 => AnyArray::F32(Array::from_shape_vec(
&*header.shape,
pco::standalone::simple_decompress(data).map_err(|err| {
PcodecError::PcoDecodeFailed {
source: PcoCodingError(err),
}
})?,
)?),
PcoDType::F64 => AnyArray::F64(Array::from_shape_vec(
&*header.shape,
pco::standalone::simple_decompress(data).map_err(|err| {
PcodecError::PcoDecodeFailed {
source: PcoCodingError(err),
}
})?,
)?),
};
Ok(decoded)
}
pub fn decompress_into<T: PcoElement, D: Dimension>(
encoded: &[u8],
mut decoded: ArrayViewMut<T, D>,
) -> Result<(), PcodecError> {
let (header, data) =
postcard::take_from_bytes::<CompressionHeader>(encoded).map_err(|err| {
PcodecError::HeaderDecodeFailed {
source: PcoHeaderError(err),
}
})?;
if T::DTYPE != header.dtype {
return Err(PcodecError::MismatchedDecodeIntoArray {
source: AnyArrayAssignError::DTypeMismatch {
src: header.dtype.into_dtype(),
dst: T::DTYPE.into_dtype(),
},
});
}
if decoded.shape() != &*header.shape {
return Err(PcodecError::MismatchedDecodeIntoArray {
source: AnyArrayAssignError::ShapeMismatch {
src: header.shape.into_owned(),
dst: decoded.shape().to_vec(),
},
});
}
if let Some(slice) = decoded.as_slice_mut() {
pco::standalone::simple_decompress_into(data, slice).map_err(|err| {
PcodecError::PcoDecodeFailed {
source: PcoCodingError(err),
}
})?;
return Ok(());
}
let dec =
pco::standalone::simple_decompress(data).map_err(|err| PcodecError::PcoDecodeFailed {
source: PcoCodingError(err),
})?;
if dec.len() != decoded.len() {
return Err(PcodecError::DecodeInvalidShapeHeader {
source: ShapeError::from_kind(ndarray::ErrorKind::IncompatibleShape),
});
}
decoded.iter_mut().zip(dec).for_each(|(o, d)| *o = d);
Ok(())
}
pub trait PcoElement: Copy + pco::data_types::Number {
const DTYPE: PcoDType;
}
impl PcoElement for u16 {
const DTYPE: PcoDType = PcoDType::U16;
}
impl PcoElement for u32 {
const DTYPE: PcoDType = PcoDType::U32;
}
impl PcoElement for u64 {
const DTYPE: PcoDType = PcoDType::U64;
}
impl PcoElement for i16 {
const DTYPE: PcoDType = PcoDType::I16;
}
impl PcoElement for i32 {
const DTYPE: PcoDType = PcoDType::I32;
}
impl PcoElement for i64 {
const DTYPE: PcoDType = PcoDType::I64;
}
impl PcoElement for f32 {
const DTYPE: PcoDType = PcoDType::F32;
}
impl PcoElement for f64 {
const DTYPE: PcoDType = PcoDType::F64;
}
#[derive(Serialize, Deserialize)]
struct CompressionHeader<'a> {
dtype: PcoDType,
#[serde(borrow)]
shape: Cow<'a, [usize]>,
version: PcodecVersion,
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[expect(missing_docs)]
pub enum PcoDType {
#[serde(rename = "u16", alias = "uint16")]
U16,
#[serde(rename = "u32", alias = "uint32")]
U32,
#[serde(rename = "u64", alias = "uint64")]
U64,
#[serde(rename = "i16", alias = "int16")]
I16,
#[serde(rename = "i32", alias = "int32")]
I32,
#[serde(rename = "i64", alias = "int64")]
I64,
#[serde(rename = "f32", alias = "float32")]
F32,
#[serde(rename = "f64", alias = "float64")]
F64,
}
impl PcoDType {
#[must_use]
pub const fn into_dtype(self) -> AnyArrayDType {
match self {
Self::U16 => AnyArrayDType::U16,
Self::U32 => AnyArrayDType::U32,
Self::U64 => AnyArrayDType::U64,
Self::I16 => AnyArrayDType::I16,
Self::I32 => AnyArrayDType::I32,
Self::I64 => AnyArrayDType::I64,
Self::F32 => AnyArrayDType::F32,
Self::F64 => AnyArrayDType::F64,
}
}
}
impl fmt::Display for PcoDType {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.write_str(match self {
Self::U16 => "u16",
Self::U32 => "u32",
Self::U64 => "u64",
Self::I16 => "i16",
Self::I32 => "i32",
Self::I64 => "i64",
Self::F32 => "f32",
Self::F64 => "f64",
})
}
}