rust_hdf5/swmr.rs
1//! Single Writer / Multiple Reader (SWMR) API.
2//!
3//! Provides a high-level wrapper around the SWMR protocol for streaming
4//! frame-based data (e.g., area detector images).
5
6use std::path::Path;
7
8use crate::io::locking::FileLocking;
9use crate::io::Hdf5Reader;
10use crate::io::SwmrWriter as IoSwmrWriter;
11
12use crate::error::Result;
13use crate::types::H5Type;
14
15/// SWMR writer for streaming frame-based data to an HDF5 file.
16///
17/// Usage:
18/// ```no_run
19/// use rust_hdf5::swmr::SwmrFileWriter;
20///
21/// let mut writer = SwmrFileWriter::create("stream.h5").unwrap();
22/// let ds = writer.create_streaming_dataset::<f32>("frames", &[256, 256]).unwrap();
23/// writer.start_swmr().unwrap();
24///
25/// // Write frames
26/// let frame_data = vec![0.0f32; 256 * 256];
27/// let raw: Vec<u8> = frame_data.iter()
28/// .flat_map(|v| v.to_le_bytes())
29/// .collect();
30/// writer.append_frame(ds, &raw).unwrap();
31/// writer.flush().unwrap();
32///
33/// writer.close().unwrap();
34/// ```
35pub struct SwmrFileWriter {
36 inner: IoSwmrWriter,
37}
38
39impl SwmrFileWriter {
40 /// Create a new HDF5 file for SWMR streaming using the env-var-derived
41 /// locking policy.
42 pub fn create<P: AsRef<Path>>(path: P) -> Result<Self> {
43 let inner = IoSwmrWriter::create(path.as_ref())?;
44 Ok(Self { inner })
45 }
46
47 /// Create a new HDF5 file for SWMR streaming with an explicit locking
48 /// policy. The writer holds an exclusive lock until [`Self::start_swmr`]
49 /// is called, at which point the lock is downgraded to shared so
50 /// concurrent SWMR readers can attach.
51 pub fn create_with_locking<P: AsRef<Path>>(path: P, locking: FileLocking) -> Result<Self> {
52 let inner = IoSwmrWriter::create_with_locking(path.as_ref(), locking)?;
53 Ok(Self { inner })
54 }
55
56 /// Create a streaming dataset.
57 ///
58 /// The dataset will have shape `[0, frame_dims...]` initially, with
59 /// chunk dimensions `[1, frame_dims...]` and unlimited first dimension.
60 ///
61 /// Returns the dataset index for use with `append_frame`.
62 pub fn create_streaming_dataset<T: H5Type>(
63 &mut self,
64 name: &str,
65 frame_dims: &[u64],
66 ) -> Result<usize> {
67 let datatype = T::hdf5_type();
68 let idx = self
69 .inner
70 .create_streaming_dataset(name, datatype, frame_dims)?;
71 Ok(idx)
72 }
73
74 /// Create a streaming dataset whose frames are compressed.
75 ///
76 /// Like [`create_streaming_dataset`](Self::create_streaming_dataset) but
77 /// each appended frame is run through `pipeline` (e.g.
78 /// `FilterPipeline::deflate(4)`). SWMR appends and in-place header
79 /// updates work the same as for uncompressed streaming datasets.
80 pub fn create_streaming_dataset_compressed<T: H5Type>(
81 &mut self,
82 name: &str,
83 frame_dims: &[u64],
84 pipeline: crate::format::messages::filter::FilterPipeline,
85 ) -> Result<usize> {
86 let idx = self.inner.create_streaming_dataset_compressed(
87 name,
88 T::hdf5_type(),
89 frame_dims,
90 pipeline,
91 )?;
92 Ok(idx)
93 }
94
95 /// Create a streaming dataset whose frames are split into fixed-size
96 /// chunk tiles.
97 ///
98 /// `frame_dims` is the per-frame shape (e.g. `[1024, 1024]`);
99 /// `frame_chunk` is the tile shape within a frame (e.g. `[256, 256]`),
100 /// of the same rank. The on-disk chunk shape becomes
101 /// `[1, frame_chunk...]`, so each frame is stored as
102 /// `product(frame_dims / frame_chunk)` chunks instead of one. This
103 /// mirrors area-detector tiling controls such as NDFileHDF5's
104 /// `nRowChunks` / `nColChunks`: it changes only the partial-read
105 /// granularity and compression unit, not the stored data.
106 /// [`append_frame`](Self::append_frame) accepts a whole frame and
107 /// splits it into tiles automatically.
108 pub fn create_streaming_dataset_tiled<T: H5Type>(
109 &mut self,
110 name: &str,
111 frame_dims: &[u64],
112 frame_chunk: &[u64],
113 ) -> Result<usize> {
114 let idx = self.inner.create_streaming_dataset_tiled(
115 name,
116 T::hdf5_type(),
117 frame_dims,
118 frame_chunk,
119 )?;
120 Ok(idx)
121 }
122
123 /// Create a compressed streaming dataset whose frames are split into
124 /// fixed-size chunk tiles. See
125 /// [`create_streaming_dataset_tiled`](Self::create_streaming_dataset_tiled)
126 /// for the meaning of `frame_chunk`; each tile is the compression unit.
127 pub fn create_streaming_dataset_tiled_compressed<T: H5Type>(
128 &mut self,
129 name: &str,
130 frame_dims: &[u64],
131 frame_chunk: &[u64],
132 pipeline: crate::format::messages::filter::FilterPipeline,
133 ) -> Result<usize> {
134 let idx = self.inner.create_streaming_dataset_tiled_compressed(
135 name,
136 T::hdf5_type(),
137 frame_dims,
138 frame_chunk,
139 pipeline,
140 )?;
141 Ok(idx)
142 }
143
144 /// Create a streaming dataset with full control over the chunk shape,
145 /// including the frame axis.
146 ///
147 /// `chunk` is the complete per-chunk shape, of rank
148 /// `frame_dims.len() + 1`: `chunk[0]` frames per chunk (the NDFileHDF5
149 /// `nFramesChunks` control) and `chunk[1..]` the per-frame tile shape
150 /// (`nRowChunks` / `nColChunks`). When `chunk[0] > 1`,
151 /// [`append_frame`](Self::append_frame) buffers whole frames until a
152 /// chunk band fills; the final partial band is written (zero-padded) at
153 /// [`close`](Self::close), and the dataset's logical frame count always
154 /// equals the exact number of frames appended.
155 pub fn create_streaming_dataset_chunked<T: H5Type>(
156 &mut self,
157 name: &str,
158 frame_dims: &[u64],
159 chunk: &[u64],
160 ) -> Result<usize> {
161 let idx =
162 self.inner
163 .create_streaming_dataset_chunked(name, T::hdf5_type(), frame_dims, chunk)?;
164 Ok(idx)
165 }
166
167 /// Compressed variant of
168 /// [`create_streaming_dataset_chunked`](Self::create_streaming_dataset_chunked);
169 /// each chunk is filtered independently through `pipeline`.
170 pub fn create_streaming_dataset_chunked_compressed<T: H5Type>(
171 &mut self,
172 name: &str,
173 frame_dims: &[u64],
174 chunk: &[u64],
175 pipeline: crate::format::messages::filter::FilterPipeline,
176 ) -> Result<usize> {
177 let idx = self.inner.create_streaming_dataset_chunked_compressed(
178 name,
179 T::hdf5_type(),
180 frame_dims,
181 chunk,
182 pipeline,
183 )?;
184 Ok(idx)
185 }
186
187 /// Signal the start of SWMR mode.
188 pub fn start_swmr(&mut self) -> Result<()> {
189 self.inner.start_swmr()?;
190 Ok(())
191 }
192
193 /// Append a frame of raw data to a streaming dataset.
194 ///
195 /// The data size must match one frame (product of frame_dims * element_size).
196 pub fn append_frame(&mut self, ds_index: usize, data: &[u8]) -> Result<()> {
197 self.inner.append_frame(ds_index, data)?;
198 Ok(())
199 }
200
201 /// Flush all dataset index structures to disk with SWMR ordering.
202 pub fn flush(&mut self) -> Result<()> {
203 self.inner.flush()?;
204 Ok(())
205 }
206
207 /// Close and finalize the file.
208 pub fn close(self) -> Result<()> {
209 self.inner.close()?;
210 Ok(())
211 }
212}
213
214/// SWMR reader for monitoring a streaming HDF5 file.
215///
216/// Opens a file being written by a concurrent [`SwmrFileWriter`] and
217/// periodically calls [`refresh`](Self::refresh) to pick up new data.
218///
219/// ```no_run
220/// use rust_hdf5::swmr::SwmrFileReader;
221///
222/// let mut reader = SwmrFileReader::open("stream.h5").unwrap();
223///
224/// loop {
225/// reader.refresh().unwrap();
226/// let names = reader.dataset_names();
227/// if let Some(shape) = reader.dataset_shape("frames").ok() {
228/// println!("frames shape: {:?}", shape);
229/// if shape[0] > 0 {
230/// let data = reader.read_dataset_raw("frames").unwrap();
231/// println!("got {} bytes", data.len());
232/// break;
233/// }
234/// }
235/// std::thread::sleep(std::time::Duration::from_millis(100));
236/// }
237/// ```
238pub struct SwmrFileReader {
239 reader: Hdf5Reader,
240}
241
242impl SwmrFileReader {
243 /// Open an HDF5 file for SWMR reading using the env-var-derived
244 /// locking policy. Takes a shared lock so it coexists with the
245 /// downgraded shared lock held by [`SwmrFileWriter`] after
246 /// `start_swmr`, and with other concurrent SWMR readers.
247 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
248 let reader = Hdf5Reader::open_swmr(path.as_ref())?;
249 Ok(Self { reader })
250 }
251
252 /// Open an HDF5 file for SWMR reading with an explicit locking policy.
253 pub fn open_with_locking<P: AsRef<Path>>(path: P, locking: FileLocking) -> Result<Self> {
254 let reader = Hdf5Reader::open_swmr_with_locking(path.as_ref(), locking)?;
255 Ok(Self { reader })
256 }
257
258 /// Re-read the superblock and dataset metadata from disk.
259 ///
260 /// Call this periodically to pick up new data written by the concurrent
261 /// SWMR writer.
262 pub fn refresh(&mut self) -> Result<()> {
263 self.reader.refresh()?;
264 Ok(())
265 }
266
267 /// Return the names of all datasets.
268 pub fn dataset_names(&self) -> Vec<String> {
269 self.reader
270 .dataset_names()
271 .iter()
272 .map(|s| s.to_string())
273 .collect()
274 }
275
276 /// Return the current shape of a dataset.
277 pub fn dataset_shape(&self, name: &str) -> Result<Vec<u64>> {
278 Ok(self.reader.dataset_shape(name)?)
279 }
280
281 /// Read the raw bytes of a dataset.
282 pub fn read_dataset_raw(&mut self, name: &str) -> Result<Vec<u8>> {
283 Ok(self.reader.read_dataset_raw(name)?)
284 }
285
286 /// Read a dataset as a typed vector.
287 pub fn read_dataset<T: H5Type>(&mut self, name: &str) -> Result<Vec<T>> {
288 let raw = self.reader.read_dataset_raw(name)?;
289 if raw.len() % T::element_size() != 0 {
290 return Err(crate::error::Hdf5Error::TypeMismatch(format!(
291 "raw data size {} is not a multiple of element size {}",
292 raw.len(),
293 T::element_size(),
294 )));
295 }
296 let count = raw.len() / T::element_size();
297 let mut result = Vec::<T>::with_capacity(count);
298 unsafe {
299 std::ptr::copy_nonoverlapping(raw.as_ptr(), result.as_mut_ptr() as *mut u8, raw.len());
300 result.set_len(count);
301 }
302 Ok(result)
303 }
304}