1use std::io::{Read, Write};
2use std::sync::Arc;
3
4use ad_core_rs::attributes::{NDAttrSource, NDAttrValue, NDAttribute};
5use ad_core_rs::codec::{Codec, CodecName};
6use ad_core_rs::ndarray::{NDArray, NDDataBuffer, NDDataType, NDDimension};
7use ad_core_rs::ndarray_pool::NDArrayPool;
8use ad_core_rs::plugin::runtime::{NDPluginProcess, ParamUpdate, ProcessResult};
9
10use flate2::Compression;
11use flate2::read::ZlibDecoder;
12use flate2::write::ZlibEncoder;
13use lz4_flex::block::{compress, decompress};
14use rust_hdf5::format::messages::filter::{
15 FILTER_BLOSC, Filter, FilterPipeline, apply_filters, reverse_filters,
16};
17
18const ATTR_ORIGINAL_DATA_TYPE: &str = "CODEC_ORIGINAL_DATA_TYPE";
20
21fn buffer_from_bytes(bytes: &[u8], data_type: NDDataType) -> Option<NDDataBuffer> {
26 let elem_size = data_type.element_size();
27 if bytes.len() % elem_size != 0 {
28 return None;
29 }
30 let count = bytes.len() / elem_size;
31
32 Some(match data_type {
33 NDDataType::Int8 => {
34 let mut v = vec![0i8; count];
35 unsafe {
37 std::ptr::copy_nonoverlapping(
38 bytes.as_ptr(),
39 v.as_mut_ptr() as *mut u8,
40 bytes.len(),
41 );
42 }
43 NDDataBuffer::I8(v)
44 }
45 NDDataType::UInt8 => NDDataBuffer::U8(bytes.to_vec()),
46 NDDataType::Int16 => {
47 let mut v = vec![0i16; count];
48 unsafe {
49 std::ptr::copy_nonoverlapping(
50 bytes.as_ptr(),
51 v.as_mut_ptr() as *mut u8,
52 bytes.len(),
53 );
54 }
55 NDDataBuffer::I16(v)
56 }
57 NDDataType::UInt16 => {
58 let mut v = vec![0u16; count];
59 unsafe {
60 std::ptr::copy_nonoverlapping(
61 bytes.as_ptr(),
62 v.as_mut_ptr() as *mut u8,
63 bytes.len(),
64 );
65 }
66 NDDataBuffer::U16(v)
67 }
68 NDDataType::Int32 => {
69 let mut v = vec![0i32; count];
70 unsafe {
71 std::ptr::copy_nonoverlapping(
72 bytes.as_ptr(),
73 v.as_mut_ptr() as *mut u8,
74 bytes.len(),
75 );
76 }
77 NDDataBuffer::I32(v)
78 }
79 NDDataType::UInt32 => {
80 let mut v = vec![0u32; count];
81 unsafe {
82 std::ptr::copy_nonoverlapping(
83 bytes.as_ptr(),
84 v.as_mut_ptr() as *mut u8,
85 bytes.len(),
86 );
87 }
88 NDDataBuffer::U32(v)
89 }
90 NDDataType::Int64 => {
91 let mut v = vec![0i64; count];
92 unsafe {
93 std::ptr::copy_nonoverlapping(
94 bytes.as_ptr(),
95 v.as_mut_ptr() as *mut u8,
96 bytes.len(),
97 );
98 }
99 NDDataBuffer::I64(v)
100 }
101 NDDataType::UInt64 => {
102 let mut v = vec![0u64; count];
103 unsafe {
104 std::ptr::copy_nonoverlapping(
105 bytes.as_ptr(),
106 v.as_mut_ptr() as *mut u8,
107 bytes.len(),
108 );
109 }
110 NDDataBuffer::U64(v)
111 }
112 NDDataType::Float32 => {
113 let mut v = vec![0f32; count];
114 unsafe {
115 std::ptr::copy_nonoverlapping(
116 bytes.as_ptr(),
117 v.as_mut_ptr() as *mut u8,
118 bytes.len(),
119 );
120 }
121 NDDataBuffer::F32(v)
122 }
123 NDDataType::Float64 => {
124 let mut v = vec![0f64; count];
125 unsafe {
126 std::ptr::copy_nonoverlapping(
127 bytes.as_ptr(),
128 v.as_mut_ptr() as *mut u8,
129 bytes.len(),
130 );
131 }
132 NDDataBuffer::F64(v)
133 }
134 })
135}
136
137pub fn compress_lz4(src: &NDArray) -> NDArray {
143 let raw = src.data.as_u8_slice();
144 let original_data_type = src.data.data_type();
145 let original_size = raw.len();
146 let compressed = compress(raw);
148 let compressed_size = compressed.len();
149
150 let mut arr = src.clone();
151 arr.data = NDDataBuffer::U8(compressed);
152 arr.codec = Some(Codec {
153 name: CodecName::LZ4,
154 compressed_size,
155 level: 0,
156 shuffle: 0,
157 compressor: 0,
158 });
159
160 arr.attributes.add(NDAttribute::new_static(
162 ATTR_ORIGINAL_DATA_TYPE,
163 "Original NDDataType ordinal before codec compression",
164 NDAttrSource::Driver,
165 NDAttrValue::UInt8(original_data_type as u8),
166 ));
167
168 tracing::debug!(
169 original_size,
170 compressed_size,
171 ratio = original_size as f64 / compressed_size.max(1) as f64,
172 "LZ4 compress"
173 );
174
175 arr
176}
177
178pub fn decompress_lz4(src: &NDArray) -> Option<NDArray> {
183 if src.codec.as_ref().map(|c| c.name) != Some(CodecName::LZ4) {
184 return None;
185 }
186 let compressed = src.data.as_u8_slice();
187 let original_type = src
190 .attributes
191 .get(ATTR_ORIGINAL_DATA_TYPE)
192 .and_then(|a| a.value.as_i64())
193 .and_then(|ord| NDDataType::from_ordinal(ord as u8))
194 .unwrap_or(NDDataType::UInt8);
195 let num_elements: usize = src.dims.iter().map(|d| d.size).product();
196 let uncompressed_size = num_elements * original_type.element_size();
197 let decompressed = decompress(compressed, uncompressed_size).ok()?;
198
199 let buffer = buffer_from_bytes(&decompressed, original_type)?;
200
201 let mut arr = src.clone();
202 arr.data = buffer;
203 arr.codec = None;
204 arr.attributes.remove(ATTR_ORIGINAL_DATA_TYPE);
205
206 Some(arr)
207}
208
209const ZLIB_DEFAULT_LEVEL: u32 = 6;
220
221pub fn compress_zlib(src: &NDArray) -> NDArray {
227 let raw = src.data.as_u8_slice();
228 let original_data_type = src.data.data_type();
229 let original_size = raw.len();
230
231 let mut encoder = ZlibEncoder::new(Vec::<u8>::new(), Compression::new(ZLIB_DEFAULT_LEVEL));
232 if encoder.write_all(raw).is_err() {
234 return src.clone();
235 }
236 let compressed = match encoder.finish() {
237 Ok(buf) => buf,
238 Err(_) => return src.clone(),
239 };
240 let compressed_size = compressed.len();
241
242 let mut arr = src.clone();
243 arr.data = NDDataBuffer::U8(compressed);
244 arr.codec = Some(Codec {
245 name: CodecName::Zlib,
246 compressed_size,
247 level: ZLIB_DEFAULT_LEVEL as i32,
248 shuffle: 0,
249 compressor: 0,
250 });
251 arr.attributes.add(NDAttribute::new_static(
252 ATTR_ORIGINAL_DATA_TYPE,
253 "Original NDDataType ordinal before codec compression",
254 NDAttrSource::Driver,
255 NDAttrValue::UInt8(original_data_type as u8),
256 ));
257
258 tracing::debug!(
259 original_size,
260 compressed_size,
261 ratio = original_size as f64 / compressed_size.max(1) as f64,
262 "Zlib compress"
263 );
264 arr
265}
266
267pub fn decompress_zlib(src: &NDArray) -> Option<NDArray> {
272 if src.codec.as_ref().map(|c| c.name) != Some(CodecName::Zlib) {
273 return None;
274 }
275 let compressed = src.data.as_u8_slice();
276
277 let original_type = src
278 .attributes
279 .get(ATTR_ORIGINAL_DATA_TYPE)
280 .and_then(|a| a.value.as_i64())
281 .and_then(|ord| NDDataType::from_ordinal(ord as u8))
282 .unwrap_or(NDDataType::UInt8);
283 let num_elements: usize = src.dims.iter().map(|d| d.size).product();
284 let uncompressed_size = num_elements * original_type.element_size();
285
286 let mut decoder = ZlibDecoder::new(compressed);
287 let mut decompressed = Vec::with_capacity(uncompressed_size);
288 decoder.read_to_end(&mut decompressed).ok()?;
289
290 let buffer = buffer_from_bytes(&decompressed, original_type)?;
291
292 let mut arr = src.clone();
293 arr.data = buffer;
294 arr.codec = None;
295 arr.attributes.remove(ATTR_ORIGINAL_DATA_TYPE);
296 Some(arr)
297}
298
299const LZ4HDF5_DEFAULT_BLOCK_SIZE: usize = 1 << 20;
319
320pub fn compress_lz4hdf5(src: &NDArray) -> NDArray {
327 let raw = src.data.as_u8_slice();
328 let data_type = src.data.data_type();
329 let original_size = raw.len();
330 let block_size = LZ4HDF5_DEFAULT_BLOCK_SIZE;
331
332 let mut out: Vec<u8> = Vec::with_capacity(original_size / 2 + 12);
334 out.extend_from_slice(&(original_size as u64).to_be_bytes());
335 out.extend_from_slice(&(block_size as u32).to_be_bytes());
336
337 let mut pos = 0usize;
338 while pos < raw.len() {
339 let n = block_size.min(raw.len() - pos);
340 let block = &raw[pos..pos + n];
341 let comp = compress(block);
342 if comp.len() < n {
345 out.extend_from_slice(&(comp.len() as u32).to_be_bytes());
346 out.extend_from_slice(&comp);
347 } else {
348 out.extend_from_slice(&(n as u32).to_be_bytes());
349 out.extend_from_slice(block);
350 }
351 pos += n;
352 }
353
354 let compressed_size = out.len();
355 let mut arr = src.clone();
356 arr.data = NDDataBuffer::U8(out);
357 arr.codec = Some(Codec {
358 name: CodecName::LZ4HDF5,
359 compressed_size,
360 level: 0,
361 shuffle: 0,
362 compressor: 0,
363 });
364 arr.attributes.add(NDAttribute::new_static(
365 ATTR_ORIGINAL_DATA_TYPE,
366 "Original NDDataType ordinal before codec compression",
367 NDAttrSource::Driver,
368 NDAttrValue::UInt8(data_type as u8),
369 ));
370
371 tracing::debug!(
372 original_size,
373 compressed_size,
374 ratio = original_size as f64 / compressed_size.max(1) as f64,
375 "LZ4HDF5 compress"
376 );
377 arr
378}
379
380pub fn decompress_lz4hdf5(src: &NDArray) -> Option<NDArray> {
384 if src.codec.as_ref().map(|c| c.name) != Some(CodecName::LZ4HDF5) {
385 return None;
386 }
387 let buf = src.data.as_u8_slice();
388 if buf.len() < 12 {
389 return None;
390 }
391 let total_bytes = u64::from_be_bytes(buf[0..8].try_into().ok()?) as usize;
392 let block_size = u32::from_be_bytes(buf[8..12].try_into().ok()?) as usize;
393 if block_size == 0 {
394 return None;
395 }
396
397 let original_type = src
398 .attributes
399 .get(ATTR_ORIGINAL_DATA_TYPE)
400 .and_then(|a| a.value.as_i64())
401 .and_then(|ord| NDDataType::from_ordinal(ord as u8))
402 .unwrap_or(NDDataType::UInt8);
403
404 let mut out: Vec<u8> = Vec::with_capacity(total_bytes);
405 let mut pos = 12usize;
406 while out.len() < total_bytes {
407 let n = block_size.min(total_bytes - out.len());
408 if pos + 4 > buf.len() {
409 return None;
410 }
411 let clen = u32::from_be_bytes(buf[pos..pos + 4].try_into().ok()?) as usize;
412 pos += 4;
413 if pos + clen > buf.len() {
414 return None;
415 }
416 let block_payload = &buf[pos..pos + clen];
417 if clen == n {
418 out.extend_from_slice(block_payload);
420 } else {
421 let block = decompress(block_payload, n).ok()?;
422 if block.len() != n {
423 return None;
424 }
425 out.extend_from_slice(&block);
426 }
427 pos += clen;
428 }
429 if out.len() != total_bytes {
430 return None;
431 }
432
433 let buffer = buffer_from_bytes(&out, original_type)?;
434 let mut arr = src.clone();
435 arr.data = buffer;
436 arr.codec = None;
437 arr.attributes.remove(ATTR_ORIGINAL_DATA_TYPE);
438 Some(arr)
439}
440
441const BSHUF_TARGET_BLOCK_SIZE_B: usize = 8192;
464const BSHUF_BLOCKED_MULT: usize = 8;
466
467fn bshuf_default_block_size(elem_size: usize) -> usize {
472 let mut bs = BSHUF_TARGET_BLOCK_SIZE_B / elem_size.max(1);
473 bs -= bs % BSHUF_BLOCKED_MULT;
474 bs.max(BSHUF_BLOCKED_MULT)
475}
476
477fn trans_byte_elem(input: &[u8], n: usize, elem_size: usize) -> Vec<u8> {
481 let mut out = vec![0u8; n * elem_size];
482 for e in 0..n {
483 for b in 0..elem_size {
484 out[b * n + e] = input[e * elem_size + b];
485 }
486 }
487 out
488}
489
490fn untrans_byte_elem(input: &[u8], n: usize, elem_size: usize) -> Vec<u8> {
492 let mut out = vec![0u8; n * elem_size];
493 for e in 0..n {
494 for b in 0..elem_size {
495 out[e * elem_size + b] = input[b * n + e];
496 }
497 }
498 out
499}
500
501fn trans_bit_elem_block(input: &[u8], n: usize, elem_size: usize) -> Vec<u8> {
510 debug_assert_eq!(n % 8, 0);
511 let byte_t = trans_byte_elem(input, n, elem_size);
513 let nbytes = byte_t.len();
516 let mut out = vec![0u8; nbytes];
517 let out_row = nbytes / 8; for byte_idx in 0..nbytes {
519 let v = byte_t[byte_idx];
520 for bit in 0..8 {
521 if (v >> bit) & 1 != 0 {
522 let dst = bit * out_row + byte_idx / 8;
524 out[dst] |= 1 << (byte_idx % 8);
525 }
526 }
527 }
528 out
529}
530
531fn untrans_bit_elem_block(input: &[u8], n: usize, elem_size: usize) -> Vec<u8> {
533 debug_assert_eq!(n % 8, 0);
534 let nbytes = n * elem_size;
535 let out_row = nbytes / 8;
536 let mut byte_t = vec![0u8; nbytes];
538 for byte_idx in 0..nbytes {
539 for bit in 0..8 {
540 let src = bit * out_row + byte_idx / 8;
541 if (input[src] >> (byte_idx % 8)) & 1 != 0 {
542 byte_t[byte_idx] |= 1 << bit;
543 }
544 }
545 }
546 untrans_byte_elem(&byte_t, n, elem_size)
548}
549
550fn bshuf_compress_block(input: &[u8], n: usize, elem_size: usize) -> Vec<u8> {
555 let shuffled = if n % 8 == 0 {
556 trans_bit_elem_block(input, n, elem_size)
557 } else {
558 trans_byte_elem(input, n, elem_size)
559 };
560 compress(&shuffled)
561}
562
563fn bshuf_decompress_block(compressed: &[u8], n: usize, elem_size: usize) -> Option<Vec<u8>> {
565 let raw_size = n * elem_size;
566 let shuffled = decompress(compressed, raw_size).ok()?;
567 if shuffled.len() != raw_size {
568 return None;
569 }
570 Some(if n % 8 == 0 {
571 untrans_bit_elem_block(&shuffled, n, elem_size)
572 } else {
573 untrans_byte_elem(&shuffled, n, elem_size)
574 })
575}
576
577pub fn compress_bslz4(src: &NDArray) -> NDArray {
584 let raw = src.data.as_u8_slice();
585 let data_type = src.data.data_type();
586 let elem_size = data_type.element_size();
587 let total_elems = if elem_size > 0 {
588 raw.len() / elem_size
589 } else {
590 0
591 };
592 let block_size = bshuf_default_block_size(elem_size);
593
594 let mut out: Vec<u8> = Vec::with_capacity(raw.len() / 2 + 16);
596 out.extend_from_slice(&(raw.len() as u64).to_be_bytes());
597 out.extend_from_slice(&(block_size as u32).to_be_bytes());
598
599 let mut elem = 0usize;
600 while elem < total_elems {
601 let n = block_size.min(total_elems - elem);
602 let byte_off = elem * elem_size;
603 let block = &raw[byte_off..byte_off + n * elem_size];
604 let comp = bshuf_compress_block(block, n, elem_size);
605 out.extend_from_slice(&(comp.len() as u32).to_be_bytes());
606 out.extend_from_slice(&comp);
607 elem += n;
608 }
609
610 let compressed_size = out.len();
611 let mut arr = src.clone();
612 arr.data = NDDataBuffer::U8(out);
613 arr.codec = Some(Codec {
614 name: CodecName::BSLZ4,
615 compressed_size,
616 level: 0,
617 shuffle: 0,
618 compressor: 0,
619 });
620 arr.attributes.add(NDAttribute::new_static(
621 ATTR_ORIGINAL_DATA_TYPE,
622 "Original NDDataType ordinal before codec compression",
623 NDAttrSource::Driver,
624 NDAttrValue::UInt8(data_type as u8),
625 ));
626
627 tracing::debug!(
628 original_size = raw.len(),
629 compressed_size,
630 ratio = raw.len() as f64 / compressed_size.max(1) as f64,
631 "BSLZ4 compress"
632 );
633 arr
634}
635
636pub fn decompress_bslz4(src: &NDArray) -> Option<NDArray> {
640 if src.codec.as_ref().map(|c| c.name) != Some(CodecName::BSLZ4) {
641 return None;
642 }
643 let buf = src.data.as_u8_slice();
644 if buf.len() < 12 {
645 return None;
646 }
647 let total_bytes = u64::from_be_bytes(buf[0..8].try_into().ok()?) as usize;
648 let block_size = u32::from_be_bytes(buf[8..12].try_into().ok()?) as usize;
649
650 let original_type = src
651 .attributes
652 .get(ATTR_ORIGINAL_DATA_TYPE)
653 .and_then(|a| a.value.as_i64())
654 .and_then(|ord| NDDataType::from_ordinal(ord as u8))
655 .unwrap_or(NDDataType::UInt8);
656 let elem_size = original_type.element_size();
657 if elem_size == 0 || block_size == 0 || total_bytes % elem_size != 0 {
658 return None;
659 }
660 let total_elems = total_bytes / elem_size;
661
662 let mut out: Vec<u8> = Vec::with_capacity(total_bytes);
663 let mut pos = 12usize;
664 let mut elem = 0usize;
665 while elem < total_elems {
666 let n = block_size.min(total_elems - elem);
667 if pos + 4 > buf.len() {
668 return None;
669 }
670 let clen = u32::from_be_bytes(buf[pos..pos + 4].try_into().ok()?) as usize;
671 pos += 4;
672 if pos + clen > buf.len() {
673 return None;
674 }
675 let block = bshuf_decompress_block(&buf[pos..pos + clen], n, elem_size)?;
676 out.extend_from_slice(&block);
677 pos += clen;
678 elem += n;
679 }
680 if out.len() != total_bytes {
681 return None;
682 }
683
684 let buffer = buffer_from_bytes(&out, original_type)?;
685 let mut arr = src.clone();
686 arr.data = buffer;
687 arr.codec = None;
688 arr.attributes.remove(ATTR_ORIGINAL_DATA_TYPE);
689 Some(arr)
690}
691
692pub fn compress_jpeg(src: &NDArray, quality: u8) -> Option<NDArray> {
700 if src.data.data_type() != NDDataType::UInt8 {
701 return None;
702 }
703
704 let raw = src.data.as_u8_slice();
705 let info = src.info();
706
707 if info.x_size > u16::MAX as usize || info.y_size > u16::MAX as usize {
709 return None;
710 }
711
712 let (width, height, color_type) = match src.dims.len() {
713 2 => {
714 (
716 info.x_size as u16,
717 info.y_size as u16,
718 jpeg_encoder::ColorType::Luma,
719 )
720 }
721 3 if src.dims[0].size == 3 => {
722 (
724 info.x_size as u16,
725 info.y_size as u16,
726 jpeg_encoder::ColorType::Rgb,
727 )
728 }
729 _ => return None,
730 };
731
732 let mut jpeg_buf = Vec::new();
733 let encoder = jpeg_encoder::Encoder::new(&mut jpeg_buf, quality);
734 if encoder.encode(raw, width, height, color_type).is_err() {
735 return None;
736 }
737
738 let compressed_size = jpeg_buf.len();
739 let original_size = raw.len();
740
741 let mut arr = src.clone();
742 arr.data = NDDataBuffer::U8(jpeg_buf);
743 arr.codec = Some(Codec {
744 name: CodecName::JPEG,
745 compressed_size,
746 level: 0,
747 shuffle: 0,
748 compressor: 0,
749 });
750
751 tracing::debug!(
752 original_size,
753 compressed_size,
754 ratio = original_size as f64 / compressed_size.max(1) as f64,
755 "JPEG compress (quality={})",
756 quality,
757 );
758
759 Some(arr)
760}
761
762pub fn decompress_jpeg(src: &NDArray) -> Option<NDArray> {
769 if src.codec.as_ref().map(|c| c.name) != Some(CodecName::JPEG) {
770 return None;
771 }
772
773 let compressed = src.data.as_u8_slice();
774 let mut decoder = jpeg_decoder::Decoder::new(compressed);
775 let pixels = decoder.decode().ok()?;
776 let metadata = decoder.info()?;
777
778 let width = metadata.width as usize;
779 let height = metadata.height as usize;
780
781 let dims = match metadata.pixel_format {
782 jpeg_decoder::PixelFormat::L8 => {
783 vec![NDDimension::new(width), NDDimension::new(height)]
785 }
786 jpeg_decoder::PixelFormat::RGB24 => {
787 vec![
789 NDDimension::new(3),
790 NDDimension::new(width),
791 NDDimension::new(height),
792 ]
793 }
794 _ => return None,
795 };
796
797 let mut arr = src.clone();
798 arr.dims = dims;
799 arr.data = NDDataBuffer::U8(pixels);
800 arr.codec = None;
801
802 Some(arr)
803}
804
805#[derive(Debug, Clone, Copy)]
807pub struct BloscConfig {
808 pub compressor: u32,
810 pub clevel: u32,
812 pub shuffle: u32,
814}
815
816impl Default for BloscConfig {
817 fn default() -> Self {
818 Self {
819 compressor: 0,
820 clevel: 3,
821 shuffle: 0,
822 }
823 }
824}
825
826pub fn compress_blosc(src: &NDArray, config: &BloscConfig) -> NDArray {
828 let raw = src.data.as_u8_slice();
829 let element_size = src.data.data_type().element_size();
830
831 let pipeline = FilterPipeline {
832 filters: vec![Filter {
833 id: FILTER_BLOSC,
834 flags: 0,
835 cd_values: vec![
836 2, 2, element_size as u32, raw.len() as u32, config.shuffle, config.compressor, config.clevel, ],
844 }],
845 };
846
847 let compressed = match apply_filters(&pipeline, raw) {
848 Ok(data) => data,
849 Err(_) => return src.clone(),
850 };
851
852 let compressed_size = compressed.len();
853 let mut arr = src.clone();
854 arr.attributes.add(NDAttribute::new_static(
855 ATTR_ORIGINAL_DATA_TYPE,
856 "Original NDDataType ordinal before codec compression",
857 NDAttrSource::Driver,
858 NDAttrValue::UInt8(src.data.data_type() as u8),
859 ));
860 arr.data = NDDataBuffer::U8(compressed);
861 arr.codec = Some(Codec {
862 name: CodecName::Blosc,
863 compressed_size,
864 level: 0,
865 shuffle: 0,
866 compressor: 0,
867 });
868 arr
869}
870
871pub fn decompress_blosc(src: &NDArray) -> Option<NDArray> {
873 if src.codec.as_ref().map(|c| c.name) != Some(CodecName::Blosc) {
874 return None;
875 }
876
877 let compressed = src.data.as_u8_slice();
878
879 let pipeline = FilterPipeline {
881 filters: vec![Filter {
882 id: FILTER_BLOSC,
883 flags: 0,
884 cd_values: vec![],
885 }],
886 };
887
888 let decompressed = reverse_filters(&pipeline, compressed).ok()?;
889
890 let original_type = src
891 .attributes
892 .get(ATTR_ORIGINAL_DATA_TYPE)
893 .and_then(|a| a.value.as_i64())
894 .and_then(|ord| NDDataType::from_ordinal(ord as u8))
895 .unwrap_or(NDDataType::UInt8);
896
897 let buffer = buffer_from_bytes(&decompressed, original_type)?;
898
899 let mut arr = src.clone();
900 arr.data = buffer;
901 arr.codec = None;
902 arr.attributes.remove(ATTR_ORIGINAL_DATA_TYPE);
903 Some(arr)
904}
905
906#[derive(Debug, Clone, Copy, PartialEq, Eq)]
908pub enum CodecMode {
909 Compress { codec: CodecName, quality: u8 },
911 Decompress,
913}
914
915#[derive(Default)]
919struct CodecParamIndices {
920 mode: Option<usize>,
921 compressor: Option<usize>,
922 comp_factor: Option<usize>,
923 jpeg_quality: Option<usize>,
924 blosc_compressor: Option<usize>,
925 blosc_clevel: Option<usize>,
926 blosc_shuffle: Option<usize>,
927 blosc_numthreads: Option<usize>,
928 codec_status: Option<usize>,
929 codec_error: Option<usize>,
930}
931
932pub struct CodecProcessor {
933 mode: CodecMode,
934 compression_ratio: f64,
935 jpeg_quality: u8,
936 blosc_config: BloscConfig,
937 params: CodecParamIndices,
938}
939
940impl CodecProcessor {
941 pub fn new(mode: CodecMode) -> Self {
942 let quality = match mode {
943 CodecMode::Compress { quality, .. } => quality,
944 _ => 85,
945 };
946 Self {
947 mode,
948 compression_ratio: 1.0,
949 jpeg_quality: quality,
950 blosc_config: BloscConfig::default(),
951 params: CodecParamIndices::default(),
952 }
953 }
954
955 pub fn compression_ratio(&self) -> f64 {
958 self.compression_ratio
959 }
960}
961
962impl NDPluginProcess for CodecProcessor {
963 fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
964 let original_bytes = array.data.as_u8_slice().len();
965
966 let result = match self.mode {
967 CodecMode::Compress { .. } if array.codec.is_some() => {
968 Some(array.clone())
970 }
971 CodecMode::Compress {
972 codec: CodecName::LZ4,
973 ..
974 } => Some(compress_lz4(array)),
975 CodecMode::Compress {
976 codec: CodecName::JPEG,
977 ..
978 } => compress_jpeg(array, self.jpeg_quality),
979 CodecMode::Compress {
980 codec: CodecName::Zlib,
981 ..
982 } => Some(compress_zlib(array)),
983 CodecMode::Compress {
984 codec: CodecName::Blosc,
985 ..
986 } => Some(compress_blosc(array, &self.blosc_config)),
987 CodecMode::Compress {
988 codec: CodecName::LZ4HDF5,
989 ..
990 } => Some(compress_lz4hdf5(array)),
991 CodecMode::Compress {
992 codec: CodecName::BSLZ4,
993 ..
994 } => Some(compress_bslz4(array)),
995 CodecMode::Compress { .. } => None,
996 CodecMode::Decompress => match array.codec.as_ref().map(|c| c.name) {
997 Some(CodecName::LZ4) => decompress_lz4(array),
998 Some(CodecName::JPEG) => decompress_jpeg(array),
999 Some(CodecName::Zlib) => decompress_zlib(array),
1000 Some(CodecName::Blosc) => decompress_blosc(array),
1001 Some(CodecName::LZ4HDF5) => decompress_lz4hdf5(array),
1002 Some(CodecName::BSLZ4) => decompress_bslz4(array),
1003 _ => None,
1004 },
1005 };
1006
1007 let mut updates = Vec::new();
1008
1009 match result {
1010 Some(ref out) => {
1011 let output_bytes = out.data.as_u8_slice().len();
1012 match self.mode {
1013 CodecMode::Compress { .. } => {
1014 self.compression_ratio = original_bytes as f64 / output_bytes.max(1) as f64;
1015 }
1016 CodecMode::Decompress => {
1017 self.compression_ratio = output_bytes as f64 / original_bytes.max(1) as f64;
1018 }
1019 }
1020 if let Some(idx) = self.params.comp_factor {
1021 updates.push(ParamUpdate::float64(idx, self.compression_ratio));
1022 }
1023 if let Some(idx) = self.params.codec_status {
1024 updates.push(ParamUpdate::int32(idx, 0)); }
1026 if let Some(idx) = self.params.codec_error {
1027 updates.push(ParamUpdate::Octet {
1028 reason: idx,
1029 addr: 0,
1030 value: String::new(),
1031 });
1032 }
1033 let mut r = ProcessResult::arrays(vec![Arc::new(out.clone())]);
1034 r.param_updates = updates;
1035 r
1036 }
1037 None => {
1038 self.compression_ratio = 1.0;
1040 if let Some(idx) = self.params.comp_factor {
1041 updates.push(ParamUpdate::float64(idx, 1.0));
1042 }
1043 if let Some(idx) = self.params.codec_status {
1044 updates.push(ParamUpdate::int32(idx, 1)); }
1046 if let Some(idx) = self.params.codec_error {
1047 updates.push(ParamUpdate::Octet {
1048 reason: idx,
1049 addr: 0,
1050 value: "codec operation failed or unsupported".to_string(),
1051 });
1052 }
1053 let mut r = ProcessResult::arrays(vec![Arc::new(array.clone())]);
1054 r.param_updates = updates;
1055 r
1056 }
1057 }
1058 }
1059
1060 fn plugin_type(&self) -> &str {
1061 "NDPluginCodec"
1062 }
1063
1064 fn register_params(
1065 &mut self,
1066 base: &mut asyn_rs::port::PortDriverBase,
1067 ) -> asyn_rs::error::AsynResult<()> {
1068 use asyn_rs::param::ParamType;
1069 base.create_param("MODE", ParamType::Int32)?;
1070 base.create_param("COMPRESSOR", ParamType::Int32)?;
1071 base.create_param("COMP_FACTOR", ParamType::Float64)?;
1072 base.create_param("JPEG_QUALITY", ParamType::Int32)?;
1073 base.create_param("BLOSC_COMPRESSOR", ParamType::Int32)?;
1074 base.create_param("BLOSC_CLEVEL", ParamType::Int32)?;
1075 base.create_param("BLOSC_SHUFFLE", ParamType::Int32)?;
1076 base.create_param("BLOSC_NUMTHREADS", ParamType::Int32)?;
1077 base.create_param("CODEC_STATUS", ParamType::Int32)?;
1078 base.create_param("CODEC_ERROR", ParamType::Octet)?;
1079
1080 self.params.mode = base.find_param("MODE");
1081 self.params.compressor = base.find_param("COMPRESSOR");
1082 self.params.comp_factor = base.find_param("COMP_FACTOR");
1083 self.params.jpeg_quality = base.find_param("JPEG_QUALITY");
1084 self.params.blosc_compressor = base.find_param("BLOSC_COMPRESSOR");
1085 self.params.blosc_clevel = base.find_param("BLOSC_CLEVEL");
1086 self.params.blosc_shuffle = base.find_param("BLOSC_SHUFFLE");
1087 self.params.blosc_numthreads = base.find_param("BLOSC_NUMTHREADS");
1088 self.params.codec_status = base.find_param("CODEC_STATUS");
1089 self.params.codec_error = base.find_param("CODEC_ERROR");
1090 Ok(())
1091 }
1092
1093 fn on_param_change(
1094 &mut self,
1095 reason: usize,
1096 params: &ad_core_rs::plugin::runtime::PluginParamSnapshot,
1097 ) -> ad_core_rs::plugin::runtime::ParamChangeResult {
1098 if Some(reason) == self.params.mode {
1099 let v = params.value.as_i32();
1100 if v == 0 {
1101 let codec = match self.mode {
1103 CodecMode::Compress { codec, .. } => codec,
1104 _ => CodecName::LZ4,
1105 };
1106 self.mode = CodecMode::Compress {
1107 codec,
1108 quality: self.jpeg_quality,
1109 };
1110 } else {
1111 self.mode = CodecMode::Decompress;
1112 }
1113 } else if Some(reason) == self.params.compressor {
1114 let codec = match params.value.as_i32() {
1117 0 => CodecName::None,
1118 1 => CodecName::JPEG,
1119 2 => CodecName::Zlib,
1120 3 => CodecName::Blosc,
1121 4 => CodecName::LZ4,
1122 5 => CodecName::LZ4HDF5,
1123 6 => CodecName::BSLZ4,
1124 _ => CodecName::None,
1125 };
1126 if let CodecMode::Compress { .. } = self.mode {
1127 self.mode = CodecMode::Compress {
1128 codec,
1129 quality: self.jpeg_quality,
1130 };
1131 }
1132 } else if Some(reason) == self.params.jpeg_quality {
1133 self.jpeg_quality = params.value.as_i32().clamp(1, 100) as u8;
1134 if let CodecMode::Compress { codec, .. } = self.mode {
1135 self.mode = CodecMode::Compress {
1136 codec,
1137 quality: self.jpeg_quality,
1138 };
1139 }
1140 } else if Some(reason) == self.params.blosc_compressor {
1141 self.blosc_config.compressor = params.value.as_i32().max(0) as u32;
1142 } else if Some(reason) == self.params.blosc_clevel {
1143 self.blosc_config.clevel = params.value.as_i32().clamp(0, 9) as u32;
1144 } else if Some(reason) == self.params.blosc_shuffle {
1145 self.blosc_config.shuffle = params.value.as_i32().max(0) as u32;
1146 }
1147
1148 ad_core_rs::plugin::runtime::ParamChangeResult::updates(vec![])
1149 }
1150}
1151
1152#[cfg(test)]
1153mod tests {
1154 use super::*;
1155
1156 fn make_u8_array(width: usize, height: usize) -> NDArray {
1157 let mut arr = NDArray::new(
1158 vec![NDDimension::new(width), NDDimension::new(height)],
1159 NDDataType::UInt8,
1160 );
1161 if let NDDataBuffer::U8(ref mut v) = arr.data {
1162 for i in 0..v.len() {
1163 v[i] = (i % 256) as u8;
1164 }
1165 }
1166 arr
1167 }
1168
1169 fn make_rgb_array(width: usize, height: usize) -> NDArray {
1170 use ad_core_rs::attributes::{NDAttrSource, NDAttrValue, NDAttribute};
1171 let mut arr = NDArray::new(
1172 vec![
1173 NDDimension::new(3),
1174 NDDimension::new(width),
1175 NDDimension::new(height),
1176 ],
1177 NDDataType::UInt8,
1178 );
1179 arr.attributes.add(NDAttribute::new_static(
1181 "ColorMode",
1182 "Color Mode",
1183 NDAttrSource::Driver,
1184 NDAttrValue::Int32(2), ));
1186 if let NDDataBuffer::U8(ref mut v) = arr.data {
1187 for i in 0..v.len() {
1188 v[i] = (i % 256) as u8;
1189 }
1190 }
1191 arr
1192 }
1193
1194 #[test]
1197 fn test_lz4_roundtrip_u8() {
1198 let arr = make_u8_array(4, 4);
1199 let original_data = arr.data.as_u8_slice().to_vec();
1200
1201 let compressed = compress_lz4(&arr);
1202 assert_eq!(compressed.codec.as_ref().unwrap().name, CodecName::LZ4);
1203 assert_ne!(compressed.data.as_u8_slice(), original_data.as_slice());
1205
1206 let decompressed = decompress_lz4(&compressed).unwrap();
1207 assert!(decompressed.codec.is_none());
1208 assert_eq!(decompressed.data.data_type(), NDDataType::UInt8);
1209 assert_eq!(decompressed.data.as_u8_slice(), original_data.as_slice());
1210 }
1211
1212 #[test]
1213 fn test_lz4_roundtrip_u16() {
1214 let mut arr = NDArray::new(
1215 vec![NDDimension::new(8), NDDimension::new(8)],
1216 NDDataType::UInt16,
1217 );
1218 if let NDDataBuffer::U16(ref mut v) = arr.data {
1219 for i in 0..v.len() {
1220 v[i] = (i * 100) as u16;
1221 }
1222 }
1223 let original_bytes = arr.data.as_u8_slice().to_vec();
1224
1225 let compressed = compress_lz4(&arr);
1226 assert_eq!(compressed.codec.as_ref().unwrap().name, CodecName::LZ4);
1227 let dt_attr = compressed.attributes.get(ATTR_ORIGINAL_DATA_TYPE).unwrap();
1229 assert_eq!(dt_attr.value, NDAttrValue::UInt8(NDDataType::UInt16 as u8));
1230
1231 let decompressed = decompress_lz4(&compressed).unwrap();
1232 assert!(decompressed.codec.is_none());
1233 assert_eq!(decompressed.data.data_type(), NDDataType::UInt16);
1234 assert_eq!(decompressed.data.as_u8_slice(), original_bytes.as_slice());
1235 assert!(
1237 decompressed
1238 .attributes
1239 .get(ATTR_ORIGINAL_DATA_TYPE)
1240 .is_none()
1241 );
1242 }
1243
1244 #[test]
1245 fn test_lz4_roundtrip_f64() {
1246 let mut arr = NDArray::new(vec![NDDimension::new(16)], NDDataType::Float64);
1247 if let NDDataBuffer::F64(ref mut v) = arr.data {
1248 for i in 0..v.len() {
1249 v[i] = i as f64 * 1.5;
1250 }
1251 }
1252 let original_bytes = arr.data.as_u8_slice().to_vec();
1253
1254 let compressed = compress_lz4(&arr);
1255 let decompressed = decompress_lz4(&compressed).unwrap();
1256 assert_eq!(decompressed.data.data_type(), NDDataType::Float64);
1257 assert_eq!(decompressed.data.as_u8_slice(), original_bytes.as_slice());
1258 }
1259
1260 #[test]
1261 fn test_lz4_compresses_repetitive_data() {
1262 let mut arr = NDArray::new(
1264 vec![NDDimension::new(256), NDDimension::new(256)],
1265 NDDataType::UInt8,
1266 );
1267 if let NDDataBuffer::U8(ref mut v) = arr.data {
1269 for x in v.iter_mut() {
1270 *x = 0;
1271 }
1272 }
1273 let original_size = arr.data.as_u8_slice().len();
1274
1275 let compressed = compress_lz4(&arr);
1276 let compressed_size = compressed.codec.as_ref().unwrap().compressed_size;
1277 assert!(
1278 compressed_size < original_size,
1279 "compressed ({}) should be smaller than original ({})",
1280 compressed_size,
1281 original_size,
1282 );
1283 }
1284
1285 #[test]
1286 fn test_lz4_preserves_metadata() {
1287 let mut arr = make_u8_array(4, 4);
1288 arr.unique_id = 42;
1289
1290 let compressed = compress_lz4(&arr);
1291 assert_eq!(compressed.unique_id, 42);
1292 assert_eq!(compressed.dims.len(), 2);
1293 assert_eq!(compressed.dims[0].size, 4);
1294 assert_eq!(compressed.dims[1].size, 4);
1295 }
1296
1297 #[test]
1300 fn test_bitshuffle_block_transpose_roundtrip() {
1301 let elem_size = 4;
1304 let n = 16;
1305 let input: Vec<u8> = (0..n * elem_size).map(|i| (i * 7 + 3) as u8).collect();
1306 let shuffled = trans_bit_elem_block(&input, n, elem_size);
1307 assert_eq!(shuffled.len(), input.len());
1308 let restored = untrans_bit_elem_block(&shuffled, n, elem_size);
1309 assert_eq!(restored, input);
1310 }
1311
1312 #[test]
1313 fn test_bitshuffle_partial_block_byte_transpose_roundtrip() {
1314 let elem_size = 2;
1316 let n = 5;
1317 let input: Vec<u8> = (0..n * elem_size).map(|i| (i * 13 + 1) as u8).collect();
1318 let t = trans_byte_elem(&input, n, elem_size);
1319 let restored = untrans_byte_elem(&t, n, elem_size);
1320 assert_eq!(restored, input);
1321 }
1322
1323 #[test]
1324 fn test_bslz4_roundtrip_u8() {
1325 let mut arr = NDArray::new(
1326 vec![NDDimension::new(64), NDDimension::new(64)],
1327 NDDataType::UInt8,
1328 );
1329 if let NDDataBuffer::U8(ref mut v) = arr.data {
1330 for (i, x) in v.iter_mut().enumerate() {
1331 *x = (i % 251) as u8;
1332 }
1333 }
1334 let original = arr.data.as_u8_slice().to_vec();
1335
1336 let compressed = compress_bslz4(&arr);
1337 assert_eq!(compressed.codec.as_ref().unwrap().name, CodecName::BSLZ4);
1338 assert_ne!(compressed.data.as_u8_slice(), original.as_slice());
1339
1340 let decompressed = decompress_bslz4(&compressed).unwrap();
1341 assert!(decompressed.codec.is_none());
1342 assert_eq!(decompressed.data.data_type(), NDDataType::UInt8);
1343 assert_eq!(decompressed.data.as_u8_slice(), original.as_slice());
1344 }
1345
1346 #[test]
1347 fn test_bslz4_roundtrip_u16() {
1348 let mut arr = NDArray::new(
1349 vec![NDDimension::new(100), NDDimension::new(20)],
1350 NDDataType::UInt16,
1351 );
1352 if let NDDataBuffer::U16(ref mut v) = arr.data {
1353 for (i, x) in v.iter_mut().enumerate() {
1354 *x = (i * 37 % 65521) as u16;
1355 }
1356 }
1357 let original = arr.data.as_u8_slice().to_vec();
1358
1359 let compressed = compress_bslz4(&arr);
1360 let decompressed = decompress_bslz4(&compressed).unwrap();
1361 assert_eq!(decompressed.data.data_type(), NDDataType::UInt16);
1362 assert_eq!(decompressed.data.as_u8_slice(), original.as_slice());
1363 assert!(
1364 decompressed
1365 .attributes
1366 .get(ATTR_ORIGINAL_DATA_TYPE)
1367 .is_none()
1368 );
1369 }
1370
1371 #[test]
1372 fn test_bslz4_roundtrip_f64_with_negatives() {
1373 let mut arr = NDArray::new(vec![NDDimension::new(73)], NDDataType::Float64);
1374 if let NDDataBuffer::F64(ref mut v) = arr.data {
1375 for (i, x) in v.iter_mut().enumerate() {
1376 *x = (i as f64 - 36.0) * 2.5;
1377 }
1378 }
1379 let original = arr.data.as_u8_slice().to_vec();
1380
1381 let compressed = compress_bslz4(&arr);
1382 let decompressed = decompress_bslz4(&compressed).unwrap();
1383 assert_eq!(decompressed.data.data_type(), NDDataType::Float64);
1384 assert_eq!(decompressed.data.as_u8_slice(), original.as_slice());
1385 }
1386
1387 #[test]
1388 fn test_bslz4_roundtrip_multi_block() {
1389 let elem_size = 4usize;
1392 let block = bshuf_default_block_size(elem_size);
1393 let count = block * 2 + block / 2 + 3;
1395 let mut arr = NDArray::new(vec![NDDimension::new(count)], NDDataType::Int32);
1396 if let NDDataBuffer::I32(ref mut v) = arr.data {
1397 for (i, x) in v.iter_mut().enumerate() {
1398 *x = (i as i32).wrapping_mul(2_654_435_761u32 as i32);
1399 }
1400 }
1401 let original = arr.data.as_u8_slice().to_vec();
1402
1403 let compressed = compress_bslz4(&arr);
1404 let decompressed = decompress_bslz4(&compressed).unwrap();
1405 assert_eq!(decompressed.data.as_u8_slice(), original.as_slice());
1406 }
1407
1408 #[test]
1409 fn test_bslz4_compresses_repetitive_data() {
1410 let arr = NDArray::new(
1412 vec![NDDimension::new(256), NDDimension::new(256)],
1413 NDDataType::UInt16,
1414 );
1415 let original_size = arr.data.as_u8_slice().len();
1416 let compressed = compress_bslz4(&arr);
1417 let compressed_size = compressed.codec.as_ref().unwrap().compressed_size;
1418 assert!(
1419 compressed_size < original_size,
1420 "bslz4 compressed ({compressed_size}) should be < original ({original_size})"
1421 );
1422 }
1423
1424 #[test]
1425 fn test_bslz4_via_processor() {
1426 let mut arr = NDArray::new(
1428 vec![NDDimension::new(32), NDDimension::new(32)],
1429 NDDataType::UInt16,
1430 );
1431 if let NDDataBuffer::U16(ref mut v) = arr.data {
1432 for (i, x) in v.iter_mut().enumerate() {
1433 *x = (i * 11) as u16;
1434 }
1435 }
1436 let original = arr.data.as_u8_slice().to_vec();
1437 let pool = NDArrayPool::new(10_000_000);
1438
1439 let mut comp = CodecProcessor::new(CodecMode::Compress {
1440 codec: CodecName::BSLZ4,
1441 quality: 0,
1442 });
1443 let compressed = comp.process_array(&arr, &pool);
1444 let compressed_arr = &compressed.output_arrays[0];
1445 assert_eq!(
1446 compressed_arr.codec.as_ref().unwrap().name,
1447 CodecName::BSLZ4
1448 );
1449
1450 let mut decomp = CodecProcessor::new(CodecMode::Decompress);
1451 let result = decomp.process_array(compressed_arr, &pool);
1452 assert_eq!(
1453 result.output_arrays[0].data.as_u8_slice(),
1454 original.as_slice()
1455 );
1456 }
1457
1458 #[test]
1461 fn test_jpeg_compress_mono() {
1462 let arr = make_u8_array(16, 16);
1463 let compressed = compress_jpeg(&arr, 90).unwrap();
1464 assert_eq!(compressed.codec.as_ref().unwrap().name, CodecName::JPEG);
1465 let data = compressed.data.as_u8_slice();
1467 assert_eq!(&data[0..2], &[0xFF, 0xD8]);
1468 }
1469
1470 #[test]
1471 fn test_jpeg_compress_rgb() {
1472 let arr = make_rgb_array(16, 16);
1473 let compressed = compress_jpeg(&arr, 90).unwrap();
1474 assert_eq!(compressed.codec.as_ref().unwrap().name, CodecName::JPEG);
1475 let data = compressed.data.as_u8_slice();
1476 assert_eq!(&data[0..2], &[0xFF, 0xD8]);
1477 }
1478
1479 #[test]
1480 fn test_jpeg_roundtrip_mono() {
1481 let arr = make_u8_array(16, 16);
1482 let compressed = compress_jpeg(&arr, 100).unwrap();
1483 let decompressed = decompress_jpeg(&compressed).unwrap();
1484 assert!(decompressed.codec.is_none());
1485 assert_eq!(decompressed.dims.len(), 2);
1486 assert_eq!(decompressed.dims[0].size, 16); assert_eq!(decompressed.dims[1].size, 16); assert_eq!(decompressed.data.data_type(), NDDataType::UInt8);
1489 assert_eq!(decompressed.data.len(), 16 * 16);
1491 }
1492
1493 #[test]
1494 fn test_jpeg_roundtrip_rgb() {
1495 let arr = make_rgb_array(16, 16);
1496 let compressed = compress_jpeg(&arr, 100).unwrap();
1497 let decompressed = decompress_jpeg(&compressed).unwrap();
1498 assert!(decompressed.codec.is_none());
1499 assert_eq!(decompressed.dims.len(), 3);
1500 assert_eq!(decompressed.dims[0].size, 3); assert_eq!(decompressed.dims[1].size, 16); assert_eq!(decompressed.dims[2].size, 16); assert_eq!(decompressed.data.len(), 3 * 16 * 16);
1504 }
1505
1506 #[test]
1507 fn test_jpeg_rejects_non_u8() {
1508 let arr = NDArray::new(
1509 vec![NDDimension::new(8), NDDimension::new(8)],
1510 NDDataType::UInt16,
1511 );
1512 assert!(compress_jpeg(&arr, 90).is_none());
1513 }
1514
1515 #[test]
1516 fn test_jpeg_rejects_1d() {
1517 let arr = NDArray::new(vec![NDDimension::new(64)], NDDataType::UInt8);
1518 assert!(compress_jpeg(&arr, 90).is_none());
1519 }
1520
1521 #[test]
1522 fn test_jpeg_quality_affects_size() {
1523 let arr = make_u8_array(64, 64);
1524 let high = compress_jpeg(&arr, 95).unwrap();
1525 let low = compress_jpeg(&arr, 10).unwrap();
1526 let high_size = high.codec.as_ref().unwrap().compressed_size;
1527 let low_size = low.codec.as_ref().unwrap().compressed_size;
1528 assert!(
1529 high_size > low_size,
1530 "high quality ({}) should produce larger output than low quality ({})",
1531 high_size,
1532 low_size,
1533 );
1534 }
1535
1536 #[test]
1539 fn test_zlib_roundtrip_u8() {
1540 let arr = make_u8_array(8, 8);
1541 let original = arr.data.as_u8_slice().to_vec();
1542
1543 let compressed = compress_zlib(&arr);
1544 assert_eq!(compressed.codec.as_ref().unwrap().name, CodecName::Zlib);
1545 assert_ne!(compressed.data.as_u8_slice(), original.as_slice());
1546
1547 let decompressed = decompress_zlib(&compressed).unwrap();
1548 assert!(decompressed.codec.is_none());
1549 assert_eq!(decompressed.data.data_type(), NDDataType::UInt8);
1550 assert_eq!(decompressed.data.as_u8_slice(), original.as_slice());
1551 }
1552
1553 #[test]
1554 fn test_zlib_roundtrip_u16() {
1555 let mut arr = NDArray::new(
1556 vec![NDDimension::new(16), NDDimension::new(16)],
1557 NDDataType::UInt16,
1558 );
1559 if let NDDataBuffer::U16(ref mut v) = arr.data {
1560 for (i, x) in v.iter_mut().enumerate() {
1561 *x = (i * 257 % 65521) as u16;
1562 }
1563 }
1564 let original = arr.data.as_u8_slice().to_vec();
1565
1566 let compressed = compress_zlib(&arr);
1567 let dt_attr = compressed.attributes.get(ATTR_ORIGINAL_DATA_TYPE).unwrap();
1568 assert_eq!(dt_attr.value, NDAttrValue::UInt8(NDDataType::UInt16 as u8));
1569
1570 let decompressed = decompress_zlib(&compressed).unwrap();
1571 assert_eq!(decompressed.data.data_type(), NDDataType::UInt16);
1572 assert_eq!(decompressed.data.as_u8_slice(), original.as_slice());
1573 assert!(
1574 decompressed
1575 .attributes
1576 .get(ATTR_ORIGINAL_DATA_TYPE)
1577 .is_none()
1578 );
1579 }
1580
1581 #[test]
1582 fn test_zlib_roundtrip_f64_with_negatives() {
1583 let mut arr = NDArray::new(vec![NDDimension::new(64)], NDDataType::Float64);
1584 if let NDDataBuffer::F64(ref mut v) = arr.data {
1585 for (i, x) in v.iter_mut().enumerate() {
1586 *x = (i as f64 - 32.0) * 3.25;
1587 }
1588 }
1589 let original = arr.data.as_u8_slice().to_vec();
1590
1591 let compressed = compress_zlib(&arr);
1592 let decompressed = decompress_zlib(&compressed).unwrap();
1593 assert_eq!(decompressed.data.data_type(), NDDataType::Float64);
1594 assert_eq!(decompressed.data.as_u8_slice(), original.as_slice());
1595 }
1596
1597 #[test]
1598 fn test_zlib_compresses_repetitive_data() {
1599 let arr = NDArray::new(
1600 vec![NDDimension::new(256), NDDimension::new(256)],
1601 NDDataType::UInt8,
1602 );
1603 let original_size = arr.data.as_u8_slice().len();
1604 let compressed = compress_zlib(&arr);
1605 let compressed_size = compressed.codec.as_ref().unwrap().compressed_size;
1606 assert!(
1607 compressed_size < original_size,
1608 "zlib compressed ({compressed_size}) should be < original ({original_size})"
1609 );
1610 }
1611
1612 #[test]
1613 fn test_zlib_via_processor() {
1614 let mut arr = NDArray::new(
1615 vec![NDDimension::new(32), NDDimension::new(32)],
1616 NDDataType::UInt16,
1617 );
1618 if let NDDataBuffer::U16(ref mut v) = arr.data {
1619 for (i, x) in v.iter_mut().enumerate() {
1620 *x = (i * 13) as u16;
1621 }
1622 }
1623 let original = arr.data.as_u8_slice().to_vec();
1624 let pool = NDArrayPool::new(10_000_000);
1625
1626 let mut comp = CodecProcessor::new(CodecMode::Compress {
1627 codec: CodecName::Zlib,
1628 quality: 0,
1629 });
1630 let compressed = comp.process_array(&arr, &pool);
1631 let compressed_arr = &compressed.output_arrays[0];
1632 assert_eq!(compressed_arr.codec.as_ref().unwrap().name, CodecName::Zlib);
1633
1634 let mut decomp = CodecProcessor::new(CodecMode::Decompress);
1635 let result = decomp.process_array(compressed_arr, &pool);
1636 assert_eq!(
1637 result.output_arrays[0].data.as_u8_slice(),
1638 original.as_slice()
1639 );
1640 }
1641
1642 #[test]
1645 fn test_lz4hdf5_roundtrip_u8() {
1646 let mut arr = NDArray::new(
1647 vec![NDDimension::new(64), NDDimension::new(64)],
1648 NDDataType::UInt8,
1649 );
1650 if let NDDataBuffer::U8(ref mut v) = arr.data {
1651 for (i, x) in v.iter_mut().enumerate() {
1652 *x = (i % 251) as u8;
1653 }
1654 }
1655 let original = arr.data.as_u8_slice().to_vec();
1656
1657 let compressed = compress_lz4hdf5(&arr);
1658 assert_eq!(compressed.codec.as_ref().unwrap().name, CodecName::LZ4HDF5);
1659 assert_ne!(compressed.data.as_u8_slice(), original.as_slice());
1660
1661 let decompressed = decompress_lz4hdf5(&compressed).unwrap();
1662 assert!(decompressed.codec.is_none());
1663 assert_eq!(decompressed.data.data_type(), NDDataType::UInt8);
1664 assert_eq!(decompressed.data.as_u8_slice(), original.as_slice());
1665 }
1666
1667 #[test]
1668 fn test_lz4hdf5_roundtrip_u16() {
1669 let mut arr = NDArray::new(
1670 vec![NDDimension::new(80), NDDimension::new(40)],
1671 NDDataType::UInt16,
1672 );
1673 if let NDDataBuffer::U16(ref mut v) = arr.data {
1674 for (i, x) in v.iter_mut().enumerate() {
1675 *x = (i * 37 % 65521) as u16;
1676 }
1677 }
1678 let original = arr.data.as_u8_slice().to_vec();
1679
1680 let compressed = compress_lz4hdf5(&arr);
1681 let dt_attr = compressed.attributes.get(ATTR_ORIGINAL_DATA_TYPE).unwrap();
1682 assert_eq!(dt_attr.value, NDAttrValue::UInt8(NDDataType::UInt16 as u8));
1683
1684 let decompressed = decompress_lz4hdf5(&compressed).unwrap();
1685 assert_eq!(decompressed.data.data_type(), NDDataType::UInt16);
1686 assert_eq!(decompressed.data.as_u8_slice(), original.as_slice());
1687 assert!(
1688 decompressed
1689 .attributes
1690 .get(ATTR_ORIGINAL_DATA_TYPE)
1691 .is_none()
1692 );
1693 }
1694
1695 #[test]
1696 fn test_lz4hdf5_roundtrip_f64_with_negatives() {
1697 let mut arr = NDArray::new(vec![NDDimension::new(97)], NDDataType::Float64);
1698 if let NDDataBuffer::F64(ref mut v) = arr.data {
1699 for (i, x) in v.iter_mut().enumerate() {
1700 *x = (i as f64 - 48.0) * 1.75;
1701 }
1702 }
1703 let original = arr.data.as_u8_slice().to_vec();
1704
1705 let compressed = compress_lz4hdf5(&arr);
1706 let decompressed = decompress_lz4hdf5(&compressed).unwrap();
1707 assert_eq!(decompressed.data.data_type(), NDDataType::Float64);
1708 assert_eq!(decompressed.data.as_u8_slice(), original.as_slice());
1709 }
1710
1711 #[test]
1712 fn test_lz4hdf5_multi_block_roundtrip() {
1713 let block = LZ4HDF5_DEFAULT_BLOCK_SIZE;
1716 let count = block * 2 + block / 3 + 7; let mut arr = NDArray::new(vec![NDDimension::new(count)], NDDataType::UInt8);
1718 if let NDDataBuffer::U8(ref mut v) = arr.data {
1719 for (i, x) in v.iter_mut().enumerate() {
1720 *x = (i.wrapping_mul(2_654_435_761) % 251) as u8;
1721 }
1722 }
1723 let original = arr.data.as_u8_slice().to_vec();
1724
1725 let compressed = compress_lz4hdf5(&arr);
1726 let decompressed = decompress_lz4hdf5(&compressed).unwrap();
1727 assert_eq!(decompressed.data.as_u8_slice(), original.as_slice());
1728 }
1729
1730 #[test]
1731 fn test_lz4hdf5_compresses_repetitive_data() {
1732 let arr = NDArray::new(
1733 vec![NDDimension::new(256), NDDimension::new(256)],
1734 NDDataType::UInt16,
1735 );
1736 let original_size = arr.data.as_u8_slice().len();
1737 let compressed = compress_lz4hdf5(&arr);
1738 let compressed_size = compressed.codec.as_ref().unwrap().compressed_size;
1739 assert!(
1740 compressed_size < original_size,
1741 "lz4hdf5 compressed ({compressed_size}) should be < original ({original_size})"
1742 );
1743 }
1744
1745 #[test]
1746 fn test_lz4hdf5_via_processor() {
1747 let mut arr = NDArray::new(
1748 vec![NDDimension::new(48), NDDimension::new(48)],
1749 NDDataType::UInt16,
1750 );
1751 if let NDDataBuffer::U16(ref mut v) = arr.data {
1752 for (i, x) in v.iter_mut().enumerate() {
1753 *x = (i * 7) as u16;
1754 }
1755 }
1756 let original = arr.data.as_u8_slice().to_vec();
1757 let pool = NDArrayPool::new(10_000_000);
1758
1759 let mut comp = CodecProcessor::new(CodecMode::Compress {
1760 codec: CodecName::LZ4HDF5,
1761 quality: 0,
1762 });
1763 let compressed = comp.process_array(&arr, &pool);
1764 let compressed_arr = &compressed.output_arrays[0];
1765 assert_eq!(
1766 compressed_arr.codec.as_ref().unwrap().name,
1767 CodecName::LZ4HDF5
1768 );
1769
1770 let mut decomp = CodecProcessor::new(CodecMode::Decompress);
1771 let result = decomp.process_array(compressed_arr, &pool);
1772 assert_eq!(
1773 result.output_arrays[0].data.as_u8_slice(),
1774 original.as_slice()
1775 );
1776 }
1777
1778 #[test]
1781 fn test_compressor_ordinal_mapping() {
1782 use ad_core_rs::plugin::runtime::{ParamChangeValue, PluginParamSnapshot};
1786
1787 let cases = [
1788 (0i32, CodecName::None),
1789 (1, CodecName::JPEG),
1790 (2, CodecName::Zlib),
1791 (3, CodecName::Blosc),
1792 (4, CodecName::LZ4),
1793 (5, CodecName::LZ4HDF5),
1794 (6, CodecName::BSLZ4),
1795 ];
1796
1797 for (ordinal, expected) in cases {
1798 let mut proc = CodecProcessor::new(CodecMode::Compress {
1799 codec: CodecName::LZ4,
1800 quality: 85,
1801 });
1802 proc.params.compressor = Some(0);
1805 let snapshot = PluginParamSnapshot {
1806 enable_callbacks: true,
1807 reason: 0,
1808 addr: 0,
1809 value: ParamChangeValue::Int32(ordinal),
1810 };
1811 proc.on_param_change(0, &snapshot);
1812 match proc.mode {
1813 CodecMode::Compress { codec, .. } => assert_eq!(
1814 codec, expected,
1815 "ordinal {ordinal} should select {expected:?}"
1816 ),
1817 other => panic!("expected Compress mode, got {other:?}"),
1818 }
1819 }
1820 }
1821
1822 #[test]
1825 fn test_decompress_wrong_codec() {
1826 let arr = make_u8_array(4, 4);
1827 assert!(decompress_lz4(&arr).is_none());
1828 assert!(decompress_jpeg(&arr).is_none());
1829 assert!(decompress_zlib(&arr).is_none());
1830 assert!(decompress_lz4hdf5(&arr).is_none());
1831 }
1832
1833 #[test]
1836 fn test_processor_lz4_compress() {
1837 let pool = NDArrayPool::new(1_000_000);
1838 let mut proc = CodecProcessor::new(CodecMode::Compress {
1839 codec: CodecName::LZ4,
1840 quality: 0,
1841 });
1842 let arr = make_u8_array(32, 32);
1843 let result = proc.process_array(&arr, &pool);
1844 assert_eq!(result.output_arrays.len(), 1);
1845 assert_eq!(
1846 result.output_arrays[0].codec.as_ref().unwrap().name,
1847 CodecName::LZ4
1848 );
1849 assert!(proc.compression_ratio() >= 1.0);
1850 }
1851
1852 #[test]
1853 fn test_processor_jpeg_compress() {
1854 let pool = NDArrayPool::new(1_000_000);
1855 let mut proc = CodecProcessor::new(CodecMode::Compress {
1856 codec: CodecName::JPEG,
1857 quality: 80,
1858 });
1859 let arr = make_u8_array(16, 16);
1860 let result = proc.process_array(&arr, &pool);
1861 assert_eq!(result.output_arrays.len(), 1);
1862 assert_eq!(
1863 result.output_arrays[0].codec.as_ref().unwrap().name,
1864 CodecName::JPEG
1865 );
1866 }
1867
1868 #[test]
1869 fn test_processor_decompress_auto_lz4() {
1870 let pool = NDArrayPool::new(1_000_000);
1871 let arr = make_u8_array(16, 16);
1872 let compressed = compress_lz4(&arr);
1873
1874 let mut proc = CodecProcessor::new(CodecMode::Decompress);
1875 let result = proc.process_array(&compressed, &pool);
1876 assert_eq!(result.output_arrays.len(), 1);
1877 assert!(result.output_arrays[0].codec.is_none());
1878 assert_eq!(
1879 result.output_arrays[0].data.as_u8_slice(),
1880 arr.data.as_u8_slice()
1881 );
1882 assert!(proc.compression_ratio() > 0.0);
1883 }
1884
1885 #[test]
1886 fn test_processor_decompress_auto_jpeg() {
1887 let pool = NDArrayPool::new(1_000_000);
1888 let arr = make_u8_array(16, 16);
1889 let compressed = compress_jpeg(&arr, 90).unwrap();
1890
1891 let mut proc = CodecProcessor::new(CodecMode::Decompress);
1892 let result = proc.process_array(&compressed, &pool);
1893 assert_eq!(result.output_arrays.len(), 1);
1894 assert!(result.output_arrays[0].codec.is_none());
1895 }
1896
1897 #[test]
1898 fn test_processor_decompress_no_codec() {
1899 let pool = NDArrayPool::new(1_000_000);
1900 let arr = make_u8_array(8, 8);
1901 let mut proc = CodecProcessor::new(CodecMode::Decompress);
1902 let result = proc.process_array(&arr, &pool);
1903 assert_eq!(result.output_arrays.len(), 1);
1905 assert_eq!(proc.compression_ratio(), 1.0);
1906 }
1907
1908 #[test]
1909 fn test_processor_compression_ratio() {
1910 let pool = NDArrayPool::new(1_000_000);
1911 let mut arr = NDArray::new(
1913 vec![NDDimension::new(128), NDDimension::new(128)],
1914 NDDataType::UInt8,
1915 );
1916 if let NDDataBuffer::U8(ref mut v) = arr.data {
1917 for x in v.iter_mut() {
1918 *x = 0;
1919 }
1920 }
1921
1922 let mut proc = CodecProcessor::new(CodecMode::Compress {
1923 codec: CodecName::LZ4,
1924 quality: 0,
1925 });
1926 let _ = proc.process_array(&arr, &pool);
1927 let ratio = proc.compression_ratio();
1928 assert!(
1929 ratio > 2.0,
1930 "all-zeros 128x128 should compress at least 2x, got {}",
1931 ratio,
1932 );
1933 }
1934
1935 #[test]
1936 fn test_processor_plugin_type() {
1937 let proc = CodecProcessor::new(CodecMode::Decompress);
1938 assert_eq!(proc.plugin_type(), "NDPluginCodec");
1939 }
1940
1941 #[test]
1944 fn test_buffer_from_bytes_u8() {
1945 let data = vec![1u8, 2, 3, 4];
1946 let buf = buffer_from_bytes(&data, NDDataType::UInt8).unwrap();
1947 assert_eq!(buf.data_type(), NDDataType::UInt8);
1948 assert_eq!(buf.len(), 4);
1949 assert_eq!(buf.as_u8_slice(), &[1, 2, 3, 4]);
1950 }
1951
1952 #[test]
1953 fn test_buffer_from_bytes_u16() {
1954 let original = vec![1000u16, 2000, 3000];
1955 let bytes: Vec<u8> = original.iter().flat_map(|v| v.to_ne_bytes()).collect();
1956 let buf = buffer_from_bytes(&bytes, NDDataType::UInt16).unwrap();
1957 assert_eq!(buf.data_type(), NDDataType::UInt16);
1958 assert_eq!(buf.len(), 3);
1959 if let NDDataBuffer::U16(v) = buf {
1960 assert_eq!(v, original);
1961 } else {
1962 panic!("wrong buffer type");
1963 }
1964 }
1965
1966 #[test]
1967 fn test_buffer_from_bytes_bad_alignment() {
1968 let data = vec![0u8; 3];
1970 assert!(buffer_from_bytes(&data, NDDataType::UInt16).is_none());
1971 }
1972
1973 #[test]
1974 fn test_buffer_from_bytes_f64_roundtrip() {
1975 let original = vec![1.5f64, -2.7, 3.14159];
1976 let bytes: Vec<u8> = original.iter().flat_map(|v| v.to_ne_bytes()).collect();
1977 let buf = buffer_from_bytes(&bytes, NDDataType::Float64).unwrap();
1978 if let NDDataBuffer::F64(v) = buf {
1979 assert_eq!(v, original);
1980 } else {
1981 panic!("wrong buffer type");
1982 }
1983 }
1984}