1use std::{borrow::Cow, fmt, num::NonZeroUsize};
21
22use ndarray::{Array, Array1, ArrayBase, ArrayViewMut, Data, Dimension, ShapeError};
23use numcodecs::{
24 AnyArray, AnyArrayAssignError, AnyArrayDType, AnyArrayView, AnyArrayViewMut, AnyCowArray,
25 Codec, StaticCodec, StaticCodecConfig,
26};
27use schemars::{JsonSchema, JsonSchema_repr};
28use serde::{Deserialize, Serialize};
29use serde_repr::{Deserialize_repr, Serialize_repr};
30use thiserror::Error;
31
32#[cfg(test)]
33use ::serde_json as _;
34
35#[derive(Clone, Serialize, Deserialize, JsonSchema)]
36#[schemars(deny_unknown_fields)] pub struct Pcodec {
39 pub level: PcoCompressionLevel,
42 #[serde(flatten)]
44 pub mode: PcoModeSpec,
45 #[serde(flatten)]
47 pub delta: PcoDeltaSpec,
48 #[serde(flatten)]
50 pub paging: PcoPagingSpec,
51}
52
53#[derive(
54 Copy, Clone, Debug, Default, PartialEq, Eq, Serialize_repr, Deserialize_repr, JsonSchema_repr,
55)]
56#[repr(u8)]
57#[allow(missing_docs)]
64pub enum PcoCompressionLevel {
65 Level0 = 0,
66 Level1 = 1,
67 Level2 = 2,
68 Level3 = 3,
69 Level4 = 4,
70 Level5 = 5,
71 Level6 = 6,
72 Level7 = 7,
73 #[default]
74 Level8 = 8,
75 Level9 = 9,
76 Level10 = 10,
77 Level11 = 11,
78 Level12 = 12,
79}
80
81#[derive(Copy, Clone, Debug, Default, PartialEq, Serialize, Deserialize, JsonSchema)]
82#[schemars(deny_unknown_fields)] #[serde(tag = "mode", rename_all = "kebab-case")]
84pub enum PcoModeSpec {
86 #[default]
87 Auto,
92 Classic,
94 TryFloatMult {
98 float_mult_base: f64,
100 },
101 TryFloatQuant {
106 float_quant_bits: u32,
108 },
109 TryIntMult {
113 int_mult_base: u64,
115 },
116}
117
118#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
119#[schemars(deny_unknown_fields)] #[serde(tag = "delta", rename_all = "kebab-case")]
121pub enum PcoDeltaSpec {
123 #[default]
124 Auto,
129 None,
134 TryConsecutive {
140 delta_encoding_order: PcoDeltaEncodingOrder,
142 },
143 TryLookback,
149}
150
151#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize_repr, Deserialize_repr, JsonSchema_repr)]
152#[repr(u8)]
153#[allow(missing_docs)]
157pub enum PcoDeltaEncodingOrder {
158 Order0 = 0,
159 Order1 = 1,
160 Order2 = 2,
161 Order3 = 3,
162 Order4 = 4,
163 Order5 = 5,
164 Order6 = 6,
165 Order7 = 7,
166}
167
168#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
169#[schemars(deny_unknown_fields)] #[serde(tag = "paging", rename_all = "kebab-case")]
171pub enum PcoPagingSpec {
173 EqualPagesUpTo {
178 #[serde(default = "default_equal_pages_up_to")]
179 equal_pages_up_to: NonZeroUsize,
181 },
182}
183
184impl Default for PcoPagingSpec {
185 fn default() -> Self {
186 Self::EqualPagesUpTo {
187 equal_pages_up_to: default_equal_pages_up_to(),
188 }
189 }
190}
191
192const fn default_equal_pages_up_to() -> NonZeroUsize {
193 NonZeroUsize::MIN.saturating_add(pco::DEFAULT_MAX_PAGE_N.saturating_sub(1))
194}
195
196impl Codec for Pcodec {
197 type Error = PcodecError;
198
199 fn encode(&self, data: AnyCowArray) -> Result<AnyArray, Self::Error> {
200 match data {
201 AnyCowArray::U16(data) => Ok(AnyArray::U8(
202 Array1::from(compress(
203 data,
204 self.level,
205 self.mode,
206 self.delta,
207 self.paging,
208 )?)
209 .into_dyn(),
210 )),
211 AnyCowArray::U32(data) => Ok(AnyArray::U8(
212 Array1::from(compress(
213 data,
214 self.level,
215 self.mode,
216 self.delta,
217 self.paging,
218 )?)
219 .into_dyn(),
220 )),
221 AnyCowArray::U64(data) => Ok(AnyArray::U8(
222 Array1::from(compress(
223 data,
224 self.level,
225 self.mode,
226 self.delta,
227 self.paging,
228 )?)
229 .into_dyn(),
230 )),
231 AnyCowArray::I16(data) => Ok(AnyArray::U8(
232 Array1::from(compress(
233 data,
234 self.level,
235 self.mode,
236 self.delta,
237 self.paging,
238 )?)
239 .into_dyn(),
240 )),
241 AnyCowArray::I32(data) => Ok(AnyArray::U8(
242 Array1::from(compress(
243 data,
244 self.level,
245 self.mode,
246 self.delta,
247 self.paging,
248 )?)
249 .into_dyn(),
250 )),
251 AnyCowArray::I64(data) => Ok(AnyArray::U8(
252 Array1::from(compress(
253 data,
254 self.level,
255 self.mode,
256 self.delta,
257 self.paging,
258 )?)
259 .into_dyn(),
260 )),
261 AnyCowArray::F32(data) => Ok(AnyArray::U8(
262 Array1::from(compress(
263 data,
264 self.level,
265 self.mode,
266 self.delta,
267 self.paging,
268 )?)
269 .into_dyn(),
270 )),
271 AnyCowArray::F64(data) => Ok(AnyArray::U8(
272 Array1::from(compress(
273 data,
274 self.level,
275 self.mode,
276 self.delta,
277 self.paging,
278 )?)
279 .into_dyn(),
280 )),
281 encoded => Err(PcodecError::UnsupportedDtype(encoded.dtype())),
282 }
283 }
284
285 fn decode(&self, encoded: AnyCowArray) -> Result<AnyArray, Self::Error> {
286 let AnyCowArray::U8(encoded) = encoded else {
287 return Err(PcodecError::EncodedDataNotBytes {
288 dtype: encoded.dtype(),
289 });
290 };
291
292 if !matches!(encoded.shape(), [_]) {
293 return Err(PcodecError::EncodedDataNotOneDimensional {
294 shape: encoded.shape().to_vec(),
295 });
296 }
297
298 decompress(&AnyCowArray::U8(encoded).as_bytes())
299 }
300
301 fn decode_into(
302 &self,
303 encoded: AnyArrayView,
304 decoded: AnyArrayViewMut,
305 ) -> Result<(), Self::Error> {
306 let AnyArrayView::U8(encoded) = encoded else {
307 return Err(PcodecError::EncodedDataNotBytes {
308 dtype: encoded.dtype(),
309 });
310 };
311
312 if !matches!(encoded.shape(), [_]) {
313 return Err(PcodecError::EncodedDataNotOneDimensional {
314 shape: encoded.shape().to_vec(),
315 });
316 }
317
318 let encoded = AnyArrayView::U8(encoded);
319 let encoded = encoded.as_bytes();
320
321 match decoded {
322 AnyArrayViewMut::U16(decoded) => decompress_into(&encoded, decoded),
323 AnyArrayViewMut::U32(decoded) => decompress_into(&encoded, decoded),
324 AnyArrayViewMut::U64(decoded) => decompress_into(&encoded, decoded),
325 AnyArrayViewMut::I16(decoded) => decompress_into(&encoded, decoded),
326 AnyArrayViewMut::I32(decoded) => decompress_into(&encoded, decoded),
327 AnyArrayViewMut::I64(decoded) => decompress_into(&encoded, decoded),
328 AnyArrayViewMut::F32(decoded) => decompress_into(&encoded, decoded),
329 AnyArrayViewMut::F64(decoded) => decompress_into(&encoded, decoded),
330 decoded => Err(PcodecError::UnsupportedDtype(decoded.dtype())),
331 }
332 }
333}
334
335impl StaticCodec for Pcodec {
336 const CODEC_ID: &'static str = "pco";
337
338 type Config<'de> = Self;
339
340 fn from_config(config: Self::Config<'_>) -> Self {
341 config
342 }
343
344 fn get_config(&self) -> StaticCodecConfig<Self> {
345 StaticCodecConfig::from(self)
346 }
347}
348
349#[derive(Debug, Error)]
350pub enum PcodecError {
352 #[error("Pco does not support the dtype {0}")]
354 UnsupportedDtype(AnyArrayDType),
355 #[error("Pco failed to encode the header")]
357 HeaderEncodeFailed {
358 source: PcoHeaderError,
360 },
361 #[error("Pco failed to encode the data")]
363 PcoEncodeFailed {
364 source: PcoCodingError,
366 },
367 #[error(
370 "Pco can only decode one-dimensional byte arrays but received an array of dtype {dtype}"
371 )]
372 EncodedDataNotBytes {
373 dtype: AnyArrayDType,
375 },
376 #[error("Pco can only decode one-dimensional byte arrays but received a byte array of shape {shape:?}")]
379 EncodedDataNotOneDimensional {
380 shape: Vec<usize>,
382 },
383 #[error("Pco failed to decode the header")]
385 HeaderDecodeFailed {
386 source: PcoHeaderError,
388 },
389 #[error("Pco failed to decode the data")]
391 PcoDecodeFailed {
392 source: PcoCodingError,
394 },
395 #[error("Pco decoded an invalid array shape header which does not fit the decoded data")]
398 DecodeInvalidShapeHeader {
399 #[from]
401 source: ShapeError,
402 },
403 #[error("Pco cannot decode into the provided array")]
405 MismatchedDecodeIntoArray {
406 #[from]
408 source: AnyArrayAssignError,
409 },
410}
411
412#[derive(Debug, Error)]
413#[error(transparent)]
414pub struct PcoHeaderError(postcard::Error);
416
417#[derive(Debug, Error)]
418#[error(transparent)]
419pub struct PcoCodingError(pco::errors::PcoError);
421
422#[allow(clippy::needless_pass_by_value)]
423pub fn compress<T: PcoElement, S: Data<Elem = T>, D: Dimension>(
432 data: ArrayBase<S, D>,
433 level: PcoCompressionLevel,
434 mode: PcoModeSpec,
435 delta: PcoDeltaSpec,
436 paging: PcoPagingSpec,
437) -> Result<Vec<u8>, PcodecError> {
438 let mut encoded_bytes = postcard::to_extend(
439 &CompressionHeader {
440 dtype: <T as PcoElement>::DTYPE,
441 shape: Cow::Borrowed(data.shape()),
442 },
443 Vec::new(),
444 )
445 .map_err(|err| PcodecError::HeaderEncodeFailed {
446 source: PcoHeaderError(err),
447 })?;
448
449 let data_owned;
450 #[allow(clippy::option_if_let_else)]
451 let data = if let Some(slice) = data.as_slice() {
452 slice
453 } else {
454 data_owned = data.into_iter().copied().collect::<Vec<T>>();
455 data_owned.as_slice()
456 };
457
458 let config = pco::ChunkConfig::default()
459 .with_compression_level(level as usize)
460 .with_mode_spec(match mode {
461 PcoModeSpec::Auto => pco::ModeSpec::Auto,
462 PcoModeSpec::Classic => pco::ModeSpec::Classic,
463 PcoModeSpec::TryFloatMult { float_mult_base } => {
464 pco::ModeSpec::TryFloatMult(float_mult_base)
465 }
466 PcoModeSpec::TryFloatQuant { float_quant_bits } => {
467 pco::ModeSpec::TryFloatQuant(float_quant_bits)
468 }
469 PcoModeSpec::TryIntMult { int_mult_base } => pco::ModeSpec::TryIntMult(int_mult_base),
470 })
471 .with_delta_spec(match delta {
472 PcoDeltaSpec::Auto => pco::DeltaSpec::Auto,
473 PcoDeltaSpec::None => pco::DeltaSpec::None,
474 PcoDeltaSpec::TryConsecutive {
475 delta_encoding_order,
476 } => pco::DeltaSpec::TryConsecutive(delta_encoding_order as usize),
477 PcoDeltaSpec::TryLookback => pco::DeltaSpec::TryLookback,
478 })
479 .with_paging_spec(match paging {
480 PcoPagingSpec::EqualPagesUpTo { equal_pages_up_to } => {
481 pco::PagingSpec::EqualPagesUpTo(equal_pages_up_to.get())
482 }
483 });
484
485 let encoded = pco::standalone::simple_compress(data, &config).map_err(|err| {
486 PcodecError::PcoEncodeFailed {
487 source: PcoCodingError(err),
488 }
489 })?;
490 encoded_bytes.extend_from_slice(&encoded);
491
492 Ok(encoded_bytes)
493}
494
495pub fn decompress(encoded: &[u8]) -> Result<AnyArray, PcodecError> {
503 let (header, data) =
504 postcard::take_from_bytes::<CompressionHeader>(encoded).map_err(|err| {
505 PcodecError::HeaderDecodeFailed {
506 source: PcoHeaderError(err),
507 }
508 })?;
509
510 let decoded = match header.dtype {
511 PcoDType::U16 => AnyArray::U16(Array::from_shape_vec(
512 &*header.shape,
513 pco::standalone::simple_decompress(data).map_err(|err| {
514 PcodecError::PcoDecodeFailed {
515 source: PcoCodingError(err),
516 }
517 })?,
518 )?),
519 PcoDType::U32 => AnyArray::U32(Array::from_shape_vec(
520 &*header.shape,
521 pco::standalone::simple_decompress(data).map_err(|err| {
522 PcodecError::PcoDecodeFailed {
523 source: PcoCodingError(err),
524 }
525 })?,
526 )?),
527 PcoDType::U64 => AnyArray::U64(Array::from_shape_vec(
528 &*header.shape,
529 pco::standalone::simple_decompress(data).map_err(|err| {
530 PcodecError::PcoDecodeFailed {
531 source: PcoCodingError(err),
532 }
533 })?,
534 )?),
535 PcoDType::I16 => AnyArray::I16(Array::from_shape_vec(
536 &*header.shape,
537 pco::standalone::simple_decompress(data).map_err(|err| {
538 PcodecError::PcoDecodeFailed {
539 source: PcoCodingError(err),
540 }
541 })?,
542 )?),
543 PcoDType::I32 => AnyArray::I32(Array::from_shape_vec(
544 &*header.shape,
545 pco::standalone::simple_decompress(data).map_err(|err| {
546 PcodecError::PcoDecodeFailed {
547 source: PcoCodingError(err),
548 }
549 })?,
550 )?),
551 PcoDType::I64 => AnyArray::I64(Array::from_shape_vec(
552 &*header.shape,
553 pco::standalone::simple_decompress(data).map_err(|err| {
554 PcodecError::PcoDecodeFailed {
555 source: PcoCodingError(err),
556 }
557 })?,
558 )?),
559 PcoDType::F32 => AnyArray::F32(Array::from_shape_vec(
560 &*header.shape,
561 pco::standalone::simple_decompress(data).map_err(|err| {
562 PcodecError::PcoDecodeFailed {
563 source: PcoCodingError(err),
564 }
565 })?,
566 )?),
567 PcoDType::F64 => AnyArray::F64(Array::from_shape_vec(
568 &*header.shape,
569 pco::standalone::simple_decompress(data).map_err(|err| {
570 PcodecError::PcoDecodeFailed {
571 source: PcoCodingError(err),
572 }
573 })?,
574 )?),
575 };
576
577 Ok(decoded)
578}
579
580pub fn decompress_into<T: PcoElement, D: Dimension>(
592 encoded: &[u8],
593 mut decoded: ArrayViewMut<T, D>,
594) -> Result<(), PcodecError> {
595 let (header, data) =
596 postcard::take_from_bytes::<CompressionHeader>(encoded).map_err(|err| {
597 PcodecError::HeaderDecodeFailed {
598 source: PcoHeaderError(err),
599 }
600 })?;
601
602 if T::DTYPE != header.dtype {
603 return Err(PcodecError::MismatchedDecodeIntoArray {
604 source: AnyArrayAssignError::DTypeMismatch {
605 src: header.dtype.into_dtype(),
606 dst: T::DTYPE.into_dtype(),
607 },
608 });
609 }
610
611 if decoded.shape() != &*header.shape {
612 return Err(PcodecError::MismatchedDecodeIntoArray {
613 source: AnyArrayAssignError::ShapeMismatch {
614 src: header.shape.into_owned(),
615 dst: decoded.shape().to_vec(),
616 },
617 });
618 }
619
620 if let Some(slice) = decoded.as_slice_mut() {
621 pco::standalone::simple_decompress_into(data, slice).map_err(|err| {
622 PcodecError::PcoDecodeFailed {
623 source: PcoCodingError(err),
624 }
625 })?;
626 return Ok(());
627 }
628
629 let dec =
630 pco::standalone::simple_decompress(data).map_err(|err| PcodecError::PcoDecodeFailed {
631 source: PcoCodingError(err),
632 })?;
633
634 if dec.len() != decoded.len() {
635 return Err(PcodecError::DecodeInvalidShapeHeader {
636 source: ShapeError::from_kind(ndarray::ErrorKind::IncompatibleShape),
637 });
638 }
639
640 decoded.iter_mut().zip(dec).for_each(|(o, d)| *o = d);
641
642 Ok(())
643}
644
645pub trait PcoElement: Copy + pco::data_types::Number {
647 const DTYPE: PcoDType;
649}
650
651impl PcoElement for u16 {
652 const DTYPE: PcoDType = PcoDType::U16;
653}
654
655impl PcoElement for u32 {
656 const DTYPE: PcoDType = PcoDType::U32;
657}
658
659impl PcoElement for u64 {
660 const DTYPE: PcoDType = PcoDType::U64;
661}
662
663impl PcoElement for i16 {
664 const DTYPE: PcoDType = PcoDType::I16;
665}
666
667impl PcoElement for i32 {
668 const DTYPE: PcoDType = PcoDType::I32;
669}
670
671impl PcoElement for i64 {
672 const DTYPE: PcoDType = PcoDType::I64;
673}
674
675impl PcoElement for f32 {
676 const DTYPE: PcoDType = PcoDType::F32;
677}
678
679impl PcoElement for f64 {
680 const DTYPE: PcoDType = PcoDType::F64;
681}
682
683#[derive(Serialize, Deserialize)]
684struct CompressionHeader<'a> {
685 dtype: PcoDType,
686 #[serde(borrow)]
687 shape: Cow<'a, [usize]>,
688}
689
690#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
692#[allow(missing_docs)]
693pub enum PcoDType {
694 #[serde(rename = "u16", alias = "uint16")]
695 U16,
696 #[serde(rename = "u32", alias = "uint32")]
697 U32,
698 #[serde(rename = "u64", alias = "uint64")]
699 U64,
700 #[serde(rename = "i16", alias = "int16")]
701 I16,
702 #[serde(rename = "i32", alias = "int32")]
703 I32,
704 #[serde(rename = "i64", alias = "int64")]
705 I64,
706 #[serde(rename = "f32", alias = "float32")]
707 F32,
708 #[serde(rename = "f64", alias = "float64")]
709 F64,
710}
711
712impl PcoDType {
713 #[must_use]
714 pub const fn into_dtype(self) -> AnyArrayDType {
716 match self {
717 Self::U16 => AnyArrayDType::U16,
718 Self::U32 => AnyArrayDType::U32,
719 Self::U64 => AnyArrayDType::U64,
720 Self::I16 => AnyArrayDType::I16,
721 Self::I32 => AnyArrayDType::I32,
722 Self::I64 => AnyArrayDType::I64,
723 Self::F32 => AnyArrayDType::F32,
724 Self::F64 => AnyArrayDType::F64,
725 }
726 }
727}
728
729impl fmt::Display for PcoDType {
730 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
731 fmt.write_str(match self {
732 Self::U16 => "u16",
733 Self::U32 => "u32",
734 Self::U64 => "u64",
735 Self::I16 => "i16",
736 Self::I32 => "i32",
737 Self::I64 => "i64",
738 Self::F32 => "f32",
739 Self::F64 => "f64",
740 })
741 }
742}