Skip to main content

zarrs/array/codec/bytes_to_bytes/gzip/
gzip_codec.rs

1use std::borrow::Cow;
2use std::io::{Cursor, Read};
3use std::sync::Arc;
4
5use flate2::bufread::{GzDecoder, GzEncoder};
6
7use super::{
8    GzipCodecConfiguration, GzipCodecConfigurationV1, GzipCompressionLevel,
9    GzipCompressionLevelError,
10};
11use crate::array::{ArrayBytesRaw, BytesRepresentation};
12use zarrs_codec::{
13    BytesToBytesCodecTraits, CodecError, CodecMetadataOptions, CodecOptions, CodecTraits,
14    PartialDecoderCapability, PartialEncoderCapability, RecommendedConcurrency,
15};
16use zarrs_metadata::Configuration;
17use zarrs_plugin::{PluginCreateError, ZarrVersion};
18
19/// A `gzip` codec implementation.
20#[derive(Clone, Debug)]
21pub struct GzipCodec {
22    compression_level: GzipCompressionLevel,
23}
24
25impl GzipCodec {
26    /// Create a new `gzip` codec.
27    ///
28    /// # Errors
29    /// Returns [`GzipCompressionLevelError`] if `compression_level` is not valid.
30    pub fn new(compression_level: u32) -> Result<Self, GzipCompressionLevelError> {
31        let compression_level: GzipCompressionLevel = compression_level.try_into()?;
32        Ok(Self { compression_level })
33    }
34
35    /// Create a new `gzip` codec from configuration.
36    ///
37    /// # Errors
38    /// Returns an error if the configuration is not supported.
39    pub fn new_with_configuration(
40        configuration: &GzipCodecConfiguration,
41    ) -> Result<Self, PluginCreateError> {
42        match configuration {
43            GzipCodecConfiguration::V1(configuration) => Ok(Self {
44                compression_level: configuration.level,
45            }),
46            _ => Err(PluginCreateError::Other(
47                "this gzip codec configuration variant is unsupported".to_string(),
48            )),
49        }
50    }
51}
52
53impl CodecTraits for GzipCodec {
54    fn as_any(&self) -> &dyn std::any::Any {
55        self
56    }
57
58    fn configuration(
59        &self,
60        _version: ZarrVersion,
61        _options: &CodecMetadataOptions,
62    ) -> Option<Configuration> {
63        let configuration = GzipCodecConfiguration::V1(GzipCodecConfigurationV1 {
64            level: self.compression_level,
65        });
66        Some(configuration.into())
67    }
68
69    fn partial_decoder_capability(&self) -> PartialDecoderCapability {
70        PartialDecoderCapability {
71            partial_read: false,
72            partial_decode: false,
73        }
74    }
75
76    fn partial_encoder_capability(&self) -> PartialEncoderCapability {
77        PartialEncoderCapability {
78            partial_encode: false,
79        }
80    }
81}
82
83#[cfg_attr(
84    all(feature = "async", not(target_arch = "wasm32")),
85    async_trait::async_trait
86)]
87#[cfg_attr(all(feature = "async", target_arch = "wasm32"), async_trait::async_trait(?Send))]
88impl BytesToBytesCodecTraits for GzipCodec {
89    fn into_dyn(self: Arc<Self>) -> Arc<dyn BytesToBytesCodecTraits> {
90        self as Arc<dyn BytesToBytesCodecTraits>
91    }
92
93    fn recommended_concurrency(
94        &self,
95        _decoded_representation: &BytesRepresentation,
96    ) -> Result<RecommendedConcurrency, CodecError> {
97        Ok(RecommendedConcurrency::new_maximum(1))
98    }
99
100    fn encode<'a>(
101        &self,
102        decoded_value: ArrayBytesRaw<'a>,
103        _options: &CodecOptions,
104    ) -> Result<ArrayBytesRaw<'a>, CodecError> {
105        let mut encoder = GzEncoder::new(
106            Cursor::new(decoded_value),
107            flate2::Compression::new(self.compression_level.as_u32()),
108        );
109        let mut out: Vec<u8> = Vec::new();
110        encoder.read_to_end(&mut out)?;
111        Ok(Cow::Owned(out))
112    }
113
114    fn decode<'a>(
115        &self,
116        encoded_value: ArrayBytesRaw<'a>,
117        _decoded_representation: &BytesRepresentation,
118        _options: &CodecOptions,
119    ) -> Result<ArrayBytesRaw<'a>, CodecError> {
120        let mut decoder = GzDecoder::new(Cursor::new(encoded_value));
121        let mut out: Vec<u8> = Vec::new();
122        decoder.read_to_end(&mut out)?;
123        Ok(Cow::Owned(out))
124    }
125
126    fn encoded_representation(
127        &self,
128        decoded_representation: &BytesRepresentation,
129    ) -> BytesRepresentation {
130        decoded_representation
131            .size()
132            .map_or(BytesRepresentation::UnboundedSize, |size| {
133                // https://www.gnu.org/software/gzip/manual/gzip.pdf
134                const HEADER_TRAILER_OVERHEAD: u64 = 10 + 8; // TODO: validate that extra headers are not populated
135                const BLOCK_SIZE: u64 = 32768;
136                const BLOCK_OVERHEAD: u64 = 5;
137                let blocks_overhead = BLOCK_OVERHEAD * size.div_ceil(BLOCK_SIZE);
138                BytesRepresentation::BoundedSize(size + HEADER_TRAILER_OVERHEAD + blocks_overhead)
139            })
140    }
141}