Skip to main content

ad_plugins_rs/
file_hdf5.rs

1use std::path::{Path, PathBuf};
2
3use ad_core_rs::error::{ADError, ADResult};
4use ad_core_rs::ndarray::{NDArray, NDDataBuffer, NDDataType, NDDimension};
5use ad_core_rs::ndarray_pool::NDArrayPool;
6use ad_core_rs::plugin::file_base::{NDFileMode, NDFileWriter};
7use ad_core_rs::plugin::file_controller::FilePluginController;
8use ad_core_rs::plugin::runtime::{
9    NDPluginProcess, ParamChangeResult, ParamUpdate, PluginParamSnapshot, ProcessResult,
10};
11
12use rust_hdf5::H5File;
13use rust_hdf5::format::messages::filter::{
14    FILTER_BLOSC, FILTER_JPEG, FILTER_NBIT, FILTER_SZIP, Filter, FilterPipeline,
15};
16use rust_hdf5::swmr::SwmrFileWriter;
17
18/// C ADCore compression type enum values (matching NDFileHDF5.h).
19const COMPRESS_NONE: i32 = 0;
20const COMPRESS_NBIT: i32 = 1;
21const COMPRESS_SZIP: i32 = 2;
22const COMPRESS_ZLIB: i32 = 3;
23const COMPRESS_BLOSC: i32 = 4;
24#[allow(dead_code)]
25const COMPRESS_BSHUF: i32 = 5;
26const COMPRESS_LZ4: i32 = 6;
27const COMPRESS_JPEG: i32 = 7;
28
29/// C ADCore BLOSC compressor sub-types.
30const BLOSC_LZ: i32 = 0;
31const BLOSC_LZ4: i32 = 1;
32const BLOSC_LZ4HC: i32 = 2;
33const BLOSC_SNAPPY: i32 = 3;
34const BLOSC_ZLIB: i32 = 4;
35const BLOSC_ZSTD: i32 = 5;
36
37/// Internal handle: either a standard H5File or a SWMR streaming writer.
38enum Hdf5Handle {
39    Standard(H5File),
40    Swmr {
41        writer: SwmrFileWriter,
42        ds_index: usize,
43    },
44}
45
46/// HDF5 file writer using the hdf5 crate.
47pub struct Hdf5Writer {
48    current_path: Option<PathBuf>,
49    handle: Option<Hdf5Handle>,
50    frame_count: usize,
51    dataset_name: String,
52    // compression
53    compression_type: i32,
54    z_compress_level: u32,
55    szip_num_pixels: u32,
56    nbit_precision: u32,
57    nbit_offset: u32,
58    jpeg_quality: u32,
59    blosc_shuffle_type: i32,
60    blosc_compressor: i32,
61    blosc_compress_level: u32,
62    // SWMR
63    swmr_mode: bool,
64    flush_nth_frame: usize,
65    pub swmr_cb_counter: u32,
66    // options
67    pub store_attributes: bool,
68    pub store_performance: bool,
69    pub total_runtime: f64,
70    pub total_bytes: u64,
71}
72
73impl Hdf5Writer {
74    pub fn new() -> Self {
75        Self {
76            current_path: None,
77            handle: None,
78            frame_count: 0,
79            dataset_name: "data".to_string(),
80            compression_type: 0,
81            z_compress_level: 6,
82            szip_num_pixels: 16,
83            nbit_precision: 0,
84            nbit_offset: 0,
85            jpeg_quality: 90,
86            blosc_shuffle_type: 0,
87            blosc_compressor: 0,
88            blosc_compress_level: 5,
89            swmr_mode: false,
90            flush_nth_frame: 0,
91            swmr_cb_counter: 0,
92            store_attributes: true,
93            store_performance: false,
94            total_runtime: 0.0,
95            total_bytes: 0,
96        }
97    }
98
99    pub fn set_dataset_name(&mut self, name: &str) {
100        self.dataset_name = name.to_string();
101    }
102
103    pub fn set_compression_type(&mut self, v: i32) {
104        self.compression_type = v;
105    }
106
107    pub fn set_z_compress_level(&mut self, v: u32) {
108        self.z_compress_level = v;
109    }
110
111    pub fn set_szip_num_pixels(&mut self, v: u32) {
112        self.szip_num_pixels = v;
113    }
114
115    pub fn set_blosc_shuffle_type(&mut self, v: i32) {
116        self.blosc_shuffle_type = v;
117    }
118
119    pub fn set_blosc_compressor(&mut self, v: i32) {
120        self.blosc_compressor = v;
121    }
122
123    pub fn set_blosc_compress_level(&mut self, v: u32) {
124        self.blosc_compress_level = v;
125    }
126
127    pub fn set_nbit_precision(&mut self, v: u32) {
128        self.nbit_precision = v;
129    }
130
131    pub fn set_nbit_offset(&mut self, v: u32) {
132        self.nbit_offset = v;
133    }
134
135    pub fn set_jpeg_quality(&mut self, v: u32) {
136        self.jpeg_quality = v;
137    }
138
139    pub fn set_store_attributes(&mut self, v: bool) {
140        self.store_attributes = v;
141    }
142
143    pub fn set_store_performance(&mut self, v: bool) {
144        self.store_performance = v;
145    }
146
147    pub fn set_swmr_mode(&mut self, v: bool) {
148        self.swmr_mode = v;
149    }
150
151    pub fn set_flush_nth_frame(&mut self, v: usize) {
152        self.flush_nth_frame = v;
153    }
154
155    pub fn frame_count(&self) -> usize {
156        self.frame_count
157    }
158
159    /// Trigger a SWMR flush. No-op if not in SWMR mode.
160    pub fn flush_swmr(&mut self) {
161        if let Some(Hdf5Handle::Swmr { ref mut writer, .. }) = self.handle {
162            if writer.flush().is_ok() {
163                self.swmr_cb_counter += 1;
164            }
165        }
166    }
167
168    /// Returns true if SWMR is currently active.
169    pub fn is_swmr_active(&self) -> bool {
170        matches!(self.handle, Some(Hdf5Handle::Swmr { .. }))
171    }
172
173    /// Build a FilterPipeline from the current compression settings.
174    fn build_pipeline(&self, element_size: usize) -> Option<FilterPipeline> {
175        match self.compression_type {
176            COMPRESS_NONE => None,
177            COMPRESS_ZLIB => Some(FilterPipeline::deflate(self.z_compress_level)),
178            COMPRESS_SZIP => Some(FilterPipeline {
179                filters: vec![Filter {
180                    id: FILTER_SZIP,
181                    flags: 0,
182                    cd_values: vec![4, self.szip_num_pixels],
183                }],
184            }),
185            COMPRESS_LZ4 => Some(FilterPipeline::lz4()),
186            COMPRESS_BLOSC => {
187                let compressor_code = match self.blosc_compressor {
188                    BLOSC_LZ => 0,
189                    BLOSC_LZ4 => 1,
190                    BLOSC_LZ4HC => 2,
191                    BLOSC_SNAPPY => 3,
192                    BLOSC_ZLIB => 4,
193                    BLOSC_ZSTD => 5,
194                    _ => 0,
195                };
196                Some(FilterPipeline {
197                    filters: vec![Filter {
198                        id: FILTER_BLOSC,
199                        flags: 0,
200                        cd_values: vec![
201                            2,
202                            2,
203                            element_size as u32,
204                            0,
205                            self.blosc_shuffle_type as u32,
206                            compressor_code,
207                            self.blosc_compress_level,
208                        ],
209                    }],
210                })
211            }
212            COMPRESS_NBIT => {
213                if self.nbit_precision > 0 {
214                    Some(FilterPipeline {
215                        filters: vec![Filter {
216                            id: FILTER_NBIT,
217                            flags: 0,
218                            cd_values: vec![self.nbit_precision, self.nbit_offset],
219                        }],
220                    })
221                } else {
222                    None
223                }
224            }
225            COMPRESS_JPEG => Some(FilterPipeline {
226                filters: vec![Filter {
227                    id: FILTER_JPEG,
228                    flags: 0,
229                    cd_values: vec![self.jpeg_quality],
230                }],
231            }),
232            _ => None,
233        }
234    }
235
236    /// Open file in SWMR streaming mode.
237    fn open_swmr(&mut self, path: &Path, array: &NDArray) -> ADResult<()> {
238        let mut swmr = SwmrFileWriter::create(path)
239            .map_err(|e| ADError::UnsupportedConversion(format!("SWMR create error: {}", e)))?;
240
241        let frame_dims: Vec<u64> = array.dims.iter().rev().map(|d| d.size as u64).collect();
242
243        macro_rules! create_ds {
244            ($t:ty) => {
245                swmr.create_streaming_dataset::<$t>(&self.dataset_name, &frame_dims)
246                    .map_err(|e| {
247                        ADError::UnsupportedConversion(format!("SWMR create dataset error: {}", e))
248                    })
249            };
250        }
251
252        let ds_index = match array.data.data_type() {
253            NDDataType::Int8 => create_ds!(i8)?,
254            NDDataType::UInt8 => create_ds!(u8)?,
255            NDDataType::Int16 => create_ds!(i16)?,
256            NDDataType::UInt16 => create_ds!(u16)?,
257            NDDataType::Int32 => create_ds!(i32)?,
258            NDDataType::UInt32 => create_ds!(u32)?,
259            NDDataType::Int64 => create_ds!(i64)?,
260            NDDataType::UInt64 => create_ds!(u64)?,
261            NDDataType::Float32 => create_ds!(f32)?,
262            NDDataType::Float64 => create_ds!(f64)?,
263        };
264
265        swmr.start_swmr()
266            .map_err(|e| ADError::UnsupportedConversion(format!("SWMR start error: {}", e)))?;
267
268        self.handle = Some(Hdf5Handle::Swmr {
269            writer: swmr,
270            ds_index,
271        });
272        Ok(())
273    }
274
275    /// Write a frame in standard (non-SWMR) mode.
276    fn write_standard(&mut self, array: &NDArray) -> ADResult<()> {
277        let h5file = match self.handle {
278            Some(Hdf5Handle::Standard(ref f)) => f,
279            _ => return Err(ADError::UnsupportedConversion("no HDF5 file open".into())),
280        };
281
282        let dataset_name = if self.frame_count == 0 {
283            self.dataset_name.clone()
284        } else {
285            format!("{}_{}", self.dataset_name, self.frame_count)
286        };
287
288        let shape = array.dims.iter().rev().map(|d| d.size).collect::<Vec<_>>();
289        let element_size = array.data.data_type().element_size();
290        let pipeline = self.build_pipeline(element_size);
291
292        macro_rules! write_typed {
293            ($t:ty, $v:expr) => {{
294                let ds = if let Some(ref pl) = pipeline {
295                    h5file
296                        .new_dataset::<$t>()
297                        .shape(&shape[..])
298                        .chunk(&shape[..])
299                        .filter_pipeline(pl.clone())
300                        .create(dataset_name.as_str())
301                        .map_err(|e| {
302                            ADError::UnsupportedConversion(format!("HDF5 dataset error: {}", e))
303                        })?
304                } else {
305                    h5file
306                        .new_dataset::<$t>()
307                        .shape(&shape[..])
308                        .create(dataset_name.as_str())
309                        .map_err(|e| {
310                            ADError::UnsupportedConversion(format!("HDF5 dataset error: {}", e))
311                        })?
312                };
313                if pipeline.is_some() {
314                    ds.write_chunk(0, array.data.as_u8_slice()).map_err(|e| {
315                        ADError::UnsupportedConversion(format!("HDF5 write error: {}", e))
316                    })?;
317                } else {
318                    ds.write_raw($v).map_err(|e| {
319                        ADError::UnsupportedConversion(format!("HDF5 write error: {}", e))
320                    })?;
321                }
322                if self.store_attributes {
323                    for attr in array.attributes.iter() {
324                        let val_str = attr.value.as_string();
325                        let _ = ds
326                            .new_attr::<rust_hdf5::types::VarLenUnicode>()
327                            .shape(())
328                            .create(attr.name.as_str())
329                            .and_then(|a| {
330                                let s: rust_hdf5::types::VarLenUnicode =
331                                    val_str.parse().unwrap_or_default();
332                                a.write_scalar(&s)
333                            });
334                    }
335                }
336            }};
337        }
338
339        match &array.data {
340            NDDataBuffer::I8(v) => write_typed!(i8, v),
341            NDDataBuffer::U8(v) => write_typed!(u8, v),
342            NDDataBuffer::I16(v) => write_typed!(i16, v),
343            NDDataBuffer::U16(v) => write_typed!(u16, v),
344            NDDataBuffer::I32(v) => write_typed!(i32, v),
345            NDDataBuffer::U32(v) => write_typed!(u32, v),
346            NDDataBuffer::I64(v) => write_typed!(i64, v),
347            NDDataBuffer::U64(v) => write_typed!(u64, v),
348            NDDataBuffer::F32(v) => write_typed!(f32, v),
349            NDDataBuffer::F64(v) => write_typed!(f64, v),
350        }
351        Ok(())
352    }
353
354    /// Write a frame in SWMR mode.
355    fn write_swmr(&mut self, array: &NDArray) -> ADResult<()> {
356        let (writer, ds_index) = match self.handle {
357            Some(Hdf5Handle::Swmr {
358                ref mut writer,
359                ds_index,
360            }) => (writer, ds_index),
361            _ => return Err(ADError::UnsupportedConversion("no SWMR writer open".into())),
362        };
363
364        writer
365            .append_frame(ds_index, array.data.as_u8_slice())
366            .map_err(|e| ADError::UnsupportedConversion(format!("SWMR append error: {}", e)))?;
367
368        // Periodic flush
369        let count = self.frame_count + 1; // will be incremented after return
370        if self.flush_nth_frame > 0 && count % self.flush_nth_frame == 0 {
371            writer
372                .flush()
373                .map_err(|e| ADError::UnsupportedConversion(format!("SWMR flush error: {}", e)))?;
374        }
375
376        Ok(())
377    }
378}
379
380impl Default for Hdf5Writer {
381    fn default() -> Self {
382        Self::new()
383    }
384}
385
386impl NDFileWriter for Hdf5Writer {
387    fn open_file(&mut self, path: &Path, mode: NDFileMode, array: &NDArray) -> ADResult<()> {
388        self.current_path = Some(path.to_path_buf());
389        self.frame_count = 0;
390        self.total_runtime = 0.0;
391        self.total_bytes = 0;
392        self.swmr_cb_counter = 0;
393
394        if self.swmr_mode && mode == NDFileMode::Stream {
395            self.open_swmr(path, array)
396        } else {
397            let h5file = H5File::create(path)
398                .map_err(|e| ADError::UnsupportedConversion(format!("HDF5 create error: {}", e)))?;
399            self.handle = Some(Hdf5Handle::Standard(h5file));
400            Ok(())
401        }
402    }
403
404    fn write_file(&mut self, array: &NDArray) -> ADResult<()> {
405        let start = if self.store_performance {
406            Some(std::time::Instant::now())
407        } else {
408            None
409        };
410
411        let is_swmr = matches!(self.handle, Some(Hdf5Handle::Swmr { .. }));
412        if is_swmr {
413            self.write_swmr(array)?;
414        } else {
415            self.write_standard(array)?;
416        }
417        self.frame_count += 1;
418
419        if let Some(start) = start {
420            self.total_runtime += start.elapsed().as_secs_f64();
421            self.total_bytes += array.data.as_u8_slice().len() as u64;
422        }
423        Ok(())
424    }
425
426    fn read_file(&mut self) -> ADResult<NDArray> {
427        let path = self
428            .current_path
429            .as_ref()
430            .ok_or_else(|| ADError::UnsupportedConversion("no file open".into()))?;
431
432        let h5file = H5File::open(path)
433            .map_err(|e| ADError::UnsupportedConversion(format!("HDF5 open error: {}", e)))?;
434
435        let ds = h5file
436            .dataset(&self.dataset_name)
437            .map_err(|e| ADError::UnsupportedConversion(format!("HDF5 dataset error: {}", e)))?;
438
439        let shape = ds.shape();
440        let dims: Vec<NDDimension> = shape.iter().rev().map(|&s| NDDimension::new(s)).collect();
441
442        if let Ok(data) = ds.read_raw::<u8>() {
443            let mut arr = NDArray::new(dims, NDDataType::UInt8);
444            arr.data = NDDataBuffer::U8(data);
445            return Ok(arr);
446        }
447        if let Ok(data) = ds.read_raw::<u16>() {
448            let mut arr = NDArray::new(dims, NDDataType::UInt16);
449            arr.data = NDDataBuffer::U16(data);
450            return Ok(arr);
451        }
452        if let Ok(data) = ds.read_raw::<f64>() {
453            let mut arr = NDArray::new(dims, NDDataType::Float64);
454            arr.data = NDDataBuffer::F64(data);
455            return Ok(arr);
456        }
457
458        Err(ADError::UnsupportedConversion(
459            "unsupported HDF5 data type".into(),
460        ))
461    }
462
463    fn close_file(&mut self) -> ADResult<()> {
464        if let Some(Hdf5Handle::Swmr { writer, .. }) = self.handle.take() {
465            writer
466                .close()
467                .map_err(|e| ADError::UnsupportedConversion(format!("SWMR close error: {}", e)))?;
468        }
469        self.current_path = None;
470        Ok(())
471    }
472
473    fn supports_multiple_arrays(&self) -> bool {
474        true
475    }
476}
477
478// ============================================================
479// Processor
480// ============================================================
481
482/// Param indices for HDF5-specific params.
483#[derive(Default)]
484struct Hdf5ParamIndices {
485    compression_type: Option<usize>,
486    z_compress_level: Option<usize>,
487    szip_num_pixels: Option<usize>,
488    nbit_precision: Option<usize>,
489    nbit_offset: Option<usize>,
490    jpeg_quality: Option<usize>,
491    blosc_shuffle_type: Option<usize>,
492    blosc_compressor: Option<usize>,
493    blosc_compress_level: Option<usize>,
494    store_attributes: Option<usize>,
495    store_performance: Option<usize>,
496    total_runtime: Option<usize>,
497    total_io_speed: Option<usize>,
498    swmr_mode: Option<usize>,
499    swmr_flush_now: Option<usize>,
500    swmr_running: Option<usize>,
501    swmr_cb_counter: Option<usize>,
502    swmr_supported: Option<usize>,
503    flush_nth_frame: Option<usize>,
504}
505
506/// HDF5 file processor wrapping FilePluginController<Hdf5Writer>.
507pub struct Hdf5FileProcessor {
508    ctrl: FilePluginController<Hdf5Writer>,
509    hdf5_params: Hdf5ParamIndices,
510}
511
512impl Hdf5FileProcessor {
513    pub fn new() -> Self {
514        Self {
515            ctrl: FilePluginController::new(Hdf5Writer::new()),
516            hdf5_params: Hdf5ParamIndices::default(),
517        }
518    }
519
520    pub fn set_dataset_name(&mut self, name: &str) {
521        self.ctrl.writer.set_dataset_name(name);
522    }
523}
524
525/// Register all HDF5-specific params.
526fn register_hdf5_params(
527    base: &mut asyn_rs::port::PortDriverBase,
528) -> asyn_rs::error::AsynResult<()> {
529    use asyn_rs::param::ParamType;
530    base.create_param("HDF5_SWMRFlushNow", ParamType::Int32)?;
531    base.create_param("HDF5_chunkSizeAuto", ParamType::Int32)?;
532    base.create_param("HDF5_nRowChunks", ParamType::Int32)?;
533    base.create_param("HDF5_nColChunks", ParamType::Int32)?;
534    base.create_param("HDF5_chunkSize2", ParamType::Int32)?;
535    base.create_param("HDF5_chunkSize3", ParamType::Int32)?;
536    base.create_param("HDF5_chunkSize4", ParamType::Int32)?;
537    base.create_param("HDF5_chunkSize5", ParamType::Int32)?;
538    base.create_param("HDF5_chunkSize6", ParamType::Int32)?;
539    base.create_param("HDF5_chunkSize7", ParamType::Int32)?;
540    base.create_param("HDF5_chunkSize8", ParamType::Int32)?;
541    base.create_param("HDF5_chunkSize9", ParamType::Int32)?;
542    base.create_param("HDF5_nFramesChunks", ParamType::Int32)?;
543    base.create_param("HDF5_NDAttributeChunk", ParamType::Int32)?;
544    base.create_param("HDF5_chunkBoundaryAlign", ParamType::Int32)?;
545    base.create_param("HDF5_chunkBoundaryThreshold", ParamType::Int32)?;
546    base.create_param("HDF5_nExtraDims", ParamType::Int32)?;
547    base.create_param("HDF5_extraDimSizeN", ParamType::Int32)?;
548    base.create_param("HDF5_extraDimNameN", ParamType::Octet)?;
549    base.create_param("HDF5_extraDimSizeX", ParamType::Int32)?;
550    base.create_param("HDF5_extraDimNameX", ParamType::Octet)?;
551    base.create_param("HDF5_extraDimSizeY", ParamType::Int32)?;
552    base.create_param("HDF5_extraDimNameY", ParamType::Octet)?;
553    base.create_param("HDF5_extraDimSize3", ParamType::Int32)?;
554    base.create_param("HDF5_extraDimName3", ParamType::Octet)?;
555    base.create_param("HDF5_extraDimSize4", ParamType::Int32)?;
556    base.create_param("HDF5_extraDimName4", ParamType::Octet)?;
557    base.create_param("HDF5_extraDimSize5", ParamType::Int32)?;
558    base.create_param("HDF5_extraDimName5", ParamType::Octet)?;
559    base.create_param("HDF5_extraDimSize6", ParamType::Int32)?;
560    base.create_param("HDF5_extraDimName6", ParamType::Octet)?;
561    base.create_param("HDF5_extraDimSize7", ParamType::Int32)?;
562    base.create_param("HDF5_extraDimName7", ParamType::Octet)?;
563    base.create_param("HDF5_extraDimSize8", ParamType::Int32)?;
564    base.create_param("HDF5_extraDimName8", ParamType::Octet)?;
565    base.create_param("HDF5_extraDimSize9", ParamType::Int32)?;
566    base.create_param("HDF5_extraDimName9", ParamType::Octet)?;
567    base.create_param("HDF5_storeAttributes", ParamType::Int32)?;
568    base.create_param("HDF5_storePerformance", ParamType::Int32)?;
569    base.create_param("HDF5_totalRuntime", ParamType::Float64)?;
570    base.create_param("HDF5_totalIoSpeed", ParamType::Float64)?;
571    base.create_param("HDF5_flushNthFrame", ParamType::Int32)?;
572    base.create_param("HDF5_compressionType", ParamType::Int32)?;
573    base.create_param("HDF5_nbitsPrecision", ParamType::Int32)?;
574    base.create_param("HDF5_nbitsOffset", ParamType::Int32)?;
575    base.create_param("HDF5_szipNumPixels", ParamType::Int32)?;
576    base.create_param("HDF5_zCompressLevel", ParamType::Int32)?;
577    base.create_param("HDF5_bloscShuffleType", ParamType::Int32)?;
578    base.create_param("HDF5_bloscCompressor", ParamType::Int32)?;
579    base.create_param("HDF5_bloscCompressLevel", ParamType::Int32)?;
580    base.create_param("HDF5_jpegQuality", ParamType::Int32)?;
581    base.create_param("HDF5_dimAttDatasets", ParamType::Int32)?;
582    base.create_param("HDF5_layoutErrorMsg", ParamType::Octet)?;
583    base.create_param("HDF5_layoutValid", ParamType::Int32)?;
584    base.create_param("HDF5_layoutFilename", ParamType::Octet)?;
585    base.create_param("HDF5_SWMRSupported", ParamType::Int32)?;
586    base.create_param("HDF5_SWMRMode", ParamType::Int32)?;
587    base.create_param("HDF5_SWMRRunning", ParamType::Int32)?;
588    base.create_param("HDF5_SWMRCbCounter", ParamType::Int32)?;
589    base.create_param("HDF5_posRunning", ParamType::Int32)?;
590    base.create_param("HDF5_posNameDimN", ParamType::Octet)?;
591    base.create_param("HDF5_posNameDimX", ParamType::Octet)?;
592    base.create_param("HDF5_posNameDimY", ParamType::Octet)?;
593    base.create_param("HDF5_posNameDim3", ParamType::Octet)?;
594    base.create_param("HDF5_posNameDim4", ParamType::Octet)?;
595    base.create_param("HDF5_posNameDim5", ParamType::Octet)?;
596    base.create_param("HDF5_posNameDim6", ParamType::Octet)?;
597    base.create_param("HDF5_posNameDim7", ParamType::Octet)?;
598    base.create_param("HDF5_posNameDim8", ParamType::Octet)?;
599    base.create_param("HDF5_posNameDim9", ParamType::Octet)?;
600    base.create_param("HDF5_posIndexDimN", ParamType::Octet)?;
601    base.create_param("HDF5_posIndexDimX", ParamType::Octet)?;
602    base.create_param("HDF5_posIndexDimY", ParamType::Octet)?;
603    base.create_param("HDF5_posIndexDim3", ParamType::Octet)?;
604    base.create_param("HDF5_posIndexDim4", ParamType::Octet)?;
605    base.create_param("HDF5_posIndexDim5", ParamType::Octet)?;
606    base.create_param("HDF5_posIndexDim6", ParamType::Octet)?;
607    base.create_param("HDF5_posIndexDim7", ParamType::Octet)?;
608    base.create_param("HDF5_posIndexDim8", ParamType::Octet)?;
609    base.create_param("HDF5_posIndexDim9", ParamType::Octet)?;
610    base.create_param("HDF5_fillValue", ParamType::Float64)?;
611    base.create_param("HDF5_extraDimChunkX", ParamType::Int32)?;
612    base.create_param("HDF5_extraDimChunkY", ParamType::Int32)?;
613    base.create_param("HDF5_extraDimChunk3", ParamType::Int32)?;
614    base.create_param("HDF5_extraDimChunk4", ParamType::Int32)?;
615    base.create_param("HDF5_extraDimChunk5", ParamType::Int32)?;
616    base.create_param("HDF5_extraDimChunk6", ParamType::Int32)?;
617    base.create_param("HDF5_extraDimChunk7", ParamType::Int32)?;
618    base.create_param("HDF5_extraDimChunk8", ParamType::Int32)?;
619    base.create_param("HDF5_extraDimChunk9", ParamType::Int32)?;
620    Ok(())
621}
622
623impl Default for Hdf5FileProcessor {
624    fn default() -> Self {
625        Self::new()
626    }
627}
628
629impl NDPluginProcess for Hdf5FileProcessor {
630    fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
631        let was_swmr = self.ctrl.writer.is_swmr_active();
632        let mut result = self.ctrl.process_array(array);
633        let is_swmr = self.ctrl.writer.is_swmr_active();
634
635        // SWMR running status changed
636        if was_swmr != is_swmr {
637            if let Some(idx) = self.hdf5_params.swmr_running {
638                result
639                    .param_updates
640                    .push(ParamUpdate::int32(idx, if is_swmr { 1 } else { 0 }));
641            }
642        }
643
644        // SWMR callback counter
645        if is_swmr {
646            if let Some(idx) = self.hdf5_params.swmr_cb_counter {
647                result.param_updates.push(ParamUpdate::int32(
648                    idx,
649                    self.ctrl.writer.swmr_cb_counter as i32,
650                ));
651            }
652        }
653
654        // Performance stats
655        if self.ctrl.writer.store_performance {
656            if let Some(idx) = self.hdf5_params.total_runtime {
657                result
658                    .param_updates
659                    .push(ParamUpdate::float64(idx, self.ctrl.writer.total_runtime));
660            }
661            if let Some(idx) = self.hdf5_params.total_io_speed {
662                let speed = if self.ctrl.writer.total_runtime > 0.0 {
663                    self.ctrl.writer.total_bytes as f64
664                        / self.ctrl.writer.total_runtime
665                        / 1_000_000.0
666                } else {
667                    0.0
668                };
669                result.param_updates.push(ParamUpdate::float64(idx, speed));
670            }
671        }
672
673        result
674    }
675
676    fn plugin_type(&self) -> &str {
677        "NDFileHDF5"
678    }
679
680    fn register_params(
681        &mut self,
682        base: &mut asyn_rs::port::PortDriverBase,
683    ) -> asyn_rs::error::AsynResult<()> {
684        self.ctrl.register_params(base)?;
685        register_hdf5_params(base)?;
686        self.hdf5_params.compression_type = base.find_param("HDF5_compressionType");
687        self.hdf5_params.z_compress_level = base.find_param("HDF5_zCompressLevel");
688        self.hdf5_params.szip_num_pixels = base.find_param("HDF5_szipNumPixels");
689        self.hdf5_params.nbit_precision = base.find_param("HDF5_nbitsPrecision");
690        self.hdf5_params.nbit_offset = base.find_param("HDF5_nbitsOffset");
691        self.hdf5_params.jpeg_quality = base.find_param("HDF5_jpegQuality");
692        self.hdf5_params.blosc_shuffle_type = base.find_param("HDF5_bloscShuffleType");
693        self.hdf5_params.blosc_compressor = base.find_param("HDF5_bloscCompressor");
694        self.hdf5_params.blosc_compress_level = base.find_param("HDF5_bloscCompressLevel");
695        self.hdf5_params.store_attributes = base.find_param("HDF5_storeAttributes");
696        self.hdf5_params.store_performance = base.find_param("HDF5_storePerformance");
697        self.hdf5_params.total_runtime = base.find_param("HDF5_totalRuntime");
698        self.hdf5_params.total_io_speed = base.find_param("HDF5_totalIoSpeed");
699        self.hdf5_params.swmr_mode = base.find_param("HDF5_SWMRMode");
700        self.hdf5_params.swmr_flush_now = base.find_param("HDF5_SWMRFlushNow");
701        self.hdf5_params.swmr_running = base.find_param("HDF5_SWMRRunning");
702        self.hdf5_params.swmr_cb_counter = base.find_param("HDF5_SWMRCbCounter");
703        self.hdf5_params.swmr_supported = base.find_param("HDF5_SWMRSupported");
704        self.hdf5_params.flush_nth_frame = base.find_param("HDF5_flushNthFrame");
705
706        // Report SWMR as always supported
707        if let Some(idx) = self.hdf5_params.swmr_supported {
708            base.set_int32_param(idx, 0, 1)?;
709        }
710        Ok(())
711    }
712
713    fn on_param_change(
714        &mut self,
715        reason: usize,
716        params: &PluginParamSnapshot,
717    ) -> ParamChangeResult {
718        // -- compression params --
719        if Some(reason) == self.hdf5_params.compression_type {
720            self.ctrl.writer.set_compression_type(params.value.as_i32());
721            return ParamChangeResult::updates(vec![]);
722        }
723        if Some(reason) == self.hdf5_params.z_compress_level {
724            self.ctrl
725                .writer
726                .set_z_compress_level(params.value.as_i32() as u32);
727            return ParamChangeResult::updates(vec![]);
728        }
729        if Some(reason) == self.hdf5_params.szip_num_pixels {
730            self.ctrl
731                .writer
732                .set_szip_num_pixels(params.value.as_i32() as u32);
733            return ParamChangeResult::updates(vec![]);
734        }
735        if Some(reason) == self.hdf5_params.blosc_shuffle_type {
736            self.ctrl
737                .writer
738                .set_blosc_shuffle_type(params.value.as_i32());
739            return ParamChangeResult::updates(vec![]);
740        }
741        if Some(reason) == self.hdf5_params.blosc_compressor {
742            self.ctrl.writer.set_blosc_compressor(params.value.as_i32());
743            return ParamChangeResult::updates(vec![]);
744        }
745        if Some(reason) == self.hdf5_params.blosc_compress_level {
746            self.ctrl
747                .writer
748                .set_blosc_compress_level(params.value.as_i32() as u32);
749            return ParamChangeResult::updates(vec![]);
750        }
751        if Some(reason) == self.hdf5_params.nbit_precision {
752            self.ctrl
753                .writer
754                .set_nbit_precision(params.value.as_i32() as u32);
755            return ParamChangeResult::updates(vec![]);
756        }
757        if Some(reason) == self.hdf5_params.nbit_offset {
758            self.ctrl
759                .writer
760                .set_nbit_offset(params.value.as_i32() as u32);
761            return ParamChangeResult::updates(vec![]);
762        }
763        if Some(reason) == self.hdf5_params.jpeg_quality {
764            self.ctrl
765                .writer
766                .set_jpeg_quality(params.value.as_i32() as u32);
767            return ParamChangeResult::updates(vec![]);
768        }
769        if Some(reason) == self.hdf5_params.store_attributes {
770            self.ctrl
771                .writer
772                .set_store_attributes(params.value.as_i32() != 0);
773            return ParamChangeResult::updates(vec![]);
774        }
775        if Some(reason) == self.hdf5_params.store_performance {
776            self.ctrl
777                .writer
778                .set_store_performance(params.value.as_i32() != 0);
779            return ParamChangeResult::updates(vec![]);
780        }
781        // -- SWMR params --
782        if Some(reason) == self.hdf5_params.swmr_mode {
783            self.ctrl.writer.set_swmr_mode(params.value.as_i32() != 0);
784            return ParamChangeResult::updates(vec![]);
785        }
786        if Some(reason) == self.hdf5_params.swmr_flush_now {
787            if params.value.as_i32() != 0 {
788                self.ctrl.writer.flush_swmr();
789                let mut updates = vec![];
790                if let Some(idx) = self.hdf5_params.swmr_cb_counter {
791                    updates.push(ParamUpdate::int32(
792                        idx,
793                        self.ctrl.writer.swmr_cb_counter as i32,
794                    ));
795                }
796                return ParamChangeResult::updates(updates);
797            }
798            return ParamChangeResult::updates(vec![]);
799        }
800        if Some(reason) == self.hdf5_params.flush_nth_frame {
801            self.ctrl
802                .writer
803                .set_flush_nth_frame(params.value.as_i32().max(0) as usize);
804            return ParamChangeResult::updates(vec![]);
805        }
806        self.ctrl.on_param_change(reason, params)
807    }
808}
809
810#[cfg(test)]
811mod tests {
812    use super::*;
813    use ad_core_rs::attributes::{NDAttrSource, NDAttrValue, NDAttribute};
814    use std::sync::atomic::{AtomicU32, Ordering};
815
816    static TEST_COUNTER: AtomicU32 = AtomicU32::new(0);
817
818    fn temp_path(prefix: &str) -> PathBuf {
819        let n = TEST_COUNTER.fetch_add(1, Ordering::Relaxed);
820        std::env::temp_dir().join(format!("adcore_test_{}_{}.h5", prefix, n))
821    }
822
823    #[test]
824    fn test_write_single_frame() {
825        let path = temp_path("hdf5_single");
826        let mut writer = Hdf5Writer::new();
827
828        let mut arr = NDArray::new(
829            vec![NDDimension::new(4), NDDimension::new(4)],
830            NDDataType::UInt8,
831        );
832        if let NDDataBuffer::U8(ref mut v) = arr.data {
833            for i in 0..16 {
834                v[i] = i as u8;
835            }
836        }
837
838        writer.open_file(&path, NDFileMode::Single, &arr).unwrap();
839        writer.write_file(&arr).unwrap();
840        writer.close_file().unwrap();
841
842        let mut reader = Hdf5Writer::new();
843        reader.current_path = Some(path.clone());
844        let read_arr = reader.read_file().unwrap();
845        assert_eq!(read_arr.dims.len(), 2);
846        assert_eq!(read_arr.dims[0].size, 4);
847
848        std::fs::remove_file(&path).ok();
849    }
850
851    #[test]
852    fn test_write_multiple_frames() {
853        let path = temp_path("hdf5_multi");
854        let mut writer = Hdf5Writer::new();
855
856        let arr = NDArray::new(
857            vec![NDDimension::new(4), NDDimension::new(4)],
858            NDDataType::UInt8,
859        );
860
861        writer.open_file(&path, NDFileMode::Stream, &arr).unwrap();
862        writer.write_file(&arr).unwrap();
863        writer.write_file(&arr).unwrap();
864        writer.write_file(&arr).unwrap();
865        writer.close_file().unwrap();
866
867        assert!(writer.supports_multiple_arrays());
868        assert_eq!(writer.frame_count(), 3);
869
870        let data = std::fs::read(&path).unwrap();
871        assert_eq!(&data[0..8], b"\x89HDF\r\n\x1a\n");
872
873        std::fs::remove_file(&path).ok();
874    }
875
876    #[test]
877    fn test_attributes_stored() {
878        let path = temp_path("hdf5_attrs");
879        let mut writer = Hdf5Writer::new();
880
881        let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
882        arr.attributes.add(NDAttribute {
883            name: "exposure".into(),
884            description: "".into(),
885            source: NDAttrSource::Driver,
886            value: NDAttrValue::Float64(0.5),
887        });
888
889        writer.open_file(&path, NDFileMode::Single, &arr).unwrap();
890        writer.write_file(&arr).unwrap();
891        writer.close_file().unwrap();
892
893        let h5file = H5File::open(&path).unwrap();
894        let ds = h5file.dataset("data").unwrap();
895        let attr = ds.attr("exposure").unwrap();
896        let val = attr.read_string().unwrap();
897        assert_eq!(val, "0.5");
898
899        std::fs::remove_file(&path).ok();
900    }
901
902    #[test]
903    fn test_roundtrip_u16() {
904        let path = temp_path("hdf5_u16");
905        let mut writer = Hdf5Writer::new();
906
907        let mut arr = NDArray::new(
908            vec![NDDimension::new(4), NDDimension::new(4)],
909            NDDataType::UInt16,
910        );
911        if let NDDataBuffer::U16(ref mut v) = arr.data {
912            for i in 0..16 {
913                v[i] = (i * 100) as u16;
914            }
915        }
916
917        writer.open_file(&path, NDFileMode::Single, &arr).unwrap();
918        writer.write_file(&arr).unwrap();
919        writer.close_file().unwrap();
920
921        let h5file = H5File::open(&path).unwrap();
922        let ds = h5file.dataset("data").unwrap();
923        let data: Vec<u16> = ds.read_raw().unwrap();
924        assert_eq!(data[0], 0);
925        assert_eq!(data[1], 100);
926        assert_eq!(data[15], 1500);
927
928        std::fs::remove_file(&path).ok();
929    }
930
931    #[test]
932    fn test_roundtrip_f64() {
933        let path = temp_path("hdf5_f64");
934        let mut writer = Hdf5Writer::new();
935
936        let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::Float64);
937        if let NDDataBuffer::F64(ref mut v) = arr.data {
938            v[0] = 1.5;
939            v[1] = 2.5;
940            v[2] = 3.5;
941            v[3] = 4.5;
942        }
943
944        writer.open_file(&path, NDFileMode::Single, &arr).unwrap();
945        writer.write_file(&arr).unwrap();
946        writer.close_file().unwrap();
947
948        let h5file = H5File::open(&path).unwrap();
949        let ds = h5file.dataset("data").unwrap();
950        let data: Vec<f64> = ds.read_raw().unwrap();
951        assert!((data[0] - 1.5).abs() < 1e-10);
952        assert!((data[3] - 4.5).abs() < 1e-10);
953
954        std::fs::remove_file(&path).ok();
955    }
956
957    #[test]
958    fn test_deflate_compressed_write() {
959        let path = temp_path("hdf5_deflate");
960        let mut writer = Hdf5Writer::new();
961        writer.set_compression_type(COMPRESS_ZLIB);
962        writer.set_z_compress_level(6);
963
964        let mut arr = NDArray::new(
965            vec![NDDimension::new(64), NDDimension::new(64)],
966            NDDataType::UInt16,
967        );
968        if let NDDataBuffer::U16(ref mut v) = arr.data {
969            for i in 0..v.len() {
970                v[i] = (i % 256) as u16;
971            }
972        }
973
974        writer.open_file(&path, NDFileMode::Single, &arr).unwrap();
975        writer.write_file(&arr).unwrap();
976        writer.close_file().unwrap();
977
978        let file_size = std::fs::metadata(&path).unwrap().len();
979        assert!(
980            file_size < 8192,
981            "compressed file should be smaller than raw data"
982        );
983
984        let h5file = H5File::open(&path).unwrap();
985        let ds = h5file.dataset("data").unwrap();
986        let data: Vec<u16> = ds.read_raw().unwrap();
987        assert_eq!(data.len(), 64 * 64);
988        assert_eq!(data[0], 0);
989        assert_eq!(data[255], 255);
990        assert_eq!(data[256], 0);
991
992        std::fs::remove_file(&path).ok();
993    }
994
995    #[test]
996    fn test_lz4_compressed_write() {
997        let path = temp_path("hdf5_lz4");
998        let mut writer = Hdf5Writer::new();
999        writer.set_compression_type(COMPRESS_LZ4);
1000
1001        let mut arr = NDArray::new(
1002            vec![NDDimension::new(32), NDDimension::new(32)],
1003            NDDataType::UInt8,
1004        );
1005        if let NDDataBuffer::U8(ref mut v) = arr.data {
1006            for i in 0..v.len() {
1007                v[i] = (i % 4) as u8;
1008            }
1009        }
1010
1011        writer.open_file(&path, NDFileMode::Single, &arr).unwrap();
1012        writer.write_file(&arr).unwrap();
1013        writer.close_file().unwrap();
1014
1015        let h5file = H5File::open(&path).unwrap();
1016        let ds = h5file.dataset("data").unwrap();
1017        let data: Vec<u8> = ds.read_raw().unwrap();
1018        assert_eq!(data.len(), 32 * 32);
1019        assert_eq!(data[0], 0);
1020        assert_eq!(data[3], 3);
1021
1022        std::fs::remove_file(&path).ok();
1023    }
1024
1025    #[test]
1026    fn test_swmr_streaming() {
1027        let path = temp_path("hdf5_swmr");
1028        let mut writer = Hdf5Writer::new();
1029        writer.set_swmr_mode(true);
1030        writer.set_flush_nth_frame(2);
1031
1032        let arr = NDArray::new(
1033            vec![NDDimension::new(8), NDDimension::new(8)],
1034            NDDataType::Float32,
1035        );
1036
1037        writer.open_file(&path, NDFileMode::Stream, &arr).unwrap();
1038        writer.write_file(&arr).unwrap();
1039        writer.write_file(&arr).unwrap(); // should trigger flush
1040        writer.write_file(&arr).unwrap();
1041        writer.close_file().unwrap();
1042
1043        assert_eq!(writer.frame_count(), 3);
1044
1045        // Read back via SwmrFileReader
1046        let mut reader = rust_hdf5::swmr::SwmrFileReader::open(&path).unwrap();
1047        let shape = reader.dataset_shape("data").unwrap();
1048        assert_eq!(shape[0], 3); // 3 frames
1049        assert_eq!(shape[1], 8);
1050        assert_eq!(shape[2], 8);
1051
1052        let data: Vec<f32> = reader.read_dataset("data").unwrap();
1053        assert_eq!(data.len(), 3 * 8 * 8);
1054
1055        std::fs::remove_file(&path).ok();
1056    }
1057}