1use std::io::{Read, Write};
2use std::sync::Arc;
3
4use ad_core_rs::codec::{Codec, CodecName};
5use ad_core_rs::ndarray::{NDArray, NDDataBuffer, NDDataType, NDDimension};
6use ad_core_rs::ndarray_pool::NDArrayPool;
7use ad_core_rs::plugin::runtime::{NDPluginProcess, ParamUpdate, ProcessResult};
8
9use flate2::Compression;
10use flate2::read::ZlibDecoder;
11use flate2::write::ZlibEncoder;
12use lz4_flex::block::{compress, decompress};
13use rust_hdf5::format::messages::filter::{
14 FILTER_BLOSC, Filter, FilterPipeline, apply_filters, reverse_filters,
15};
16
17pub fn original_data_type(array: &NDArray) -> NDDataType {
29 match &array.codec {
30 Some(c) => c.original_data_type,
31 None => array.data.data_type(),
32 }
33}
34
35fn buffer_from_bytes(bytes: &[u8], data_type: NDDataType) -> Option<NDDataBuffer> {
40 let elem_size = data_type.element_size();
41 if bytes.len() % elem_size != 0 {
42 return None;
43 }
44 let count = bytes.len() / elem_size;
45
46 Some(match data_type {
47 NDDataType::Int8 => {
48 let mut v = vec![0i8; count];
49 unsafe {
51 std::ptr::copy_nonoverlapping(
52 bytes.as_ptr(),
53 v.as_mut_ptr() as *mut u8,
54 bytes.len(),
55 );
56 }
57 NDDataBuffer::I8(v)
58 }
59 NDDataType::UInt8 => NDDataBuffer::U8(bytes.to_vec()),
60 NDDataType::Int16 => {
61 let mut v = vec![0i16; count];
62 unsafe {
63 std::ptr::copy_nonoverlapping(
64 bytes.as_ptr(),
65 v.as_mut_ptr() as *mut u8,
66 bytes.len(),
67 );
68 }
69 NDDataBuffer::I16(v)
70 }
71 NDDataType::UInt16 => {
72 let mut v = vec![0u16; count];
73 unsafe {
74 std::ptr::copy_nonoverlapping(
75 bytes.as_ptr(),
76 v.as_mut_ptr() as *mut u8,
77 bytes.len(),
78 );
79 }
80 NDDataBuffer::U16(v)
81 }
82 NDDataType::Int32 => {
83 let mut v = vec![0i32; count];
84 unsafe {
85 std::ptr::copy_nonoverlapping(
86 bytes.as_ptr(),
87 v.as_mut_ptr() as *mut u8,
88 bytes.len(),
89 );
90 }
91 NDDataBuffer::I32(v)
92 }
93 NDDataType::UInt32 => {
94 let mut v = vec![0u32; count];
95 unsafe {
96 std::ptr::copy_nonoverlapping(
97 bytes.as_ptr(),
98 v.as_mut_ptr() as *mut u8,
99 bytes.len(),
100 );
101 }
102 NDDataBuffer::U32(v)
103 }
104 NDDataType::Int64 => {
105 let mut v = vec![0i64; count];
106 unsafe {
107 std::ptr::copy_nonoverlapping(
108 bytes.as_ptr(),
109 v.as_mut_ptr() as *mut u8,
110 bytes.len(),
111 );
112 }
113 NDDataBuffer::I64(v)
114 }
115 NDDataType::UInt64 => {
116 let mut v = vec![0u64; count];
117 unsafe {
118 std::ptr::copy_nonoverlapping(
119 bytes.as_ptr(),
120 v.as_mut_ptr() as *mut u8,
121 bytes.len(),
122 );
123 }
124 NDDataBuffer::U64(v)
125 }
126 NDDataType::Float32 => {
127 let mut v = vec![0f32; count];
128 unsafe {
129 std::ptr::copy_nonoverlapping(
130 bytes.as_ptr(),
131 v.as_mut_ptr() as *mut u8,
132 bytes.len(),
133 );
134 }
135 NDDataBuffer::F32(v)
136 }
137 NDDataType::Float64 => {
138 let mut v = vec![0f64; count];
139 unsafe {
140 std::ptr::copy_nonoverlapping(
141 bytes.as_ptr(),
142 v.as_mut_ptr() as *mut u8,
143 bytes.len(),
144 );
145 }
146 NDDataBuffer::F64(v)
147 }
148 })
149}
150
151pub fn compress_lz4(src: &NDArray) -> NDArray {
157 let raw = src.data.as_u8_slice();
158 let original_data_type = src.data.data_type();
159 let original_size = raw.len();
160 let compressed = compress(raw);
162 let compressed_size = compressed.len();
163
164 let mut arr = src.clone();
165 arr.data = NDDataBuffer::U8(compressed);
166 arr.codec = Some(Codec {
167 name: CodecName::LZ4,
168 compressed_size,
169 level: 0,
170 shuffle: 0,
171 compressor: 0,
172 original_data_type,
175 });
176
177 tracing::debug!(
178 original_size,
179 compressed_size,
180 ratio = original_size as f64 / compressed_size.max(1) as f64,
181 "LZ4 compress"
182 );
183
184 arr
185}
186
187pub fn decompress_lz4(src: &NDArray) -> Option<NDArray> {
192 if src.codec.as_ref().map(|c| c.name) != Some(CodecName::LZ4) {
193 return None;
194 }
195 let compressed = src.data.as_u8_slice();
196 let original_type = original_data_type(src);
199 let num_elements: usize = src.dims.iter().map(|d| d.size).product();
200 let uncompressed_size = num_elements * original_type.element_size();
201 let decompressed = decompress(compressed, uncompressed_size).ok()?;
202
203 let buffer = buffer_from_bytes(&decompressed, original_type)?;
204
205 let mut arr = src.clone();
206 arr.data = buffer;
207 arr.codec = None;
208
209 Some(arr)
210}
211
212const ZLIB_DEFAULT_LEVEL: u32 = 6;
223
224pub fn compress_zlib(src: &NDArray) -> NDArray {
230 let raw = src.data.as_u8_slice();
231 let original_data_type = src.data.data_type();
232 let original_size = raw.len();
233
234 let mut encoder = ZlibEncoder::new(Vec::<u8>::new(), Compression::new(ZLIB_DEFAULT_LEVEL));
235 if encoder.write_all(raw).is_err() {
237 return src.clone();
238 }
239 let compressed = match encoder.finish() {
240 Ok(buf) => buf,
241 Err(_) => return src.clone(),
242 };
243 let compressed_size = compressed.len();
244
245 let mut arr = src.clone();
246 arr.data = NDDataBuffer::U8(compressed);
247 arr.codec = Some(Codec {
248 name: CodecName::Zlib,
249 compressed_size,
250 level: ZLIB_DEFAULT_LEVEL as i32,
251 shuffle: 0,
252 compressor: 0,
253 original_data_type,
254 });
255
256 tracing::debug!(
257 original_size,
258 compressed_size,
259 ratio = original_size as f64 / compressed_size.max(1) as f64,
260 "Zlib compress"
261 );
262 arr
263}
264
265pub fn decompress_zlib(src: &NDArray) -> Option<NDArray> {
270 if src.codec.as_ref().map(|c| c.name) != Some(CodecName::Zlib) {
271 return None;
272 }
273 let compressed = src.data.as_u8_slice();
274
275 let original_type = original_data_type(src);
276 let num_elements: usize = src.dims.iter().map(|d| d.size).product();
277 let uncompressed_size = num_elements * original_type.element_size();
278
279 let mut decoder = ZlibDecoder::new(compressed);
280 let mut decompressed = Vec::with_capacity(uncompressed_size);
281 decoder.read_to_end(&mut decompressed).ok()?;
282
283 let buffer = buffer_from_bytes(&decompressed, original_type)?;
284
285 let mut arr = src.clone();
286 arr.data = buffer;
287 arr.codec = None;
288 Some(arr)
289}
290
291const LZ4HDF5_DEFAULT_BLOCK_SIZE: usize = 1 << 20;
311
312pub fn compress_lz4hdf5(src: &NDArray) -> NDArray {
319 let raw = src.data.as_u8_slice();
320 let data_type = src.data.data_type();
321 let original_size = raw.len();
322 let block_size = LZ4HDF5_DEFAULT_BLOCK_SIZE;
323
324 let mut out: Vec<u8> = Vec::with_capacity(original_size / 2 + 12);
326 out.extend_from_slice(&(original_size as u64).to_be_bytes());
327 out.extend_from_slice(&(block_size as u32).to_be_bytes());
328
329 let mut pos = 0usize;
330 while pos < raw.len() {
331 let n = block_size.min(raw.len() - pos);
332 let block = &raw[pos..pos + n];
333 let comp = compress(block);
334 if comp.len() < n {
337 out.extend_from_slice(&(comp.len() as u32).to_be_bytes());
338 out.extend_from_slice(&comp);
339 } else {
340 out.extend_from_slice(&(n as u32).to_be_bytes());
341 out.extend_from_slice(block);
342 }
343 pos += n;
344 }
345
346 let compressed_size = out.len();
347 let mut arr = src.clone();
348 arr.data = NDDataBuffer::U8(out);
349 arr.codec = Some(Codec {
350 name: CodecName::LZ4HDF5,
351 compressed_size,
352 level: 0,
353 shuffle: 0,
354 compressor: 0,
355 original_data_type: data_type,
356 });
357
358 tracing::debug!(
359 original_size,
360 compressed_size,
361 ratio = original_size as f64 / compressed_size.max(1) as f64,
362 "LZ4HDF5 compress"
363 );
364 arr
365}
366
367pub fn decompress_lz4hdf5(src: &NDArray) -> Option<NDArray> {
371 if src.codec.as_ref().map(|c| c.name) != Some(CodecName::LZ4HDF5) {
372 return None;
373 }
374 let buf = src.data.as_u8_slice();
375 if buf.len() < 12 {
376 return None;
377 }
378 let total_bytes = u64::from_be_bytes(buf[0..8].try_into().ok()?) as usize;
379 let block_size = u32::from_be_bytes(buf[8..12].try_into().ok()?) as usize;
380 if block_size == 0 {
381 return None;
382 }
383
384 let original_type = original_data_type(src);
385
386 let mut out: Vec<u8> = Vec::with_capacity(total_bytes);
387 let mut pos = 12usize;
388 while out.len() < total_bytes {
389 let n = block_size.min(total_bytes - out.len());
390 if pos + 4 > buf.len() {
391 return None;
392 }
393 let clen = u32::from_be_bytes(buf[pos..pos + 4].try_into().ok()?) as usize;
394 pos += 4;
395 if pos + clen > buf.len() {
396 return None;
397 }
398 let block_payload = &buf[pos..pos + clen];
399 if clen == n {
400 out.extend_from_slice(block_payload);
402 } else {
403 let block = decompress(block_payload, n).ok()?;
404 if block.len() != n {
405 return None;
406 }
407 out.extend_from_slice(&block);
408 }
409 pos += clen;
410 }
411 if out.len() != total_bytes {
412 return None;
413 }
414
415 let buffer = buffer_from_bytes(&out, original_type)?;
416 let mut arr = src.clone();
417 arr.data = buffer;
418 arr.codec = None;
419 Some(arr)
420}
421
422const BSHUF_TARGET_BLOCK_SIZE_B: usize = 8192;
445const BSHUF_BLOCKED_MULT: usize = 8;
447
448fn bshuf_default_block_size(elem_size: usize) -> usize {
453 let mut bs = BSHUF_TARGET_BLOCK_SIZE_B / elem_size.max(1);
454 bs -= bs % BSHUF_BLOCKED_MULT;
455 bs.max(BSHUF_BLOCKED_MULT)
456}
457
458fn trans_byte_elem(input: &[u8], n: usize, elem_size: usize) -> Vec<u8> {
462 let mut out = vec![0u8; n * elem_size];
463 for e in 0..n {
464 for b in 0..elem_size {
465 out[b * n + e] = input[e * elem_size + b];
466 }
467 }
468 out
469}
470
471fn untrans_byte_elem(input: &[u8], n: usize, elem_size: usize) -> Vec<u8> {
473 let mut out = vec![0u8; n * elem_size];
474 for e in 0..n {
475 for b in 0..elem_size {
476 out[e * elem_size + b] = input[b * n + e];
477 }
478 }
479 out
480}
481
482fn trans_bit_elem_block(input: &[u8], n: usize, elem_size: usize) -> Vec<u8> {
491 debug_assert_eq!(n % 8, 0);
492 let byte_t = trans_byte_elem(input, n, elem_size);
494 let nbytes = byte_t.len();
497 let mut out = vec![0u8; nbytes];
498 let out_row = nbytes / 8; for byte_idx in 0..nbytes {
500 let v = byte_t[byte_idx];
501 for bit in 0..8 {
502 if (v >> bit) & 1 != 0 {
503 let dst = bit * out_row + byte_idx / 8;
505 out[dst] |= 1 << (byte_idx % 8);
506 }
507 }
508 }
509 out
510}
511
512fn untrans_bit_elem_block(input: &[u8], n: usize, elem_size: usize) -> Vec<u8> {
514 debug_assert_eq!(n % 8, 0);
515 let nbytes = n * elem_size;
516 let out_row = nbytes / 8;
517 let mut byte_t = vec![0u8; nbytes];
519 for byte_idx in 0..nbytes {
520 for bit in 0..8 {
521 let src = bit * out_row + byte_idx / 8;
522 if (input[src] >> (byte_idx % 8)) & 1 != 0 {
523 byte_t[byte_idx] |= 1 << bit;
524 }
525 }
526 }
527 untrans_byte_elem(&byte_t, n, elem_size)
529}
530
531fn bshuf_compress_block(input: &[u8], n: usize, elem_size: usize) -> Vec<u8> {
536 let shuffled = if n % 8 == 0 {
537 trans_bit_elem_block(input, n, elem_size)
538 } else {
539 trans_byte_elem(input, n, elem_size)
540 };
541 compress(&shuffled)
542}
543
544fn bshuf_decompress_block(compressed: &[u8], n: usize, elem_size: usize) -> Option<Vec<u8>> {
546 let raw_size = n * elem_size;
547 let shuffled = decompress(compressed, raw_size).ok()?;
548 if shuffled.len() != raw_size {
549 return None;
550 }
551 Some(if n % 8 == 0 {
552 untrans_bit_elem_block(&shuffled, n, elem_size)
553 } else {
554 untrans_byte_elem(&shuffled, n, elem_size)
555 })
556}
557
558pub fn compress_bslz4(src: &NDArray) -> NDArray {
565 let raw = src.data.as_u8_slice();
566 let data_type = src.data.data_type();
567 let elem_size = data_type.element_size();
568 let total_elems = if elem_size > 0 {
569 raw.len() / elem_size
570 } else {
571 0
572 };
573 let block_size = bshuf_default_block_size(elem_size);
574
575 let mut out: Vec<u8> = Vec::with_capacity(raw.len() / 2 + 16);
577 out.extend_from_slice(&(raw.len() as u64).to_be_bytes());
578 out.extend_from_slice(&(block_size as u32).to_be_bytes());
579
580 let mut elem = 0usize;
581 while elem < total_elems {
582 let n = block_size.min(total_elems - elem);
583 let byte_off = elem * elem_size;
584 let block = &raw[byte_off..byte_off + n * elem_size];
585 let comp = bshuf_compress_block(block, n, elem_size);
586 out.extend_from_slice(&(comp.len() as u32).to_be_bytes());
587 out.extend_from_slice(&comp);
588 elem += n;
589 }
590
591 let compressed_size = out.len();
592 let mut arr = src.clone();
593 arr.data = NDDataBuffer::U8(out);
594 arr.codec = Some(Codec {
595 name: CodecName::BSLZ4,
596 compressed_size,
597 level: 0,
598 shuffle: 0,
599 compressor: 0,
600 original_data_type: data_type,
601 });
602
603 tracing::debug!(
604 original_size = raw.len(),
605 compressed_size,
606 ratio = raw.len() as f64 / compressed_size.max(1) as f64,
607 "BSLZ4 compress"
608 );
609 arr
610}
611
612pub fn decompress_bslz4(src: &NDArray) -> Option<NDArray> {
616 if src.codec.as_ref().map(|c| c.name) != Some(CodecName::BSLZ4) {
617 return None;
618 }
619 let buf = src.data.as_u8_slice();
620 if buf.len() < 12 {
621 return None;
622 }
623 let total_bytes = u64::from_be_bytes(buf[0..8].try_into().ok()?) as usize;
624 let block_size = u32::from_be_bytes(buf[8..12].try_into().ok()?) as usize;
625
626 let original_type = original_data_type(src);
627 let elem_size = original_type.element_size();
628 if elem_size == 0 || block_size == 0 || total_bytes % elem_size != 0 {
629 return None;
630 }
631 let total_elems = total_bytes / elem_size;
632
633 let mut out: Vec<u8> = Vec::with_capacity(total_bytes);
634 let mut pos = 12usize;
635 let mut elem = 0usize;
636 while elem < total_elems {
637 let n = block_size.min(total_elems - elem);
638 if pos + 4 > buf.len() {
639 return None;
640 }
641 let clen = u32::from_be_bytes(buf[pos..pos + 4].try_into().ok()?) as usize;
642 pos += 4;
643 if pos + clen > buf.len() {
644 return None;
645 }
646 let block = bshuf_decompress_block(&buf[pos..pos + clen], n, elem_size)?;
647 out.extend_from_slice(&block);
648 pos += clen;
649 elem += n;
650 }
651 if out.len() != total_bytes {
652 return None;
653 }
654
655 let buffer = buffer_from_bytes(&out, original_type)?;
656 let mut arr = src.clone();
657 arr.data = buffer;
658 arr.codec = None;
659 Some(arr)
660}
661
662pub fn compress_jpeg(src: &NDArray, quality: u8) -> Option<NDArray> {
670 if src.data.data_type() != NDDataType::UInt8 {
671 return None;
672 }
673
674 let raw = src.data.as_u8_slice();
675 let info = src.info();
676
677 if info.x_size > u16::MAX as usize || info.y_size > u16::MAX as usize {
679 return None;
680 }
681
682 let (width, height, color_type) = match src.dims.len() {
683 2 => {
684 (
686 info.x_size as u16,
687 info.y_size as u16,
688 jpeg_encoder::ColorType::Luma,
689 )
690 }
691 3 if src.dims[0].size == 3 => {
692 (
694 info.x_size as u16,
695 info.y_size as u16,
696 jpeg_encoder::ColorType::Rgb,
697 )
698 }
699 _ => return None,
700 };
701
702 let mut jpeg_buf = Vec::new();
703 let encoder = jpeg_encoder::Encoder::new(&mut jpeg_buf, quality);
704 if encoder.encode(raw, width, height, color_type).is_err() {
705 return None;
706 }
707
708 let compressed_size = jpeg_buf.len();
709 let original_size = raw.len();
710
711 let mut arr = src.clone();
712 arr.data = NDDataBuffer::U8(jpeg_buf);
713 arr.codec = Some(Codec {
714 name: CodecName::JPEG,
715 compressed_size,
716 level: 0,
717 shuffle: 0,
718 compressor: 0,
719 original_data_type: src.data.data_type(),
723 });
724
725 tracing::debug!(
726 original_size,
727 compressed_size,
728 ratio = original_size as f64 / compressed_size.max(1) as f64,
729 "JPEG compress (quality={})",
730 quality,
731 );
732
733 Some(arr)
734}
735
736pub fn decompress_jpeg(src: &NDArray) -> Option<NDArray> {
743 if src.codec.as_ref().map(|c| c.name) != Some(CodecName::JPEG) {
744 return None;
745 }
746
747 let compressed = src.data.as_u8_slice();
748 let mut decoder = jpeg_decoder::Decoder::new(compressed);
749 let pixels = decoder.decode().ok()?;
750 let metadata = decoder.info()?;
751
752 let width = metadata.width as usize;
753 let height = metadata.height as usize;
754
755 let dims = match metadata.pixel_format {
756 jpeg_decoder::PixelFormat::L8 => {
757 vec![NDDimension::new(width), NDDimension::new(height)]
759 }
760 jpeg_decoder::PixelFormat::RGB24 => {
761 vec![
763 NDDimension::new(3),
764 NDDimension::new(width),
765 NDDimension::new(height),
766 ]
767 }
768 _ => return None,
769 };
770
771 let mut arr = src.clone();
772 arr.dims = dims;
773 arr.data = NDDataBuffer::U8(pixels);
774 arr.codec = None;
775
776 Some(arr)
777}
778
779#[derive(Debug, Clone, Copy)]
781pub struct BloscConfig {
782 pub compressor: u32,
784 pub clevel: u32,
786 pub shuffle: u32,
788}
789
790impl Default for BloscConfig {
791 fn default() -> Self {
792 Self {
793 compressor: 0,
794 clevel: 3,
795 shuffle: 0,
796 }
797 }
798}
799
800pub fn compress_blosc(src: &NDArray, config: &BloscConfig) -> NDArray {
802 let raw = src.data.as_u8_slice();
803 let element_size = src.data.data_type().element_size();
804
805 let pipeline = FilterPipeline {
806 filters: vec![Filter {
807 id: FILTER_BLOSC,
808 flags: 0,
809 cd_values: vec![
810 2, 2, element_size as u32, raw.len() as u32, config.shuffle, config.compressor, config.clevel, ],
818 }],
819 };
820
821 let compressed = match apply_filters(&pipeline, raw) {
822 Ok(data) => data,
823 Err(_) => return src.clone(),
824 };
825
826 let compressed_size = compressed.len();
827 let original_data_type = src.data.data_type();
828 let mut arr = src.clone();
829 arr.data = NDDataBuffer::U8(compressed);
830 arr.codec = Some(Codec {
831 name: CodecName::Blosc,
832 compressed_size,
833 level: 0,
834 shuffle: 0,
835 compressor: 0,
836 original_data_type,
837 });
838 arr
839}
840
841pub fn decompress_blosc(src: &NDArray) -> Option<NDArray> {
843 if src.codec.as_ref().map(|c| c.name) != Some(CodecName::Blosc) {
844 return None;
845 }
846
847 let compressed = src.data.as_u8_slice();
848
849 let pipeline = FilterPipeline {
851 filters: vec![Filter {
852 id: FILTER_BLOSC,
853 flags: 0,
854 cd_values: vec![],
855 }],
856 };
857
858 let decompressed = reverse_filters(&pipeline, compressed).ok()?;
859
860 let original_type = original_data_type(src);
861
862 let buffer = buffer_from_bytes(&decompressed, original_type)?;
863
864 let mut arr = src.clone();
865 arr.data = buffer;
866 arr.codec = None;
867 Some(arr)
868}
869
870#[derive(Debug, Clone, Copy, PartialEq, Eq)]
872pub enum CodecMode {
873 Compress { codec: CodecName, quality: u8 },
875 Decompress,
877}
878
879#[derive(Default)]
883struct CodecParamIndices {
884 mode: Option<usize>,
885 compressor: Option<usize>,
886 comp_factor: Option<usize>,
887 jpeg_quality: Option<usize>,
888 blosc_compressor: Option<usize>,
889 blosc_clevel: Option<usize>,
890 blosc_shuffle: Option<usize>,
891 blosc_numthreads: Option<usize>,
892 codec_status: Option<usize>,
893 codec_error: Option<usize>,
894}
895
896pub struct CodecProcessor {
897 mode: CodecMode,
898 compression_ratio: f64,
899 jpeg_quality: u8,
900 blosc_config: BloscConfig,
901 params: CodecParamIndices,
902}
903
904impl CodecProcessor {
905 pub fn new(mode: CodecMode) -> Self {
906 let quality = match mode {
907 CodecMode::Compress { quality, .. } => quality,
908 _ => 85,
909 };
910 Self {
911 mode,
912 compression_ratio: 1.0,
913 jpeg_quality: quality,
914 blosc_config: BloscConfig::default(),
915 params: CodecParamIndices::default(),
916 }
917 }
918
919 pub fn compression_ratio(&self) -> f64 {
922 self.compression_ratio
923 }
924}
925
926impl NDPluginProcess for CodecProcessor {
927 fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
928 let original_bytes = array.data.as_u8_slice().len();
929
930 let result = match self.mode {
931 CodecMode::Compress { .. } if array.codec.is_some() => {
932 Some(array.clone())
934 }
935 CodecMode::Compress {
936 codec: CodecName::LZ4,
937 ..
938 } => Some(compress_lz4(array)),
939 CodecMode::Compress {
940 codec: CodecName::JPEG,
941 ..
942 } => compress_jpeg(array, self.jpeg_quality),
943 CodecMode::Compress {
944 codec: CodecName::Zlib,
945 ..
946 } => Some(compress_zlib(array)),
947 CodecMode::Compress {
948 codec: CodecName::Blosc,
949 ..
950 } => Some(compress_blosc(array, &self.blosc_config)),
951 CodecMode::Compress {
952 codec: CodecName::LZ4HDF5,
953 ..
954 } => Some(compress_lz4hdf5(array)),
955 CodecMode::Compress {
956 codec: CodecName::BSLZ4,
957 ..
958 } => Some(compress_bslz4(array)),
959 CodecMode::Compress { .. } => None,
960 CodecMode::Decompress => match array.codec.as_ref().map(|c| c.name) {
961 Some(CodecName::LZ4) => decompress_lz4(array),
962 Some(CodecName::JPEG) => decompress_jpeg(array),
963 Some(CodecName::Zlib) => decompress_zlib(array),
964 Some(CodecName::Blosc) => decompress_blosc(array),
965 Some(CodecName::LZ4HDF5) => decompress_lz4hdf5(array),
966 Some(CodecName::BSLZ4) => decompress_bslz4(array),
967 _ => None,
968 },
969 };
970
971 let mut updates = Vec::new();
972
973 match result {
974 Some(ref out) => {
975 let output_bytes = out.data.as_u8_slice().len();
976 match self.mode {
977 CodecMode::Compress { .. } => {
978 self.compression_ratio = original_bytes as f64 / output_bytes.max(1) as f64;
979 }
980 CodecMode::Decompress => {
981 self.compression_ratio = output_bytes as f64 / original_bytes.max(1) as f64;
982 }
983 }
984 if let Some(idx) = self.params.comp_factor {
985 updates.push(ParamUpdate::float64(idx, self.compression_ratio));
986 }
987 if let Some(idx) = self.params.codec_status {
988 updates.push(ParamUpdate::int32(idx, 0)); }
990 if let Some(idx) = self.params.codec_error {
991 updates.push(ParamUpdate::Octet {
992 reason: idx,
993 addr: 0,
994 value: String::new(),
995 });
996 }
997 let mut r = ProcessResult::arrays(vec![Arc::new(out.clone())]);
998 r.param_updates = updates;
999 r
1000 }
1001 None => {
1002 self.compression_ratio = 1.0;
1004 if let Some(idx) = self.params.comp_factor {
1005 updates.push(ParamUpdate::float64(idx, 1.0));
1006 }
1007 if let Some(idx) = self.params.codec_status {
1008 updates.push(ParamUpdate::int32(idx, 1)); }
1010 if let Some(idx) = self.params.codec_error {
1011 updates.push(ParamUpdate::Octet {
1012 reason: idx,
1013 addr: 0,
1014 value: "codec operation failed or unsupported".to_string(),
1015 });
1016 }
1017 let mut r = ProcessResult::arrays(vec![Arc::new(array.clone())]);
1018 r.param_updates = updates;
1019 r
1020 }
1021 }
1022 }
1023
1024 fn plugin_type(&self) -> &str {
1025 "NDPluginCodec"
1026 }
1027
1028 fn register_params(
1029 &mut self,
1030 base: &mut asyn_rs::port::PortDriverBase,
1031 ) -> asyn_rs::error::AsynResult<()> {
1032 use asyn_rs::param::ParamType;
1033 base.create_param("MODE", ParamType::Int32)?;
1034 base.create_param("COMPRESSOR", ParamType::Int32)?;
1035 base.create_param("COMP_FACTOR", ParamType::Float64)?;
1036 base.create_param("JPEG_QUALITY", ParamType::Int32)?;
1037 base.create_param("BLOSC_COMPRESSOR", ParamType::Int32)?;
1038 base.create_param("BLOSC_CLEVEL", ParamType::Int32)?;
1039 base.create_param("BLOSC_SHUFFLE", ParamType::Int32)?;
1040 base.create_param("BLOSC_NUMTHREADS", ParamType::Int32)?;
1041 base.create_param("CODEC_STATUS", ParamType::Int32)?;
1042 base.create_param("CODEC_ERROR", ParamType::Octet)?;
1043
1044 self.params.mode = base.find_param("MODE");
1045 self.params.compressor = base.find_param("COMPRESSOR");
1046 self.params.comp_factor = base.find_param("COMP_FACTOR");
1047 self.params.jpeg_quality = base.find_param("JPEG_QUALITY");
1048 self.params.blosc_compressor = base.find_param("BLOSC_COMPRESSOR");
1049 self.params.blosc_clevel = base.find_param("BLOSC_CLEVEL");
1050 self.params.blosc_shuffle = base.find_param("BLOSC_SHUFFLE");
1051 self.params.blosc_numthreads = base.find_param("BLOSC_NUMTHREADS");
1052 self.params.codec_status = base.find_param("CODEC_STATUS");
1053 self.params.codec_error = base.find_param("CODEC_ERROR");
1054 Ok(())
1055 }
1056
1057 fn on_param_change(
1058 &mut self,
1059 reason: usize,
1060 params: &ad_core_rs::plugin::runtime::PluginParamSnapshot,
1061 ) -> ad_core_rs::plugin::runtime::ParamChangeResult {
1062 if Some(reason) == self.params.mode {
1063 let v = params.value.as_i32();
1064 if v == 0 {
1065 let codec = match self.mode {
1067 CodecMode::Compress { codec, .. } => codec,
1068 _ => CodecName::LZ4,
1069 };
1070 self.mode = CodecMode::Compress {
1071 codec,
1072 quality: self.jpeg_quality,
1073 };
1074 } else {
1075 self.mode = CodecMode::Decompress;
1076 }
1077 } else if Some(reason) == self.params.compressor {
1078 let codec = match params.value.as_i32() {
1081 0 => CodecName::None,
1082 1 => CodecName::JPEG,
1083 2 => CodecName::Zlib,
1084 3 => CodecName::Blosc,
1085 4 => CodecName::LZ4,
1086 5 => CodecName::LZ4HDF5,
1087 6 => CodecName::BSLZ4,
1088 _ => CodecName::None,
1089 };
1090 if let CodecMode::Compress { .. } = self.mode {
1091 self.mode = CodecMode::Compress {
1092 codec,
1093 quality: self.jpeg_quality,
1094 };
1095 }
1096 } else if Some(reason) == self.params.jpeg_quality {
1097 self.jpeg_quality = params.value.as_i32().clamp(1, 100) as u8;
1098 if let CodecMode::Compress { codec, .. } = self.mode {
1099 self.mode = CodecMode::Compress {
1100 codec,
1101 quality: self.jpeg_quality,
1102 };
1103 }
1104 } else if Some(reason) == self.params.blosc_compressor {
1105 self.blosc_config.compressor = params.value.as_i32().max(0) as u32;
1106 } else if Some(reason) == self.params.blosc_clevel {
1107 self.blosc_config.clevel = params.value.as_i32().clamp(0, 9) as u32;
1108 } else if Some(reason) == self.params.blosc_shuffle {
1109 self.blosc_config.shuffle = params.value.as_i32().max(0) as u32;
1110 }
1111
1112 ad_core_rs::plugin::runtime::ParamChangeResult::updates(vec![])
1113 }
1114}
1115
1116#[cfg(test)]
1117mod tests {
1118 use super::*;
1119
1120 fn make_u8_array(width: usize, height: usize) -> NDArray {
1121 let mut arr = NDArray::new(
1122 vec![NDDimension::new(width), NDDimension::new(height)],
1123 NDDataType::UInt8,
1124 );
1125 if let NDDataBuffer::U8(ref mut v) = arr.data {
1126 for i in 0..v.len() {
1127 v[i] = (i % 256) as u8;
1128 }
1129 }
1130 arr
1131 }
1132
1133 fn make_rgb_array(width: usize, height: usize) -> NDArray {
1134 use ad_core_rs::attributes::{NDAttrSource, NDAttrValue, NDAttribute};
1135 let mut arr = NDArray::new(
1136 vec![
1137 NDDimension::new(3),
1138 NDDimension::new(width),
1139 NDDimension::new(height),
1140 ],
1141 NDDataType::UInt8,
1142 );
1143 arr.attributes.add(NDAttribute::new_static(
1145 "ColorMode",
1146 "Color Mode",
1147 NDAttrSource::Driver,
1148 NDAttrValue::Int32(2), ));
1150 if let NDDataBuffer::U8(ref mut v) = arr.data {
1151 for i in 0..v.len() {
1152 v[i] = (i % 256) as u8;
1153 }
1154 }
1155 arr
1156 }
1157
1158 #[test]
1164 fn compressors_record_type_in_codec_not_an_attribute() {
1165 let mut arr = NDArray::new(vec![NDDimension::new(8)], NDDataType::UInt16);
1166 if let NDDataBuffer::U16(ref mut v) = arr.data {
1167 for (i, x) in v.iter_mut().enumerate() {
1168 *x = (i * 7) as u16;
1169 }
1170 }
1171 for compressed in [
1172 compress_lz4(&arr),
1173 compress_zlib(&arr),
1174 compress_lz4hdf5(&arr),
1175 compress_bslz4(&arr),
1176 compress_blosc(&arr, &BloscConfig::default()),
1177 ] {
1178 assert_eq!(
1179 compressed.codec.as_ref().unwrap().original_data_type,
1180 NDDataType::UInt16,
1181 "the original element type must travel in the codec"
1182 );
1183 assert!(
1184 compressed
1185 .attributes
1186 .get("CODEC_ORIGINAL_DATA_TYPE")
1187 .is_none(),
1188 "no codec carrier attribute may be attached to a compressed frame"
1189 );
1190 }
1191 }
1192
1193 #[test]
1196 fn test_lz4_roundtrip_u8() {
1197 let arr = make_u8_array(4, 4);
1198 let original_data = arr.data.as_u8_slice().to_vec();
1199
1200 let compressed = compress_lz4(&arr);
1201 assert_eq!(compressed.codec.as_ref().unwrap().name, CodecName::LZ4);
1202 assert_ne!(compressed.data.as_u8_slice(), original_data.as_slice());
1204
1205 let decompressed = decompress_lz4(&compressed).unwrap();
1206 assert!(decompressed.codec.is_none());
1207 assert_eq!(decompressed.data.data_type(), NDDataType::UInt8);
1208 assert_eq!(decompressed.data.as_u8_slice(), original_data.as_slice());
1209 }
1210
1211 #[test]
1212 fn test_lz4_roundtrip_u16() {
1213 let mut arr = NDArray::new(
1214 vec![NDDimension::new(8), NDDimension::new(8)],
1215 NDDataType::UInt16,
1216 );
1217 if let NDDataBuffer::U16(ref mut v) = arr.data {
1218 for i in 0..v.len() {
1219 v[i] = (i * 100) as u16;
1220 }
1221 }
1222 let original_bytes = arr.data.as_u8_slice().to_vec();
1223
1224 let compressed = compress_lz4(&arr);
1225 assert_eq!(compressed.codec.as_ref().unwrap().name, CodecName::LZ4);
1226 assert_eq!(
1228 compressed.codec.as_ref().unwrap().original_data_type,
1229 NDDataType::UInt16
1230 );
1231 assert!(
1233 compressed
1234 .attributes
1235 .get("CODEC_ORIGINAL_DATA_TYPE")
1236 .is_none()
1237 );
1238
1239 let decompressed = decompress_lz4(&compressed).unwrap();
1240 assert!(decompressed.codec.is_none());
1241 assert_eq!(decompressed.data.data_type(), NDDataType::UInt16);
1242 assert_eq!(decompressed.data.as_u8_slice(), original_bytes.as_slice());
1243 }
1244
1245 #[test]
1246 fn test_lz4_roundtrip_f64() {
1247 let mut arr = NDArray::new(vec![NDDimension::new(16)], NDDataType::Float64);
1248 if let NDDataBuffer::F64(ref mut v) = arr.data {
1249 for i in 0..v.len() {
1250 v[i] = i as f64 * 1.5;
1251 }
1252 }
1253 let original_bytes = arr.data.as_u8_slice().to_vec();
1254
1255 let compressed = compress_lz4(&arr);
1256 let decompressed = decompress_lz4(&compressed).unwrap();
1257 assert_eq!(decompressed.data.data_type(), NDDataType::Float64);
1258 assert_eq!(decompressed.data.as_u8_slice(), original_bytes.as_slice());
1259 }
1260
1261 #[test]
1262 fn test_lz4_compresses_repetitive_data() {
1263 let mut arr = NDArray::new(
1265 vec![NDDimension::new(256), NDDimension::new(256)],
1266 NDDataType::UInt8,
1267 );
1268 if let NDDataBuffer::U8(ref mut v) = arr.data {
1270 for x in v.iter_mut() {
1271 *x = 0;
1272 }
1273 }
1274 let original_size = arr.data.as_u8_slice().len();
1275
1276 let compressed = compress_lz4(&arr);
1277 let compressed_size = compressed.codec.as_ref().unwrap().compressed_size;
1278 assert!(
1279 compressed_size < original_size,
1280 "compressed ({}) should be smaller than original ({})",
1281 compressed_size,
1282 original_size,
1283 );
1284 }
1285
1286 #[test]
1287 fn test_lz4_preserves_metadata() {
1288 let mut arr = make_u8_array(4, 4);
1289 arr.unique_id = 42;
1290
1291 let compressed = compress_lz4(&arr);
1292 assert_eq!(compressed.unique_id, 42);
1293 assert_eq!(compressed.dims.len(), 2);
1294 assert_eq!(compressed.dims[0].size, 4);
1295 assert_eq!(compressed.dims[1].size, 4);
1296 }
1297
1298 #[test]
1301 fn test_bitshuffle_block_transpose_roundtrip() {
1302 let elem_size = 4;
1305 let n = 16;
1306 let input: Vec<u8> = (0..n * elem_size).map(|i| (i * 7 + 3) as u8).collect();
1307 let shuffled = trans_bit_elem_block(&input, n, elem_size);
1308 assert_eq!(shuffled.len(), input.len());
1309 let restored = untrans_bit_elem_block(&shuffled, n, elem_size);
1310 assert_eq!(restored, input);
1311 }
1312
1313 #[test]
1314 fn test_bitshuffle_partial_block_byte_transpose_roundtrip() {
1315 let elem_size = 2;
1317 let n = 5;
1318 let input: Vec<u8> = (0..n * elem_size).map(|i| (i * 13 + 1) as u8).collect();
1319 let t = trans_byte_elem(&input, n, elem_size);
1320 let restored = untrans_byte_elem(&t, n, elem_size);
1321 assert_eq!(restored, input);
1322 }
1323
1324 #[test]
1325 fn test_bslz4_roundtrip_u8() {
1326 let mut arr = NDArray::new(
1327 vec![NDDimension::new(64), NDDimension::new(64)],
1328 NDDataType::UInt8,
1329 );
1330 if let NDDataBuffer::U8(ref mut v) = arr.data {
1331 for (i, x) in v.iter_mut().enumerate() {
1332 *x = (i % 251) as u8;
1333 }
1334 }
1335 let original = arr.data.as_u8_slice().to_vec();
1336
1337 let compressed = compress_bslz4(&arr);
1338 assert_eq!(compressed.codec.as_ref().unwrap().name, CodecName::BSLZ4);
1339 assert_ne!(compressed.data.as_u8_slice(), original.as_slice());
1340
1341 let decompressed = decompress_bslz4(&compressed).unwrap();
1342 assert!(decompressed.codec.is_none());
1343 assert_eq!(decompressed.data.data_type(), NDDataType::UInt8);
1344 assert_eq!(decompressed.data.as_u8_slice(), original.as_slice());
1345 }
1346
1347 #[test]
1348 fn test_bslz4_roundtrip_u16() {
1349 let mut arr = NDArray::new(
1350 vec![NDDimension::new(100), NDDimension::new(20)],
1351 NDDataType::UInt16,
1352 );
1353 if let NDDataBuffer::U16(ref mut v) = arr.data {
1354 for (i, x) in v.iter_mut().enumerate() {
1355 *x = (i * 37 % 65521) as u16;
1356 }
1357 }
1358 let original = arr.data.as_u8_slice().to_vec();
1359
1360 let compressed = compress_bslz4(&arr);
1361 assert_eq!(
1362 compressed.codec.as_ref().unwrap().original_data_type,
1363 NDDataType::UInt16
1364 );
1365 let decompressed = decompress_bslz4(&compressed).unwrap();
1366 assert_eq!(decompressed.data.data_type(), NDDataType::UInt16);
1367 assert_eq!(decompressed.data.as_u8_slice(), original.as_slice());
1368 }
1369
1370 #[test]
1371 fn test_bslz4_roundtrip_f64_with_negatives() {
1372 let mut arr = NDArray::new(vec![NDDimension::new(73)], NDDataType::Float64);
1373 if let NDDataBuffer::F64(ref mut v) = arr.data {
1374 for (i, x) in v.iter_mut().enumerate() {
1375 *x = (i as f64 - 36.0) * 2.5;
1376 }
1377 }
1378 let original = arr.data.as_u8_slice().to_vec();
1379
1380 let compressed = compress_bslz4(&arr);
1381 let decompressed = decompress_bslz4(&compressed).unwrap();
1382 assert_eq!(decompressed.data.data_type(), NDDataType::Float64);
1383 assert_eq!(decompressed.data.as_u8_slice(), original.as_slice());
1384 }
1385
1386 #[test]
1387 fn test_bslz4_roundtrip_multi_block() {
1388 let elem_size = 4usize;
1391 let block = bshuf_default_block_size(elem_size);
1392 let count = block * 2 + block / 2 + 3;
1394 let mut arr = NDArray::new(vec![NDDimension::new(count)], NDDataType::Int32);
1395 if let NDDataBuffer::I32(ref mut v) = arr.data {
1396 for (i, x) in v.iter_mut().enumerate() {
1397 *x = (i as i32).wrapping_mul(2_654_435_761u32 as i32);
1398 }
1399 }
1400 let original = arr.data.as_u8_slice().to_vec();
1401
1402 let compressed = compress_bslz4(&arr);
1403 let decompressed = decompress_bslz4(&compressed).unwrap();
1404 assert_eq!(decompressed.data.as_u8_slice(), original.as_slice());
1405 }
1406
1407 #[test]
1408 fn test_bslz4_compresses_repetitive_data() {
1409 let arr = NDArray::new(
1411 vec![NDDimension::new(256), NDDimension::new(256)],
1412 NDDataType::UInt16,
1413 );
1414 let original_size = arr.data.as_u8_slice().len();
1415 let compressed = compress_bslz4(&arr);
1416 let compressed_size = compressed.codec.as_ref().unwrap().compressed_size;
1417 assert!(
1418 compressed_size < original_size,
1419 "bslz4 compressed ({compressed_size}) should be < original ({original_size})"
1420 );
1421 }
1422
1423 #[test]
1424 fn test_bslz4_via_processor() {
1425 let mut arr = NDArray::new(
1427 vec![NDDimension::new(32), NDDimension::new(32)],
1428 NDDataType::UInt16,
1429 );
1430 if let NDDataBuffer::U16(ref mut v) = arr.data {
1431 for (i, x) in v.iter_mut().enumerate() {
1432 *x = (i * 11) as u16;
1433 }
1434 }
1435 let original = arr.data.as_u8_slice().to_vec();
1436 let pool = NDArrayPool::new(10_000_000);
1437
1438 let mut comp = CodecProcessor::new(CodecMode::Compress {
1439 codec: CodecName::BSLZ4,
1440 quality: 0,
1441 });
1442 let compressed = comp.process_array(&arr, &pool);
1443 let compressed_arr = &compressed.output_arrays[0];
1444 assert_eq!(
1445 compressed_arr.codec.as_ref().unwrap().name,
1446 CodecName::BSLZ4
1447 );
1448
1449 let mut decomp = CodecProcessor::new(CodecMode::Decompress);
1450 let result = decomp.process_array(compressed_arr, &pool);
1451 assert_eq!(
1452 result.output_arrays[0].data.as_u8_slice(),
1453 original.as_slice()
1454 );
1455 }
1456
1457 #[test]
1460 fn test_jpeg_compress_mono() {
1461 let arr = make_u8_array(16, 16);
1462 let compressed = compress_jpeg(&arr, 90).unwrap();
1463 assert_eq!(compressed.codec.as_ref().unwrap().name, CodecName::JPEG);
1464 let data = compressed.data.as_u8_slice();
1466 assert_eq!(&data[0..2], &[0xFF, 0xD8]);
1467 }
1468
1469 #[test]
1470 fn test_jpeg_compress_rgb() {
1471 let arr = make_rgb_array(16, 16);
1472 let compressed = compress_jpeg(&arr, 90).unwrap();
1473 assert_eq!(compressed.codec.as_ref().unwrap().name, CodecName::JPEG);
1474 let data = compressed.data.as_u8_slice();
1475 assert_eq!(&data[0..2], &[0xFF, 0xD8]);
1476 }
1477
1478 #[test]
1479 fn test_jpeg_roundtrip_mono() {
1480 let arr = make_u8_array(16, 16);
1481 let compressed = compress_jpeg(&arr, 100).unwrap();
1482 let decompressed = decompress_jpeg(&compressed).unwrap();
1483 assert!(decompressed.codec.is_none());
1484 assert_eq!(decompressed.dims.len(), 2);
1485 assert_eq!(decompressed.dims[0].size, 16); assert_eq!(decompressed.dims[1].size, 16); assert_eq!(decompressed.data.data_type(), NDDataType::UInt8);
1488 assert_eq!(decompressed.data.len(), 16 * 16);
1490 }
1491
1492 #[test]
1493 fn test_jpeg_roundtrip_rgb() {
1494 let arr = make_rgb_array(16, 16);
1495 let compressed = compress_jpeg(&arr, 100).unwrap();
1496 let decompressed = decompress_jpeg(&compressed).unwrap();
1497 assert!(decompressed.codec.is_none());
1498 assert_eq!(decompressed.dims.len(), 3);
1499 assert_eq!(decompressed.dims[0].size, 3); assert_eq!(decompressed.dims[1].size, 16); assert_eq!(decompressed.dims[2].size, 16); assert_eq!(decompressed.data.len(), 3 * 16 * 16);
1503 }
1504
1505 #[test]
1506 fn test_jpeg_rejects_non_u8() {
1507 let arr = NDArray::new(
1508 vec![NDDimension::new(8), NDDimension::new(8)],
1509 NDDataType::UInt16,
1510 );
1511 assert!(compress_jpeg(&arr, 90).is_none());
1512 }
1513
1514 #[test]
1515 fn test_jpeg_rejects_1d() {
1516 let arr = NDArray::new(vec![NDDimension::new(64)], NDDataType::UInt8);
1517 assert!(compress_jpeg(&arr, 90).is_none());
1518 }
1519
1520 #[test]
1521 fn test_jpeg_quality_affects_size() {
1522 let arr = make_u8_array(64, 64);
1523 let high = compress_jpeg(&arr, 95).unwrap();
1524 let low = compress_jpeg(&arr, 10).unwrap();
1525 let high_size = high.codec.as_ref().unwrap().compressed_size;
1526 let low_size = low.codec.as_ref().unwrap().compressed_size;
1527 assert!(
1528 high_size > low_size,
1529 "high quality ({}) should produce larger output than low quality ({})",
1530 high_size,
1531 low_size,
1532 );
1533 }
1534
1535 #[test]
1538 fn test_zlib_roundtrip_u8() {
1539 let arr = make_u8_array(8, 8);
1540 let original = arr.data.as_u8_slice().to_vec();
1541
1542 let compressed = compress_zlib(&arr);
1543 assert_eq!(compressed.codec.as_ref().unwrap().name, CodecName::Zlib);
1544 assert_ne!(compressed.data.as_u8_slice(), original.as_slice());
1545
1546 let decompressed = decompress_zlib(&compressed).unwrap();
1547 assert!(decompressed.codec.is_none());
1548 assert_eq!(decompressed.data.data_type(), NDDataType::UInt8);
1549 assert_eq!(decompressed.data.as_u8_slice(), original.as_slice());
1550 }
1551
1552 #[test]
1553 fn test_zlib_roundtrip_u16() {
1554 let mut arr = NDArray::new(
1555 vec![NDDimension::new(16), NDDimension::new(16)],
1556 NDDataType::UInt16,
1557 );
1558 if let NDDataBuffer::U16(ref mut v) = arr.data {
1559 for (i, x) in v.iter_mut().enumerate() {
1560 *x = (i * 257 % 65521) as u16;
1561 }
1562 }
1563 let original = arr.data.as_u8_slice().to_vec();
1564
1565 let compressed = compress_zlib(&arr);
1566 assert_eq!(
1567 compressed.codec.as_ref().unwrap().original_data_type,
1568 NDDataType::UInt16
1569 );
1570
1571 let decompressed = decompress_zlib(&compressed).unwrap();
1572 assert_eq!(decompressed.data.data_type(), NDDataType::UInt16);
1573 assert_eq!(decompressed.data.as_u8_slice(), original.as_slice());
1574 }
1575
1576 #[test]
1577 fn test_zlib_roundtrip_f64_with_negatives() {
1578 let mut arr = NDArray::new(vec![NDDimension::new(64)], NDDataType::Float64);
1579 if let NDDataBuffer::F64(ref mut v) = arr.data {
1580 for (i, x) in v.iter_mut().enumerate() {
1581 *x = (i as f64 - 32.0) * 3.25;
1582 }
1583 }
1584 let original = arr.data.as_u8_slice().to_vec();
1585
1586 let compressed = compress_zlib(&arr);
1587 let decompressed = decompress_zlib(&compressed).unwrap();
1588 assert_eq!(decompressed.data.data_type(), NDDataType::Float64);
1589 assert_eq!(decompressed.data.as_u8_slice(), original.as_slice());
1590 }
1591
1592 #[test]
1593 fn test_zlib_compresses_repetitive_data() {
1594 let arr = NDArray::new(
1595 vec![NDDimension::new(256), NDDimension::new(256)],
1596 NDDataType::UInt8,
1597 );
1598 let original_size = arr.data.as_u8_slice().len();
1599 let compressed = compress_zlib(&arr);
1600 let compressed_size = compressed.codec.as_ref().unwrap().compressed_size;
1601 assert!(
1602 compressed_size < original_size,
1603 "zlib compressed ({compressed_size}) should be < original ({original_size})"
1604 );
1605 }
1606
1607 #[test]
1608 fn test_zlib_via_processor() {
1609 let mut arr = NDArray::new(
1610 vec![NDDimension::new(32), NDDimension::new(32)],
1611 NDDataType::UInt16,
1612 );
1613 if let NDDataBuffer::U16(ref mut v) = arr.data {
1614 for (i, x) in v.iter_mut().enumerate() {
1615 *x = (i * 13) as u16;
1616 }
1617 }
1618 let original = arr.data.as_u8_slice().to_vec();
1619 let pool = NDArrayPool::new(10_000_000);
1620
1621 let mut comp = CodecProcessor::new(CodecMode::Compress {
1622 codec: CodecName::Zlib,
1623 quality: 0,
1624 });
1625 let compressed = comp.process_array(&arr, &pool);
1626 let compressed_arr = &compressed.output_arrays[0];
1627 assert_eq!(compressed_arr.codec.as_ref().unwrap().name, CodecName::Zlib);
1628
1629 let mut decomp = CodecProcessor::new(CodecMode::Decompress);
1630 let result = decomp.process_array(compressed_arr, &pool);
1631 assert_eq!(
1632 result.output_arrays[0].data.as_u8_slice(),
1633 original.as_slice()
1634 );
1635 }
1636
1637 #[test]
1640 fn test_lz4hdf5_roundtrip_u8() {
1641 let mut arr = NDArray::new(
1642 vec![NDDimension::new(64), NDDimension::new(64)],
1643 NDDataType::UInt8,
1644 );
1645 if let NDDataBuffer::U8(ref mut v) = arr.data {
1646 for (i, x) in v.iter_mut().enumerate() {
1647 *x = (i % 251) as u8;
1648 }
1649 }
1650 let original = arr.data.as_u8_slice().to_vec();
1651
1652 let compressed = compress_lz4hdf5(&arr);
1653 assert_eq!(compressed.codec.as_ref().unwrap().name, CodecName::LZ4HDF5);
1654 assert_ne!(compressed.data.as_u8_slice(), original.as_slice());
1655
1656 let decompressed = decompress_lz4hdf5(&compressed).unwrap();
1657 assert!(decompressed.codec.is_none());
1658 assert_eq!(decompressed.data.data_type(), NDDataType::UInt8);
1659 assert_eq!(decompressed.data.as_u8_slice(), original.as_slice());
1660 }
1661
1662 #[test]
1663 fn test_lz4hdf5_roundtrip_u16() {
1664 let mut arr = NDArray::new(
1665 vec![NDDimension::new(80), NDDimension::new(40)],
1666 NDDataType::UInt16,
1667 );
1668 if let NDDataBuffer::U16(ref mut v) = arr.data {
1669 for (i, x) in v.iter_mut().enumerate() {
1670 *x = (i * 37 % 65521) as u16;
1671 }
1672 }
1673 let original = arr.data.as_u8_slice().to_vec();
1674
1675 let compressed = compress_lz4hdf5(&arr);
1676 assert_eq!(
1677 compressed.codec.as_ref().unwrap().original_data_type,
1678 NDDataType::UInt16
1679 );
1680
1681 let decompressed = decompress_lz4hdf5(&compressed).unwrap();
1682 assert_eq!(decompressed.data.data_type(), NDDataType::UInt16);
1683 assert_eq!(decompressed.data.as_u8_slice(), original.as_slice());
1684 }
1685
1686 #[test]
1687 fn test_lz4hdf5_roundtrip_f64_with_negatives() {
1688 let mut arr = NDArray::new(vec![NDDimension::new(97)], NDDataType::Float64);
1689 if let NDDataBuffer::F64(ref mut v) = arr.data {
1690 for (i, x) in v.iter_mut().enumerate() {
1691 *x = (i as f64 - 48.0) * 1.75;
1692 }
1693 }
1694 let original = arr.data.as_u8_slice().to_vec();
1695
1696 let compressed = compress_lz4hdf5(&arr);
1697 let decompressed = decompress_lz4hdf5(&compressed).unwrap();
1698 assert_eq!(decompressed.data.data_type(), NDDataType::Float64);
1699 assert_eq!(decompressed.data.as_u8_slice(), original.as_slice());
1700 }
1701
1702 #[test]
1703 fn test_lz4hdf5_multi_block_roundtrip() {
1704 let block = LZ4HDF5_DEFAULT_BLOCK_SIZE;
1707 let count = block * 2 + block / 3 + 7; let mut arr = NDArray::new(vec![NDDimension::new(count)], NDDataType::UInt8);
1709 if let NDDataBuffer::U8(ref mut v) = arr.data {
1710 for (i, x) in v.iter_mut().enumerate() {
1711 *x = (i.wrapping_mul(2_654_435_761) % 251) as u8;
1712 }
1713 }
1714 let original = arr.data.as_u8_slice().to_vec();
1715
1716 let compressed = compress_lz4hdf5(&arr);
1717 let decompressed = decompress_lz4hdf5(&compressed).unwrap();
1718 assert_eq!(decompressed.data.as_u8_slice(), original.as_slice());
1719 }
1720
1721 #[test]
1722 fn test_lz4hdf5_compresses_repetitive_data() {
1723 let arr = NDArray::new(
1724 vec![NDDimension::new(256), NDDimension::new(256)],
1725 NDDataType::UInt16,
1726 );
1727 let original_size = arr.data.as_u8_slice().len();
1728 let compressed = compress_lz4hdf5(&arr);
1729 let compressed_size = compressed.codec.as_ref().unwrap().compressed_size;
1730 assert!(
1731 compressed_size < original_size,
1732 "lz4hdf5 compressed ({compressed_size}) should be < original ({original_size})"
1733 );
1734 }
1735
1736 #[test]
1737 fn test_lz4hdf5_via_processor() {
1738 let mut arr = NDArray::new(
1739 vec![NDDimension::new(48), NDDimension::new(48)],
1740 NDDataType::UInt16,
1741 );
1742 if let NDDataBuffer::U16(ref mut v) = arr.data {
1743 for (i, x) in v.iter_mut().enumerate() {
1744 *x = (i * 7) as u16;
1745 }
1746 }
1747 let original = arr.data.as_u8_slice().to_vec();
1748 let pool = NDArrayPool::new(10_000_000);
1749
1750 let mut comp = CodecProcessor::new(CodecMode::Compress {
1751 codec: CodecName::LZ4HDF5,
1752 quality: 0,
1753 });
1754 let compressed = comp.process_array(&arr, &pool);
1755 let compressed_arr = &compressed.output_arrays[0];
1756 assert_eq!(
1757 compressed_arr.codec.as_ref().unwrap().name,
1758 CodecName::LZ4HDF5
1759 );
1760
1761 let mut decomp = CodecProcessor::new(CodecMode::Decompress);
1762 let result = decomp.process_array(compressed_arr, &pool);
1763 assert_eq!(
1764 result.output_arrays[0].data.as_u8_slice(),
1765 original.as_slice()
1766 );
1767 }
1768
1769 #[test]
1772 fn test_compressor_ordinal_mapping() {
1773 use ad_core_rs::plugin::runtime::{ParamChangeValue, PluginParamSnapshot};
1777
1778 let cases = [
1779 (0i32, CodecName::None),
1780 (1, CodecName::JPEG),
1781 (2, CodecName::Zlib),
1782 (3, CodecName::Blosc),
1783 (4, CodecName::LZ4),
1784 (5, CodecName::LZ4HDF5),
1785 (6, CodecName::BSLZ4),
1786 ];
1787
1788 for (ordinal, expected) in cases {
1789 let mut proc = CodecProcessor::new(CodecMode::Compress {
1790 codec: CodecName::LZ4,
1791 quality: 85,
1792 });
1793 proc.params.compressor = Some(0);
1796 let snapshot = PluginParamSnapshot {
1797 enable_callbacks: true,
1798 reason: 0,
1799 addr: 0,
1800 value: ParamChangeValue::Int32(ordinal),
1801 };
1802 proc.on_param_change(0, &snapshot);
1803 match proc.mode {
1804 CodecMode::Compress { codec, .. } => assert_eq!(
1805 codec, expected,
1806 "ordinal {ordinal} should select {expected:?}"
1807 ),
1808 other => panic!("expected Compress mode, got {other:?}"),
1809 }
1810 }
1811 }
1812
1813 #[test]
1816 fn test_decompress_wrong_codec() {
1817 let arr = make_u8_array(4, 4);
1818 assert!(decompress_lz4(&arr).is_none());
1819 assert!(decompress_jpeg(&arr).is_none());
1820 assert!(decompress_zlib(&arr).is_none());
1821 assert!(decompress_lz4hdf5(&arr).is_none());
1822 }
1823
1824 #[test]
1827 fn test_processor_lz4_compress() {
1828 let pool = NDArrayPool::new(1_000_000);
1829 let mut proc = CodecProcessor::new(CodecMode::Compress {
1830 codec: CodecName::LZ4,
1831 quality: 0,
1832 });
1833 let arr = make_u8_array(32, 32);
1834 let result = proc.process_array(&arr, &pool);
1835 assert_eq!(result.output_arrays.len(), 1);
1836 assert_eq!(
1837 result.output_arrays[0].codec.as_ref().unwrap().name,
1838 CodecName::LZ4
1839 );
1840 assert!(proc.compression_ratio() >= 1.0);
1841 }
1842
1843 #[test]
1844 fn test_processor_jpeg_compress() {
1845 let pool = NDArrayPool::new(1_000_000);
1846 let mut proc = CodecProcessor::new(CodecMode::Compress {
1847 codec: CodecName::JPEG,
1848 quality: 80,
1849 });
1850 let arr = make_u8_array(16, 16);
1851 let result = proc.process_array(&arr, &pool);
1852 assert_eq!(result.output_arrays.len(), 1);
1853 assert_eq!(
1854 result.output_arrays[0].codec.as_ref().unwrap().name,
1855 CodecName::JPEG
1856 );
1857 }
1858
1859 #[test]
1860 fn test_processor_decompress_auto_lz4() {
1861 let pool = NDArrayPool::new(1_000_000);
1862 let arr = make_u8_array(16, 16);
1863 let compressed = compress_lz4(&arr);
1864
1865 let mut proc = CodecProcessor::new(CodecMode::Decompress);
1866 let result = proc.process_array(&compressed, &pool);
1867 assert_eq!(result.output_arrays.len(), 1);
1868 assert!(result.output_arrays[0].codec.is_none());
1869 assert_eq!(
1870 result.output_arrays[0].data.as_u8_slice(),
1871 arr.data.as_u8_slice()
1872 );
1873 assert!(proc.compression_ratio() > 0.0);
1874 }
1875
1876 #[test]
1877 fn test_processor_decompress_auto_jpeg() {
1878 let pool = NDArrayPool::new(1_000_000);
1879 let arr = make_u8_array(16, 16);
1880 let compressed = compress_jpeg(&arr, 90).unwrap();
1881
1882 let mut proc = CodecProcessor::new(CodecMode::Decompress);
1883 let result = proc.process_array(&compressed, &pool);
1884 assert_eq!(result.output_arrays.len(), 1);
1885 assert!(result.output_arrays[0].codec.is_none());
1886 }
1887
1888 #[test]
1889 fn test_processor_decompress_no_codec() {
1890 let pool = NDArrayPool::new(1_000_000);
1891 let arr = make_u8_array(8, 8);
1892 let mut proc = CodecProcessor::new(CodecMode::Decompress);
1893 let result = proc.process_array(&arr, &pool);
1894 assert_eq!(result.output_arrays.len(), 1);
1896 assert_eq!(proc.compression_ratio(), 1.0);
1897 }
1898
1899 #[test]
1900 fn test_processor_compression_ratio() {
1901 let pool = NDArrayPool::new(1_000_000);
1902 let mut arr = NDArray::new(
1904 vec![NDDimension::new(128), NDDimension::new(128)],
1905 NDDataType::UInt8,
1906 );
1907 if let NDDataBuffer::U8(ref mut v) = arr.data {
1908 for x in v.iter_mut() {
1909 *x = 0;
1910 }
1911 }
1912
1913 let mut proc = CodecProcessor::new(CodecMode::Compress {
1914 codec: CodecName::LZ4,
1915 quality: 0,
1916 });
1917 let _ = proc.process_array(&arr, &pool);
1918 let ratio = proc.compression_ratio();
1919 assert!(
1920 ratio > 2.0,
1921 "all-zeros 128x128 should compress at least 2x, got {}",
1922 ratio,
1923 );
1924 }
1925
1926 #[test]
1927 fn test_processor_plugin_type() {
1928 let proc = CodecProcessor::new(CodecMode::Decompress);
1929 assert_eq!(proc.plugin_type(), "NDPluginCodec");
1930 }
1931
1932 #[test]
1935 fn test_buffer_from_bytes_u8() {
1936 let data = vec![1u8, 2, 3, 4];
1937 let buf = buffer_from_bytes(&data, NDDataType::UInt8).unwrap();
1938 assert_eq!(buf.data_type(), NDDataType::UInt8);
1939 assert_eq!(buf.len(), 4);
1940 assert_eq!(buf.as_u8_slice(), &[1, 2, 3, 4]);
1941 }
1942
1943 #[test]
1944 fn test_buffer_from_bytes_u16() {
1945 let original = vec![1000u16, 2000, 3000];
1946 let bytes: Vec<u8> = original.iter().flat_map(|v| v.to_ne_bytes()).collect();
1947 let buf = buffer_from_bytes(&bytes, NDDataType::UInt16).unwrap();
1948 assert_eq!(buf.data_type(), NDDataType::UInt16);
1949 assert_eq!(buf.len(), 3);
1950 if let NDDataBuffer::U16(v) = buf {
1951 assert_eq!(v, original);
1952 } else {
1953 panic!("wrong buffer type");
1954 }
1955 }
1956
1957 #[test]
1958 fn test_buffer_from_bytes_bad_alignment() {
1959 let data = vec![0u8; 3];
1961 assert!(buffer_from_bytes(&data, NDDataType::UInt16).is_none());
1962 }
1963
1964 #[test]
1965 fn test_buffer_from_bytes_f64_roundtrip() {
1966 let original = vec![1.5f64, -2.7, 3.14159];
1967 let bytes: Vec<u8> = original.iter().flat_map(|v| v.to_ne_bytes()).collect();
1968 let buf = buffer_from_bytes(&bytes, NDDataType::Float64).unwrap();
1969 if let NDDataBuffer::F64(v) = buf {
1970 assert_eq!(v, original);
1971 } else {
1972 panic!("wrong buffer type");
1973 }
1974 }
1975}