zarrs/array/codec/bytes_to_bytes/gzip/
gzip_codec.rs1use std::borrow::Cow;
2use std::io::{Cursor, Read};
3use std::sync::Arc;
4
5use flate2::bufread::{GzDecoder, GzEncoder};
6
7use super::{
8 GzipCodecConfiguration, GzipCodecConfigurationV1, GzipCompressionLevel,
9 GzipCompressionLevelError,
10};
11use crate::array::{ArrayBytesRaw, BytesRepresentation};
12use zarrs_codec::{
13 BytesToBytesCodecTraits, CodecError, CodecMetadataOptions, CodecOptions, CodecTraits,
14 PartialDecoderCapability, PartialEncoderCapability, RecommendedConcurrency,
15};
16use zarrs_metadata::Configuration;
17use zarrs_plugin::{PluginCreateError, ZarrVersion};
18
19#[derive(Clone, Debug)]
21pub struct GzipCodec {
22 compression_level: GzipCompressionLevel,
23}
24
25impl GzipCodec {
26 pub fn new(compression_level: u32) -> Result<Self, GzipCompressionLevelError> {
31 let compression_level: GzipCompressionLevel = compression_level.try_into()?;
32 Ok(Self { compression_level })
33 }
34
35 pub fn new_with_configuration(
40 configuration: &GzipCodecConfiguration,
41 ) -> Result<Self, PluginCreateError> {
42 match configuration {
43 GzipCodecConfiguration::V1(configuration) => Ok(Self {
44 compression_level: configuration.level,
45 }),
46 _ => Err(PluginCreateError::Other(
47 "this gzip codec configuration variant is unsupported".to_string(),
48 )),
49 }
50 }
51}
52
53impl CodecTraits for GzipCodec {
54 fn as_any(&self) -> &dyn std::any::Any {
55 self
56 }
57
58 fn configuration(
59 &self,
60 _version: ZarrVersion,
61 _options: &CodecMetadataOptions,
62 ) -> Option<Configuration> {
63 let configuration = GzipCodecConfiguration::V1(GzipCodecConfigurationV1 {
64 level: self.compression_level,
65 });
66 Some(configuration.into())
67 }
68
69 fn partial_decoder_capability(&self) -> PartialDecoderCapability {
70 PartialDecoderCapability {
71 partial_read: false,
72 partial_decode: false,
73 }
74 }
75
76 fn partial_encoder_capability(&self) -> PartialEncoderCapability {
77 PartialEncoderCapability {
78 partial_encode: false,
79 }
80 }
81}
82
83#[cfg_attr(
84 all(feature = "async", not(target_arch = "wasm32")),
85 async_trait::async_trait
86)]
87#[cfg_attr(all(feature = "async", target_arch = "wasm32"), async_trait::async_trait(?Send))]
88impl BytesToBytesCodecTraits for GzipCodec {
89 fn into_dyn(self: Arc<Self>) -> Arc<dyn BytesToBytesCodecTraits> {
90 self as Arc<dyn BytesToBytesCodecTraits>
91 }
92
93 fn recommended_concurrency(
94 &self,
95 _decoded_representation: &BytesRepresentation,
96 ) -> Result<RecommendedConcurrency, CodecError> {
97 Ok(RecommendedConcurrency::new_maximum(1))
98 }
99
100 fn encode<'a>(
101 &self,
102 decoded_value: ArrayBytesRaw<'a>,
103 _options: &CodecOptions,
104 ) -> Result<ArrayBytesRaw<'a>, CodecError> {
105 let mut encoder = GzEncoder::new(
106 Cursor::new(decoded_value),
107 flate2::Compression::new(self.compression_level.as_u32()),
108 );
109 let mut out: Vec<u8> = Vec::new();
110 encoder.read_to_end(&mut out)?;
111 Ok(Cow::Owned(out))
112 }
113
114 fn decode<'a>(
115 &self,
116 encoded_value: ArrayBytesRaw<'a>,
117 _decoded_representation: &BytesRepresentation,
118 _options: &CodecOptions,
119 ) -> Result<ArrayBytesRaw<'a>, CodecError> {
120 let mut decoder = GzDecoder::new(Cursor::new(encoded_value));
121 let mut out: Vec<u8> = Vec::new();
122 decoder.read_to_end(&mut out)?;
123 Ok(Cow::Owned(out))
124 }
125
126 fn encoded_representation(
127 &self,
128 decoded_representation: &BytesRepresentation,
129 ) -> BytesRepresentation {
130 decoded_representation
131 .size()
132 .map_or(BytesRepresentation::UnboundedSize, |size| {
133 const HEADER_TRAILER_OVERHEAD: u64 = 10 + 8; const BLOCK_SIZE: u64 = 32768;
136 const BLOCK_OVERHEAD: u64 = 5;
137 let blocks_overhead = BLOCK_OVERHEAD * size.div_ceil(BLOCK_SIZE);
138 BytesRepresentation::BoundedSize(size + HEADER_TRAILER_OVERHEAD + blocks_overhead)
139 })
140 }
141}