1use std::path::{Path, PathBuf};
2
3use ad_core_rs::error::{ADError, ADResult};
4use ad_core_rs::ndarray::{NDArray, NDDataBuffer, NDDataType, NDDimension};
5use ad_core_rs::ndarray_pool::NDArrayPool;
6use ad_core_rs::plugin::file_base::{NDFileMode, NDFileWriter};
7use ad_core_rs::plugin::file_controller::FilePluginController;
8use ad_core_rs::plugin::runtime::{
9 NDPluginProcess, ParamChangeResult, ParamUpdate, PluginParamSnapshot, ProcessResult,
10};
11
12use rust_hdf5::H5File;
13use rust_hdf5::format::messages::filter::{
14 FILTER_BLOSC, FILTER_JPEG, FILTER_NBIT, FILTER_SZIP, Filter, FilterPipeline,
15};
16use rust_hdf5::swmr::SwmrFileWriter;
17
18const COMPRESS_NONE: i32 = 0;
20const COMPRESS_NBIT: i32 = 1;
21const COMPRESS_SZIP: i32 = 2;
22const COMPRESS_ZLIB: i32 = 3;
23const COMPRESS_BLOSC: i32 = 4;
24const COMPRESS_JPEG: i32 = 5;
25const COMPRESS_LZ4: i32 = 6;
26
27const BLOSC_LZ: i32 = 0;
29const BLOSC_LZ4: i32 = 1;
30const BLOSC_LZ4HC: i32 = 2;
31const BLOSC_SNAPPY: i32 = 3;
32const BLOSC_ZLIB: i32 = 4;
33const BLOSC_ZSTD: i32 = 5;
34
35enum Hdf5Handle {
37 Standard(H5File),
38 Swmr {
39 writer: SwmrFileWriter,
40 ds_index: usize,
41 },
42}
43
44pub struct Hdf5Writer {
46 current_path: Option<PathBuf>,
47 handle: Option<Hdf5Handle>,
48 frame_count: usize,
49 dataset_name: String,
50 compression_type: i32,
52 z_compress_level: u32,
53 szip_num_pixels: u32,
54 nbit_precision: u32,
55 nbit_offset: u32,
56 jpeg_quality: u32,
57 blosc_shuffle_type: i32,
58 blosc_compressor: i32,
59 blosc_compress_level: u32,
60 swmr_mode: bool,
62 flush_nth_frame: usize,
63 pub swmr_cb_counter: u32,
64 pub store_attributes: bool,
66 pub store_performance: bool,
67 pub total_runtime: f64,
68 pub total_bytes: u64,
69}
70
71impl Hdf5Writer {
72 pub fn new() -> Self {
73 Self {
74 current_path: None,
75 handle: None,
76 frame_count: 0,
77 dataset_name: "data".to_string(),
78 compression_type: 0,
79 z_compress_level: 6,
80 szip_num_pixels: 16,
81 nbit_precision: 0,
82 nbit_offset: 0,
83 jpeg_quality: 90,
84 blosc_shuffle_type: 0,
85 blosc_compressor: 0,
86 blosc_compress_level: 5,
87 swmr_mode: false,
88 flush_nth_frame: 0,
89 swmr_cb_counter: 0,
90 store_attributes: true,
91 store_performance: false,
92 total_runtime: 0.0,
93 total_bytes: 0,
94 }
95 }
96
97 pub fn set_dataset_name(&mut self, name: &str) {
98 self.dataset_name = name.to_string();
99 }
100
101 pub fn set_compression_type(&mut self, v: i32) {
102 self.compression_type = v;
103 }
104
105 pub fn set_z_compress_level(&mut self, v: u32) {
106 self.z_compress_level = v;
107 }
108
109 pub fn set_szip_num_pixels(&mut self, v: u32) {
110 self.szip_num_pixels = v;
111 }
112
113 pub fn set_blosc_shuffle_type(&mut self, v: i32) {
114 self.blosc_shuffle_type = v;
115 }
116
117 pub fn set_blosc_compressor(&mut self, v: i32) {
118 self.blosc_compressor = v;
119 }
120
121 pub fn set_blosc_compress_level(&mut self, v: u32) {
122 self.blosc_compress_level = v;
123 }
124
125 pub fn set_nbit_precision(&mut self, v: u32) {
126 self.nbit_precision = v;
127 }
128
129 pub fn set_nbit_offset(&mut self, v: u32) {
130 self.nbit_offset = v;
131 }
132
133 pub fn set_jpeg_quality(&mut self, v: u32) {
134 self.jpeg_quality = v;
135 }
136
137 pub fn set_store_attributes(&mut self, v: bool) {
138 self.store_attributes = v;
139 }
140
141 pub fn set_store_performance(&mut self, v: bool) {
142 self.store_performance = v;
143 }
144
145 pub fn set_swmr_mode(&mut self, v: bool) {
146 self.swmr_mode = v;
147 }
148
149 pub fn set_flush_nth_frame(&mut self, v: usize) {
150 self.flush_nth_frame = v;
151 }
152
153 pub fn frame_count(&self) -> usize {
154 self.frame_count
155 }
156
157 pub fn flush_swmr(&mut self) {
159 if let Some(Hdf5Handle::Swmr { ref mut writer, .. }) = self.handle {
160 if writer.flush().is_ok() {
161 self.swmr_cb_counter += 1;
162 }
163 }
164 }
165
166 pub fn is_swmr_active(&self) -> bool {
168 matches!(self.handle, Some(Hdf5Handle::Swmr { .. }))
169 }
170
171 fn build_pipeline(&self, element_size: usize) -> Option<FilterPipeline> {
173 match self.compression_type {
174 COMPRESS_NONE => None,
175 COMPRESS_ZLIB => Some(FilterPipeline::deflate(self.z_compress_level)),
176 COMPRESS_SZIP => Some(FilterPipeline {
177 filters: vec![Filter {
178 id: FILTER_SZIP,
179 flags: 0,
180 cd_values: vec![4, self.szip_num_pixels],
181 }],
182 }),
183 COMPRESS_LZ4 => Some(FilterPipeline::lz4()),
184 COMPRESS_BLOSC => {
185 let compressor_code = match self.blosc_compressor {
186 BLOSC_LZ => 0,
187 BLOSC_LZ4 => 1,
188 BLOSC_LZ4HC => 2,
189 BLOSC_SNAPPY => 3,
190 BLOSC_ZLIB => 4,
191 BLOSC_ZSTD => 5,
192 _ => 0,
193 };
194 Some(FilterPipeline {
195 filters: vec![Filter {
196 id: FILTER_BLOSC,
197 flags: 0,
198 cd_values: vec![
199 2,
200 2,
201 element_size as u32,
202 0,
203 self.blosc_shuffle_type as u32,
204 compressor_code,
205 self.blosc_compress_level,
206 ],
207 }],
208 })
209 }
210 COMPRESS_NBIT => {
211 if self.nbit_precision > 0 {
212 Some(FilterPipeline {
213 filters: vec![Filter {
214 id: FILTER_NBIT,
215 flags: 0,
216 cd_values: vec![self.nbit_precision, self.nbit_offset],
217 }],
218 })
219 } else {
220 None
221 }
222 }
223 COMPRESS_JPEG => Some(FilterPipeline {
224 filters: vec![Filter {
225 id: FILTER_JPEG,
226 flags: 0,
227 cd_values: vec![self.jpeg_quality],
228 }],
229 }),
230 _ => None,
231 }
232 }
233
234 fn open_swmr(&mut self, path: &Path, array: &NDArray) -> ADResult<()> {
236 let mut swmr = SwmrFileWriter::create(path)
237 .map_err(|e| ADError::UnsupportedConversion(format!("SWMR create error: {}", e)))?;
238
239 let frame_dims: Vec<u64> = array.dims.iter().rev().map(|d| d.size as u64).collect();
240
241 macro_rules! create_ds {
242 ($t:ty) => {
243 swmr.create_streaming_dataset::<$t>(&self.dataset_name, &frame_dims)
244 .map_err(|e| {
245 ADError::UnsupportedConversion(format!("SWMR create dataset error: {}", e))
246 })
247 };
248 }
249
250 let ds_index = match array.data.data_type() {
251 NDDataType::Int8 => create_ds!(i8)?,
252 NDDataType::UInt8 => create_ds!(u8)?,
253 NDDataType::Int16 => create_ds!(i16)?,
254 NDDataType::UInt16 => create_ds!(u16)?,
255 NDDataType::Int32 => create_ds!(i32)?,
256 NDDataType::UInt32 => create_ds!(u32)?,
257 NDDataType::Int64 => create_ds!(i64)?,
258 NDDataType::UInt64 => create_ds!(u64)?,
259 NDDataType::Float32 => create_ds!(f32)?,
260 NDDataType::Float64 => create_ds!(f64)?,
261 };
262
263 swmr.start_swmr()
264 .map_err(|e| ADError::UnsupportedConversion(format!("SWMR start error: {}", e)))?;
265
266 self.handle = Some(Hdf5Handle::Swmr {
267 writer: swmr,
268 ds_index,
269 });
270 Ok(())
271 }
272
273 fn write_standard(&mut self, array: &NDArray) -> ADResult<()> {
275 let h5file = match self.handle {
276 Some(Hdf5Handle::Standard(ref f)) => f,
277 _ => return Err(ADError::UnsupportedConversion("no HDF5 file open".into())),
278 };
279
280 let dataset_name = if self.frame_count == 0 {
281 self.dataset_name.clone()
282 } else {
283 format!("{}_{}", self.dataset_name, self.frame_count)
284 };
285
286 let shape = array.dims.iter().rev().map(|d| d.size).collect::<Vec<_>>();
287 let element_size = array.data.data_type().element_size();
288 let pipeline = self.build_pipeline(element_size);
289
290 macro_rules! write_typed {
291 ($t:ty, $v:expr) => {{
292 let ds = if let Some(ref pl) = pipeline {
293 h5file
294 .new_dataset::<$t>()
295 .shape(&shape[..])
296 .chunk(&shape[..])
297 .filter_pipeline(pl.clone())
298 .create(dataset_name.as_str())
299 .map_err(|e| {
300 ADError::UnsupportedConversion(format!("HDF5 dataset error: {}", e))
301 })?
302 } else {
303 h5file
304 .new_dataset::<$t>()
305 .shape(&shape[..])
306 .create(dataset_name.as_str())
307 .map_err(|e| {
308 ADError::UnsupportedConversion(format!("HDF5 dataset error: {}", e))
309 })?
310 };
311 if pipeline.is_some() {
312 ds.write_chunk(0, array.data.as_u8_slice()).map_err(|e| {
313 ADError::UnsupportedConversion(format!("HDF5 write error: {}", e))
314 })?;
315 } else {
316 ds.write_raw($v).map_err(|e| {
317 ADError::UnsupportedConversion(format!("HDF5 write error: {}", e))
318 })?;
319 }
320 if self.store_attributes {
321 for attr in array.attributes.iter() {
322 let val_str = attr.value.as_string();
323 let _ = ds
324 .new_attr::<rust_hdf5::types::VarLenUnicode>()
325 .shape(())
326 .create(attr.name.as_str())
327 .and_then(|a| {
328 let s: rust_hdf5::types::VarLenUnicode =
329 val_str.parse().unwrap_or_default();
330 a.write_scalar(&s)
331 });
332 }
333 }
334 }};
335 }
336
337 match &array.data {
338 NDDataBuffer::U8(v) => write_typed!(u8, v),
339 NDDataBuffer::U16(v) => write_typed!(u16, v),
340 NDDataBuffer::I32(v) => write_typed!(i32, v),
341 NDDataBuffer::F32(v) => write_typed!(f32, v),
342 NDDataBuffer::F64(v) => write_typed!(f64, v),
343 _ => {
344 let raw = array.data.as_u8_slice();
345 let ds = if let Some(ref pl) = pipeline {
346 h5file
347 .new_dataset::<u8>()
348 .shape([raw.len()])
349 .chunk(&[raw.len()])
350 .filter_pipeline(pl.clone())
351 .create(dataset_name.as_str())
352 .map_err(|e| {
353 ADError::UnsupportedConversion(format!("HDF5 dataset error: {}", e))
354 })?
355 } else {
356 h5file
357 .new_dataset::<u8>()
358 .shape([raw.len()])
359 .create(dataset_name.as_str())
360 .map_err(|e| {
361 ADError::UnsupportedConversion(format!("HDF5 dataset error: {}", e))
362 })?
363 };
364 if pipeline.is_some() {
365 ds.write_chunk(0, raw).map_err(|e| {
366 ADError::UnsupportedConversion(format!("HDF5 write error: {}", e))
367 })?;
368 } else {
369 ds.write_raw(raw).map_err(|e| {
370 ADError::UnsupportedConversion(format!("HDF5 write error: {}", e))
371 })?;
372 }
373 if self.store_attributes {
374 for attr in array.attributes.iter() {
375 let val_str = attr.value.as_string();
376 let _ = ds
377 .new_attr::<rust_hdf5::types::VarLenUnicode>()
378 .shape(())
379 .create(attr.name.as_str())
380 .and_then(|a| {
381 let s: rust_hdf5::types::VarLenUnicode =
382 val_str.parse().unwrap_or_default();
383 a.write_scalar(&s)
384 });
385 }
386 }
387 }
388 }
389 Ok(())
390 }
391
392 fn write_swmr(&mut self, array: &NDArray) -> ADResult<()> {
394 let (writer, ds_index) = match self.handle {
395 Some(Hdf5Handle::Swmr {
396 ref mut writer,
397 ds_index,
398 }) => (writer, ds_index),
399 _ => return Err(ADError::UnsupportedConversion("no SWMR writer open".into())),
400 };
401
402 writer
403 .append_frame(ds_index, array.data.as_u8_slice())
404 .map_err(|e| ADError::UnsupportedConversion(format!("SWMR append error: {}", e)))?;
405
406 let count = self.frame_count + 1; if self.flush_nth_frame > 0 && count % self.flush_nth_frame == 0 {
409 writer
410 .flush()
411 .map_err(|e| ADError::UnsupportedConversion(format!("SWMR flush error: {}", e)))?;
412 }
413
414 Ok(())
415 }
416}
417
418impl Default for Hdf5Writer {
419 fn default() -> Self {
420 Self::new()
421 }
422}
423
424impl NDFileWriter for Hdf5Writer {
425 fn open_file(&mut self, path: &Path, mode: NDFileMode, array: &NDArray) -> ADResult<()> {
426 self.current_path = Some(path.to_path_buf());
427 self.frame_count = 0;
428 self.total_runtime = 0.0;
429 self.total_bytes = 0;
430 self.swmr_cb_counter = 0;
431
432 if self.swmr_mode && mode == NDFileMode::Stream {
433 self.open_swmr(path, array)
434 } else {
435 let h5file = H5File::create(path)
436 .map_err(|e| ADError::UnsupportedConversion(format!("HDF5 create error: {}", e)))?;
437 self.handle = Some(Hdf5Handle::Standard(h5file));
438 Ok(())
439 }
440 }
441
442 fn write_file(&mut self, array: &NDArray) -> ADResult<()> {
443 let start = if self.store_performance {
444 Some(std::time::Instant::now())
445 } else {
446 None
447 };
448
449 let is_swmr = matches!(self.handle, Some(Hdf5Handle::Swmr { .. }));
450 if is_swmr {
451 self.write_swmr(array)?;
452 } else {
453 self.write_standard(array)?;
454 }
455 self.frame_count += 1;
456
457 if let Some(start) = start {
458 self.total_runtime += start.elapsed().as_secs_f64();
459 self.total_bytes += array.data.as_u8_slice().len() as u64;
460 }
461 Ok(())
462 }
463
464 fn read_file(&mut self) -> ADResult<NDArray> {
465 let path = self
466 .current_path
467 .as_ref()
468 .ok_or_else(|| ADError::UnsupportedConversion("no file open".into()))?;
469
470 let h5file = H5File::open(path)
471 .map_err(|e| ADError::UnsupportedConversion(format!("HDF5 open error: {}", e)))?;
472
473 let ds = h5file
474 .dataset(&self.dataset_name)
475 .map_err(|e| ADError::UnsupportedConversion(format!("HDF5 dataset error: {}", e)))?;
476
477 let shape = ds.shape();
478 let dims: Vec<NDDimension> = shape.iter().rev().map(|&s| NDDimension::new(s)).collect();
479
480 if let Ok(data) = ds.read_raw::<u8>() {
481 let mut arr = NDArray::new(dims, NDDataType::UInt8);
482 arr.data = NDDataBuffer::U8(data);
483 return Ok(arr);
484 }
485 if let Ok(data) = ds.read_raw::<u16>() {
486 let mut arr = NDArray::new(dims, NDDataType::UInt16);
487 arr.data = NDDataBuffer::U16(data);
488 return Ok(arr);
489 }
490 if let Ok(data) = ds.read_raw::<f64>() {
491 let mut arr = NDArray::new(dims, NDDataType::Float64);
492 arr.data = NDDataBuffer::F64(data);
493 return Ok(arr);
494 }
495
496 Err(ADError::UnsupportedConversion(
497 "unsupported HDF5 data type".into(),
498 ))
499 }
500
501 fn close_file(&mut self) -> ADResult<()> {
502 if let Some(Hdf5Handle::Swmr { writer, .. }) = self.handle.take() {
503 writer
504 .close()
505 .map_err(|e| ADError::UnsupportedConversion(format!("SWMR close error: {}", e)))?;
506 }
507 self.current_path = None;
508 Ok(())
509 }
510
511 fn supports_multiple_arrays(&self) -> bool {
512 true
513 }
514}
515
516#[derive(Default)]
522struct Hdf5ParamIndices {
523 compression_type: Option<usize>,
524 z_compress_level: Option<usize>,
525 szip_num_pixels: Option<usize>,
526 nbit_precision: Option<usize>,
527 nbit_offset: Option<usize>,
528 jpeg_quality: Option<usize>,
529 blosc_shuffle_type: Option<usize>,
530 blosc_compressor: Option<usize>,
531 blosc_compress_level: Option<usize>,
532 store_attributes: Option<usize>,
533 store_performance: Option<usize>,
534 total_runtime: Option<usize>,
535 total_io_speed: Option<usize>,
536 swmr_mode: Option<usize>,
537 swmr_flush_now: Option<usize>,
538 swmr_running: Option<usize>,
539 swmr_cb_counter: Option<usize>,
540 swmr_supported: Option<usize>,
541 flush_nth_frame: Option<usize>,
542}
543
544pub struct Hdf5FileProcessor {
546 ctrl: FilePluginController<Hdf5Writer>,
547 hdf5_params: Hdf5ParamIndices,
548}
549
550impl Hdf5FileProcessor {
551 pub fn new() -> Self {
552 Self {
553 ctrl: FilePluginController::new(Hdf5Writer::new()),
554 hdf5_params: Hdf5ParamIndices::default(),
555 }
556 }
557
558 pub fn set_dataset_name(&mut self, name: &str) {
559 self.ctrl.writer.set_dataset_name(name);
560 }
561}
562
563fn register_hdf5_params(
565 base: &mut asyn_rs::port::PortDriverBase,
566) -> asyn_rs::error::AsynResult<()> {
567 use asyn_rs::param::ParamType;
568 base.create_param("HDF5_SWMRFlushNow", ParamType::Int32)?;
569 base.create_param("HDF5_chunkSizeAuto", ParamType::Int32)?;
570 base.create_param("HDF5_nRowChunks", ParamType::Int32)?;
571 base.create_param("HDF5_nColChunks", ParamType::Int32)?;
572 base.create_param("HDF5_chunkSize2", ParamType::Int32)?;
573 base.create_param("HDF5_chunkSize3", ParamType::Int32)?;
574 base.create_param("HDF5_chunkSize4", ParamType::Int32)?;
575 base.create_param("HDF5_chunkSize5", ParamType::Int32)?;
576 base.create_param("HDF5_chunkSize6", ParamType::Int32)?;
577 base.create_param("HDF5_chunkSize7", ParamType::Int32)?;
578 base.create_param("HDF5_chunkSize8", ParamType::Int32)?;
579 base.create_param("HDF5_chunkSize9", ParamType::Int32)?;
580 base.create_param("HDF5_nFramesChunks", ParamType::Int32)?;
581 base.create_param("HDF5_NDAttributeChunk", ParamType::Int32)?;
582 base.create_param("HDF5_chunkBoundaryAlign", ParamType::Int32)?;
583 base.create_param("HDF5_chunkBoundaryThreshold", ParamType::Int32)?;
584 base.create_param("HDF5_nExtraDims", ParamType::Int32)?;
585 base.create_param("HDF5_extraDimSizeN", ParamType::Int32)?;
586 base.create_param("HDF5_extraDimNameN", ParamType::Octet)?;
587 base.create_param("HDF5_extraDimSizeX", ParamType::Int32)?;
588 base.create_param("HDF5_extraDimNameX", ParamType::Octet)?;
589 base.create_param("HDF5_extraDimSizeY", ParamType::Int32)?;
590 base.create_param("HDF5_extraDimNameY", ParamType::Octet)?;
591 base.create_param("HDF5_extraDimSize3", ParamType::Int32)?;
592 base.create_param("HDF5_extraDimName3", ParamType::Octet)?;
593 base.create_param("HDF5_extraDimSize4", ParamType::Int32)?;
594 base.create_param("HDF5_extraDimName4", ParamType::Octet)?;
595 base.create_param("HDF5_extraDimSize5", ParamType::Int32)?;
596 base.create_param("HDF5_extraDimName5", ParamType::Octet)?;
597 base.create_param("HDF5_extraDimSize6", ParamType::Int32)?;
598 base.create_param("HDF5_extraDimName6", ParamType::Octet)?;
599 base.create_param("HDF5_extraDimSize7", ParamType::Int32)?;
600 base.create_param("HDF5_extraDimName7", ParamType::Octet)?;
601 base.create_param("HDF5_extraDimSize8", ParamType::Int32)?;
602 base.create_param("HDF5_extraDimName8", ParamType::Octet)?;
603 base.create_param("HDF5_extraDimSize9", ParamType::Int32)?;
604 base.create_param("HDF5_extraDimName9", ParamType::Octet)?;
605 base.create_param("HDF5_storeAttributes", ParamType::Int32)?;
606 base.create_param("HDF5_storePerformance", ParamType::Int32)?;
607 base.create_param("HDF5_totalRuntime", ParamType::Float64)?;
608 base.create_param("HDF5_totalIoSpeed", ParamType::Float64)?;
609 base.create_param("HDF5_flushNthFrame", ParamType::Int32)?;
610 base.create_param("HDF5_compressionType", ParamType::Int32)?;
611 base.create_param("HDF5_nbitsPrecision", ParamType::Int32)?;
612 base.create_param("HDF5_nbitsOffset", ParamType::Int32)?;
613 base.create_param("HDF5_szipNumPixels", ParamType::Int32)?;
614 base.create_param("HDF5_zCompressLevel", ParamType::Int32)?;
615 base.create_param("HDF5_bloscShuffleType", ParamType::Int32)?;
616 base.create_param("HDF5_bloscCompressor", ParamType::Int32)?;
617 base.create_param("HDF5_bloscCompressLevel", ParamType::Int32)?;
618 base.create_param("HDF5_jpegQuality", ParamType::Int32)?;
619 base.create_param("HDF5_dimAttDatasets", ParamType::Int32)?;
620 base.create_param("HDF5_layoutErrorMsg", ParamType::Octet)?;
621 base.create_param("HDF5_layoutValid", ParamType::Int32)?;
622 base.create_param("HDF5_layoutFilename", ParamType::Octet)?;
623 base.create_param("HDF5_SWMRSupported", ParamType::Int32)?;
624 base.create_param("HDF5_SWMRMode", ParamType::Int32)?;
625 base.create_param("HDF5_SWMRRunning", ParamType::Int32)?;
626 base.create_param("HDF5_SWMRCbCounter", ParamType::Int32)?;
627 base.create_param("HDF5_posRunning", ParamType::Int32)?;
628 base.create_param("HDF5_posNameDimN", ParamType::Octet)?;
629 base.create_param("HDF5_posNameDimX", ParamType::Octet)?;
630 base.create_param("HDF5_posNameDimY", ParamType::Octet)?;
631 base.create_param("HDF5_posNameDim3", ParamType::Octet)?;
632 base.create_param("HDF5_posNameDim4", ParamType::Octet)?;
633 base.create_param("HDF5_posNameDim5", ParamType::Octet)?;
634 base.create_param("HDF5_posNameDim6", ParamType::Octet)?;
635 base.create_param("HDF5_posNameDim7", ParamType::Octet)?;
636 base.create_param("HDF5_posNameDim8", ParamType::Octet)?;
637 base.create_param("HDF5_posNameDim9", ParamType::Octet)?;
638 base.create_param("HDF5_posIndexDimN", ParamType::Octet)?;
639 base.create_param("HDF5_posIndexDimX", ParamType::Octet)?;
640 base.create_param("HDF5_posIndexDimY", ParamType::Octet)?;
641 base.create_param("HDF5_posIndexDim3", ParamType::Octet)?;
642 base.create_param("HDF5_posIndexDim4", ParamType::Octet)?;
643 base.create_param("HDF5_posIndexDim5", ParamType::Octet)?;
644 base.create_param("HDF5_posIndexDim6", ParamType::Octet)?;
645 base.create_param("HDF5_posIndexDim7", ParamType::Octet)?;
646 base.create_param("HDF5_posIndexDim8", ParamType::Octet)?;
647 base.create_param("HDF5_posIndexDim9", ParamType::Octet)?;
648 base.create_param("HDF5_fillValue", ParamType::Float64)?;
649 base.create_param("HDF5_extraDimChunkX", ParamType::Int32)?;
650 base.create_param("HDF5_extraDimChunkY", ParamType::Int32)?;
651 base.create_param("HDF5_extraDimChunk3", ParamType::Int32)?;
652 base.create_param("HDF5_extraDimChunk4", ParamType::Int32)?;
653 base.create_param("HDF5_extraDimChunk5", ParamType::Int32)?;
654 base.create_param("HDF5_extraDimChunk6", ParamType::Int32)?;
655 base.create_param("HDF5_extraDimChunk7", ParamType::Int32)?;
656 base.create_param("HDF5_extraDimChunk8", ParamType::Int32)?;
657 base.create_param("HDF5_extraDimChunk9", ParamType::Int32)?;
658 Ok(())
659}
660
661impl Default for Hdf5FileProcessor {
662 fn default() -> Self {
663 Self::new()
664 }
665}
666
667impl NDPluginProcess for Hdf5FileProcessor {
668 fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
669 let was_swmr = self.ctrl.writer.is_swmr_active();
670 let mut result = self.ctrl.process_array(array);
671 let is_swmr = self.ctrl.writer.is_swmr_active();
672
673 if was_swmr != is_swmr {
675 if let Some(idx) = self.hdf5_params.swmr_running {
676 result
677 .param_updates
678 .push(ParamUpdate::int32(idx, if is_swmr { 1 } else { 0 }));
679 }
680 }
681
682 if is_swmr {
684 if let Some(idx) = self.hdf5_params.swmr_cb_counter {
685 result.param_updates.push(ParamUpdate::int32(
686 idx,
687 self.ctrl.writer.swmr_cb_counter as i32,
688 ));
689 }
690 }
691
692 if self.ctrl.writer.store_performance {
694 if let Some(idx) = self.hdf5_params.total_runtime {
695 result
696 .param_updates
697 .push(ParamUpdate::float64(idx, self.ctrl.writer.total_runtime));
698 }
699 if let Some(idx) = self.hdf5_params.total_io_speed {
700 let speed = if self.ctrl.writer.total_runtime > 0.0 {
701 self.ctrl.writer.total_bytes as f64
702 / self.ctrl.writer.total_runtime
703 / 1_000_000.0
704 } else {
705 0.0
706 };
707 result.param_updates.push(ParamUpdate::float64(idx, speed));
708 }
709 }
710
711 result
712 }
713
714 fn plugin_type(&self) -> &str {
715 "NDFileHDF5"
716 }
717
718 fn register_params(
719 &mut self,
720 base: &mut asyn_rs::port::PortDriverBase,
721 ) -> asyn_rs::error::AsynResult<()> {
722 self.ctrl.register_params(base)?;
723 register_hdf5_params(base)?;
724 self.hdf5_params.compression_type = base.find_param("HDF5_compressionType");
725 self.hdf5_params.z_compress_level = base.find_param("HDF5_zCompressLevel");
726 self.hdf5_params.szip_num_pixels = base.find_param("HDF5_szipNumPixels");
727 self.hdf5_params.nbit_precision = base.find_param("HDF5_nbitsPrecision");
728 self.hdf5_params.nbit_offset = base.find_param("HDF5_nbitsOffset");
729 self.hdf5_params.jpeg_quality = base.find_param("HDF5_jpegQuality");
730 self.hdf5_params.blosc_shuffle_type = base.find_param("HDF5_bloscShuffleType");
731 self.hdf5_params.blosc_compressor = base.find_param("HDF5_bloscCompressor");
732 self.hdf5_params.blosc_compress_level = base.find_param("HDF5_bloscCompressLevel");
733 self.hdf5_params.store_attributes = base.find_param("HDF5_storeAttributes");
734 self.hdf5_params.store_performance = base.find_param("HDF5_storePerformance");
735 self.hdf5_params.total_runtime = base.find_param("HDF5_totalRuntime");
736 self.hdf5_params.total_io_speed = base.find_param("HDF5_totalIoSpeed");
737 self.hdf5_params.swmr_mode = base.find_param("HDF5_SWMRMode");
738 self.hdf5_params.swmr_flush_now = base.find_param("HDF5_SWMRFlushNow");
739 self.hdf5_params.swmr_running = base.find_param("HDF5_SWMRRunning");
740 self.hdf5_params.swmr_cb_counter = base.find_param("HDF5_SWMRCbCounter");
741 self.hdf5_params.swmr_supported = base.find_param("HDF5_SWMRSupported");
742 self.hdf5_params.flush_nth_frame = base.find_param("HDF5_flushNthFrame");
743
744 if let Some(idx) = self.hdf5_params.swmr_supported {
746 base.set_int32_param(idx, 0, 1)?;
747 }
748 Ok(())
749 }
750
751 fn on_param_change(
752 &mut self,
753 reason: usize,
754 params: &PluginParamSnapshot,
755 ) -> ParamChangeResult {
756 if Some(reason) == self.hdf5_params.compression_type {
758 self.ctrl.writer.set_compression_type(params.value.as_i32());
759 return ParamChangeResult::updates(vec![]);
760 }
761 if Some(reason) == self.hdf5_params.z_compress_level {
762 self.ctrl
763 .writer
764 .set_z_compress_level(params.value.as_i32() as u32);
765 return ParamChangeResult::updates(vec![]);
766 }
767 if Some(reason) == self.hdf5_params.szip_num_pixels {
768 self.ctrl
769 .writer
770 .set_szip_num_pixels(params.value.as_i32() as u32);
771 return ParamChangeResult::updates(vec![]);
772 }
773 if Some(reason) == self.hdf5_params.blosc_shuffle_type {
774 self.ctrl
775 .writer
776 .set_blosc_shuffle_type(params.value.as_i32());
777 return ParamChangeResult::updates(vec![]);
778 }
779 if Some(reason) == self.hdf5_params.blosc_compressor {
780 self.ctrl.writer.set_blosc_compressor(params.value.as_i32());
781 return ParamChangeResult::updates(vec![]);
782 }
783 if Some(reason) == self.hdf5_params.blosc_compress_level {
784 self.ctrl
785 .writer
786 .set_blosc_compress_level(params.value.as_i32() as u32);
787 return ParamChangeResult::updates(vec![]);
788 }
789 if Some(reason) == self.hdf5_params.nbit_precision {
790 self.ctrl
791 .writer
792 .set_nbit_precision(params.value.as_i32() as u32);
793 return ParamChangeResult::updates(vec![]);
794 }
795 if Some(reason) == self.hdf5_params.nbit_offset {
796 self.ctrl
797 .writer
798 .set_nbit_offset(params.value.as_i32() as u32);
799 return ParamChangeResult::updates(vec![]);
800 }
801 if Some(reason) == self.hdf5_params.jpeg_quality {
802 self.ctrl
803 .writer
804 .set_jpeg_quality(params.value.as_i32() as u32);
805 return ParamChangeResult::updates(vec![]);
806 }
807 if Some(reason) == self.hdf5_params.store_attributes {
808 self.ctrl
809 .writer
810 .set_store_attributes(params.value.as_i32() != 0);
811 return ParamChangeResult::updates(vec![]);
812 }
813 if Some(reason) == self.hdf5_params.store_performance {
814 self.ctrl
815 .writer
816 .set_store_performance(params.value.as_i32() != 0);
817 return ParamChangeResult::updates(vec![]);
818 }
819 if Some(reason) == self.hdf5_params.swmr_mode {
821 self.ctrl.writer.set_swmr_mode(params.value.as_i32() != 0);
822 return ParamChangeResult::updates(vec![]);
823 }
824 if Some(reason) == self.hdf5_params.swmr_flush_now {
825 if params.value.as_i32() != 0 {
826 self.ctrl.writer.flush_swmr();
827 let mut updates = vec![];
828 if let Some(idx) = self.hdf5_params.swmr_cb_counter {
829 updates.push(ParamUpdate::int32(
830 idx,
831 self.ctrl.writer.swmr_cb_counter as i32,
832 ));
833 }
834 return ParamChangeResult::updates(updates);
835 }
836 return ParamChangeResult::updates(vec![]);
837 }
838 if Some(reason) == self.hdf5_params.flush_nth_frame {
839 self.ctrl
840 .writer
841 .set_flush_nth_frame(params.value.as_i32().max(0) as usize);
842 return ParamChangeResult::updates(vec![]);
843 }
844 self.ctrl.on_param_change(reason, params)
845 }
846}
847
848#[cfg(test)]
849mod tests {
850 use super::*;
851 use ad_core_rs::attributes::{NDAttrSource, NDAttrValue, NDAttribute};
852 use std::sync::atomic::{AtomicU32, Ordering};
853
854 static TEST_COUNTER: AtomicU32 = AtomicU32::new(0);
855
856 fn temp_path(prefix: &str) -> PathBuf {
857 let n = TEST_COUNTER.fetch_add(1, Ordering::Relaxed);
858 std::env::temp_dir().join(format!("adcore_test_{}_{}.h5", prefix, n))
859 }
860
861 #[test]
862 fn test_write_single_frame() {
863 let path = temp_path("hdf5_single");
864 let mut writer = Hdf5Writer::new();
865
866 let mut arr = NDArray::new(
867 vec![NDDimension::new(4), NDDimension::new(4)],
868 NDDataType::UInt8,
869 );
870 if let NDDataBuffer::U8(ref mut v) = arr.data {
871 for i in 0..16 {
872 v[i] = i as u8;
873 }
874 }
875
876 writer.open_file(&path, NDFileMode::Single, &arr).unwrap();
877 writer.write_file(&arr).unwrap();
878 writer.close_file().unwrap();
879
880 let mut reader = Hdf5Writer::new();
881 reader.current_path = Some(path.clone());
882 let read_arr = reader.read_file().unwrap();
883 assert_eq!(read_arr.dims.len(), 2);
884 assert_eq!(read_arr.dims[0].size, 4);
885
886 std::fs::remove_file(&path).ok();
887 }
888
889 #[test]
890 fn test_write_multiple_frames() {
891 let path = temp_path("hdf5_multi");
892 let mut writer = Hdf5Writer::new();
893
894 let arr = NDArray::new(
895 vec![NDDimension::new(4), NDDimension::new(4)],
896 NDDataType::UInt8,
897 );
898
899 writer.open_file(&path, NDFileMode::Stream, &arr).unwrap();
900 writer.write_file(&arr).unwrap();
901 writer.write_file(&arr).unwrap();
902 writer.write_file(&arr).unwrap();
903 writer.close_file().unwrap();
904
905 assert!(writer.supports_multiple_arrays());
906 assert_eq!(writer.frame_count(), 3);
907
908 let data = std::fs::read(&path).unwrap();
909 assert_eq!(&data[0..8], b"\x89HDF\r\n\x1a\n");
910
911 std::fs::remove_file(&path).ok();
912 }
913
914 #[test]
915 fn test_attributes_stored() {
916 let path = temp_path("hdf5_attrs");
917 let mut writer = Hdf5Writer::new();
918
919 let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
920 arr.attributes.add(NDAttribute {
921 name: "exposure".into(),
922 description: "".into(),
923 source: NDAttrSource::Driver,
924 value: NDAttrValue::Float64(0.5),
925 });
926
927 writer.open_file(&path, NDFileMode::Single, &arr).unwrap();
928 writer.write_file(&arr).unwrap();
929 writer.close_file().unwrap();
930
931 let h5file = H5File::open(&path).unwrap();
932 let ds = h5file.dataset("data").unwrap();
933 let attr = ds.attr("exposure").unwrap();
934 let val = attr.read_string().unwrap();
935 assert_eq!(val, "0.5");
936
937 std::fs::remove_file(&path).ok();
938 }
939
940 #[test]
941 fn test_roundtrip_u16() {
942 let path = temp_path("hdf5_u16");
943 let mut writer = Hdf5Writer::new();
944
945 let mut arr = NDArray::new(
946 vec![NDDimension::new(4), NDDimension::new(4)],
947 NDDataType::UInt16,
948 );
949 if let NDDataBuffer::U16(ref mut v) = arr.data {
950 for i in 0..16 {
951 v[i] = (i * 100) as u16;
952 }
953 }
954
955 writer.open_file(&path, NDFileMode::Single, &arr).unwrap();
956 writer.write_file(&arr).unwrap();
957 writer.close_file().unwrap();
958
959 let h5file = H5File::open(&path).unwrap();
960 let ds = h5file.dataset("data").unwrap();
961 let data: Vec<u16> = ds.read_raw().unwrap();
962 assert_eq!(data[0], 0);
963 assert_eq!(data[1], 100);
964 assert_eq!(data[15], 1500);
965
966 std::fs::remove_file(&path).ok();
967 }
968
969 #[test]
970 fn test_roundtrip_f64() {
971 let path = temp_path("hdf5_f64");
972 let mut writer = Hdf5Writer::new();
973
974 let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::Float64);
975 if let NDDataBuffer::F64(ref mut v) = arr.data {
976 v[0] = 1.5;
977 v[1] = 2.5;
978 v[2] = 3.5;
979 v[3] = 4.5;
980 }
981
982 writer.open_file(&path, NDFileMode::Single, &arr).unwrap();
983 writer.write_file(&arr).unwrap();
984 writer.close_file().unwrap();
985
986 let h5file = H5File::open(&path).unwrap();
987 let ds = h5file.dataset("data").unwrap();
988 let data: Vec<f64> = ds.read_raw().unwrap();
989 assert!((data[0] - 1.5).abs() < 1e-10);
990 assert!((data[3] - 4.5).abs() < 1e-10);
991
992 std::fs::remove_file(&path).ok();
993 }
994
995 #[test]
996 fn test_deflate_compressed_write() {
997 let path = temp_path("hdf5_deflate");
998 let mut writer = Hdf5Writer::new();
999 writer.set_compression_type(COMPRESS_ZLIB);
1000 writer.set_z_compress_level(6);
1001
1002 let mut arr = NDArray::new(
1003 vec![NDDimension::new(64), NDDimension::new(64)],
1004 NDDataType::UInt16,
1005 );
1006 if let NDDataBuffer::U16(ref mut v) = arr.data {
1007 for i in 0..v.len() {
1008 v[i] = (i % 256) as u16;
1009 }
1010 }
1011
1012 writer.open_file(&path, NDFileMode::Single, &arr).unwrap();
1013 writer.write_file(&arr).unwrap();
1014 writer.close_file().unwrap();
1015
1016 let file_size = std::fs::metadata(&path).unwrap().len();
1017 assert!(
1018 file_size < 8192,
1019 "compressed file should be smaller than raw data"
1020 );
1021
1022 let h5file = H5File::open(&path).unwrap();
1023 let ds = h5file.dataset("data").unwrap();
1024 let data: Vec<u16> = ds.read_raw().unwrap();
1025 assert_eq!(data.len(), 64 * 64);
1026 assert_eq!(data[0], 0);
1027 assert_eq!(data[255], 255);
1028 assert_eq!(data[256], 0);
1029
1030 std::fs::remove_file(&path).ok();
1031 }
1032
1033 #[test]
1034 fn test_lz4_compressed_write() {
1035 let path = temp_path("hdf5_lz4");
1036 let mut writer = Hdf5Writer::new();
1037 writer.set_compression_type(COMPRESS_LZ4);
1038
1039 let mut arr = NDArray::new(
1040 vec![NDDimension::new(32), NDDimension::new(32)],
1041 NDDataType::UInt8,
1042 );
1043 if let NDDataBuffer::U8(ref mut v) = arr.data {
1044 for i in 0..v.len() {
1045 v[i] = (i % 4) as u8;
1046 }
1047 }
1048
1049 writer.open_file(&path, NDFileMode::Single, &arr).unwrap();
1050 writer.write_file(&arr).unwrap();
1051 writer.close_file().unwrap();
1052
1053 let h5file = H5File::open(&path).unwrap();
1054 let ds = h5file.dataset("data").unwrap();
1055 let data: Vec<u8> = ds.read_raw().unwrap();
1056 assert_eq!(data.len(), 32 * 32);
1057 assert_eq!(data[0], 0);
1058 assert_eq!(data[3], 3);
1059
1060 std::fs::remove_file(&path).ok();
1061 }
1062
1063 #[test]
1064 fn test_swmr_streaming() {
1065 let path = temp_path("hdf5_swmr");
1066 let mut writer = Hdf5Writer::new();
1067 writer.set_swmr_mode(true);
1068 writer.set_flush_nth_frame(2);
1069
1070 let arr = NDArray::new(
1071 vec![NDDimension::new(8), NDDimension::new(8)],
1072 NDDataType::Float32,
1073 );
1074
1075 writer.open_file(&path, NDFileMode::Stream, &arr).unwrap();
1076 writer.write_file(&arr).unwrap();
1077 writer.write_file(&arr).unwrap(); writer.write_file(&arr).unwrap();
1079 writer.close_file().unwrap();
1080
1081 assert_eq!(writer.frame_count(), 3);
1082
1083 let mut reader = rust_hdf5::swmr::SwmrFileReader::open(&path).unwrap();
1085 let shape = reader.dataset_shape("data").unwrap();
1086 assert_eq!(shape[0], 3); assert_eq!(shape[1], 8);
1088 assert_eq!(shape[2], 8);
1089
1090 let data: Vec<f32> = reader.read_dataset("data").unwrap();
1091 assert_eq!(data.len(), 3 * 8 * 8);
1092
1093 std::fs::remove_file(&path).ok();
1094 }
1095}