Skip to main content

ad_plugins_rs/
file_hdf5.rs

1use std::path::{Path, PathBuf};
2
3use ad_core_rs::error::{ADError, ADResult};
4use ad_core_rs::ndarray::{NDArray, NDDataBuffer, NDDataType, NDDimension};
5use ad_core_rs::ndarray_pool::NDArrayPool;
6use ad_core_rs::plugin::file_base::{NDFileMode, NDFileWriter};
7use ad_core_rs::plugin::file_controller::FilePluginController;
8use ad_core_rs::plugin::runtime::{
9    NDPluginProcess, ParamChangeResult, ParamUpdate, PluginParamSnapshot, ProcessResult,
10};
11
12use rust_hdf5::H5File;
13use rust_hdf5::format::messages::filter::{
14    FILTER_BLOSC, FILTER_JPEG, FILTER_NBIT, FILTER_SZIP, Filter, FilterPipeline,
15};
16use rust_hdf5::swmr::SwmrFileWriter;
17
18/// C ADCore compression type enum values.
19const COMPRESS_NONE: i32 = 0;
20const COMPRESS_NBIT: i32 = 1;
21const COMPRESS_SZIP: i32 = 2;
22const COMPRESS_ZLIB: i32 = 3;
23const COMPRESS_BLOSC: i32 = 4;
24const COMPRESS_JPEG: i32 = 5;
25const COMPRESS_LZ4: i32 = 6;
26
27/// C ADCore BLOSC compressor sub-types.
28const BLOSC_LZ: i32 = 0;
29const BLOSC_LZ4: i32 = 1;
30const BLOSC_LZ4HC: i32 = 2;
31const BLOSC_SNAPPY: i32 = 3;
32const BLOSC_ZLIB: i32 = 4;
33const BLOSC_ZSTD: i32 = 5;
34
35/// Internal handle: either a standard H5File or a SWMR streaming writer.
36enum Hdf5Handle {
37    Standard(H5File),
38    Swmr {
39        writer: SwmrFileWriter,
40        ds_index: usize,
41    },
42}
43
44/// HDF5 file writer using the hdf5 crate.
45pub struct Hdf5Writer {
46    current_path: Option<PathBuf>,
47    handle: Option<Hdf5Handle>,
48    frame_count: usize,
49    dataset_name: String,
50    // compression
51    compression_type: i32,
52    z_compress_level: u32,
53    szip_num_pixels: u32,
54    nbit_precision: u32,
55    nbit_offset: u32,
56    jpeg_quality: u32,
57    blosc_shuffle_type: i32,
58    blosc_compressor: i32,
59    blosc_compress_level: u32,
60    // SWMR
61    swmr_mode: bool,
62    flush_nth_frame: usize,
63    pub swmr_cb_counter: u32,
64    // options
65    pub store_attributes: bool,
66    pub store_performance: bool,
67    pub total_runtime: f64,
68    pub total_bytes: u64,
69}
70
71impl Hdf5Writer {
72    pub fn new() -> Self {
73        Self {
74            current_path: None,
75            handle: None,
76            frame_count: 0,
77            dataset_name: "data".to_string(),
78            compression_type: 0,
79            z_compress_level: 6,
80            szip_num_pixels: 16,
81            nbit_precision: 0,
82            nbit_offset: 0,
83            jpeg_quality: 90,
84            blosc_shuffle_type: 0,
85            blosc_compressor: 0,
86            blosc_compress_level: 5,
87            swmr_mode: false,
88            flush_nth_frame: 0,
89            swmr_cb_counter: 0,
90            store_attributes: true,
91            store_performance: false,
92            total_runtime: 0.0,
93            total_bytes: 0,
94        }
95    }
96
97    pub fn set_dataset_name(&mut self, name: &str) {
98        self.dataset_name = name.to_string();
99    }
100
101    pub fn set_compression_type(&mut self, v: i32) {
102        self.compression_type = v;
103    }
104
105    pub fn set_z_compress_level(&mut self, v: u32) {
106        self.z_compress_level = v;
107    }
108
109    pub fn set_szip_num_pixels(&mut self, v: u32) {
110        self.szip_num_pixels = v;
111    }
112
113    pub fn set_blosc_shuffle_type(&mut self, v: i32) {
114        self.blosc_shuffle_type = v;
115    }
116
117    pub fn set_blosc_compressor(&mut self, v: i32) {
118        self.blosc_compressor = v;
119    }
120
121    pub fn set_blosc_compress_level(&mut self, v: u32) {
122        self.blosc_compress_level = v;
123    }
124
125    pub fn set_nbit_precision(&mut self, v: u32) {
126        self.nbit_precision = v;
127    }
128
129    pub fn set_nbit_offset(&mut self, v: u32) {
130        self.nbit_offset = v;
131    }
132
133    pub fn set_jpeg_quality(&mut self, v: u32) {
134        self.jpeg_quality = v;
135    }
136
137    pub fn set_store_attributes(&mut self, v: bool) {
138        self.store_attributes = v;
139    }
140
141    pub fn set_store_performance(&mut self, v: bool) {
142        self.store_performance = v;
143    }
144
145    pub fn set_swmr_mode(&mut self, v: bool) {
146        self.swmr_mode = v;
147    }
148
149    pub fn set_flush_nth_frame(&mut self, v: usize) {
150        self.flush_nth_frame = v;
151    }
152
153    pub fn frame_count(&self) -> usize {
154        self.frame_count
155    }
156
157    /// Trigger a SWMR flush. No-op if not in SWMR mode.
158    pub fn flush_swmr(&mut self) {
159        if let Some(Hdf5Handle::Swmr { ref mut writer, .. }) = self.handle {
160            if writer.flush().is_ok() {
161                self.swmr_cb_counter += 1;
162            }
163        }
164    }
165
166    /// Returns true if SWMR is currently active.
167    pub fn is_swmr_active(&self) -> bool {
168        matches!(self.handle, Some(Hdf5Handle::Swmr { .. }))
169    }
170
171    /// Build a FilterPipeline from the current compression settings.
172    fn build_pipeline(&self, element_size: usize) -> Option<FilterPipeline> {
173        match self.compression_type {
174            COMPRESS_NONE => None,
175            COMPRESS_ZLIB => Some(FilterPipeline::deflate(self.z_compress_level)),
176            COMPRESS_SZIP => Some(FilterPipeline {
177                filters: vec![Filter {
178                    id: FILTER_SZIP,
179                    flags: 0,
180                    cd_values: vec![4, self.szip_num_pixels],
181                }],
182            }),
183            COMPRESS_LZ4 => Some(FilterPipeline::lz4()),
184            COMPRESS_BLOSC => {
185                let compressor_code = match self.blosc_compressor {
186                    BLOSC_LZ => 0,
187                    BLOSC_LZ4 => 1,
188                    BLOSC_LZ4HC => 2,
189                    BLOSC_SNAPPY => 3,
190                    BLOSC_ZLIB => 4,
191                    BLOSC_ZSTD => 5,
192                    _ => 0,
193                };
194                Some(FilterPipeline {
195                    filters: vec![Filter {
196                        id: FILTER_BLOSC,
197                        flags: 0,
198                        cd_values: vec![
199                            2,
200                            2,
201                            element_size as u32,
202                            0,
203                            self.blosc_shuffle_type as u32,
204                            compressor_code,
205                            self.blosc_compress_level,
206                        ],
207                    }],
208                })
209            }
210            COMPRESS_NBIT => {
211                if self.nbit_precision > 0 {
212                    Some(FilterPipeline {
213                        filters: vec![Filter {
214                            id: FILTER_NBIT,
215                            flags: 0,
216                            cd_values: vec![self.nbit_precision, self.nbit_offset],
217                        }],
218                    })
219                } else {
220                    None
221                }
222            }
223            COMPRESS_JPEG => Some(FilterPipeline {
224                filters: vec![Filter {
225                    id: FILTER_JPEG,
226                    flags: 0,
227                    cd_values: vec![self.jpeg_quality],
228                }],
229            }),
230            _ => None,
231        }
232    }
233
234    /// Open file in SWMR streaming mode.
235    fn open_swmr(&mut self, path: &Path, array: &NDArray) -> ADResult<()> {
236        let mut swmr = SwmrFileWriter::create(path)
237            .map_err(|e| ADError::UnsupportedConversion(format!("SWMR create error: {}", e)))?;
238
239        let frame_dims: Vec<u64> = array.dims.iter().rev().map(|d| d.size as u64).collect();
240
241        macro_rules! create_ds {
242            ($t:ty) => {
243                swmr.create_streaming_dataset::<$t>(&self.dataset_name, &frame_dims)
244                    .map_err(|e| {
245                        ADError::UnsupportedConversion(format!("SWMR create dataset error: {}", e))
246                    })
247            };
248        }
249
250        let ds_index = match array.data.data_type() {
251            NDDataType::Int8 => create_ds!(i8)?,
252            NDDataType::UInt8 => create_ds!(u8)?,
253            NDDataType::Int16 => create_ds!(i16)?,
254            NDDataType::UInt16 => create_ds!(u16)?,
255            NDDataType::Int32 => create_ds!(i32)?,
256            NDDataType::UInt32 => create_ds!(u32)?,
257            NDDataType::Int64 => create_ds!(i64)?,
258            NDDataType::UInt64 => create_ds!(u64)?,
259            NDDataType::Float32 => create_ds!(f32)?,
260            NDDataType::Float64 => create_ds!(f64)?,
261        };
262
263        swmr.start_swmr()
264            .map_err(|e| ADError::UnsupportedConversion(format!("SWMR start error: {}", e)))?;
265
266        self.handle = Some(Hdf5Handle::Swmr {
267            writer: swmr,
268            ds_index,
269        });
270        Ok(())
271    }
272
273    /// Write a frame in standard (non-SWMR) mode.
274    fn write_standard(&mut self, array: &NDArray) -> ADResult<()> {
275        let h5file = match self.handle {
276            Some(Hdf5Handle::Standard(ref f)) => f,
277            _ => return Err(ADError::UnsupportedConversion("no HDF5 file open".into())),
278        };
279
280        let dataset_name = if self.frame_count == 0 {
281            self.dataset_name.clone()
282        } else {
283            format!("{}_{}", self.dataset_name, self.frame_count)
284        };
285
286        let shape = array.dims.iter().rev().map(|d| d.size).collect::<Vec<_>>();
287        let element_size = array.data.data_type().element_size();
288        let pipeline = self.build_pipeline(element_size);
289
290        macro_rules! write_typed {
291            ($t:ty, $v:expr) => {{
292                let ds = if let Some(ref pl) = pipeline {
293                    h5file
294                        .new_dataset::<$t>()
295                        .shape(&shape[..])
296                        .chunk(&shape[..])
297                        .filter_pipeline(pl.clone())
298                        .create(dataset_name.as_str())
299                        .map_err(|e| {
300                            ADError::UnsupportedConversion(format!("HDF5 dataset error: {}", e))
301                        })?
302                } else {
303                    h5file
304                        .new_dataset::<$t>()
305                        .shape(&shape[..])
306                        .create(dataset_name.as_str())
307                        .map_err(|e| {
308                            ADError::UnsupportedConversion(format!("HDF5 dataset error: {}", e))
309                        })?
310                };
311                if pipeline.is_some() {
312                    ds.write_chunk(0, array.data.as_u8_slice()).map_err(|e| {
313                        ADError::UnsupportedConversion(format!("HDF5 write error: {}", e))
314                    })?;
315                } else {
316                    ds.write_raw($v).map_err(|e| {
317                        ADError::UnsupportedConversion(format!("HDF5 write error: {}", e))
318                    })?;
319                }
320                if self.store_attributes {
321                    for attr in array.attributes.iter() {
322                        let val_str = attr.value.as_string();
323                        let _ = ds
324                            .new_attr::<rust_hdf5::types::VarLenUnicode>()
325                            .shape(())
326                            .create(attr.name.as_str())
327                            .and_then(|a| {
328                                let s: rust_hdf5::types::VarLenUnicode =
329                                    val_str.parse().unwrap_or_default();
330                                a.write_scalar(&s)
331                            });
332                    }
333                }
334            }};
335        }
336
337        match &array.data {
338            NDDataBuffer::U8(v) => write_typed!(u8, v),
339            NDDataBuffer::U16(v) => write_typed!(u16, v),
340            NDDataBuffer::I32(v) => write_typed!(i32, v),
341            NDDataBuffer::F32(v) => write_typed!(f32, v),
342            NDDataBuffer::F64(v) => write_typed!(f64, v),
343            _ => {
344                let raw = array.data.as_u8_slice();
345                let ds = if let Some(ref pl) = pipeline {
346                    h5file
347                        .new_dataset::<u8>()
348                        .shape([raw.len()])
349                        .chunk(&[raw.len()])
350                        .filter_pipeline(pl.clone())
351                        .create(dataset_name.as_str())
352                        .map_err(|e| {
353                            ADError::UnsupportedConversion(format!("HDF5 dataset error: {}", e))
354                        })?
355                } else {
356                    h5file
357                        .new_dataset::<u8>()
358                        .shape([raw.len()])
359                        .create(dataset_name.as_str())
360                        .map_err(|e| {
361                            ADError::UnsupportedConversion(format!("HDF5 dataset error: {}", e))
362                        })?
363                };
364                if pipeline.is_some() {
365                    ds.write_chunk(0, raw).map_err(|e| {
366                        ADError::UnsupportedConversion(format!("HDF5 write error: {}", e))
367                    })?;
368                } else {
369                    ds.write_raw(raw).map_err(|e| {
370                        ADError::UnsupportedConversion(format!("HDF5 write error: {}", e))
371                    })?;
372                }
373                if self.store_attributes {
374                    for attr in array.attributes.iter() {
375                        let val_str = attr.value.as_string();
376                        let _ = ds
377                            .new_attr::<rust_hdf5::types::VarLenUnicode>()
378                            .shape(())
379                            .create(attr.name.as_str())
380                            .and_then(|a| {
381                                let s: rust_hdf5::types::VarLenUnicode =
382                                    val_str.parse().unwrap_or_default();
383                                a.write_scalar(&s)
384                            });
385                    }
386                }
387            }
388        }
389        Ok(())
390    }
391
392    /// Write a frame in SWMR mode.
393    fn write_swmr(&mut self, array: &NDArray) -> ADResult<()> {
394        let (writer, ds_index) = match self.handle {
395            Some(Hdf5Handle::Swmr {
396                ref mut writer,
397                ds_index,
398            }) => (writer, ds_index),
399            _ => return Err(ADError::UnsupportedConversion("no SWMR writer open".into())),
400        };
401
402        writer
403            .append_frame(ds_index, array.data.as_u8_slice())
404            .map_err(|e| ADError::UnsupportedConversion(format!("SWMR append error: {}", e)))?;
405
406        // Periodic flush
407        let count = self.frame_count + 1; // will be incremented after return
408        if self.flush_nth_frame > 0 && count % self.flush_nth_frame == 0 {
409            writer
410                .flush()
411                .map_err(|e| ADError::UnsupportedConversion(format!("SWMR flush error: {}", e)))?;
412        }
413
414        Ok(())
415    }
416}
417
418impl Default for Hdf5Writer {
419    fn default() -> Self {
420        Self::new()
421    }
422}
423
424impl NDFileWriter for Hdf5Writer {
425    fn open_file(&mut self, path: &Path, mode: NDFileMode, array: &NDArray) -> ADResult<()> {
426        self.current_path = Some(path.to_path_buf());
427        self.frame_count = 0;
428        self.total_runtime = 0.0;
429        self.total_bytes = 0;
430        self.swmr_cb_counter = 0;
431
432        if self.swmr_mode && mode == NDFileMode::Stream {
433            self.open_swmr(path, array)
434        } else {
435            let h5file = H5File::create(path)
436                .map_err(|e| ADError::UnsupportedConversion(format!("HDF5 create error: {}", e)))?;
437            self.handle = Some(Hdf5Handle::Standard(h5file));
438            Ok(())
439        }
440    }
441
442    fn write_file(&mut self, array: &NDArray) -> ADResult<()> {
443        let start = if self.store_performance {
444            Some(std::time::Instant::now())
445        } else {
446            None
447        };
448
449        let is_swmr = matches!(self.handle, Some(Hdf5Handle::Swmr { .. }));
450        if is_swmr {
451            self.write_swmr(array)?;
452        } else {
453            self.write_standard(array)?;
454        }
455        self.frame_count += 1;
456
457        if let Some(start) = start {
458            self.total_runtime += start.elapsed().as_secs_f64();
459            self.total_bytes += array.data.as_u8_slice().len() as u64;
460        }
461        Ok(())
462    }
463
464    fn read_file(&mut self) -> ADResult<NDArray> {
465        let path = self
466            .current_path
467            .as_ref()
468            .ok_or_else(|| ADError::UnsupportedConversion("no file open".into()))?;
469
470        let h5file = H5File::open(path)
471            .map_err(|e| ADError::UnsupportedConversion(format!("HDF5 open error: {}", e)))?;
472
473        let ds = h5file
474            .dataset(&self.dataset_name)
475            .map_err(|e| ADError::UnsupportedConversion(format!("HDF5 dataset error: {}", e)))?;
476
477        let shape = ds.shape();
478        let dims: Vec<NDDimension> = shape.iter().rev().map(|&s| NDDimension::new(s)).collect();
479
480        if let Ok(data) = ds.read_raw::<u8>() {
481            let mut arr = NDArray::new(dims, NDDataType::UInt8);
482            arr.data = NDDataBuffer::U8(data);
483            return Ok(arr);
484        }
485        if let Ok(data) = ds.read_raw::<u16>() {
486            let mut arr = NDArray::new(dims, NDDataType::UInt16);
487            arr.data = NDDataBuffer::U16(data);
488            return Ok(arr);
489        }
490        if let Ok(data) = ds.read_raw::<f64>() {
491            let mut arr = NDArray::new(dims, NDDataType::Float64);
492            arr.data = NDDataBuffer::F64(data);
493            return Ok(arr);
494        }
495
496        Err(ADError::UnsupportedConversion(
497            "unsupported HDF5 data type".into(),
498        ))
499    }
500
501    fn close_file(&mut self) -> ADResult<()> {
502        if let Some(Hdf5Handle::Swmr { writer, .. }) = self.handle.take() {
503            writer
504                .close()
505                .map_err(|e| ADError::UnsupportedConversion(format!("SWMR close error: {}", e)))?;
506        }
507        self.current_path = None;
508        Ok(())
509    }
510
511    fn supports_multiple_arrays(&self) -> bool {
512        true
513    }
514}
515
516// ============================================================
517// Processor
518// ============================================================
519
520/// Param indices for HDF5-specific params.
521#[derive(Default)]
522struct Hdf5ParamIndices {
523    compression_type: Option<usize>,
524    z_compress_level: Option<usize>,
525    szip_num_pixels: Option<usize>,
526    nbit_precision: Option<usize>,
527    nbit_offset: Option<usize>,
528    jpeg_quality: Option<usize>,
529    blosc_shuffle_type: Option<usize>,
530    blosc_compressor: Option<usize>,
531    blosc_compress_level: Option<usize>,
532    store_attributes: Option<usize>,
533    store_performance: Option<usize>,
534    total_runtime: Option<usize>,
535    total_io_speed: Option<usize>,
536    swmr_mode: Option<usize>,
537    swmr_flush_now: Option<usize>,
538    swmr_running: Option<usize>,
539    swmr_cb_counter: Option<usize>,
540    swmr_supported: Option<usize>,
541    flush_nth_frame: Option<usize>,
542}
543
544/// HDF5 file processor wrapping FilePluginController<Hdf5Writer>.
545pub struct Hdf5FileProcessor {
546    ctrl: FilePluginController<Hdf5Writer>,
547    hdf5_params: Hdf5ParamIndices,
548}
549
550impl Hdf5FileProcessor {
551    pub fn new() -> Self {
552        Self {
553            ctrl: FilePluginController::new(Hdf5Writer::new()),
554            hdf5_params: Hdf5ParamIndices::default(),
555        }
556    }
557
558    pub fn set_dataset_name(&mut self, name: &str) {
559        self.ctrl.writer.set_dataset_name(name);
560    }
561}
562
563/// Register all HDF5-specific params.
564fn register_hdf5_params(
565    base: &mut asyn_rs::port::PortDriverBase,
566) -> asyn_rs::error::AsynResult<()> {
567    use asyn_rs::param::ParamType;
568    base.create_param("HDF5_SWMRFlushNow", ParamType::Int32)?;
569    base.create_param("HDF5_chunkSizeAuto", ParamType::Int32)?;
570    base.create_param("HDF5_nRowChunks", ParamType::Int32)?;
571    base.create_param("HDF5_nColChunks", ParamType::Int32)?;
572    base.create_param("HDF5_chunkSize2", ParamType::Int32)?;
573    base.create_param("HDF5_chunkSize3", ParamType::Int32)?;
574    base.create_param("HDF5_chunkSize4", ParamType::Int32)?;
575    base.create_param("HDF5_chunkSize5", ParamType::Int32)?;
576    base.create_param("HDF5_chunkSize6", ParamType::Int32)?;
577    base.create_param("HDF5_chunkSize7", ParamType::Int32)?;
578    base.create_param("HDF5_chunkSize8", ParamType::Int32)?;
579    base.create_param("HDF5_chunkSize9", ParamType::Int32)?;
580    base.create_param("HDF5_nFramesChunks", ParamType::Int32)?;
581    base.create_param("HDF5_NDAttributeChunk", ParamType::Int32)?;
582    base.create_param("HDF5_chunkBoundaryAlign", ParamType::Int32)?;
583    base.create_param("HDF5_chunkBoundaryThreshold", ParamType::Int32)?;
584    base.create_param("HDF5_nExtraDims", ParamType::Int32)?;
585    base.create_param("HDF5_extraDimSizeN", ParamType::Int32)?;
586    base.create_param("HDF5_extraDimNameN", ParamType::Octet)?;
587    base.create_param("HDF5_extraDimSizeX", ParamType::Int32)?;
588    base.create_param("HDF5_extraDimNameX", ParamType::Octet)?;
589    base.create_param("HDF5_extraDimSizeY", ParamType::Int32)?;
590    base.create_param("HDF5_extraDimNameY", ParamType::Octet)?;
591    base.create_param("HDF5_extraDimSize3", ParamType::Int32)?;
592    base.create_param("HDF5_extraDimName3", ParamType::Octet)?;
593    base.create_param("HDF5_extraDimSize4", ParamType::Int32)?;
594    base.create_param("HDF5_extraDimName4", ParamType::Octet)?;
595    base.create_param("HDF5_extraDimSize5", ParamType::Int32)?;
596    base.create_param("HDF5_extraDimName5", ParamType::Octet)?;
597    base.create_param("HDF5_extraDimSize6", ParamType::Int32)?;
598    base.create_param("HDF5_extraDimName6", ParamType::Octet)?;
599    base.create_param("HDF5_extraDimSize7", ParamType::Int32)?;
600    base.create_param("HDF5_extraDimName7", ParamType::Octet)?;
601    base.create_param("HDF5_extraDimSize8", ParamType::Int32)?;
602    base.create_param("HDF5_extraDimName8", ParamType::Octet)?;
603    base.create_param("HDF5_extraDimSize9", ParamType::Int32)?;
604    base.create_param("HDF5_extraDimName9", ParamType::Octet)?;
605    base.create_param("HDF5_storeAttributes", ParamType::Int32)?;
606    base.create_param("HDF5_storePerformance", ParamType::Int32)?;
607    base.create_param("HDF5_totalRuntime", ParamType::Float64)?;
608    base.create_param("HDF5_totalIoSpeed", ParamType::Float64)?;
609    base.create_param("HDF5_flushNthFrame", ParamType::Int32)?;
610    base.create_param("HDF5_compressionType", ParamType::Int32)?;
611    base.create_param("HDF5_nbitsPrecision", ParamType::Int32)?;
612    base.create_param("HDF5_nbitsOffset", ParamType::Int32)?;
613    base.create_param("HDF5_szipNumPixels", ParamType::Int32)?;
614    base.create_param("HDF5_zCompressLevel", ParamType::Int32)?;
615    base.create_param("HDF5_bloscShuffleType", ParamType::Int32)?;
616    base.create_param("HDF5_bloscCompressor", ParamType::Int32)?;
617    base.create_param("HDF5_bloscCompressLevel", ParamType::Int32)?;
618    base.create_param("HDF5_jpegQuality", ParamType::Int32)?;
619    base.create_param("HDF5_dimAttDatasets", ParamType::Int32)?;
620    base.create_param("HDF5_layoutErrorMsg", ParamType::Octet)?;
621    base.create_param("HDF5_layoutValid", ParamType::Int32)?;
622    base.create_param("HDF5_layoutFilename", ParamType::Octet)?;
623    base.create_param("HDF5_SWMRSupported", ParamType::Int32)?;
624    base.create_param("HDF5_SWMRMode", ParamType::Int32)?;
625    base.create_param("HDF5_SWMRRunning", ParamType::Int32)?;
626    base.create_param("HDF5_SWMRCbCounter", ParamType::Int32)?;
627    base.create_param("HDF5_posRunning", ParamType::Int32)?;
628    base.create_param("HDF5_posNameDimN", ParamType::Octet)?;
629    base.create_param("HDF5_posNameDimX", ParamType::Octet)?;
630    base.create_param("HDF5_posNameDimY", ParamType::Octet)?;
631    base.create_param("HDF5_posNameDim3", ParamType::Octet)?;
632    base.create_param("HDF5_posNameDim4", ParamType::Octet)?;
633    base.create_param("HDF5_posNameDim5", ParamType::Octet)?;
634    base.create_param("HDF5_posNameDim6", ParamType::Octet)?;
635    base.create_param("HDF5_posNameDim7", ParamType::Octet)?;
636    base.create_param("HDF5_posNameDim8", ParamType::Octet)?;
637    base.create_param("HDF5_posNameDim9", ParamType::Octet)?;
638    base.create_param("HDF5_posIndexDimN", ParamType::Octet)?;
639    base.create_param("HDF5_posIndexDimX", ParamType::Octet)?;
640    base.create_param("HDF5_posIndexDimY", ParamType::Octet)?;
641    base.create_param("HDF5_posIndexDim3", ParamType::Octet)?;
642    base.create_param("HDF5_posIndexDim4", ParamType::Octet)?;
643    base.create_param("HDF5_posIndexDim5", ParamType::Octet)?;
644    base.create_param("HDF5_posIndexDim6", ParamType::Octet)?;
645    base.create_param("HDF5_posIndexDim7", ParamType::Octet)?;
646    base.create_param("HDF5_posIndexDim8", ParamType::Octet)?;
647    base.create_param("HDF5_posIndexDim9", ParamType::Octet)?;
648    base.create_param("HDF5_fillValue", ParamType::Float64)?;
649    base.create_param("HDF5_extraDimChunkX", ParamType::Int32)?;
650    base.create_param("HDF5_extraDimChunkY", ParamType::Int32)?;
651    base.create_param("HDF5_extraDimChunk3", ParamType::Int32)?;
652    base.create_param("HDF5_extraDimChunk4", ParamType::Int32)?;
653    base.create_param("HDF5_extraDimChunk5", ParamType::Int32)?;
654    base.create_param("HDF5_extraDimChunk6", ParamType::Int32)?;
655    base.create_param("HDF5_extraDimChunk7", ParamType::Int32)?;
656    base.create_param("HDF5_extraDimChunk8", ParamType::Int32)?;
657    base.create_param("HDF5_extraDimChunk9", ParamType::Int32)?;
658    Ok(())
659}
660
661impl Default for Hdf5FileProcessor {
662    fn default() -> Self {
663        Self::new()
664    }
665}
666
667impl NDPluginProcess for Hdf5FileProcessor {
668    fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
669        let was_swmr = self.ctrl.writer.is_swmr_active();
670        let mut result = self.ctrl.process_array(array);
671        let is_swmr = self.ctrl.writer.is_swmr_active();
672
673        // SWMR running status changed
674        if was_swmr != is_swmr {
675            if let Some(idx) = self.hdf5_params.swmr_running {
676                result
677                    .param_updates
678                    .push(ParamUpdate::int32(idx, if is_swmr { 1 } else { 0 }));
679            }
680        }
681
682        // SWMR callback counter
683        if is_swmr {
684            if let Some(idx) = self.hdf5_params.swmr_cb_counter {
685                result.param_updates.push(ParamUpdate::int32(
686                    idx,
687                    self.ctrl.writer.swmr_cb_counter as i32,
688                ));
689            }
690        }
691
692        // Performance stats
693        if self.ctrl.writer.store_performance {
694            if let Some(idx) = self.hdf5_params.total_runtime {
695                result
696                    .param_updates
697                    .push(ParamUpdate::float64(idx, self.ctrl.writer.total_runtime));
698            }
699            if let Some(idx) = self.hdf5_params.total_io_speed {
700                let speed = if self.ctrl.writer.total_runtime > 0.0 {
701                    self.ctrl.writer.total_bytes as f64
702                        / self.ctrl.writer.total_runtime
703                        / 1_000_000.0
704                } else {
705                    0.0
706                };
707                result.param_updates.push(ParamUpdate::float64(idx, speed));
708            }
709        }
710
711        result
712    }
713
714    fn plugin_type(&self) -> &str {
715        "NDFileHDF5"
716    }
717
718    fn register_params(
719        &mut self,
720        base: &mut asyn_rs::port::PortDriverBase,
721    ) -> asyn_rs::error::AsynResult<()> {
722        self.ctrl.register_params(base)?;
723        register_hdf5_params(base)?;
724        self.hdf5_params.compression_type = base.find_param("HDF5_compressionType");
725        self.hdf5_params.z_compress_level = base.find_param("HDF5_zCompressLevel");
726        self.hdf5_params.szip_num_pixels = base.find_param("HDF5_szipNumPixels");
727        self.hdf5_params.nbit_precision = base.find_param("HDF5_nbitsPrecision");
728        self.hdf5_params.nbit_offset = base.find_param("HDF5_nbitsOffset");
729        self.hdf5_params.jpeg_quality = base.find_param("HDF5_jpegQuality");
730        self.hdf5_params.blosc_shuffle_type = base.find_param("HDF5_bloscShuffleType");
731        self.hdf5_params.blosc_compressor = base.find_param("HDF5_bloscCompressor");
732        self.hdf5_params.blosc_compress_level = base.find_param("HDF5_bloscCompressLevel");
733        self.hdf5_params.store_attributes = base.find_param("HDF5_storeAttributes");
734        self.hdf5_params.store_performance = base.find_param("HDF5_storePerformance");
735        self.hdf5_params.total_runtime = base.find_param("HDF5_totalRuntime");
736        self.hdf5_params.total_io_speed = base.find_param("HDF5_totalIoSpeed");
737        self.hdf5_params.swmr_mode = base.find_param("HDF5_SWMRMode");
738        self.hdf5_params.swmr_flush_now = base.find_param("HDF5_SWMRFlushNow");
739        self.hdf5_params.swmr_running = base.find_param("HDF5_SWMRRunning");
740        self.hdf5_params.swmr_cb_counter = base.find_param("HDF5_SWMRCbCounter");
741        self.hdf5_params.swmr_supported = base.find_param("HDF5_SWMRSupported");
742        self.hdf5_params.flush_nth_frame = base.find_param("HDF5_flushNthFrame");
743
744        // Report SWMR as always supported
745        if let Some(idx) = self.hdf5_params.swmr_supported {
746            base.set_int32_param(idx, 0, 1)?;
747        }
748        Ok(())
749    }
750
751    fn on_param_change(
752        &mut self,
753        reason: usize,
754        params: &PluginParamSnapshot,
755    ) -> ParamChangeResult {
756        // -- compression params --
757        if Some(reason) == self.hdf5_params.compression_type {
758            self.ctrl.writer.set_compression_type(params.value.as_i32());
759            return ParamChangeResult::updates(vec![]);
760        }
761        if Some(reason) == self.hdf5_params.z_compress_level {
762            self.ctrl
763                .writer
764                .set_z_compress_level(params.value.as_i32() as u32);
765            return ParamChangeResult::updates(vec![]);
766        }
767        if Some(reason) == self.hdf5_params.szip_num_pixels {
768            self.ctrl
769                .writer
770                .set_szip_num_pixels(params.value.as_i32() as u32);
771            return ParamChangeResult::updates(vec![]);
772        }
773        if Some(reason) == self.hdf5_params.blosc_shuffle_type {
774            self.ctrl
775                .writer
776                .set_blosc_shuffle_type(params.value.as_i32());
777            return ParamChangeResult::updates(vec![]);
778        }
779        if Some(reason) == self.hdf5_params.blosc_compressor {
780            self.ctrl.writer.set_blosc_compressor(params.value.as_i32());
781            return ParamChangeResult::updates(vec![]);
782        }
783        if Some(reason) == self.hdf5_params.blosc_compress_level {
784            self.ctrl
785                .writer
786                .set_blosc_compress_level(params.value.as_i32() as u32);
787            return ParamChangeResult::updates(vec![]);
788        }
789        if Some(reason) == self.hdf5_params.nbit_precision {
790            self.ctrl
791                .writer
792                .set_nbit_precision(params.value.as_i32() as u32);
793            return ParamChangeResult::updates(vec![]);
794        }
795        if Some(reason) == self.hdf5_params.nbit_offset {
796            self.ctrl
797                .writer
798                .set_nbit_offset(params.value.as_i32() as u32);
799            return ParamChangeResult::updates(vec![]);
800        }
801        if Some(reason) == self.hdf5_params.jpeg_quality {
802            self.ctrl
803                .writer
804                .set_jpeg_quality(params.value.as_i32() as u32);
805            return ParamChangeResult::updates(vec![]);
806        }
807        if Some(reason) == self.hdf5_params.store_attributes {
808            self.ctrl
809                .writer
810                .set_store_attributes(params.value.as_i32() != 0);
811            return ParamChangeResult::updates(vec![]);
812        }
813        if Some(reason) == self.hdf5_params.store_performance {
814            self.ctrl
815                .writer
816                .set_store_performance(params.value.as_i32() != 0);
817            return ParamChangeResult::updates(vec![]);
818        }
819        // -- SWMR params --
820        if Some(reason) == self.hdf5_params.swmr_mode {
821            self.ctrl.writer.set_swmr_mode(params.value.as_i32() != 0);
822            return ParamChangeResult::updates(vec![]);
823        }
824        if Some(reason) == self.hdf5_params.swmr_flush_now {
825            if params.value.as_i32() != 0 {
826                self.ctrl.writer.flush_swmr();
827                let mut updates = vec![];
828                if let Some(idx) = self.hdf5_params.swmr_cb_counter {
829                    updates.push(ParamUpdate::int32(
830                        idx,
831                        self.ctrl.writer.swmr_cb_counter as i32,
832                    ));
833                }
834                return ParamChangeResult::updates(updates);
835            }
836            return ParamChangeResult::updates(vec![]);
837        }
838        if Some(reason) == self.hdf5_params.flush_nth_frame {
839            self.ctrl
840                .writer
841                .set_flush_nth_frame(params.value.as_i32().max(0) as usize);
842            return ParamChangeResult::updates(vec![]);
843        }
844        self.ctrl.on_param_change(reason, params)
845    }
846}
847
848#[cfg(test)]
849mod tests {
850    use super::*;
851    use ad_core_rs::attributes::{NDAttrSource, NDAttrValue, NDAttribute};
852    use std::sync::atomic::{AtomicU32, Ordering};
853
854    static TEST_COUNTER: AtomicU32 = AtomicU32::new(0);
855
856    fn temp_path(prefix: &str) -> PathBuf {
857        let n = TEST_COUNTER.fetch_add(1, Ordering::Relaxed);
858        std::env::temp_dir().join(format!("adcore_test_{}_{}.h5", prefix, n))
859    }
860
861    #[test]
862    fn test_write_single_frame() {
863        let path = temp_path("hdf5_single");
864        let mut writer = Hdf5Writer::new();
865
866        let mut arr = NDArray::new(
867            vec![NDDimension::new(4), NDDimension::new(4)],
868            NDDataType::UInt8,
869        );
870        if let NDDataBuffer::U8(ref mut v) = arr.data {
871            for i in 0..16 {
872                v[i] = i as u8;
873            }
874        }
875
876        writer.open_file(&path, NDFileMode::Single, &arr).unwrap();
877        writer.write_file(&arr).unwrap();
878        writer.close_file().unwrap();
879
880        let mut reader = Hdf5Writer::new();
881        reader.current_path = Some(path.clone());
882        let read_arr = reader.read_file().unwrap();
883        assert_eq!(read_arr.dims.len(), 2);
884        assert_eq!(read_arr.dims[0].size, 4);
885
886        std::fs::remove_file(&path).ok();
887    }
888
889    #[test]
890    fn test_write_multiple_frames() {
891        let path = temp_path("hdf5_multi");
892        let mut writer = Hdf5Writer::new();
893
894        let arr = NDArray::new(
895            vec![NDDimension::new(4), NDDimension::new(4)],
896            NDDataType::UInt8,
897        );
898
899        writer.open_file(&path, NDFileMode::Stream, &arr).unwrap();
900        writer.write_file(&arr).unwrap();
901        writer.write_file(&arr).unwrap();
902        writer.write_file(&arr).unwrap();
903        writer.close_file().unwrap();
904
905        assert!(writer.supports_multiple_arrays());
906        assert_eq!(writer.frame_count(), 3);
907
908        let data = std::fs::read(&path).unwrap();
909        assert_eq!(&data[0..8], b"\x89HDF\r\n\x1a\n");
910
911        std::fs::remove_file(&path).ok();
912    }
913
914    #[test]
915    fn test_attributes_stored() {
916        let path = temp_path("hdf5_attrs");
917        let mut writer = Hdf5Writer::new();
918
919        let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
920        arr.attributes.add(NDAttribute {
921            name: "exposure".into(),
922            description: "".into(),
923            source: NDAttrSource::Driver,
924            value: NDAttrValue::Float64(0.5),
925        });
926
927        writer.open_file(&path, NDFileMode::Single, &arr).unwrap();
928        writer.write_file(&arr).unwrap();
929        writer.close_file().unwrap();
930
931        let h5file = H5File::open(&path).unwrap();
932        let ds = h5file.dataset("data").unwrap();
933        let attr = ds.attr("exposure").unwrap();
934        let val = attr.read_string().unwrap();
935        assert_eq!(val, "0.5");
936
937        std::fs::remove_file(&path).ok();
938    }
939
940    #[test]
941    fn test_roundtrip_u16() {
942        let path = temp_path("hdf5_u16");
943        let mut writer = Hdf5Writer::new();
944
945        let mut arr = NDArray::new(
946            vec![NDDimension::new(4), NDDimension::new(4)],
947            NDDataType::UInt16,
948        );
949        if let NDDataBuffer::U16(ref mut v) = arr.data {
950            for i in 0..16 {
951                v[i] = (i * 100) as u16;
952            }
953        }
954
955        writer.open_file(&path, NDFileMode::Single, &arr).unwrap();
956        writer.write_file(&arr).unwrap();
957        writer.close_file().unwrap();
958
959        let h5file = H5File::open(&path).unwrap();
960        let ds = h5file.dataset("data").unwrap();
961        let data: Vec<u16> = ds.read_raw().unwrap();
962        assert_eq!(data[0], 0);
963        assert_eq!(data[1], 100);
964        assert_eq!(data[15], 1500);
965
966        std::fs::remove_file(&path).ok();
967    }
968
969    #[test]
970    fn test_roundtrip_f64() {
971        let path = temp_path("hdf5_f64");
972        let mut writer = Hdf5Writer::new();
973
974        let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::Float64);
975        if let NDDataBuffer::F64(ref mut v) = arr.data {
976            v[0] = 1.5;
977            v[1] = 2.5;
978            v[2] = 3.5;
979            v[3] = 4.5;
980        }
981
982        writer.open_file(&path, NDFileMode::Single, &arr).unwrap();
983        writer.write_file(&arr).unwrap();
984        writer.close_file().unwrap();
985
986        let h5file = H5File::open(&path).unwrap();
987        let ds = h5file.dataset("data").unwrap();
988        let data: Vec<f64> = ds.read_raw().unwrap();
989        assert!((data[0] - 1.5).abs() < 1e-10);
990        assert!((data[3] - 4.5).abs() < 1e-10);
991
992        std::fs::remove_file(&path).ok();
993    }
994
995    #[test]
996    fn test_deflate_compressed_write() {
997        let path = temp_path("hdf5_deflate");
998        let mut writer = Hdf5Writer::new();
999        writer.set_compression_type(COMPRESS_ZLIB);
1000        writer.set_z_compress_level(6);
1001
1002        let mut arr = NDArray::new(
1003            vec![NDDimension::new(64), NDDimension::new(64)],
1004            NDDataType::UInt16,
1005        );
1006        if let NDDataBuffer::U16(ref mut v) = arr.data {
1007            for i in 0..v.len() {
1008                v[i] = (i % 256) as u16;
1009            }
1010        }
1011
1012        writer.open_file(&path, NDFileMode::Single, &arr).unwrap();
1013        writer.write_file(&arr).unwrap();
1014        writer.close_file().unwrap();
1015
1016        let file_size = std::fs::metadata(&path).unwrap().len();
1017        assert!(
1018            file_size < 8192,
1019            "compressed file should be smaller than raw data"
1020        );
1021
1022        let h5file = H5File::open(&path).unwrap();
1023        let ds = h5file.dataset("data").unwrap();
1024        let data: Vec<u16> = ds.read_raw().unwrap();
1025        assert_eq!(data.len(), 64 * 64);
1026        assert_eq!(data[0], 0);
1027        assert_eq!(data[255], 255);
1028        assert_eq!(data[256], 0);
1029
1030        std::fs::remove_file(&path).ok();
1031    }
1032
1033    #[test]
1034    fn test_lz4_compressed_write() {
1035        let path = temp_path("hdf5_lz4");
1036        let mut writer = Hdf5Writer::new();
1037        writer.set_compression_type(COMPRESS_LZ4);
1038
1039        let mut arr = NDArray::new(
1040            vec![NDDimension::new(32), NDDimension::new(32)],
1041            NDDataType::UInt8,
1042        );
1043        if let NDDataBuffer::U8(ref mut v) = arr.data {
1044            for i in 0..v.len() {
1045                v[i] = (i % 4) as u8;
1046            }
1047        }
1048
1049        writer.open_file(&path, NDFileMode::Single, &arr).unwrap();
1050        writer.write_file(&arr).unwrap();
1051        writer.close_file().unwrap();
1052
1053        let h5file = H5File::open(&path).unwrap();
1054        let ds = h5file.dataset("data").unwrap();
1055        let data: Vec<u8> = ds.read_raw().unwrap();
1056        assert_eq!(data.len(), 32 * 32);
1057        assert_eq!(data[0], 0);
1058        assert_eq!(data[3], 3);
1059
1060        std::fs::remove_file(&path).ok();
1061    }
1062
1063    #[test]
1064    fn test_swmr_streaming() {
1065        let path = temp_path("hdf5_swmr");
1066        let mut writer = Hdf5Writer::new();
1067        writer.set_swmr_mode(true);
1068        writer.set_flush_nth_frame(2);
1069
1070        let arr = NDArray::new(
1071            vec![NDDimension::new(8), NDDimension::new(8)],
1072            NDDataType::Float32,
1073        );
1074
1075        writer.open_file(&path, NDFileMode::Stream, &arr).unwrap();
1076        writer.write_file(&arr).unwrap();
1077        writer.write_file(&arr).unwrap(); // should trigger flush
1078        writer.write_file(&arr).unwrap();
1079        writer.close_file().unwrap();
1080
1081        assert_eq!(writer.frame_count(), 3);
1082
1083        // Read back via SwmrFileReader
1084        let mut reader = rust_hdf5::swmr::SwmrFileReader::open(&path).unwrap();
1085        let shape = reader.dataset_shape("data").unwrap();
1086        assert_eq!(shape[0], 3); // 3 frames
1087        assert_eq!(shape[1], 8);
1088        assert_eq!(shape[2], 8);
1089
1090        let data: Vec<f32> = reader.read_dataset("data").unwrap();
1091        assert_eq!(data.len(), 3 * 8 * 8);
1092
1093        std::fs::remove_file(&path).ok();
1094    }
1095}