1use std::path::{Path, PathBuf};
2
3use ad_core_rs::error::{ADError, ADResult};
4use ad_core_rs::ndarray::{NDArray, NDDataBuffer, NDDataType, NDDimension};
5use ad_core_rs::ndarray_pool::NDArrayPool;
6use ad_core_rs::plugin::file_base::{NDFileMode, NDFileWriter};
7use ad_core_rs::plugin::file_controller::FilePluginController;
8use ad_core_rs::plugin::runtime::{
9 NDPluginProcess, ParamChangeResult, ParamUpdate, PluginParamSnapshot, ProcessResult,
10};
11
12use rust_hdf5::H5File;
13use rust_hdf5::format::messages::filter::{
14 FILTER_BLOSC, FILTER_JPEG, FILTER_NBIT, FILTER_SZIP, Filter, FilterPipeline,
15};
16use rust_hdf5::swmr::SwmrFileWriter;
17
18const COMPRESS_NONE: i32 = 0;
20const COMPRESS_NBIT: i32 = 1;
21const COMPRESS_SZIP: i32 = 2;
22const COMPRESS_ZLIB: i32 = 3;
23const COMPRESS_BLOSC: i32 = 4;
24#[allow(dead_code)]
25const COMPRESS_BSHUF: i32 = 5;
26const COMPRESS_LZ4: i32 = 6;
27const COMPRESS_JPEG: i32 = 7;
28
29const BLOSC_LZ: i32 = 0;
31const BLOSC_LZ4: i32 = 1;
32const BLOSC_LZ4HC: i32 = 2;
33const BLOSC_SNAPPY: i32 = 3;
34const BLOSC_ZLIB: i32 = 4;
35const BLOSC_ZSTD: i32 = 5;
36
37enum Hdf5Handle {
39 Standard(H5File),
40 Swmr {
41 writer: SwmrFileWriter,
42 ds_index: usize,
43 },
44}
45
46pub struct Hdf5Writer {
48 current_path: Option<PathBuf>,
49 handle: Option<Hdf5Handle>,
50 frame_count: usize,
51 dataset_name: String,
52 compression_type: i32,
54 z_compress_level: u32,
55 szip_num_pixels: u32,
56 nbit_precision: u32,
57 nbit_offset: u32,
58 jpeg_quality: u32,
59 blosc_shuffle_type: i32,
60 blosc_compressor: i32,
61 blosc_compress_level: u32,
62 swmr_mode: bool,
64 flush_nth_frame: usize,
65 pub swmr_cb_counter: u32,
66 pub store_attributes: bool,
68 pub store_performance: bool,
69 pub total_runtime: f64,
70 pub total_bytes: u64,
71}
72
73impl Hdf5Writer {
74 pub fn new() -> Self {
75 Self {
76 current_path: None,
77 handle: None,
78 frame_count: 0,
79 dataset_name: "data".to_string(),
80 compression_type: 0,
81 z_compress_level: 6,
82 szip_num_pixels: 16,
83 nbit_precision: 0,
84 nbit_offset: 0,
85 jpeg_quality: 90,
86 blosc_shuffle_type: 0,
87 blosc_compressor: 0,
88 blosc_compress_level: 5,
89 swmr_mode: false,
90 flush_nth_frame: 0,
91 swmr_cb_counter: 0,
92 store_attributes: true,
93 store_performance: false,
94 total_runtime: 0.0,
95 total_bytes: 0,
96 }
97 }
98
99 pub fn set_dataset_name(&mut self, name: &str) {
100 self.dataset_name = name.to_string();
101 }
102
103 pub fn set_compression_type(&mut self, v: i32) {
104 self.compression_type = v;
105 }
106
107 pub fn set_z_compress_level(&mut self, v: u32) {
108 self.z_compress_level = v;
109 }
110
111 pub fn set_szip_num_pixels(&mut self, v: u32) {
112 self.szip_num_pixels = v;
113 }
114
115 pub fn set_blosc_shuffle_type(&mut self, v: i32) {
116 self.blosc_shuffle_type = v;
117 }
118
119 pub fn set_blosc_compressor(&mut self, v: i32) {
120 self.blosc_compressor = v;
121 }
122
123 pub fn set_blosc_compress_level(&mut self, v: u32) {
124 self.blosc_compress_level = v;
125 }
126
127 pub fn set_nbit_precision(&mut self, v: u32) {
128 self.nbit_precision = v;
129 }
130
131 pub fn set_nbit_offset(&mut self, v: u32) {
132 self.nbit_offset = v;
133 }
134
135 pub fn set_jpeg_quality(&mut self, v: u32) {
136 self.jpeg_quality = v;
137 }
138
139 pub fn set_store_attributes(&mut self, v: bool) {
140 self.store_attributes = v;
141 }
142
143 pub fn set_store_performance(&mut self, v: bool) {
144 self.store_performance = v;
145 }
146
147 pub fn set_swmr_mode(&mut self, v: bool) {
148 self.swmr_mode = v;
149 }
150
151 pub fn set_flush_nth_frame(&mut self, v: usize) {
152 self.flush_nth_frame = v;
153 }
154
155 pub fn frame_count(&self) -> usize {
156 self.frame_count
157 }
158
159 pub fn flush_swmr(&mut self) {
161 if let Some(Hdf5Handle::Swmr { ref mut writer, .. }) = self.handle {
162 if writer.flush().is_ok() {
163 self.swmr_cb_counter += 1;
164 }
165 }
166 }
167
168 pub fn is_swmr_active(&self) -> bool {
170 matches!(self.handle, Some(Hdf5Handle::Swmr { .. }))
171 }
172
173 fn build_pipeline(&self, element_size: usize) -> Option<FilterPipeline> {
175 match self.compression_type {
176 COMPRESS_NONE => None,
177 COMPRESS_ZLIB => Some(FilterPipeline::deflate(self.z_compress_level)),
178 COMPRESS_SZIP => Some(FilterPipeline {
179 filters: vec![Filter {
180 id: FILTER_SZIP,
181 flags: 0,
182 cd_values: vec![4, self.szip_num_pixels],
183 }],
184 }),
185 COMPRESS_LZ4 => Some(FilterPipeline::lz4()),
186 COMPRESS_BLOSC => {
187 let compressor_code = match self.blosc_compressor {
188 BLOSC_LZ => 0,
189 BLOSC_LZ4 => 1,
190 BLOSC_LZ4HC => 2,
191 BLOSC_SNAPPY => 3,
192 BLOSC_ZLIB => 4,
193 BLOSC_ZSTD => 5,
194 _ => 0,
195 };
196 Some(FilterPipeline {
197 filters: vec![Filter {
198 id: FILTER_BLOSC,
199 flags: 0,
200 cd_values: vec![
201 2,
202 2,
203 element_size as u32,
204 0,
205 self.blosc_shuffle_type as u32,
206 compressor_code,
207 self.blosc_compress_level,
208 ],
209 }],
210 })
211 }
212 COMPRESS_NBIT => {
213 if self.nbit_precision > 0 {
214 Some(FilterPipeline {
215 filters: vec![Filter {
216 id: FILTER_NBIT,
217 flags: 0,
218 cd_values: vec![self.nbit_precision, self.nbit_offset],
219 }],
220 })
221 } else {
222 None
223 }
224 }
225 COMPRESS_JPEG => Some(FilterPipeline {
226 filters: vec![Filter {
227 id: FILTER_JPEG,
228 flags: 0,
229 cd_values: vec![self.jpeg_quality],
230 }],
231 }),
232 _ => None,
233 }
234 }
235
236 fn open_swmr(&mut self, path: &Path, array: &NDArray) -> ADResult<()> {
238 let mut swmr = SwmrFileWriter::create(path)
239 .map_err(|e| ADError::UnsupportedConversion(format!("SWMR create error: {}", e)))?;
240
241 let frame_dims: Vec<u64> = array.dims.iter().rev().map(|d| d.size as u64).collect();
242
243 macro_rules! create_ds {
244 ($t:ty) => {
245 swmr.create_streaming_dataset::<$t>(&self.dataset_name, &frame_dims)
246 .map_err(|e| {
247 ADError::UnsupportedConversion(format!("SWMR create dataset error: {}", e))
248 })
249 };
250 }
251
252 let ds_index = match array.data.data_type() {
253 NDDataType::Int8 => create_ds!(i8)?,
254 NDDataType::UInt8 => create_ds!(u8)?,
255 NDDataType::Int16 => create_ds!(i16)?,
256 NDDataType::UInt16 => create_ds!(u16)?,
257 NDDataType::Int32 => create_ds!(i32)?,
258 NDDataType::UInt32 => create_ds!(u32)?,
259 NDDataType::Int64 => create_ds!(i64)?,
260 NDDataType::UInt64 => create_ds!(u64)?,
261 NDDataType::Float32 => create_ds!(f32)?,
262 NDDataType::Float64 => create_ds!(f64)?,
263 };
264
265 swmr.start_swmr()
266 .map_err(|e| ADError::UnsupportedConversion(format!("SWMR start error: {}", e)))?;
267
268 self.handle = Some(Hdf5Handle::Swmr {
269 writer: swmr,
270 ds_index,
271 });
272 Ok(())
273 }
274
275 fn write_standard(&mut self, array: &NDArray) -> ADResult<()> {
277 let h5file = match self.handle {
278 Some(Hdf5Handle::Standard(ref f)) => f,
279 _ => return Err(ADError::UnsupportedConversion("no HDF5 file open".into())),
280 };
281
282 let dataset_name = if self.frame_count == 0 {
283 self.dataset_name.clone()
284 } else {
285 format!("{}_{}", self.dataset_name, self.frame_count)
286 };
287
288 let shape = array.dims.iter().rev().map(|d| d.size).collect::<Vec<_>>();
289 let element_size = array.data.data_type().element_size();
290 let pipeline = self.build_pipeline(element_size);
291
292 macro_rules! write_typed {
293 ($t:ty, $v:expr) => {{
294 let ds = if let Some(ref pl) = pipeline {
295 h5file
296 .new_dataset::<$t>()
297 .shape(&shape[..])
298 .chunk(&shape[..])
299 .filter_pipeline(pl.clone())
300 .create(dataset_name.as_str())
301 .map_err(|e| {
302 ADError::UnsupportedConversion(format!("HDF5 dataset error: {}", e))
303 })?
304 } else {
305 h5file
306 .new_dataset::<$t>()
307 .shape(&shape[..])
308 .create(dataset_name.as_str())
309 .map_err(|e| {
310 ADError::UnsupportedConversion(format!("HDF5 dataset error: {}", e))
311 })?
312 };
313 if pipeline.is_some() {
314 ds.write_chunk(0, array.data.as_u8_slice()).map_err(|e| {
315 ADError::UnsupportedConversion(format!("HDF5 write error: {}", e))
316 })?;
317 } else {
318 ds.write_raw($v).map_err(|e| {
319 ADError::UnsupportedConversion(format!("HDF5 write error: {}", e))
320 })?;
321 }
322 if self.store_attributes {
323 for attr in array.attributes.iter() {
324 let val_str = attr.value.as_string();
325 let _ = ds
326 .new_attr::<rust_hdf5::types::VarLenUnicode>()
327 .shape(())
328 .create(attr.name.as_str())
329 .and_then(|a| {
330 let s: rust_hdf5::types::VarLenUnicode =
331 val_str.parse().unwrap_or_default();
332 a.write_scalar(&s)
333 });
334 }
335 }
336 }};
337 }
338
339 match &array.data {
340 NDDataBuffer::I8(v) => write_typed!(i8, v),
341 NDDataBuffer::U8(v) => write_typed!(u8, v),
342 NDDataBuffer::I16(v) => write_typed!(i16, v),
343 NDDataBuffer::U16(v) => write_typed!(u16, v),
344 NDDataBuffer::I32(v) => write_typed!(i32, v),
345 NDDataBuffer::U32(v) => write_typed!(u32, v),
346 NDDataBuffer::I64(v) => write_typed!(i64, v),
347 NDDataBuffer::U64(v) => write_typed!(u64, v),
348 NDDataBuffer::F32(v) => write_typed!(f32, v),
349 NDDataBuffer::F64(v) => write_typed!(f64, v),
350 }
351 Ok(())
352 }
353
354 fn write_swmr(&mut self, array: &NDArray) -> ADResult<()> {
356 let (writer, ds_index) = match self.handle {
357 Some(Hdf5Handle::Swmr {
358 ref mut writer,
359 ds_index,
360 }) => (writer, ds_index),
361 _ => return Err(ADError::UnsupportedConversion("no SWMR writer open".into())),
362 };
363
364 writer
365 .append_frame(ds_index, array.data.as_u8_slice())
366 .map_err(|e| ADError::UnsupportedConversion(format!("SWMR append error: {}", e)))?;
367
368 let count = self.frame_count + 1; if self.flush_nth_frame > 0 && count % self.flush_nth_frame == 0 {
371 writer
372 .flush()
373 .map_err(|e| ADError::UnsupportedConversion(format!("SWMR flush error: {}", e)))?;
374 }
375
376 Ok(())
377 }
378}
379
380impl Default for Hdf5Writer {
381 fn default() -> Self {
382 Self::new()
383 }
384}
385
386impl NDFileWriter for Hdf5Writer {
387 fn open_file(&mut self, path: &Path, mode: NDFileMode, array: &NDArray) -> ADResult<()> {
388 self.current_path = Some(path.to_path_buf());
389 self.frame_count = 0;
390 self.total_runtime = 0.0;
391 self.total_bytes = 0;
392 self.swmr_cb_counter = 0;
393
394 if self.swmr_mode && mode == NDFileMode::Stream {
395 self.open_swmr(path, array)
396 } else {
397 let h5file = H5File::create(path)
398 .map_err(|e| ADError::UnsupportedConversion(format!("HDF5 create error: {}", e)))?;
399 self.handle = Some(Hdf5Handle::Standard(h5file));
400 Ok(())
401 }
402 }
403
404 fn write_file(&mut self, array: &NDArray) -> ADResult<()> {
405 let start = if self.store_performance {
406 Some(std::time::Instant::now())
407 } else {
408 None
409 };
410
411 let is_swmr = matches!(self.handle, Some(Hdf5Handle::Swmr { .. }));
412 if is_swmr {
413 self.write_swmr(array)?;
414 } else {
415 self.write_standard(array)?;
416 }
417 self.frame_count += 1;
418
419 if let Some(start) = start {
420 self.total_runtime += start.elapsed().as_secs_f64();
421 self.total_bytes += array.data.as_u8_slice().len() as u64;
422 }
423 Ok(())
424 }
425
426 fn read_file(&mut self) -> ADResult<NDArray> {
427 let path = self
428 .current_path
429 .as_ref()
430 .ok_or_else(|| ADError::UnsupportedConversion("no file open".into()))?;
431
432 let h5file = H5File::open(path)
433 .map_err(|e| ADError::UnsupportedConversion(format!("HDF5 open error: {}", e)))?;
434
435 let ds = h5file
436 .dataset(&self.dataset_name)
437 .map_err(|e| ADError::UnsupportedConversion(format!("HDF5 dataset error: {}", e)))?;
438
439 let shape = ds.shape();
440 let dims: Vec<NDDimension> = shape.iter().rev().map(|&s| NDDimension::new(s)).collect();
441
442 if let Ok(data) = ds.read_raw::<u8>() {
443 let mut arr = NDArray::new(dims, NDDataType::UInt8);
444 arr.data = NDDataBuffer::U8(data);
445 return Ok(arr);
446 }
447 if let Ok(data) = ds.read_raw::<u16>() {
448 let mut arr = NDArray::new(dims, NDDataType::UInt16);
449 arr.data = NDDataBuffer::U16(data);
450 return Ok(arr);
451 }
452 if let Ok(data) = ds.read_raw::<f64>() {
453 let mut arr = NDArray::new(dims, NDDataType::Float64);
454 arr.data = NDDataBuffer::F64(data);
455 return Ok(arr);
456 }
457
458 Err(ADError::UnsupportedConversion(
459 "unsupported HDF5 data type".into(),
460 ))
461 }
462
463 fn close_file(&mut self) -> ADResult<()> {
464 if let Some(Hdf5Handle::Swmr { writer, .. }) = self.handle.take() {
465 writer
466 .close()
467 .map_err(|e| ADError::UnsupportedConversion(format!("SWMR close error: {}", e)))?;
468 }
469 self.current_path = None;
470 Ok(())
471 }
472
473 fn supports_multiple_arrays(&self) -> bool {
474 true
475 }
476}
477
478#[derive(Default)]
484struct Hdf5ParamIndices {
485 compression_type: Option<usize>,
486 z_compress_level: Option<usize>,
487 szip_num_pixels: Option<usize>,
488 nbit_precision: Option<usize>,
489 nbit_offset: Option<usize>,
490 jpeg_quality: Option<usize>,
491 blosc_shuffle_type: Option<usize>,
492 blosc_compressor: Option<usize>,
493 blosc_compress_level: Option<usize>,
494 store_attributes: Option<usize>,
495 store_performance: Option<usize>,
496 total_runtime: Option<usize>,
497 total_io_speed: Option<usize>,
498 swmr_mode: Option<usize>,
499 swmr_flush_now: Option<usize>,
500 swmr_running: Option<usize>,
501 swmr_cb_counter: Option<usize>,
502 swmr_supported: Option<usize>,
503 flush_nth_frame: Option<usize>,
504}
505
506pub struct Hdf5FileProcessor {
508 ctrl: FilePluginController<Hdf5Writer>,
509 hdf5_params: Hdf5ParamIndices,
510}
511
512impl Hdf5FileProcessor {
513 pub fn new() -> Self {
514 Self {
515 ctrl: FilePluginController::new(Hdf5Writer::new()),
516 hdf5_params: Hdf5ParamIndices::default(),
517 }
518 }
519
520 pub fn set_dataset_name(&mut self, name: &str) {
521 self.ctrl.writer.set_dataset_name(name);
522 }
523}
524
525fn register_hdf5_params(
527 base: &mut asyn_rs::port::PortDriverBase,
528) -> asyn_rs::error::AsynResult<()> {
529 use asyn_rs::param::ParamType;
530 base.create_param("HDF5_SWMRFlushNow", ParamType::Int32)?;
531 base.create_param("HDF5_chunkSizeAuto", ParamType::Int32)?;
532 base.create_param("HDF5_nRowChunks", ParamType::Int32)?;
533 base.create_param("HDF5_nColChunks", ParamType::Int32)?;
534 base.create_param("HDF5_chunkSize2", ParamType::Int32)?;
535 base.create_param("HDF5_chunkSize3", ParamType::Int32)?;
536 base.create_param("HDF5_chunkSize4", ParamType::Int32)?;
537 base.create_param("HDF5_chunkSize5", ParamType::Int32)?;
538 base.create_param("HDF5_chunkSize6", ParamType::Int32)?;
539 base.create_param("HDF5_chunkSize7", ParamType::Int32)?;
540 base.create_param("HDF5_chunkSize8", ParamType::Int32)?;
541 base.create_param("HDF5_chunkSize9", ParamType::Int32)?;
542 base.create_param("HDF5_nFramesChunks", ParamType::Int32)?;
543 base.create_param("HDF5_NDAttributeChunk", ParamType::Int32)?;
544 base.create_param("HDF5_chunkBoundaryAlign", ParamType::Int32)?;
545 base.create_param("HDF5_chunkBoundaryThreshold", ParamType::Int32)?;
546 base.create_param("HDF5_nExtraDims", ParamType::Int32)?;
547 base.create_param("HDF5_extraDimSizeN", ParamType::Int32)?;
548 base.create_param("HDF5_extraDimNameN", ParamType::Octet)?;
549 base.create_param("HDF5_extraDimSizeX", ParamType::Int32)?;
550 base.create_param("HDF5_extraDimNameX", ParamType::Octet)?;
551 base.create_param("HDF5_extraDimSizeY", ParamType::Int32)?;
552 base.create_param("HDF5_extraDimNameY", ParamType::Octet)?;
553 base.create_param("HDF5_extraDimSize3", ParamType::Int32)?;
554 base.create_param("HDF5_extraDimName3", ParamType::Octet)?;
555 base.create_param("HDF5_extraDimSize4", ParamType::Int32)?;
556 base.create_param("HDF5_extraDimName4", ParamType::Octet)?;
557 base.create_param("HDF5_extraDimSize5", ParamType::Int32)?;
558 base.create_param("HDF5_extraDimName5", ParamType::Octet)?;
559 base.create_param("HDF5_extraDimSize6", ParamType::Int32)?;
560 base.create_param("HDF5_extraDimName6", ParamType::Octet)?;
561 base.create_param("HDF5_extraDimSize7", ParamType::Int32)?;
562 base.create_param("HDF5_extraDimName7", ParamType::Octet)?;
563 base.create_param("HDF5_extraDimSize8", ParamType::Int32)?;
564 base.create_param("HDF5_extraDimName8", ParamType::Octet)?;
565 base.create_param("HDF5_extraDimSize9", ParamType::Int32)?;
566 base.create_param("HDF5_extraDimName9", ParamType::Octet)?;
567 base.create_param("HDF5_storeAttributes", ParamType::Int32)?;
568 base.create_param("HDF5_storePerformance", ParamType::Int32)?;
569 base.create_param("HDF5_totalRuntime", ParamType::Float64)?;
570 base.create_param("HDF5_totalIoSpeed", ParamType::Float64)?;
571 base.create_param("HDF5_flushNthFrame", ParamType::Int32)?;
572 base.create_param("HDF5_compressionType", ParamType::Int32)?;
573 base.create_param("HDF5_nbitsPrecision", ParamType::Int32)?;
574 base.create_param("HDF5_nbitsOffset", ParamType::Int32)?;
575 base.create_param("HDF5_szipNumPixels", ParamType::Int32)?;
576 base.create_param("HDF5_zCompressLevel", ParamType::Int32)?;
577 base.create_param("HDF5_bloscShuffleType", ParamType::Int32)?;
578 base.create_param("HDF5_bloscCompressor", ParamType::Int32)?;
579 base.create_param("HDF5_bloscCompressLevel", ParamType::Int32)?;
580 base.create_param("HDF5_jpegQuality", ParamType::Int32)?;
581 base.create_param("HDF5_dimAttDatasets", ParamType::Int32)?;
582 base.create_param("HDF5_layoutErrorMsg", ParamType::Octet)?;
583 base.create_param("HDF5_layoutValid", ParamType::Int32)?;
584 base.create_param("HDF5_layoutFilename", ParamType::Octet)?;
585 base.create_param("HDF5_SWMRSupported", ParamType::Int32)?;
586 base.create_param("HDF5_SWMRMode", ParamType::Int32)?;
587 base.create_param("HDF5_SWMRRunning", ParamType::Int32)?;
588 base.create_param("HDF5_SWMRCbCounter", ParamType::Int32)?;
589 base.create_param("HDF5_posRunning", ParamType::Int32)?;
590 base.create_param("HDF5_posNameDimN", ParamType::Octet)?;
591 base.create_param("HDF5_posNameDimX", ParamType::Octet)?;
592 base.create_param("HDF5_posNameDimY", ParamType::Octet)?;
593 base.create_param("HDF5_posNameDim3", ParamType::Octet)?;
594 base.create_param("HDF5_posNameDim4", ParamType::Octet)?;
595 base.create_param("HDF5_posNameDim5", ParamType::Octet)?;
596 base.create_param("HDF5_posNameDim6", ParamType::Octet)?;
597 base.create_param("HDF5_posNameDim7", ParamType::Octet)?;
598 base.create_param("HDF5_posNameDim8", ParamType::Octet)?;
599 base.create_param("HDF5_posNameDim9", ParamType::Octet)?;
600 base.create_param("HDF5_posIndexDimN", ParamType::Octet)?;
601 base.create_param("HDF5_posIndexDimX", ParamType::Octet)?;
602 base.create_param("HDF5_posIndexDimY", ParamType::Octet)?;
603 base.create_param("HDF5_posIndexDim3", ParamType::Octet)?;
604 base.create_param("HDF5_posIndexDim4", ParamType::Octet)?;
605 base.create_param("HDF5_posIndexDim5", ParamType::Octet)?;
606 base.create_param("HDF5_posIndexDim6", ParamType::Octet)?;
607 base.create_param("HDF5_posIndexDim7", ParamType::Octet)?;
608 base.create_param("HDF5_posIndexDim8", ParamType::Octet)?;
609 base.create_param("HDF5_posIndexDim9", ParamType::Octet)?;
610 base.create_param("HDF5_fillValue", ParamType::Float64)?;
611 base.create_param("HDF5_extraDimChunkX", ParamType::Int32)?;
612 base.create_param("HDF5_extraDimChunkY", ParamType::Int32)?;
613 base.create_param("HDF5_extraDimChunk3", ParamType::Int32)?;
614 base.create_param("HDF5_extraDimChunk4", ParamType::Int32)?;
615 base.create_param("HDF5_extraDimChunk5", ParamType::Int32)?;
616 base.create_param("HDF5_extraDimChunk6", ParamType::Int32)?;
617 base.create_param("HDF5_extraDimChunk7", ParamType::Int32)?;
618 base.create_param("HDF5_extraDimChunk8", ParamType::Int32)?;
619 base.create_param("HDF5_extraDimChunk9", ParamType::Int32)?;
620 Ok(())
621}
622
623impl Default for Hdf5FileProcessor {
624 fn default() -> Self {
625 Self::new()
626 }
627}
628
629impl NDPluginProcess for Hdf5FileProcessor {
630 fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
631 let was_swmr = self.ctrl.writer.is_swmr_active();
632 let mut result = self.ctrl.process_array(array);
633 let is_swmr = self.ctrl.writer.is_swmr_active();
634
635 if was_swmr != is_swmr {
637 if let Some(idx) = self.hdf5_params.swmr_running {
638 result
639 .param_updates
640 .push(ParamUpdate::int32(idx, if is_swmr { 1 } else { 0 }));
641 }
642 }
643
644 if is_swmr {
646 if let Some(idx) = self.hdf5_params.swmr_cb_counter {
647 result.param_updates.push(ParamUpdate::int32(
648 idx,
649 self.ctrl.writer.swmr_cb_counter as i32,
650 ));
651 }
652 }
653
654 if self.ctrl.writer.store_performance {
656 if let Some(idx) = self.hdf5_params.total_runtime {
657 result
658 .param_updates
659 .push(ParamUpdate::float64(idx, self.ctrl.writer.total_runtime));
660 }
661 if let Some(idx) = self.hdf5_params.total_io_speed {
662 let speed = if self.ctrl.writer.total_runtime > 0.0 {
663 self.ctrl.writer.total_bytes as f64
664 / self.ctrl.writer.total_runtime
665 / 1_000_000.0
666 } else {
667 0.0
668 };
669 result.param_updates.push(ParamUpdate::float64(idx, speed));
670 }
671 }
672
673 result
674 }
675
676 fn plugin_type(&self) -> &str {
677 "NDFileHDF5"
678 }
679
680 fn register_params(
681 &mut self,
682 base: &mut asyn_rs::port::PortDriverBase,
683 ) -> asyn_rs::error::AsynResult<()> {
684 self.ctrl.register_params(base)?;
685 register_hdf5_params(base)?;
686 self.hdf5_params.compression_type = base.find_param("HDF5_compressionType");
687 self.hdf5_params.z_compress_level = base.find_param("HDF5_zCompressLevel");
688 self.hdf5_params.szip_num_pixels = base.find_param("HDF5_szipNumPixels");
689 self.hdf5_params.nbit_precision = base.find_param("HDF5_nbitsPrecision");
690 self.hdf5_params.nbit_offset = base.find_param("HDF5_nbitsOffset");
691 self.hdf5_params.jpeg_quality = base.find_param("HDF5_jpegQuality");
692 self.hdf5_params.blosc_shuffle_type = base.find_param("HDF5_bloscShuffleType");
693 self.hdf5_params.blosc_compressor = base.find_param("HDF5_bloscCompressor");
694 self.hdf5_params.blosc_compress_level = base.find_param("HDF5_bloscCompressLevel");
695 self.hdf5_params.store_attributes = base.find_param("HDF5_storeAttributes");
696 self.hdf5_params.store_performance = base.find_param("HDF5_storePerformance");
697 self.hdf5_params.total_runtime = base.find_param("HDF5_totalRuntime");
698 self.hdf5_params.total_io_speed = base.find_param("HDF5_totalIoSpeed");
699 self.hdf5_params.swmr_mode = base.find_param("HDF5_SWMRMode");
700 self.hdf5_params.swmr_flush_now = base.find_param("HDF5_SWMRFlushNow");
701 self.hdf5_params.swmr_running = base.find_param("HDF5_SWMRRunning");
702 self.hdf5_params.swmr_cb_counter = base.find_param("HDF5_SWMRCbCounter");
703 self.hdf5_params.swmr_supported = base.find_param("HDF5_SWMRSupported");
704 self.hdf5_params.flush_nth_frame = base.find_param("HDF5_flushNthFrame");
705
706 if let Some(idx) = self.hdf5_params.swmr_supported {
708 base.set_int32_param(idx, 0, 1)?;
709 }
710 Ok(())
711 }
712
713 fn on_param_change(
714 &mut self,
715 reason: usize,
716 params: &PluginParamSnapshot,
717 ) -> ParamChangeResult {
718 if Some(reason) == self.hdf5_params.compression_type {
720 self.ctrl.writer.set_compression_type(params.value.as_i32());
721 return ParamChangeResult::updates(vec![]);
722 }
723 if Some(reason) == self.hdf5_params.z_compress_level {
724 self.ctrl
725 .writer
726 .set_z_compress_level(params.value.as_i32() as u32);
727 return ParamChangeResult::updates(vec![]);
728 }
729 if Some(reason) == self.hdf5_params.szip_num_pixels {
730 self.ctrl
731 .writer
732 .set_szip_num_pixels(params.value.as_i32() as u32);
733 return ParamChangeResult::updates(vec![]);
734 }
735 if Some(reason) == self.hdf5_params.blosc_shuffle_type {
736 self.ctrl
737 .writer
738 .set_blosc_shuffle_type(params.value.as_i32());
739 return ParamChangeResult::updates(vec![]);
740 }
741 if Some(reason) == self.hdf5_params.blosc_compressor {
742 self.ctrl.writer.set_blosc_compressor(params.value.as_i32());
743 return ParamChangeResult::updates(vec![]);
744 }
745 if Some(reason) == self.hdf5_params.blosc_compress_level {
746 self.ctrl
747 .writer
748 .set_blosc_compress_level(params.value.as_i32() as u32);
749 return ParamChangeResult::updates(vec![]);
750 }
751 if Some(reason) == self.hdf5_params.nbit_precision {
752 self.ctrl
753 .writer
754 .set_nbit_precision(params.value.as_i32() as u32);
755 return ParamChangeResult::updates(vec![]);
756 }
757 if Some(reason) == self.hdf5_params.nbit_offset {
758 self.ctrl
759 .writer
760 .set_nbit_offset(params.value.as_i32() as u32);
761 return ParamChangeResult::updates(vec![]);
762 }
763 if Some(reason) == self.hdf5_params.jpeg_quality {
764 self.ctrl
765 .writer
766 .set_jpeg_quality(params.value.as_i32() as u32);
767 return ParamChangeResult::updates(vec![]);
768 }
769 if Some(reason) == self.hdf5_params.store_attributes {
770 self.ctrl
771 .writer
772 .set_store_attributes(params.value.as_i32() != 0);
773 return ParamChangeResult::updates(vec![]);
774 }
775 if Some(reason) == self.hdf5_params.store_performance {
776 self.ctrl
777 .writer
778 .set_store_performance(params.value.as_i32() != 0);
779 return ParamChangeResult::updates(vec![]);
780 }
781 if Some(reason) == self.hdf5_params.swmr_mode {
783 self.ctrl.writer.set_swmr_mode(params.value.as_i32() != 0);
784 return ParamChangeResult::updates(vec![]);
785 }
786 if Some(reason) == self.hdf5_params.swmr_flush_now {
787 if params.value.as_i32() != 0 {
788 self.ctrl.writer.flush_swmr();
789 let mut updates = vec![];
790 if let Some(idx) = self.hdf5_params.swmr_cb_counter {
791 updates.push(ParamUpdate::int32(
792 idx,
793 self.ctrl.writer.swmr_cb_counter as i32,
794 ));
795 }
796 return ParamChangeResult::updates(updates);
797 }
798 return ParamChangeResult::updates(vec![]);
799 }
800 if Some(reason) == self.hdf5_params.flush_nth_frame {
801 self.ctrl
802 .writer
803 .set_flush_nth_frame(params.value.as_i32().max(0) as usize);
804 return ParamChangeResult::updates(vec![]);
805 }
806 self.ctrl.on_param_change(reason, params)
807 }
808}
809
810#[cfg(test)]
811mod tests {
812 use super::*;
813 use ad_core_rs::attributes::{NDAttrSource, NDAttrValue, NDAttribute};
814 use std::sync::atomic::{AtomicU32, Ordering};
815
816 static TEST_COUNTER: AtomicU32 = AtomicU32::new(0);
817
818 fn temp_path(prefix: &str) -> PathBuf {
819 let n = TEST_COUNTER.fetch_add(1, Ordering::Relaxed);
820 std::env::temp_dir().join(format!("adcore_test_{}_{}.h5", prefix, n))
821 }
822
823 #[test]
824 fn test_write_single_frame() {
825 let path = temp_path("hdf5_single");
826 let mut writer = Hdf5Writer::new();
827
828 let mut arr = NDArray::new(
829 vec![NDDimension::new(4), NDDimension::new(4)],
830 NDDataType::UInt8,
831 );
832 if let NDDataBuffer::U8(ref mut v) = arr.data {
833 for i in 0..16 {
834 v[i] = i as u8;
835 }
836 }
837
838 writer.open_file(&path, NDFileMode::Single, &arr).unwrap();
839 writer.write_file(&arr).unwrap();
840 writer.close_file().unwrap();
841
842 let mut reader = Hdf5Writer::new();
843 reader.current_path = Some(path.clone());
844 let read_arr = reader.read_file().unwrap();
845 assert_eq!(read_arr.dims.len(), 2);
846 assert_eq!(read_arr.dims[0].size, 4);
847
848 std::fs::remove_file(&path).ok();
849 }
850
851 #[test]
852 fn test_write_multiple_frames() {
853 let path = temp_path("hdf5_multi");
854 let mut writer = Hdf5Writer::new();
855
856 let arr = NDArray::new(
857 vec![NDDimension::new(4), NDDimension::new(4)],
858 NDDataType::UInt8,
859 );
860
861 writer.open_file(&path, NDFileMode::Stream, &arr).unwrap();
862 writer.write_file(&arr).unwrap();
863 writer.write_file(&arr).unwrap();
864 writer.write_file(&arr).unwrap();
865 writer.close_file().unwrap();
866
867 assert!(writer.supports_multiple_arrays());
868 assert_eq!(writer.frame_count(), 3);
869
870 let data = std::fs::read(&path).unwrap();
871 assert_eq!(&data[0..8], b"\x89HDF\r\n\x1a\n");
872
873 std::fs::remove_file(&path).ok();
874 }
875
876 #[test]
877 fn test_attributes_stored() {
878 let path = temp_path("hdf5_attrs");
879 let mut writer = Hdf5Writer::new();
880
881 let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
882 arr.attributes.add(NDAttribute {
883 name: "exposure".into(),
884 description: "".into(),
885 source: NDAttrSource::Driver,
886 value: NDAttrValue::Float64(0.5),
887 });
888
889 writer.open_file(&path, NDFileMode::Single, &arr).unwrap();
890 writer.write_file(&arr).unwrap();
891 writer.close_file().unwrap();
892
893 let h5file = H5File::open(&path).unwrap();
894 let ds = h5file.dataset("data").unwrap();
895 let attr = ds.attr("exposure").unwrap();
896 let val = attr.read_string().unwrap();
897 assert_eq!(val, "0.5");
898
899 std::fs::remove_file(&path).ok();
900 }
901
902 #[test]
903 fn test_roundtrip_u16() {
904 let path = temp_path("hdf5_u16");
905 let mut writer = Hdf5Writer::new();
906
907 let mut arr = NDArray::new(
908 vec![NDDimension::new(4), NDDimension::new(4)],
909 NDDataType::UInt16,
910 );
911 if let NDDataBuffer::U16(ref mut v) = arr.data {
912 for i in 0..16 {
913 v[i] = (i * 100) as u16;
914 }
915 }
916
917 writer.open_file(&path, NDFileMode::Single, &arr).unwrap();
918 writer.write_file(&arr).unwrap();
919 writer.close_file().unwrap();
920
921 let h5file = H5File::open(&path).unwrap();
922 let ds = h5file.dataset("data").unwrap();
923 let data: Vec<u16> = ds.read_raw().unwrap();
924 assert_eq!(data[0], 0);
925 assert_eq!(data[1], 100);
926 assert_eq!(data[15], 1500);
927
928 std::fs::remove_file(&path).ok();
929 }
930
931 #[test]
932 fn test_roundtrip_f64() {
933 let path = temp_path("hdf5_f64");
934 let mut writer = Hdf5Writer::new();
935
936 let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::Float64);
937 if let NDDataBuffer::F64(ref mut v) = arr.data {
938 v[0] = 1.5;
939 v[1] = 2.5;
940 v[2] = 3.5;
941 v[3] = 4.5;
942 }
943
944 writer.open_file(&path, NDFileMode::Single, &arr).unwrap();
945 writer.write_file(&arr).unwrap();
946 writer.close_file().unwrap();
947
948 let h5file = H5File::open(&path).unwrap();
949 let ds = h5file.dataset("data").unwrap();
950 let data: Vec<f64> = ds.read_raw().unwrap();
951 assert!((data[0] - 1.5).abs() < 1e-10);
952 assert!((data[3] - 4.5).abs() < 1e-10);
953
954 std::fs::remove_file(&path).ok();
955 }
956
957 #[test]
958 fn test_deflate_compressed_write() {
959 let path = temp_path("hdf5_deflate");
960 let mut writer = Hdf5Writer::new();
961 writer.set_compression_type(COMPRESS_ZLIB);
962 writer.set_z_compress_level(6);
963
964 let mut arr = NDArray::new(
965 vec![NDDimension::new(64), NDDimension::new(64)],
966 NDDataType::UInt16,
967 );
968 if let NDDataBuffer::U16(ref mut v) = arr.data {
969 for i in 0..v.len() {
970 v[i] = (i % 256) as u16;
971 }
972 }
973
974 writer.open_file(&path, NDFileMode::Single, &arr).unwrap();
975 writer.write_file(&arr).unwrap();
976 writer.close_file().unwrap();
977
978 let file_size = std::fs::metadata(&path).unwrap().len();
979 assert!(
980 file_size < 8192,
981 "compressed file should be smaller than raw data"
982 );
983
984 let h5file = H5File::open(&path).unwrap();
985 let ds = h5file.dataset("data").unwrap();
986 let data: Vec<u16> = ds.read_raw().unwrap();
987 assert_eq!(data.len(), 64 * 64);
988 assert_eq!(data[0], 0);
989 assert_eq!(data[255], 255);
990 assert_eq!(data[256], 0);
991
992 std::fs::remove_file(&path).ok();
993 }
994
995 #[test]
996 fn test_lz4_compressed_write() {
997 let path = temp_path("hdf5_lz4");
998 let mut writer = Hdf5Writer::new();
999 writer.set_compression_type(COMPRESS_LZ4);
1000
1001 let mut arr = NDArray::new(
1002 vec![NDDimension::new(32), NDDimension::new(32)],
1003 NDDataType::UInt8,
1004 );
1005 if let NDDataBuffer::U8(ref mut v) = arr.data {
1006 for i in 0..v.len() {
1007 v[i] = (i % 4) as u8;
1008 }
1009 }
1010
1011 writer.open_file(&path, NDFileMode::Single, &arr).unwrap();
1012 writer.write_file(&arr).unwrap();
1013 writer.close_file().unwrap();
1014
1015 let h5file = H5File::open(&path).unwrap();
1016 let ds = h5file.dataset("data").unwrap();
1017 let data: Vec<u8> = ds.read_raw().unwrap();
1018 assert_eq!(data.len(), 32 * 32);
1019 assert_eq!(data[0], 0);
1020 assert_eq!(data[3], 3);
1021
1022 std::fs::remove_file(&path).ok();
1023 }
1024
1025 #[test]
1026 fn test_swmr_streaming() {
1027 let path = temp_path("hdf5_swmr");
1028 let mut writer = Hdf5Writer::new();
1029 writer.set_swmr_mode(true);
1030 writer.set_flush_nth_frame(2);
1031
1032 let arr = NDArray::new(
1033 vec![NDDimension::new(8), NDDimension::new(8)],
1034 NDDataType::Float32,
1035 );
1036
1037 writer.open_file(&path, NDFileMode::Stream, &arr).unwrap();
1038 writer.write_file(&arr).unwrap();
1039 writer.write_file(&arr).unwrap(); writer.write_file(&arr).unwrap();
1041 writer.close_file().unwrap();
1042
1043 assert_eq!(writer.frame_count(), 3);
1044
1045 let mut reader = rust_hdf5::swmr::SwmrFileReader::open(&path).unwrap();
1047 let shape = reader.dataset_shape("data").unwrap();
1048 assert_eq!(shape[0], 3); assert_eq!(shape[1], 8);
1050 assert_eq!(shape[2], 8);
1051
1052 let data: Vec<f32> = reader.read_dataset("data").unwrap();
1053 assert_eq!(data.len(), 3 * 8 * 8);
1054
1055 std::fs::remove_file(&path).ok();
1056 }
1057}