1use std::sync::Arc;
2
3use ad_core::attributes::{NDAttrSource, NDAttrValue, NDAttribute};
4use ad_core::codec::{Codec, CodecName};
5use ad_core::ndarray::{NDArray, NDDataBuffer, NDDataType, NDDimension};
6use ad_core::ndarray_pool::NDArrayPool;
7use ad_core::plugin::runtime::{NDPluginProcess, ProcessResult};
8
9use lz4_flex::{compress_prepend_size, decompress_size_prepended};
10
11const ATTR_ORIGINAL_DATA_TYPE: &str = "CODEC_ORIGINAL_DATA_TYPE";
13
14fn buffer_from_bytes(bytes: &[u8], data_type: NDDataType) -> Option<NDDataBuffer> {
19 let elem_size = data_type.element_size();
20 if bytes.len() % elem_size != 0 {
21 return None;
22 }
23 let count = bytes.len() / elem_size;
24
25 Some(match data_type {
26 NDDataType::Int8 => {
27 let mut v = vec![0i8; count];
28 unsafe {
30 std::ptr::copy_nonoverlapping(
31 bytes.as_ptr(),
32 v.as_mut_ptr() as *mut u8,
33 bytes.len(),
34 );
35 }
36 NDDataBuffer::I8(v)
37 }
38 NDDataType::UInt8 => NDDataBuffer::U8(bytes.to_vec()),
39 NDDataType::Int16 => {
40 let mut v = vec![0i16; count];
41 unsafe {
42 std::ptr::copy_nonoverlapping(
43 bytes.as_ptr(),
44 v.as_mut_ptr() as *mut u8,
45 bytes.len(),
46 );
47 }
48 NDDataBuffer::I16(v)
49 }
50 NDDataType::UInt16 => {
51 let mut v = vec![0u16; count];
52 unsafe {
53 std::ptr::copy_nonoverlapping(
54 bytes.as_ptr(),
55 v.as_mut_ptr() as *mut u8,
56 bytes.len(),
57 );
58 }
59 NDDataBuffer::U16(v)
60 }
61 NDDataType::Int32 => {
62 let mut v = vec![0i32; count];
63 unsafe {
64 std::ptr::copy_nonoverlapping(
65 bytes.as_ptr(),
66 v.as_mut_ptr() as *mut u8,
67 bytes.len(),
68 );
69 }
70 NDDataBuffer::I32(v)
71 }
72 NDDataType::UInt32 => {
73 let mut v = vec![0u32; count];
74 unsafe {
75 std::ptr::copy_nonoverlapping(
76 bytes.as_ptr(),
77 v.as_mut_ptr() as *mut u8,
78 bytes.len(),
79 );
80 }
81 NDDataBuffer::U32(v)
82 }
83 NDDataType::Int64 => {
84 let mut v = vec![0i64; count];
85 unsafe {
86 std::ptr::copy_nonoverlapping(
87 bytes.as_ptr(),
88 v.as_mut_ptr() as *mut u8,
89 bytes.len(),
90 );
91 }
92 NDDataBuffer::I64(v)
93 }
94 NDDataType::UInt64 => {
95 let mut v = vec![0u64; count];
96 unsafe {
97 std::ptr::copy_nonoverlapping(
98 bytes.as_ptr(),
99 v.as_mut_ptr() as *mut u8,
100 bytes.len(),
101 );
102 }
103 NDDataBuffer::U64(v)
104 }
105 NDDataType::Float32 => {
106 let mut v = vec![0f32; count];
107 unsafe {
108 std::ptr::copy_nonoverlapping(
109 bytes.as_ptr(),
110 v.as_mut_ptr() as *mut u8,
111 bytes.len(),
112 );
113 }
114 NDDataBuffer::F32(v)
115 }
116 NDDataType::Float64 => {
117 let mut v = vec![0f64; count];
118 unsafe {
119 std::ptr::copy_nonoverlapping(
120 bytes.as_ptr(),
121 v.as_mut_ptr() as *mut u8,
122 bytes.len(),
123 );
124 }
125 NDDataBuffer::F64(v)
126 }
127 })
128}
129
130pub fn compress_lz4(src: &NDArray) -> NDArray {
136 let raw = src.data.as_u8_slice();
137 let original_data_type = src.data.data_type();
138 let original_size = raw.len();
139 let compressed = compress_prepend_size(raw);
140 let compressed_size = compressed.len();
141
142 let mut arr = src.clone();
143 arr.data = NDDataBuffer::U8(compressed);
144 arr.codec = Some(Codec {
145 name: CodecName::LZ4,
146 compressed_size,
147 });
148
149 arr.attributes.add(NDAttribute {
151 name: ATTR_ORIGINAL_DATA_TYPE.into(),
152 description: "Original NDDataType ordinal before codec compression".into(),
153 source: NDAttrSource::Driver,
154 value: NDAttrValue::UInt8(original_data_type as u8),
155 });
156
157 tracing::debug!(
158 original_size,
159 compressed_size,
160 ratio = original_size as f64 / compressed_size.max(1) as f64,
161 "LZ4 compress"
162 );
163
164 arr
165}
166
167pub fn decompress_lz4(src: &NDArray) -> Option<NDArray> {
172 if src.codec.as_ref().map(|c| c.name) != Some(CodecName::LZ4) {
173 return None;
174 }
175 let compressed = src.data.as_u8_slice();
176 let decompressed = decompress_size_prepended(compressed).ok()?;
177
178 let original_type = src
180 .attributes
181 .get(ATTR_ORIGINAL_DATA_TYPE)
182 .and_then(|a| a.value.as_i64())
183 .and_then(|ord| NDDataType::from_ordinal(ord as u8))
184 .unwrap_or(NDDataType::UInt8);
185
186 let buffer = buffer_from_bytes(&decompressed, original_type)?;
187
188 let mut arr = src.clone();
189 arr.data = buffer;
190 arr.codec = None;
191 arr.attributes.remove(ATTR_ORIGINAL_DATA_TYPE);
192
193 Some(arr)
194}
195
196pub fn compress_jpeg(src: &NDArray, quality: u8) -> Option<NDArray> {
204 if src.data.data_type() != NDDataType::UInt8 {
205 return None;
206 }
207
208 let raw = src.data.as_u8_slice();
209 let info = src.info();
210
211 let (width, height, color_type) = match src.dims.len() {
212 2 => {
213 (
215 info.x_size as u16,
216 info.y_size as u16,
217 jpeg_encoder::ColorType::Luma,
218 )
219 }
220 3 if src.dims[0].size == 3 => {
221 (
223 info.x_size as u16,
224 info.y_size as u16,
225 jpeg_encoder::ColorType::Rgb,
226 )
227 }
228 _ => return None,
229 };
230
231 let mut jpeg_buf = Vec::new();
232 let encoder = jpeg_encoder::Encoder::new(&mut jpeg_buf, quality);
233 if encoder.encode(raw, width, height, color_type).is_err() {
234 return None;
235 }
236
237 let compressed_size = jpeg_buf.len();
238 let original_size = raw.len();
239
240 let mut arr = src.clone();
241 arr.data = NDDataBuffer::U8(jpeg_buf);
242 arr.codec = Some(Codec {
243 name: CodecName::JPEG,
244 compressed_size,
245 });
246
247 tracing::debug!(
248 original_size,
249 compressed_size,
250 ratio = original_size as f64 / compressed_size.max(1) as f64,
251 "JPEG compress (quality={})",
252 quality,
253 );
254
255 Some(arr)
256}
257
258pub fn decompress_jpeg(src: &NDArray) -> Option<NDArray> {
265 if src.codec.as_ref().map(|c| c.name) != Some(CodecName::JPEG) {
266 return None;
267 }
268
269 let compressed = src.data.as_u8_slice();
270 let mut decoder = jpeg_decoder::Decoder::new(compressed);
271 let pixels = decoder.decode().ok()?;
272 let metadata = decoder.info()?;
273
274 let width = metadata.width as usize;
275 let height = metadata.height as usize;
276
277 let dims = match metadata.pixel_format {
278 jpeg_decoder::PixelFormat::L8 => {
279 vec![NDDimension::new(width), NDDimension::new(height)]
281 }
282 jpeg_decoder::PixelFormat::RGB24 => {
283 vec![
285 NDDimension::new(3),
286 NDDimension::new(width),
287 NDDimension::new(height),
288 ]
289 }
290 _ => return None,
291 };
292
293 let mut arr = src.clone();
294 arr.dims = dims;
295 arr.data = NDDataBuffer::U8(pixels);
296 arr.codec = None;
297
298 Some(arr)
299}
300
301#[derive(Debug, Clone, Copy, PartialEq, Eq)]
303pub enum CodecMode {
304 Compress { codec: CodecName, quality: u8 },
306 Decompress,
308}
309
310pub struct CodecProcessor {
314 mode: CodecMode,
315 compression_ratio: f64,
316}
317
318impl CodecProcessor {
319 pub fn new(mode: CodecMode) -> Self {
320 Self {
321 mode,
322 compression_ratio: 1.0,
323 }
324 }
325
326 pub fn compression_ratio(&self) -> f64 {
329 self.compression_ratio
330 }
331}
332
333impl NDPluginProcess for CodecProcessor {
334 fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
335 let original_bytes = array.data.as_u8_slice().len();
336
337 let result = match self.mode {
338 CodecMode::Compress { codec: CodecName::LZ4, .. } => Some(compress_lz4(array)),
339 CodecMode::Compress { codec: CodecName::JPEG, quality } => {
340 compress_jpeg(array, quality)
341 }
342 CodecMode::Compress { .. } => None,
343 CodecMode::Decompress => {
344 match array.codec.as_ref().map(|c| c.name) {
346 Some(CodecName::LZ4) => decompress_lz4(array),
347 Some(CodecName::JPEG) => decompress_jpeg(array),
348 _ => None,
349 }
350 }
351 };
352
353 match result {
354 Some(ref out) => {
355 let output_bytes = out.data.as_u8_slice().len();
356 match self.mode {
357 CodecMode::Compress { .. } => {
358 self.compression_ratio =
360 original_bytes as f64 / output_bytes.max(1) as f64;
361 }
362 CodecMode::Decompress => {
363 self.compression_ratio =
365 output_bytes as f64 / original_bytes.max(1) as f64;
366 }
367 }
368 ProcessResult::arrays(vec![Arc::new(out.clone())])
369 }
370 None => {
371 self.compression_ratio = 1.0;
372 ProcessResult::empty()
373 }
374 }
375 }
376
377 fn plugin_type(&self) -> &str {
378 "NDPluginCodec"
379 }
380}
381
382#[cfg(test)]
383mod tests {
384 use super::*;
385
386 fn make_u8_array(width: usize, height: usize) -> NDArray {
387 let mut arr = NDArray::new(
388 vec![NDDimension::new(width), NDDimension::new(height)],
389 NDDataType::UInt8,
390 );
391 if let NDDataBuffer::U8(ref mut v) = arr.data {
392 for i in 0..v.len() {
393 v[i] = (i % 256) as u8;
394 }
395 }
396 arr
397 }
398
399 fn make_rgb_array(width: usize, height: usize) -> NDArray {
400 let mut arr = NDArray::new(
401 vec![
402 NDDimension::new(3),
403 NDDimension::new(width),
404 NDDimension::new(height),
405 ],
406 NDDataType::UInt8,
407 );
408 if let NDDataBuffer::U8(ref mut v) = arr.data {
409 for i in 0..v.len() {
410 v[i] = (i % 256) as u8;
411 }
412 }
413 arr
414 }
415
416 #[test]
419 fn test_lz4_roundtrip_u8() {
420 let arr = make_u8_array(4, 4);
421 let original_data = arr.data.as_u8_slice().to_vec();
422
423 let compressed = compress_lz4(&arr);
424 assert_eq!(compressed.codec.as_ref().unwrap().name, CodecName::LZ4);
425 assert_ne!(compressed.data.as_u8_slice(), original_data.as_slice());
427
428 let decompressed = decompress_lz4(&compressed).unwrap();
429 assert!(decompressed.codec.is_none());
430 assert_eq!(decompressed.data.data_type(), NDDataType::UInt8);
431 assert_eq!(decompressed.data.as_u8_slice(), original_data.as_slice());
432 }
433
434 #[test]
435 fn test_lz4_roundtrip_u16() {
436 let mut arr = NDArray::new(
437 vec![NDDimension::new(8), NDDimension::new(8)],
438 NDDataType::UInt16,
439 );
440 if let NDDataBuffer::U16(ref mut v) = arr.data {
441 for i in 0..v.len() {
442 v[i] = (i * 100) as u16;
443 }
444 }
445 let original_bytes = arr.data.as_u8_slice().to_vec();
446
447 let compressed = compress_lz4(&arr);
448 assert_eq!(compressed.codec.as_ref().unwrap().name, CodecName::LZ4);
449 let dt_attr = compressed.attributes.get(ATTR_ORIGINAL_DATA_TYPE).unwrap();
451 assert_eq!(dt_attr.value, NDAttrValue::UInt8(NDDataType::UInt16 as u8));
452
453 let decompressed = decompress_lz4(&compressed).unwrap();
454 assert!(decompressed.codec.is_none());
455 assert_eq!(decompressed.data.data_type(), NDDataType::UInt16);
456 assert_eq!(decompressed.data.as_u8_slice(), original_bytes.as_slice());
457 assert!(decompressed.attributes.get(ATTR_ORIGINAL_DATA_TYPE).is_none());
459 }
460
461 #[test]
462 fn test_lz4_roundtrip_f64() {
463 let mut arr = NDArray::new(
464 vec![NDDimension::new(16)],
465 NDDataType::Float64,
466 );
467 if let NDDataBuffer::F64(ref mut v) = arr.data {
468 for i in 0..v.len() {
469 v[i] = i as f64 * 1.5;
470 }
471 }
472 let original_bytes = arr.data.as_u8_slice().to_vec();
473
474 let compressed = compress_lz4(&arr);
475 let decompressed = decompress_lz4(&compressed).unwrap();
476 assert_eq!(decompressed.data.data_type(), NDDataType::Float64);
477 assert_eq!(decompressed.data.as_u8_slice(), original_bytes.as_slice());
478 }
479
480 #[test]
481 fn test_lz4_compresses_repetitive_data() {
482 let mut arr = NDArray::new(
484 vec![NDDimension::new(256), NDDimension::new(256)],
485 NDDataType::UInt8,
486 );
487 if let NDDataBuffer::U8(ref mut v) = arr.data {
489 for x in v.iter_mut() {
490 *x = 0;
491 }
492 }
493 let original_size = arr.data.as_u8_slice().len();
494
495 let compressed = compress_lz4(&arr);
496 let compressed_size = compressed.codec.as_ref().unwrap().compressed_size;
497 assert!(
498 compressed_size < original_size,
499 "compressed ({}) should be smaller than original ({})",
500 compressed_size,
501 original_size,
502 );
503 }
504
505 #[test]
506 fn test_lz4_preserves_metadata() {
507 let mut arr = make_u8_array(4, 4);
508 arr.unique_id = 42;
509
510 let compressed = compress_lz4(&arr);
511 assert_eq!(compressed.unique_id, 42);
512 assert_eq!(compressed.dims.len(), 2);
513 assert_eq!(compressed.dims[0].size, 4);
514 assert_eq!(compressed.dims[1].size, 4);
515 }
516
517 #[test]
520 fn test_jpeg_compress_mono() {
521 let arr = make_u8_array(16, 16);
522 let compressed = compress_jpeg(&arr, 90).unwrap();
523 assert_eq!(compressed.codec.as_ref().unwrap().name, CodecName::JPEG);
524 let data = compressed.data.as_u8_slice();
526 assert_eq!(&data[0..2], &[0xFF, 0xD8]);
527 }
528
529 #[test]
530 fn test_jpeg_compress_rgb() {
531 let arr = make_rgb_array(16, 16);
532 let compressed = compress_jpeg(&arr, 90).unwrap();
533 assert_eq!(compressed.codec.as_ref().unwrap().name, CodecName::JPEG);
534 let data = compressed.data.as_u8_slice();
535 assert_eq!(&data[0..2], &[0xFF, 0xD8]);
536 }
537
538 #[test]
539 fn test_jpeg_roundtrip_mono() {
540 let arr = make_u8_array(16, 16);
541 let compressed = compress_jpeg(&arr, 100).unwrap();
542 let decompressed = decompress_jpeg(&compressed).unwrap();
543 assert!(decompressed.codec.is_none());
544 assert_eq!(decompressed.dims.len(), 2);
545 assert_eq!(decompressed.dims[0].size, 16); assert_eq!(decompressed.dims[1].size, 16); assert_eq!(decompressed.data.data_type(), NDDataType::UInt8);
548 assert_eq!(decompressed.data.len(), 16 * 16);
550 }
551
552 #[test]
553 fn test_jpeg_roundtrip_rgb() {
554 let arr = make_rgb_array(16, 16);
555 let compressed = compress_jpeg(&arr, 100).unwrap();
556 let decompressed = decompress_jpeg(&compressed).unwrap();
557 assert!(decompressed.codec.is_none());
558 assert_eq!(decompressed.dims.len(), 3);
559 assert_eq!(decompressed.dims[0].size, 3); assert_eq!(decompressed.dims[1].size, 16); assert_eq!(decompressed.dims[2].size, 16); assert_eq!(decompressed.data.len(), 3 * 16 * 16);
563 }
564
565 #[test]
566 fn test_jpeg_rejects_non_u8() {
567 let arr = NDArray::new(
568 vec![NDDimension::new(8), NDDimension::new(8)],
569 NDDataType::UInt16,
570 );
571 assert!(compress_jpeg(&arr, 90).is_none());
572 }
573
574 #[test]
575 fn test_jpeg_rejects_1d() {
576 let arr = NDArray::new(vec![NDDimension::new(64)], NDDataType::UInt8);
577 assert!(compress_jpeg(&arr, 90).is_none());
578 }
579
580 #[test]
581 fn test_jpeg_quality_affects_size() {
582 let arr = make_u8_array(64, 64);
583 let high = compress_jpeg(&arr, 95).unwrap();
584 let low = compress_jpeg(&arr, 10).unwrap();
585 let high_size = high.codec.as_ref().unwrap().compressed_size;
586 let low_size = low.codec.as_ref().unwrap().compressed_size;
587 assert!(
588 high_size > low_size,
589 "high quality ({}) should produce larger output than low quality ({})",
590 high_size,
591 low_size,
592 );
593 }
594
595 #[test]
598 fn test_decompress_wrong_codec() {
599 let arr = make_u8_array(4, 4);
600 assert!(decompress_lz4(&arr).is_none());
601 assert!(decompress_jpeg(&arr).is_none());
602 }
603
604 #[test]
607 fn test_processor_lz4_compress() {
608 let pool = NDArrayPool::new(1_000_000);
609 let mut proc = CodecProcessor::new(CodecMode::Compress {
610 codec: CodecName::LZ4,
611 quality: 0,
612 });
613 let arr = make_u8_array(32, 32);
614 let result = proc.process_array(&arr, &pool);
615 assert_eq!(result.output_arrays.len(), 1);
616 assert_eq!(
617 result.output_arrays[0].codec.as_ref().unwrap().name,
618 CodecName::LZ4
619 );
620 assert!(proc.compression_ratio() >= 1.0);
621 }
622
623 #[test]
624 fn test_processor_jpeg_compress() {
625 let pool = NDArrayPool::new(1_000_000);
626 let mut proc = CodecProcessor::new(CodecMode::Compress {
627 codec: CodecName::JPEG,
628 quality: 80,
629 });
630 let arr = make_u8_array(16, 16);
631 let result = proc.process_array(&arr, &pool);
632 assert_eq!(result.output_arrays.len(), 1);
633 assert_eq!(
634 result.output_arrays[0].codec.as_ref().unwrap().name,
635 CodecName::JPEG
636 );
637 }
638
639 #[test]
640 fn test_processor_decompress_auto_lz4() {
641 let pool = NDArrayPool::new(1_000_000);
642 let arr = make_u8_array(16, 16);
643 let compressed = compress_lz4(&arr);
644
645 let mut proc = CodecProcessor::new(CodecMode::Decompress);
646 let result = proc.process_array(&compressed, &pool);
647 assert_eq!(result.output_arrays.len(), 1);
648 assert!(result.output_arrays[0].codec.is_none());
649 assert_eq!(
650 result.output_arrays[0].data.as_u8_slice(),
651 arr.data.as_u8_slice()
652 );
653 assert!(proc.compression_ratio() > 0.0);
654 }
655
656 #[test]
657 fn test_processor_decompress_auto_jpeg() {
658 let pool = NDArrayPool::new(1_000_000);
659 let arr = make_u8_array(16, 16);
660 let compressed = compress_jpeg(&arr, 90).unwrap();
661
662 let mut proc = CodecProcessor::new(CodecMode::Decompress);
663 let result = proc.process_array(&compressed, &pool);
664 assert_eq!(result.output_arrays.len(), 1);
665 assert!(result.output_arrays[0].codec.is_none());
666 }
667
668 #[test]
669 fn test_processor_decompress_no_codec() {
670 let pool = NDArrayPool::new(1_000_000);
671 let arr = make_u8_array(8, 8);
672 let mut proc = CodecProcessor::new(CodecMode::Decompress);
673 let result = proc.process_array(&arr, &pool);
674 assert!(result.output_arrays.is_empty());
675 assert_eq!(proc.compression_ratio(), 1.0);
676 }
677
678 #[test]
679 fn test_processor_compression_ratio() {
680 let pool = NDArrayPool::new(1_000_000);
681 let mut arr = NDArray::new(
683 vec![NDDimension::new(128), NDDimension::new(128)],
684 NDDataType::UInt8,
685 );
686 if let NDDataBuffer::U8(ref mut v) = arr.data {
687 for x in v.iter_mut() {
688 *x = 0;
689 }
690 }
691
692 let mut proc = CodecProcessor::new(CodecMode::Compress {
693 codec: CodecName::LZ4,
694 quality: 0,
695 });
696 let _ = proc.process_array(&arr, &pool);
697 let ratio = proc.compression_ratio();
698 assert!(
699 ratio > 2.0,
700 "all-zeros 128x128 should compress at least 2x, got {}",
701 ratio,
702 );
703 }
704
705 #[test]
706 fn test_processor_plugin_type() {
707 let proc = CodecProcessor::new(CodecMode::Decompress);
708 assert_eq!(proc.plugin_type(), "NDPluginCodec");
709 }
710
711 #[test]
714 fn test_buffer_from_bytes_u8() {
715 let data = vec![1u8, 2, 3, 4];
716 let buf = buffer_from_bytes(&data, NDDataType::UInt8).unwrap();
717 assert_eq!(buf.data_type(), NDDataType::UInt8);
718 assert_eq!(buf.len(), 4);
719 assert_eq!(buf.as_u8_slice(), &[1, 2, 3, 4]);
720 }
721
722 #[test]
723 fn test_buffer_from_bytes_u16() {
724 let original = vec![1000u16, 2000, 3000];
725 let bytes: Vec<u8> = original
726 .iter()
727 .flat_map(|v| v.to_ne_bytes())
728 .collect();
729 let buf = buffer_from_bytes(&bytes, NDDataType::UInt16).unwrap();
730 assert_eq!(buf.data_type(), NDDataType::UInt16);
731 assert_eq!(buf.len(), 3);
732 if let NDDataBuffer::U16(v) = buf {
733 assert_eq!(v, original);
734 } else {
735 panic!("wrong buffer type");
736 }
737 }
738
739 #[test]
740 fn test_buffer_from_bytes_bad_alignment() {
741 let data = vec![0u8; 3];
743 assert!(buffer_from_bytes(&data, NDDataType::UInt16).is_none());
744 }
745
746 #[test]
747 fn test_buffer_from_bytes_f64_roundtrip() {
748 let original = vec![1.5f64, -2.7, 3.14159];
749 let bytes: Vec<u8> = original
750 .iter()
751 .flat_map(|v| v.to_ne_bytes())
752 .collect();
753 let buf = buffer_from_bytes(&bytes, NDDataType::Float64).unwrap();
754 if let NDDataBuffer::F64(v) = buf {
755 assert_eq!(v, original);
756 } else {
757 panic!("wrong buffer type");
758 }
759 }
760}