zarrs/array/codec/bytes_to_bytes/
gdeflate.rs1mod gdeflate_codec;
45
46use std::sync::Arc;
47
48pub use gdeflate_codec::GDeflateCodec;
49use zarrs_metadata::v3::MetadataV3;
50
51use crate::array::ArrayBytesRaw;
52use zarrs_codec::{Codec, CodecError, CodecPluginV3, CodecTraitsV3, InvalidBytesLengthError};
53pub use zarrs_metadata_ext::codec::gdeflate::{
54 GDeflateCodecConfiguration, GDeflateCodecConfigurationV0, GDeflateCompressionLevel,
55 GDeflateCompressionLevelError,
56};
57use zarrs_plugin::PluginCreateError;
58
59zarrs_plugin::impl_extension_aliases!(GDeflateCodec, v3: "zarrs.gdeflate");
60
61inventory::submit! {
63 CodecPluginV3::new::<GDeflateCodec>()
64}
65
66impl CodecTraitsV3 for GDeflateCodec {
67 fn create(metadata: &MetadataV3) -> Result<Codec, PluginCreateError> {
68 crate::warn_experimental_extension(metadata.name(), "codec");
69 let configuration: GDeflateCodecConfiguration = metadata.to_typed_configuration()?;
70 let codec = Arc::new(GDeflateCodec::new_with_configuration(&configuration)?);
71 Ok(Codec::BytesToBytes(codec))
72 }
73}
74
75const GDEFLATE_PAGE_SIZE_UNCOMPRESSED: usize = 65536;
76const GDEFLATE_STATIC_HEADER_LENGTH: usize = 2 * size_of::<u64>();
77
78fn gdeflate_decode(encoded_value: &ArrayBytesRaw<'_>) -> Result<Vec<u8>, CodecError> {
79 if encoded_value.len() < GDEFLATE_STATIC_HEADER_LENGTH {
80 return Err(InvalidBytesLengthError::new(
81 encoded_value.len(),
82 GDEFLATE_STATIC_HEADER_LENGTH,
83 )
84 .into());
85 }
86
87 let as_u64 = |bytes: &[u8]| -> u64 { u64::from_le_bytes(bytes.try_into().unwrap()) };
89 let decoded_value_len = as_u64(&encoded_value[0..size_of::<u64>()]);
90 let decoded_value_len = usize::try_from(decoded_value_len).unwrap();
91 let num_pages = as_u64(&encoded_value[size_of::<u64>()..2 * size_of::<u64>()]);
92 let num_pages = usize::try_from(num_pages).unwrap();
93
94 let dynamic_header_length = num_pages * size_of::<u64>();
96 if encoded_value.len() < GDEFLATE_STATIC_HEADER_LENGTH + dynamic_header_length {
97 return Err(InvalidBytesLengthError::new(
98 encoded_value.len(),
99 GDEFLATE_STATIC_HEADER_LENGTH + dynamic_header_length,
100 )
101 .into());
102 }
103
104 let decompressor = GDeflateDecompressor::new()?;
106 let mut decoded_value = Vec::with_capacity(decoded_value_len);
107 let mut page_offset = GDEFLATE_STATIC_HEADER_LENGTH + dynamic_header_length;
108 for page in 0..num_pages {
109 let page_size_compressed_offset = GDEFLATE_STATIC_HEADER_LENGTH + page * size_of::<u64>();
111 let page_size_compressed = as_u64(
112 &encoded_value
113 [page_size_compressed_offset..page_size_compressed_offset + size_of::<u64>()],
114 );
115 let page_size_compressed = usize::try_from(page_size_compressed).unwrap();
116
117 let page_data = &encoded_value[page_offset..page_offset + page_size_compressed];
119 let in_page = gdeflate_sys::libdeflate_gdeflate_in_page {
120 data: page_data.as_ptr().cast(),
121 nbytes: page_data.len(),
122 };
123
124 let data_out = decoded_value.spare_capacity_mut();
126 let page_size_uncompressed =
127 decompressor.decompress_page(in_page, data_out.as_mut_ptr().cast(), data_out.len())?;
128
129 unsafe {
130 decoded_value.set_len(decoded_value.len() + page_size_uncompressed);
131 }
132 page_offset += page_size_compressed;
133 }
134
135 Ok(decoded_value)
136}
137
138struct GDeflateCompressor(*mut gdeflate_sys::libdeflate_gdeflate_compressor);
139
140impl GDeflateCompressor {
141 pub(crate) fn new(compression_level: GDeflateCompressionLevel) -> Result<Self, CodecError> {
142 let compressor = unsafe {
143 gdeflate_sys::libdeflate_alloc_gdeflate_compressor(compression_level.as_i32())
144 };
145 if compressor.is_null() {
146 Err(CodecError::Other(
147 "Failed to create gdeflate compressor".to_string(),
148 ))
149 } else {
150 Ok(Self(compressor))
151 }
152 }
153
154 fn get_npages_compress_bound(&self, input_length: usize) -> (usize, usize) {
155 let mut out_npages = 0;
156 let compress_bound = unsafe {
157 gdeflate_sys::libdeflate_gdeflate_compress_bound(
158 self.0,
159 input_length,
160 &raw mut out_npages,
161 )
162 };
163 (out_npages, compress_bound)
164 }
165
166 pub(crate) fn compress(
167 &self,
168 uncompressed_bytes: &[u8],
169 ) -> Result<(Vec<usize>, Vec<u8>), CodecError> {
170 let (out_npages, compress_bound) = self.get_npages_compress_bound(uncompressed_bytes.len());
171 let mut compressed_bytes = Vec::with_capacity(compress_bound);
174 let mut page_sizes = Vec::with_capacity(out_npages);
175 for i in 0..out_npages {
176 let page_offset = i * GDEFLATE_PAGE_SIZE_UNCOMPRESSED;
177
178 let data_out = compressed_bytes.spare_capacity_mut();
179 let mut out_page = gdeflate_sys::libdeflate_gdeflate_out_page {
180 data: data_out.as_mut_ptr().cast(),
181 nbytes: data_out.len(),
182 };
183
184 let data_in = &uncompressed_bytes[page_offset
185 ..(page_offset + GDEFLATE_PAGE_SIZE_UNCOMPRESSED).min(uncompressed_bytes.len())];
186 let compressed_size = unsafe {
187 gdeflate_sys::libdeflate_gdeflate_compress(
188 self.0,
189 data_in.as_ptr().cast(),
190 data_in.len(),
191 &raw mut out_page,
192 1,
193 )
194 };
195 if compressed_size == 0 {
196 return Err(CodecError::Other("gdeflate compression failed".to_string()));
197 }
198 page_sizes.push(compressed_size);
199 unsafe {
200 compressed_bytes.set_len(compressed_bytes.len() + compressed_size);
201 }
202 }
203
204 Ok((page_sizes, compressed_bytes))
205 }
206}
207
208impl Drop for GDeflateCompressor {
209 fn drop(&mut self) {
210 unsafe { gdeflate_sys::libdeflate_free_gdeflate_compressor(self.0) }
211 }
212}
213
214struct GDeflateDecompressor(*mut gdeflate_sys::libdeflate_gdeflate_decompressor);
215
216impl GDeflateDecompressor {
217 pub(crate) fn new() -> Result<Self, CodecError> {
218 let decompressor = unsafe { gdeflate_sys::libdeflate_alloc_gdeflate_decompressor() };
219 if decompressor.is_null() {
220 Err(CodecError::Other(
221 "Failed to create gdeflate compressor".to_string(),
222 ))
223 } else {
224 Ok(Self(decompressor))
225 }
226 }
227
228 pub(crate) fn decompress_page(
229 &self,
230 mut in_page: gdeflate_sys::libdeflate_gdeflate_in_page,
231 out: *mut u8,
232 out_nbytes_avail: usize,
233 ) -> Result<usize, CodecError> {
234 let mut actual_out_nbytes: usize = 0;
235 let result = unsafe {
236 gdeflate_sys::libdeflate_gdeflate_decompress(
237 self.0,
238 &raw mut in_page,
239 1,
240 out.cast(),
241 out_nbytes_avail,
242 &raw mut actual_out_nbytes,
243 )
244 };
245 assert_eq!(actual_out_nbytes, out_nbytes_avail);
246 if result == 0 {
247 Ok(actual_out_nbytes)
248 } else {
249 Err(CodecError::Other(
250 "gdeflate page decompression failed".to_string(),
251 ))
252 }
253 }
254}
255
256impl Drop for GDeflateDecompressor {
257 fn drop(&mut self) {
258 unsafe { gdeflate_sys::libdeflate_free_gdeflate_decompressor(self.0) }
259 }
260}
261
262#[cfg(test)]
263mod tests {
264 use std::borrow::Cow;
265 use std::sync::Arc;
266
267 use super::*;
268 use crate::array::BytesRepresentation;
269 use zarrs_codec::{BytesPartialDecoderTraits, BytesToBytesCodecTraits, CodecOptions};
270 use zarrs_storage::byte_range::ByteRange;
271
272 const JSON_VALID: &str = r#"{
273 "level": 1
274 }"#;
275
276 #[test]
277 fn codec_gdeflate_configuration_valid() {
278 assert!(serde_json::from_str::<GDeflateCodecConfiguration>(JSON_VALID).is_ok());
279 }
280
281 #[test]
282 fn codec_gdeflate_configuration_invalid1() {
283 const JSON_INVALID1: &str = r#"{
284 "level": -1
285 }"#;
286 assert!(serde_json::from_str::<GDeflateCodecConfiguration>(JSON_INVALID1).is_err());
287 }
288
289 #[test]
290 fn codec_gdeflate_configuration_invalid2() {
291 const JSON_INVALID2: &str = r#"{
292 "level": 13
293 }"#;
294 assert!(serde_json::from_str::<GDeflateCodecConfiguration>(JSON_INVALID2).is_err());
295 }
296
297 #[test]
298 #[cfg_attr(miri, ignore)]
299 fn codec_gdeflate_round_trip1() {
300 let elements: Vec<u16> = (0..32).collect();
301 let bytes = crate::array::transmute_to_bytes_vec(elements);
302 let bytes_representation = BytesRepresentation::FixedSize(bytes.len() as u64);
303
304 let configuration: GDeflateCodecConfiguration = serde_json::from_str(JSON_VALID).unwrap();
305 let codec = GDeflateCodec::new_with_configuration(&configuration).unwrap();
306
307 let encoded = codec
308 .encode(Cow::Borrowed(&bytes), &CodecOptions::default())
309 .unwrap();
310 let decoded = codec
311 .decode(encoded, &bytes_representation, &CodecOptions::default())
312 .unwrap();
313 assert_eq!(bytes, decoded.to_vec());
314 }
315
316 #[test]
317 #[cfg_attr(miri, ignore)]
318 fn codec_gdeflate_partial_decode() {
319 let elements: Vec<u16> = (0..8).collect();
320 let bytes = crate::array::transmute_to_bytes_vec(elements);
321 let bytes_representation = BytesRepresentation::FixedSize(bytes.len() as u64);
322
323 let configuration: GDeflateCodecConfiguration = serde_json::from_str(JSON_VALID).unwrap();
324 let codec = Arc::new(GDeflateCodec::new_with_configuration(&configuration).unwrap());
325
326 let encoded = codec
327 .encode(Cow::Owned(bytes), &CodecOptions::default())
328 .unwrap();
329 let decoded_regions = [
330 ByteRange::FromStart(4, Some(4)),
331 ByteRange::FromStart(10, Some(2)),
332 ];
333
334 let input_handle = Arc::new(encoded);
335 let partial_decoder = codec
336 .partial_decoder(
337 input_handle.clone(),
338 &bytes_representation,
339 &CodecOptions::default(),
340 )
341 .unwrap();
342 assert_eq!(partial_decoder.size_held(), input_handle.size_held()); let decoded_partial_chunk = partial_decoder
344 .partial_decode_many(
345 Box::new(decoded_regions.into_iter()),
346 &CodecOptions::default(),
347 )
348 .unwrap()
349 .unwrap()
350 .concat();
351
352 let decoded_partial_chunk: Vec<u16> = decoded_partial_chunk
353 .clone()
354 .as_chunks::<2>()
355 .0
356 .iter()
357 .map(|b| u16::from_ne_bytes(*b))
358 .collect();
359 let answer: Vec<u16> = vec![2, 3, 5];
360 assert_eq!(answer, decoded_partial_chunk);
361 }
362
363 #[cfg(feature = "async")]
364 #[tokio::test]
365 #[cfg_attr(miri, ignore)]
366 async fn codec_gdeflate_async_partial_decode() {
367 let elements: Vec<u16> = (0..8).collect();
368 let bytes = crate::array::transmute_to_bytes_vec(elements);
369 let bytes_representation = BytesRepresentation::FixedSize(bytes.len() as u64);
370
371 let configuration: GDeflateCodecConfiguration = serde_json::from_str(JSON_VALID).unwrap();
372 let codec = Arc::new(GDeflateCodec::new_with_configuration(&configuration).unwrap());
373
374 let encoded = codec
375 .encode(Cow::Owned(bytes), &CodecOptions::default())
376 .unwrap();
377 let decoded_regions = [
378 ByteRange::FromStart(4, Some(4)),
379 ByteRange::FromStart(10, Some(2)),
380 ];
381
382 let input_handle = Arc::new(encoded);
383 let partial_decoder = codec
384 .async_partial_decoder(
385 input_handle,
386 &bytes_representation,
387 &CodecOptions::default(),
388 )
389 .await
390 .unwrap();
391 let decoded_partial_chunk = partial_decoder
392 .partial_decode_many(
393 Box::new(decoded_regions.into_iter()),
394 &CodecOptions::default(),
395 )
396 .await
397 .unwrap()
398 .unwrap()
399 .concat();
400
401 let decoded_partial_chunk: Vec<u16> = decoded_partial_chunk
402 .clone()
403 .as_chunks::<2>()
404 .0
405 .iter()
406 .map(|b| u16::from_ne_bytes(*b))
407 .collect();
408 let answer: Vec<u16> = vec![2, 3, 5];
409 assert_eq!(answer, decoded_partial_chunk);
410 }
411}