Skip to main content

zarrs/array/codec/bytes_to_bytes/
gdeflate.rs

1//! The `gdeflate` bytes to bytes codec (Experimental).
2//!
3//! Applies [GDeflate](https://docs.nvidia.com/cuda/nvcomp/gdeflate.html) compression.
4//!
5//! <div class="warning">
6//! This codec is experimental and may be incompatible with other Zarr V3 implementations.
7//! </div>
8//!
9//! ### Compatible Implementations
10//! None
11//!
12//! ### Specification
13//! - <https://codec.zarrs.dev/bytes_to_bytes/gdeflate>
14//!
15//! `gdeflate` encoded data sequentially encodes a static header, a dynamic header, and the compressed bytes.
16//!
17//! The static header is composed of the following:
18//!  - `UNCOMPRESSED_INPUT_LENGTH`: a little-endian 64-bit unsigned integer holding the total uncompressed length of the input bytes.
19//!  - `NUMBER_OF_PAGES`: a little-endian 64-bit unsigned integer holding the number of compressed pages.
20//!
21//! The dynamic header is composed of the following:
22//!  - `COMPRESSED_PAGE_SIZES`: `NUMBER_OF_PAGES` little-endian 64-bit unsigned integers holding the compressed sizes of each page.
23//!
24//! The remaining bytes are the `gdeflate` encoded pages of total length equal to the sum of all `COMPRESSED_PAGE_SIZES`.
25//!
26//! ### Codec `name` Aliases (Zarr V3)
27//! - `zarrs.gdeflate`
28//! - `https://codec.zarrs.dev/bytes_to_bytes/gdeflate`
29//!
30//! ### Codec `id` Aliases (Zarr V2)
31//! None
32//!
33//! ### Codec `configuration` Example - [`GDeflateCodecConfiguration`]:
34//! ```rust
35//! # let JSON = r#"
36//! {
37//!     "level": 9
38//! }
39//! # "#;
40//! # use zarrs::metadata_ext::codec::gdeflate::GDeflateCodecConfiguration;
41//! # serde_json::from_str::<GDeflateCodecConfiguration>(JSON).unwrap();
42//! ```
43
44mod gdeflate_codec;
45
46use std::sync::Arc;
47
48pub use gdeflate_codec::GDeflateCodec;
49use zarrs_metadata::v3::MetadataV3;
50
51use crate::array::ArrayBytesRaw;
52use zarrs_codec::{Codec, CodecError, CodecPluginV3, CodecTraitsV3, InvalidBytesLengthError};
53pub use zarrs_metadata_ext::codec::gdeflate::{
54    GDeflateCodecConfiguration, GDeflateCodecConfigurationV0, GDeflateCompressionLevel,
55    GDeflateCompressionLevelError,
56};
57use zarrs_plugin::PluginCreateError;
58
59zarrs_plugin::impl_extension_aliases!(GDeflateCodec, v3: "zarrs.gdeflate");
60
61// Register the V3 codec.
62inventory::submit! {
63    CodecPluginV3::new::<GDeflateCodec>()
64}
65
66impl CodecTraitsV3 for GDeflateCodec {
67    fn create(metadata: &MetadataV3) -> Result<Codec, PluginCreateError> {
68        crate::warn_experimental_extension(metadata.name(), "codec");
69        let configuration: GDeflateCodecConfiguration = metadata.to_typed_configuration()?;
70        let codec = Arc::new(GDeflateCodec::new_with_configuration(&configuration)?);
71        Ok(Codec::BytesToBytes(codec))
72    }
73}
74
75const GDEFLATE_PAGE_SIZE_UNCOMPRESSED: usize = 65536;
76const GDEFLATE_STATIC_HEADER_LENGTH: usize = 2 * size_of::<u64>();
77
78fn gdeflate_decode(encoded_value: &ArrayBytesRaw<'_>) -> Result<Vec<u8>, CodecError> {
79    if encoded_value.len() < GDEFLATE_STATIC_HEADER_LENGTH {
80        return Err(InvalidBytesLengthError::new(
81            encoded_value.len(),
82            GDEFLATE_STATIC_HEADER_LENGTH,
83        )
84        .into());
85    }
86
87    // Decode the static header
88    let as_u64 = |bytes: &[u8]| -> u64 { u64::from_le_bytes(bytes.try_into().unwrap()) };
89    let decoded_value_len = as_u64(&encoded_value[0..size_of::<u64>()]);
90    let decoded_value_len = usize::try_from(decoded_value_len).unwrap();
91    let num_pages = as_u64(&encoded_value[size_of::<u64>()..2 * size_of::<u64>()]);
92    let num_pages = usize::try_from(num_pages).unwrap();
93
94    // Check length of dynamic header
95    let dynamic_header_length = num_pages * size_of::<u64>();
96    if encoded_value.len() < GDEFLATE_STATIC_HEADER_LENGTH + dynamic_header_length {
97        return Err(InvalidBytesLengthError::new(
98            encoded_value.len(),
99            GDEFLATE_STATIC_HEADER_LENGTH + dynamic_header_length,
100        )
101        .into());
102    }
103
104    // Decode the pages
105    let decompressor = GDeflateDecompressor::new()?;
106    let mut decoded_value = Vec::with_capacity(decoded_value_len);
107    let mut page_offset = GDEFLATE_STATIC_HEADER_LENGTH + dynamic_header_length;
108    for page in 0..num_pages {
109        // Get the compressed page length
110        let page_size_compressed_offset = GDEFLATE_STATIC_HEADER_LENGTH + page * size_of::<u64>();
111        let page_size_compressed = as_u64(
112            &encoded_value
113                [page_size_compressed_offset..page_size_compressed_offset + size_of::<u64>()],
114        );
115        let page_size_compressed = usize::try_from(page_size_compressed).unwrap();
116
117        // Get the compressed page data
118        let page_data = &encoded_value[page_offset..page_offset + page_size_compressed];
119        let in_page = gdeflate_sys::libdeflate_gdeflate_in_page {
120            data: page_data.as_ptr().cast(),
121            nbytes: page_data.len(),
122        };
123
124        // Decompress the page
125        let data_out = decoded_value.spare_capacity_mut();
126        let page_size_uncompressed =
127            decompressor.decompress_page(in_page, data_out.as_mut_ptr().cast(), data_out.len())?;
128
129        unsafe {
130            decoded_value.set_len(decoded_value.len() + page_size_uncompressed);
131        }
132        page_offset += page_size_compressed;
133    }
134
135    Ok(decoded_value)
136}
137
138struct GDeflateCompressor(*mut gdeflate_sys::libdeflate_gdeflate_compressor);
139
140impl GDeflateCompressor {
141    pub(crate) fn new(compression_level: GDeflateCompressionLevel) -> Result<Self, CodecError> {
142        let compressor = unsafe {
143            gdeflate_sys::libdeflate_alloc_gdeflate_compressor(compression_level.as_i32())
144        };
145        if compressor.is_null() {
146            Err(CodecError::Other(
147                "Failed to create gdeflate compressor".to_string(),
148            ))
149        } else {
150            Ok(Self(compressor))
151        }
152    }
153
154    fn get_npages_compress_bound(&self, input_length: usize) -> (usize, usize) {
155        let mut out_npages = 0;
156        let compress_bound = unsafe {
157            gdeflate_sys::libdeflate_gdeflate_compress_bound(
158                self.0,
159                input_length,
160                &raw mut out_npages,
161            )
162        };
163        (out_npages, compress_bound)
164    }
165
166    pub(crate) fn compress(
167        &self,
168        uncompressed_bytes: &[u8],
169    ) -> Result<(Vec<usize>, Vec<u8>), CodecError> {
170        let (out_npages, compress_bound) = self.get_npages_compress_bound(uncompressed_bytes.len());
171        // let compress_bound_page = compress_bound / out_npages;
172
173        let mut compressed_bytes = Vec::with_capacity(compress_bound);
174        let mut page_sizes = Vec::with_capacity(out_npages);
175        for i in 0..out_npages {
176            let page_offset = i * GDEFLATE_PAGE_SIZE_UNCOMPRESSED;
177
178            let data_out = compressed_bytes.spare_capacity_mut();
179            let mut out_page = gdeflate_sys::libdeflate_gdeflate_out_page {
180                data: data_out.as_mut_ptr().cast(),
181                nbytes: data_out.len(),
182            };
183
184            let data_in = &uncompressed_bytes[page_offset
185                ..(page_offset + GDEFLATE_PAGE_SIZE_UNCOMPRESSED).min(uncompressed_bytes.len())];
186            let compressed_size = unsafe {
187                gdeflate_sys::libdeflate_gdeflate_compress(
188                    self.0,
189                    data_in.as_ptr().cast(),
190                    data_in.len(),
191                    &raw mut out_page,
192                    1,
193                )
194            };
195            if compressed_size == 0 {
196                return Err(CodecError::Other("gdeflate compression failed".to_string()));
197            }
198            page_sizes.push(compressed_size);
199            unsafe {
200                compressed_bytes.set_len(compressed_bytes.len() + compressed_size);
201            }
202        }
203
204        Ok((page_sizes, compressed_bytes))
205    }
206}
207
208impl Drop for GDeflateCompressor {
209    fn drop(&mut self) {
210        unsafe { gdeflate_sys::libdeflate_free_gdeflate_compressor(self.0) }
211    }
212}
213
214struct GDeflateDecompressor(*mut gdeflate_sys::libdeflate_gdeflate_decompressor);
215
216impl GDeflateDecompressor {
217    pub(crate) fn new() -> Result<Self, CodecError> {
218        let decompressor = unsafe { gdeflate_sys::libdeflate_alloc_gdeflate_decompressor() };
219        if decompressor.is_null() {
220            Err(CodecError::Other(
221                "Failed to create gdeflate compressor".to_string(),
222            ))
223        } else {
224            Ok(Self(decompressor))
225        }
226    }
227
228    pub(crate) fn decompress_page(
229        &self,
230        mut in_page: gdeflate_sys::libdeflate_gdeflate_in_page,
231        out: *mut u8,
232        out_nbytes_avail: usize,
233    ) -> Result<usize, CodecError> {
234        let mut actual_out_nbytes: usize = 0;
235        let result = unsafe {
236            gdeflate_sys::libdeflate_gdeflate_decompress(
237                self.0,
238                &raw mut in_page,
239                1,
240                out.cast(),
241                out_nbytes_avail,
242                &raw mut actual_out_nbytes,
243            )
244        };
245        assert_eq!(actual_out_nbytes, out_nbytes_avail);
246        if result == 0 {
247            Ok(actual_out_nbytes)
248        } else {
249            Err(CodecError::Other(
250                "gdeflate page decompression failed".to_string(),
251            ))
252        }
253    }
254}
255
256impl Drop for GDeflateDecompressor {
257    fn drop(&mut self) {
258        unsafe { gdeflate_sys::libdeflate_free_gdeflate_decompressor(self.0) }
259    }
260}
261
262#[cfg(test)]
263mod tests {
264    use std::borrow::Cow;
265    use std::sync::Arc;
266
267    use super::*;
268    use crate::array::BytesRepresentation;
269    use zarrs_codec::{BytesPartialDecoderTraits, BytesToBytesCodecTraits, CodecOptions};
270    use zarrs_storage::byte_range::ByteRange;
271
272    const JSON_VALID: &str = r#"{
273        "level": 1
274    }"#;
275
276    #[test]
277    fn codec_gdeflate_configuration_valid() {
278        assert!(serde_json::from_str::<GDeflateCodecConfiguration>(JSON_VALID).is_ok());
279    }
280
281    #[test]
282    fn codec_gdeflate_configuration_invalid1() {
283        const JSON_INVALID1: &str = r#"{
284        "level": -1
285    }"#;
286        assert!(serde_json::from_str::<GDeflateCodecConfiguration>(JSON_INVALID1).is_err());
287    }
288
289    #[test]
290    fn codec_gdeflate_configuration_invalid2() {
291        const JSON_INVALID2: &str = r#"{
292        "level": 13
293    }"#;
294        assert!(serde_json::from_str::<GDeflateCodecConfiguration>(JSON_INVALID2).is_err());
295    }
296
297    #[test]
298    #[cfg_attr(miri, ignore)]
299    fn codec_gdeflate_round_trip1() {
300        let elements: Vec<u16> = (0..32).collect();
301        let bytes = crate::array::transmute_to_bytes_vec(elements);
302        let bytes_representation = BytesRepresentation::FixedSize(bytes.len() as u64);
303
304        let configuration: GDeflateCodecConfiguration = serde_json::from_str(JSON_VALID).unwrap();
305        let codec = GDeflateCodec::new_with_configuration(&configuration).unwrap();
306
307        let encoded = codec
308            .encode(Cow::Borrowed(&bytes), &CodecOptions::default())
309            .unwrap();
310        let decoded = codec
311            .decode(encoded, &bytes_representation, &CodecOptions::default())
312            .unwrap();
313        assert_eq!(bytes, decoded.to_vec());
314    }
315
316    #[test]
317    #[cfg_attr(miri, ignore)]
318    fn codec_gdeflate_partial_decode() {
319        let elements: Vec<u16> = (0..8).collect();
320        let bytes = crate::array::transmute_to_bytes_vec(elements);
321        let bytes_representation = BytesRepresentation::FixedSize(bytes.len() as u64);
322
323        let configuration: GDeflateCodecConfiguration = serde_json::from_str(JSON_VALID).unwrap();
324        let codec = Arc::new(GDeflateCodec::new_with_configuration(&configuration).unwrap());
325
326        let encoded = codec
327            .encode(Cow::Owned(bytes), &CodecOptions::default())
328            .unwrap();
329        let decoded_regions = [
330            ByteRange::FromStart(4, Some(4)),
331            ByteRange::FromStart(10, Some(2)),
332        ];
333
334        let input_handle = Arc::new(encoded);
335        let partial_decoder = codec
336            .partial_decoder(
337                input_handle.clone(),
338                &bytes_representation,
339                &CodecOptions::default(),
340            )
341            .unwrap();
342        assert_eq!(partial_decoder.size_held(), input_handle.size_held()); // gdeflate partial decoder does not hold bytes
343        let decoded_partial_chunk = partial_decoder
344            .partial_decode_many(
345                Box::new(decoded_regions.into_iter()),
346                &CodecOptions::default(),
347            )
348            .unwrap()
349            .unwrap()
350            .concat();
351
352        let decoded_partial_chunk: Vec<u16> = decoded_partial_chunk
353            .clone()
354            .as_chunks::<2>()
355            .0
356            .iter()
357            .map(|b| u16::from_ne_bytes(*b))
358            .collect();
359        let answer: Vec<u16> = vec![2, 3, 5];
360        assert_eq!(answer, decoded_partial_chunk);
361    }
362
363    #[cfg(feature = "async")]
364    #[tokio::test]
365    #[cfg_attr(miri, ignore)]
366    async fn codec_gdeflate_async_partial_decode() {
367        let elements: Vec<u16> = (0..8).collect();
368        let bytes = crate::array::transmute_to_bytes_vec(elements);
369        let bytes_representation = BytesRepresentation::FixedSize(bytes.len() as u64);
370
371        let configuration: GDeflateCodecConfiguration = serde_json::from_str(JSON_VALID).unwrap();
372        let codec = Arc::new(GDeflateCodec::new_with_configuration(&configuration).unwrap());
373
374        let encoded = codec
375            .encode(Cow::Owned(bytes), &CodecOptions::default())
376            .unwrap();
377        let decoded_regions = [
378            ByteRange::FromStart(4, Some(4)),
379            ByteRange::FromStart(10, Some(2)),
380        ];
381
382        let input_handle = Arc::new(encoded);
383        let partial_decoder = codec
384            .async_partial_decoder(
385                input_handle,
386                &bytes_representation,
387                &CodecOptions::default(),
388            )
389            .await
390            .unwrap();
391        let decoded_partial_chunk = partial_decoder
392            .partial_decode_many(
393                Box::new(decoded_regions.into_iter()),
394                &CodecOptions::default(),
395            )
396            .await
397            .unwrap()
398            .unwrap()
399            .concat();
400
401        let decoded_partial_chunk: Vec<u16> = decoded_partial_chunk
402            .clone()
403            .as_chunks::<2>()
404            .0
405            .iter()
406            .map(|b| u16::from_ne_bytes(*b))
407            .collect();
408        let answer: Vec<u16> = vec![2, 3, 5];
409        assert_eq!(answer, decoded_partial_chunk);
410    }
411}