Skip to main content

rust_hdf5/
swmr.rs

1//! Single Writer / Multiple Reader (SWMR) API.
2//!
3//! Provides a high-level wrapper around the SWMR protocol for streaming
4//! frame-based data (e.g., area detector images).
5
6use std::path::Path;
7
8use crate::io::locking::FileLocking;
9use crate::io::Hdf5Reader;
10use crate::io::SwmrWriter as IoSwmrWriter;
11
12use crate::error::Result;
13use crate::types::H5Type;
14
15/// SWMR writer for streaming frame-based data to an HDF5 file.
16///
17/// Usage:
18/// ```no_run
19/// use rust_hdf5::swmr::SwmrFileWriter;
20///
21/// let mut writer = SwmrFileWriter::create("stream.h5").unwrap();
22/// let ds = writer.create_streaming_dataset::<f32>("frames", &[256, 256]).unwrap();
23/// writer.start_swmr().unwrap();
24///
25/// // Write frames
26/// let frame_data = vec![0.0f32; 256 * 256];
27/// let raw: Vec<u8> = frame_data.iter()
28///     .flat_map(|v| v.to_le_bytes())
29///     .collect();
30/// writer.append_frame(ds, &raw).unwrap();
31/// writer.flush().unwrap();
32///
33/// writer.close().unwrap();
34/// ```
35pub struct SwmrFileWriter {
36    inner: IoSwmrWriter,
37}
38
39impl SwmrFileWriter {
40    /// Create a new HDF5 file for SWMR streaming using the env-var-derived
41    /// locking policy.
42    pub fn create<P: AsRef<Path>>(path: P) -> Result<Self> {
43        let inner = IoSwmrWriter::create(path.as_ref())?;
44        Ok(Self { inner })
45    }
46
47    /// Create a new HDF5 file for SWMR streaming with an explicit locking
48    /// policy. The writer holds an exclusive lock until [`Self::start_swmr`]
49    /// is called, at which point the lock is downgraded to shared so
50    /// concurrent SWMR readers can attach.
51    pub fn create_with_locking<P: AsRef<Path>>(path: P, locking: FileLocking) -> Result<Self> {
52        let inner = IoSwmrWriter::create_with_locking(path.as_ref(), locking)?;
53        Ok(Self { inner })
54    }
55
56    /// Create a streaming dataset.
57    ///
58    /// The dataset will have shape `[0, frame_dims...]` initially, with
59    /// chunk dimensions `[1, frame_dims...]` and unlimited first dimension.
60    ///
61    /// Returns the dataset index for use with `append_frame`.
62    pub fn create_streaming_dataset<T: H5Type>(
63        &mut self,
64        name: &str,
65        frame_dims: &[u64],
66    ) -> Result<usize> {
67        let datatype = T::hdf5_type();
68        let idx = self
69            .inner
70            .create_streaming_dataset(name, datatype, frame_dims)?;
71        Ok(idx)
72    }
73
74    /// Create a streaming dataset whose frames are compressed.
75    ///
76    /// Like [`create_streaming_dataset`](Self::create_streaming_dataset) but
77    /// each appended frame is run through `pipeline` (e.g.
78    /// `FilterPipeline::deflate(4)`). SWMR appends and in-place header
79    /// updates work the same as for uncompressed streaming datasets.
80    pub fn create_streaming_dataset_compressed<T: H5Type>(
81        &mut self,
82        name: &str,
83        frame_dims: &[u64],
84        pipeline: crate::format::messages::filter::FilterPipeline,
85    ) -> Result<usize> {
86        let idx = self.inner.create_streaming_dataset_compressed(
87            name,
88            T::hdf5_type(),
89            frame_dims,
90            pipeline,
91        )?;
92        Ok(idx)
93    }
94
95    /// Create a streaming dataset whose frames are split into fixed-size
96    /// chunk tiles.
97    ///
98    /// `frame_dims` is the per-frame shape (e.g. `[1024, 1024]`);
99    /// `frame_chunk` is the tile shape within a frame (e.g. `[256, 256]`),
100    /// of the same rank. The on-disk chunk shape becomes
101    /// `[1, frame_chunk...]`, so each frame is stored as
102    /// `product(frame_dims / frame_chunk)` chunks instead of one. This
103    /// mirrors area-detector tiling controls such as NDFileHDF5's
104    /// `nRowChunks` / `nColChunks`: it changes only the partial-read
105    /// granularity and compression unit, not the stored data.
106    /// [`append_frame`](Self::append_frame) accepts a whole frame and
107    /// splits it into tiles automatically.
108    pub fn create_streaming_dataset_tiled<T: H5Type>(
109        &mut self,
110        name: &str,
111        frame_dims: &[u64],
112        frame_chunk: &[u64],
113    ) -> Result<usize> {
114        let idx = self.inner.create_streaming_dataset_tiled(
115            name,
116            T::hdf5_type(),
117            frame_dims,
118            frame_chunk,
119        )?;
120        Ok(idx)
121    }
122
123    /// Create a compressed streaming dataset whose frames are split into
124    /// fixed-size chunk tiles. See
125    /// [`create_streaming_dataset_tiled`](Self::create_streaming_dataset_tiled)
126    /// for the meaning of `frame_chunk`; each tile is the compression unit.
127    pub fn create_streaming_dataset_tiled_compressed<T: H5Type>(
128        &mut self,
129        name: &str,
130        frame_dims: &[u64],
131        frame_chunk: &[u64],
132        pipeline: crate::format::messages::filter::FilterPipeline,
133    ) -> Result<usize> {
134        let idx = self.inner.create_streaming_dataset_tiled_compressed(
135            name,
136            T::hdf5_type(),
137            frame_dims,
138            frame_chunk,
139            pipeline,
140        )?;
141        Ok(idx)
142    }
143
144    /// Create a streaming dataset with full control over the chunk shape,
145    /// including the frame axis.
146    ///
147    /// `chunk` is the complete per-chunk shape, of rank
148    /// `frame_dims.len() + 1`: `chunk[0]` frames per chunk (the NDFileHDF5
149    /// `nFramesChunks` control) and `chunk[1..]` the per-frame tile shape
150    /// (`nRowChunks` / `nColChunks`). When `chunk[0] > 1`,
151    /// [`append_frame`](Self::append_frame) buffers whole frames until a
152    /// chunk band fills; the final partial band is written (zero-padded) at
153    /// [`close`](Self::close), and the dataset's logical frame count always
154    /// equals the exact number of frames appended.
155    pub fn create_streaming_dataset_chunked<T: H5Type>(
156        &mut self,
157        name: &str,
158        frame_dims: &[u64],
159        chunk: &[u64],
160    ) -> Result<usize> {
161        let idx =
162            self.inner
163                .create_streaming_dataset_chunked(name, T::hdf5_type(), frame_dims, chunk)?;
164        Ok(idx)
165    }
166
167    /// Compressed variant of
168    /// [`create_streaming_dataset_chunked`](Self::create_streaming_dataset_chunked);
169    /// each chunk is filtered independently through `pipeline`.
170    pub fn create_streaming_dataset_chunked_compressed<T: H5Type>(
171        &mut self,
172        name: &str,
173        frame_dims: &[u64],
174        chunk: &[u64],
175        pipeline: crate::format::messages::filter::FilterPipeline,
176    ) -> Result<usize> {
177        let idx = self.inner.create_streaming_dataset_chunked_compressed(
178            name,
179            T::hdf5_type(),
180            frame_dims,
181            chunk,
182            pipeline,
183        )?;
184        Ok(idx)
185    }
186
187    /// Create a hard link: an additional name for a dataset or group that
188    /// already exists in the file.
189    ///
190    /// No data is copied — the link and its target share one object header,
191    /// exactly as `h5py` / libhdf5 hard links do. This is the NeXus-style way
192    /// to expose a streaming dataset at an aliased path.
193    ///
194    /// * `parent_group_path` — full path of the group that will hold the
195    ///   link (`"/"` for the root group).
196    /// * `link_name` — leaf name of the new link within that group.
197    /// * `target_path` — full path of an existing dataset or group.
198    ///
199    /// # Visibility relative to SWMR mode
200    ///
201    /// A link created **before** [`start_swmr`](Self::start_swmr) is committed
202    /// by `start_swmr` and is visible to SWMR readers for the whole streaming
203    /// window. A link created **after** `start_swmr` is committed only by
204    /// [`close`](Self::close); it does not appear to readers that attach
205    /// during the live SWMR window. Create layout links before `start_swmr`
206    /// when readers must resolve them while streaming.
207    pub fn create_hard_link(
208        &mut self,
209        parent_group_path: &str,
210        link_name: &str,
211        target_path: &str,
212    ) -> Result<()> {
213        self.inner
214            .writer_mut()
215            .create_hard_link(parent_group_path, link_name, target_path)?;
216        Ok(())
217    }
218
219    /// Signal the start of SWMR mode.
220    pub fn start_swmr(&mut self) -> Result<()> {
221        self.inner.start_swmr()?;
222        Ok(())
223    }
224
225    /// Append a frame of raw data to a streaming dataset.
226    ///
227    /// The data size must match one frame (product of frame_dims * element_size).
228    pub fn append_frame(&mut self, ds_index: usize, data: &[u8]) -> Result<()> {
229        self.inner.append_frame(ds_index, data)?;
230        Ok(())
231    }
232
233    /// Flush all dataset index structures to disk with SWMR ordering.
234    pub fn flush(&mut self) -> Result<()> {
235        self.inner.flush()?;
236        Ok(())
237    }
238
239    /// Close and finalize the file.
240    pub fn close(self) -> Result<()> {
241        self.inner.close()?;
242        Ok(())
243    }
244}
245
246/// SWMR reader for monitoring a streaming HDF5 file.
247///
248/// Opens a file being written by a concurrent [`SwmrFileWriter`] and
249/// periodically calls [`refresh`](Self::refresh) to pick up new data.
250///
251/// ```no_run
252/// use rust_hdf5::swmr::SwmrFileReader;
253///
254/// let mut reader = SwmrFileReader::open("stream.h5").unwrap();
255///
256/// loop {
257///     reader.refresh().unwrap();
258///     let names = reader.dataset_names();
259///     if let Some(shape) = reader.dataset_shape("frames").ok() {
260///         println!("frames shape: {:?}", shape);
261///         if shape[0] > 0 {
262///             let data = reader.read_dataset_raw("frames").unwrap();
263///             println!("got {} bytes", data.len());
264///             break;
265///         }
266///     }
267///     std::thread::sleep(std::time::Duration::from_millis(100));
268/// }
269/// ```
270pub struct SwmrFileReader {
271    reader: Hdf5Reader,
272}
273
274impl SwmrFileReader {
275    /// Open an HDF5 file for SWMR reading using the env-var-derived
276    /// locking policy. Takes a shared lock so it coexists with the
277    /// downgraded shared lock held by [`SwmrFileWriter`] after
278    /// `start_swmr`, and with other concurrent SWMR readers.
279    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
280        let reader = Hdf5Reader::open_swmr(path.as_ref())?;
281        Ok(Self { reader })
282    }
283
284    /// Open an HDF5 file for SWMR reading with an explicit locking policy.
285    pub fn open_with_locking<P: AsRef<Path>>(path: P, locking: FileLocking) -> Result<Self> {
286        let reader = Hdf5Reader::open_swmr_with_locking(path.as_ref(), locking)?;
287        Ok(Self { reader })
288    }
289
290    /// Re-read the superblock and dataset metadata from disk.
291    ///
292    /// Call this periodically to pick up new data written by the concurrent
293    /// SWMR writer.
294    pub fn refresh(&mut self) -> Result<()> {
295        self.reader.refresh()?;
296        Ok(())
297    }
298
299    /// Return the names of all datasets.
300    pub fn dataset_names(&self) -> Vec<String> {
301        self.reader
302            .dataset_names()
303            .iter()
304            .map(|s| s.to_string())
305            .collect()
306    }
307
308    /// Return the current shape of a dataset.
309    pub fn dataset_shape(&self, name: &str) -> Result<Vec<u64>> {
310        Ok(self.reader.dataset_shape(name)?)
311    }
312
313    /// Read the raw bytes of a dataset.
314    pub fn read_dataset_raw(&mut self, name: &str) -> Result<Vec<u8>> {
315        Ok(self.reader.read_dataset_raw(name)?)
316    }
317
318    /// Read a dataset as a typed vector.
319    pub fn read_dataset<T: H5Type>(&mut self, name: &str) -> Result<Vec<T>> {
320        let raw = self.reader.read_dataset_raw(name)?;
321        if raw.len() % T::element_size() != 0 {
322            return Err(crate::error::Hdf5Error::TypeMismatch(format!(
323                "raw data size {} is not a multiple of element size {}",
324                raw.len(),
325                T::element_size(),
326            )));
327        }
328        let count = raw.len() / T::element_size();
329        let mut result = Vec::<T>::with_capacity(count);
330        unsafe {
331            std::ptr::copy_nonoverlapping(raw.as_ptr(), result.as_mut_ptr() as *mut u8, raw.len());
332            result.set_len(count);
333        }
334        Ok(result)
335    }
336}