Skip to main content

rust_hdf5/
swmr.rs

1//! Single Writer / Multiple Reader (SWMR) API.
2//!
3//! Provides a high-level wrapper around the SWMR protocol for streaming
4//! frame-based data (e.g., area detector images).
5
6use std::path::Path;
7
8use crate::io::locking::FileLocking;
9use crate::io::Hdf5Reader;
10use crate::io::SwmrWriter as IoSwmrWriter;
11
12use crate::error::Result;
13use crate::types::H5Type;
14
15/// SWMR writer for streaming frame-based data to an HDF5 file.
16///
17/// Usage:
18/// ```no_run
19/// use rust_hdf5::swmr::SwmrFileWriter;
20///
21/// let mut writer = SwmrFileWriter::create("stream.h5").unwrap();
22/// let ds = writer.create_streaming_dataset::<f32>("frames", &[256, 256]).unwrap();
23/// writer.start_swmr().unwrap();
24///
25/// // Write frames
26/// let frame_data = vec![0.0f32; 256 * 256];
27/// let raw: Vec<u8> = frame_data.iter()
28///     .flat_map(|v| v.to_le_bytes())
29///     .collect();
30/// writer.append_frame(ds, &raw).unwrap();
31/// writer.flush().unwrap();
32///
33/// writer.close().unwrap();
34/// ```
35pub struct SwmrFileWriter {
36    inner: IoSwmrWriter,
37}
38
39impl SwmrFileWriter {
40    /// Create a new HDF5 file for SWMR streaming using the env-var-derived
41    /// locking policy.
42    pub fn create<P: AsRef<Path>>(path: P) -> Result<Self> {
43        let inner = IoSwmrWriter::create(path.as_ref())?;
44        Ok(Self { inner })
45    }
46
47    /// Create a new HDF5 file for SWMR streaming with an explicit locking
48    /// policy. The writer holds an exclusive lock until [`Self::start_swmr`]
49    /// is called, at which point the lock is downgraded to shared so
50    /// concurrent SWMR readers can attach.
51    pub fn create_with_locking<P: AsRef<Path>>(path: P, locking: FileLocking) -> Result<Self> {
52        let inner = IoSwmrWriter::create_with_locking(path.as_ref(), locking)?;
53        Ok(Self { inner })
54    }
55
56    /// Create a streaming dataset.
57    ///
58    /// The dataset will have shape `[0, frame_dims...]` initially, with
59    /// chunk dimensions `[1, frame_dims...]` and unlimited first dimension.
60    ///
61    /// Returns the dataset index for use with `append_frame`.
62    pub fn create_streaming_dataset<T: H5Type>(
63        &mut self,
64        name: &str,
65        frame_dims: &[u64],
66    ) -> Result<usize> {
67        let datatype = T::hdf5_type();
68        let idx = self
69            .inner
70            .create_streaming_dataset(name, datatype, frame_dims)?;
71        Ok(idx)
72    }
73
74    /// Create a streaming dataset whose frames are compressed.
75    ///
76    /// Like [`create_streaming_dataset`](Self::create_streaming_dataset) but
77    /// each appended frame is run through `pipeline` (e.g.
78    /// `FilterPipeline::deflate(4)`). SWMR appends and in-place header
79    /// updates work the same as for uncompressed streaming datasets.
80    pub fn create_streaming_dataset_compressed<T: H5Type>(
81        &mut self,
82        name: &str,
83        frame_dims: &[u64],
84        pipeline: crate::format::messages::filter::FilterPipeline,
85    ) -> Result<usize> {
86        let idx = self.inner.create_streaming_dataset_compressed(
87            name,
88            T::hdf5_type(),
89            frame_dims,
90            pipeline,
91        )?;
92        Ok(idx)
93    }
94
95    /// Signal the start of SWMR mode.
96    pub fn start_swmr(&mut self) -> Result<()> {
97        self.inner.start_swmr()?;
98        Ok(())
99    }
100
101    /// Append a frame of raw data to a streaming dataset.
102    ///
103    /// The data size must match one frame (product of frame_dims * element_size).
104    pub fn append_frame(&mut self, ds_index: usize, data: &[u8]) -> Result<()> {
105        self.inner.append_frame(ds_index, data)?;
106        Ok(())
107    }
108
109    /// Flush all dataset index structures to disk with SWMR ordering.
110    pub fn flush(&mut self) -> Result<()> {
111        self.inner.flush()?;
112        Ok(())
113    }
114
115    /// Close and finalize the file.
116    pub fn close(self) -> Result<()> {
117        self.inner.close()?;
118        Ok(())
119    }
120}
121
122/// SWMR reader for monitoring a streaming HDF5 file.
123///
124/// Opens a file being written by a concurrent [`SwmrFileWriter`] and
125/// periodically calls [`refresh`](Self::refresh) to pick up new data.
126///
127/// ```no_run
128/// use rust_hdf5::swmr::SwmrFileReader;
129///
130/// let mut reader = SwmrFileReader::open("stream.h5").unwrap();
131///
132/// loop {
133///     reader.refresh().unwrap();
134///     let names = reader.dataset_names();
135///     if let Some(shape) = reader.dataset_shape("frames").ok() {
136///         println!("frames shape: {:?}", shape);
137///         if shape[0] > 0 {
138///             let data = reader.read_dataset_raw("frames").unwrap();
139///             println!("got {} bytes", data.len());
140///             break;
141///         }
142///     }
143///     std::thread::sleep(std::time::Duration::from_millis(100));
144/// }
145/// ```
146pub struct SwmrFileReader {
147    reader: Hdf5Reader,
148}
149
150impl SwmrFileReader {
151    /// Open an HDF5 file for SWMR reading using the env-var-derived
152    /// locking policy. Takes a shared lock so it coexists with the
153    /// downgraded shared lock held by [`SwmrFileWriter`] after
154    /// `start_swmr`, and with other concurrent SWMR readers.
155    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
156        let reader = Hdf5Reader::open_swmr(path.as_ref())?;
157        Ok(Self { reader })
158    }
159
160    /// Open an HDF5 file for SWMR reading with an explicit locking policy.
161    pub fn open_with_locking<P: AsRef<Path>>(path: P, locking: FileLocking) -> Result<Self> {
162        let reader = Hdf5Reader::open_swmr_with_locking(path.as_ref(), locking)?;
163        Ok(Self { reader })
164    }
165
166    /// Re-read the superblock and dataset metadata from disk.
167    ///
168    /// Call this periodically to pick up new data written by the concurrent
169    /// SWMR writer.
170    pub fn refresh(&mut self) -> Result<()> {
171        self.reader.refresh()?;
172        Ok(())
173    }
174
175    /// Return the names of all datasets.
176    pub fn dataset_names(&self) -> Vec<String> {
177        self.reader
178            .dataset_names()
179            .iter()
180            .map(|s| s.to_string())
181            .collect()
182    }
183
184    /// Return the current shape of a dataset.
185    pub fn dataset_shape(&self, name: &str) -> Result<Vec<u64>> {
186        Ok(self.reader.dataset_shape(name)?)
187    }
188
189    /// Read the raw bytes of a dataset.
190    pub fn read_dataset_raw(&mut self, name: &str) -> Result<Vec<u8>> {
191        Ok(self.reader.read_dataset_raw(name)?)
192    }
193
194    /// Read a dataset as a typed vector.
195    pub fn read_dataset<T: H5Type>(&mut self, name: &str) -> Result<Vec<T>> {
196        let raw = self.reader.read_dataset_raw(name)?;
197        if raw.len() % T::element_size() != 0 {
198            return Err(crate::error::Hdf5Error::TypeMismatch(format!(
199                "raw data size {} is not a multiple of element size {}",
200                raw.len(),
201                T::element_size(),
202            )));
203        }
204        let count = raw.len() / T::element_size();
205        let mut result = Vec::<T>::with_capacity(count);
206        unsafe {
207            std::ptr::copy_nonoverlapping(raw.as_ptr(), result.as_mut_ptr() as *mut u8, raw.len());
208            result.set_len(count);
209        }
210        Ok(result)
211    }
212}