Skip to main content

rust_hdf5/
swmr.rs

1//! Single Writer / Multiple Reader (SWMR) API.
2//!
3//! Provides a high-level wrapper around the SWMR protocol for streaming
4//! frame-based data (e.g., area detector images).
5
6use std::path::Path;
7
8use crate::io::locking::FileLocking;
9use crate::io::Hdf5Reader;
10use crate::io::SwmrWriter as IoSwmrWriter;
11
12use crate::error::Result;
13use crate::types::H5Type;
14
15/// SWMR writer for streaming frame-based data to an HDF5 file.
16///
17/// Usage:
18/// ```no_run
19/// use rust_hdf5::swmr::SwmrFileWriter;
20///
21/// let mut writer = SwmrFileWriter::create("stream.h5").unwrap();
22/// let ds = writer.create_streaming_dataset::<f32>("frames", &[256, 256]).unwrap();
23/// writer.start_swmr().unwrap();
24///
25/// // Write frames
26/// let frame_data = vec![0.0f32; 256 * 256];
27/// let raw: Vec<u8> = frame_data.iter()
28///     .flat_map(|v| v.to_le_bytes())
29///     .collect();
30/// writer.append_frame(ds, &raw).unwrap();
31/// writer.flush().unwrap();
32///
33/// writer.close().unwrap();
34/// ```
35pub struct SwmrFileWriter {
36    inner: IoSwmrWriter,
37}
38
39impl SwmrFileWriter {
40    /// Create a new HDF5 file for SWMR streaming using the env-var-derived
41    /// locking policy.
42    pub fn create<P: AsRef<Path>>(path: P) -> Result<Self> {
43        let inner = IoSwmrWriter::create(path.as_ref())?;
44        Ok(Self { inner })
45    }
46
47    /// Create a new HDF5 file for SWMR streaming with an explicit locking
48    /// policy. The writer holds an exclusive lock until [`Self::start_swmr`]
49    /// is called, at which point the lock is downgraded to shared so
50    /// concurrent SWMR readers can attach.
51    pub fn create_with_locking<P: AsRef<Path>>(path: P, locking: FileLocking) -> Result<Self> {
52        let inner = IoSwmrWriter::create_with_locking(path.as_ref(), locking)?;
53        Ok(Self { inner })
54    }
55
56    /// Create a streaming dataset.
57    ///
58    /// The dataset will have shape `[0, frame_dims...]` initially, with
59    /// chunk dimensions `[1, frame_dims...]` and unlimited first dimension.
60    ///
61    /// Returns the dataset index for use with `append_frame`.
62    pub fn create_streaming_dataset<T: H5Type>(
63        &mut self,
64        name: &str,
65        frame_dims: &[u64],
66    ) -> Result<usize> {
67        let datatype = T::hdf5_type();
68        let idx = self
69            .inner
70            .create_streaming_dataset(name, datatype, frame_dims)?;
71        Ok(idx)
72    }
73
74    /// Signal the start of SWMR mode.
75    pub fn start_swmr(&mut self) -> Result<()> {
76        self.inner.start_swmr()?;
77        Ok(())
78    }
79
80    /// Append a frame of raw data to a streaming dataset.
81    ///
82    /// The data size must match one frame (product of frame_dims * element_size).
83    pub fn append_frame(&mut self, ds_index: usize, data: &[u8]) -> Result<()> {
84        self.inner.append_frame(ds_index, data)?;
85        Ok(())
86    }
87
88    /// Flush all dataset index structures to disk with SWMR ordering.
89    pub fn flush(&mut self) -> Result<()> {
90        self.inner.flush()?;
91        Ok(())
92    }
93
94    /// Close and finalize the file.
95    pub fn close(self) -> Result<()> {
96        self.inner.close()?;
97        Ok(())
98    }
99}
100
101/// SWMR reader for monitoring a streaming HDF5 file.
102///
103/// Opens a file being written by a concurrent [`SwmrFileWriter`] and
104/// periodically calls [`refresh`](Self::refresh) to pick up new data.
105///
106/// ```no_run
107/// use rust_hdf5::swmr::SwmrFileReader;
108///
109/// let mut reader = SwmrFileReader::open("stream.h5").unwrap();
110///
111/// loop {
112///     reader.refresh().unwrap();
113///     let names = reader.dataset_names();
114///     if let Some(shape) = reader.dataset_shape("frames").ok() {
115///         println!("frames shape: {:?}", shape);
116///         if shape[0] > 0 {
117///             let data = reader.read_dataset_raw("frames").unwrap();
118///             println!("got {} bytes", data.len());
119///             break;
120///         }
121///     }
122///     std::thread::sleep(std::time::Duration::from_millis(100));
123/// }
124/// ```
125pub struct SwmrFileReader {
126    reader: Hdf5Reader,
127}
128
129impl SwmrFileReader {
130    /// Open an HDF5 file for SWMR reading using the env-var-derived
131    /// locking policy. Takes a shared lock so it coexists with the
132    /// downgraded shared lock held by [`SwmrFileWriter`] after
133    /// `start_swmr`, and with other concurrent SWMR readers.
134    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
135        let reader = Hdf5Reader::open_swmr(path.as_ref())?;
136        Ok(Self { reader })
137    }
138
139    /// Open an HDF5 file for SWMR reading with an explicit locking policy.
140    pub fn open_with_locking<P: AsRef<Path>>(path: P, locking: FileLocking) -> Result<Self> {
141        let reader = Hdf5Reader::open_swmr_with_locking(path.as_ref(), locking)?;
142        Ok(Self { reader })
143    }
144
145    /// Re-read the superblock and dataset metadata from disk.
146    ///
147    /// Call this periodically to pick up new data written by the concurrent
148    /// SWMR writer.
149    pub fn refresh(&mut self) -> Result<()> {
150        self.reader.refresh()?;
151        Ok(())
152    }
153
154    /// Return the names of all datasets.
155    pub fn dataset_names(&self) -> Vec<String> {
156        self.reader
157            .dataset_names()
158            .iter()
159            .map(|s| s.to_string())
160            .collect()
161    }
162
163    /// Return the current shape of a dataset.
164    pub fn dataset_shape(&self, name: &str) -> Result<Vec<u64>> {
165        Ok(self.reader.dataset_shape(name)?)
166    }
167
168    /// Read the raw bytes of a dataset.
169    pub fn read_dataset_raw(&mut self, name: &str) -> Result<Vec<u8>> {
170        Ok(self.reader.read_dataset_raw(name)?)
171    }
172
173    /// Read a dataset as a typed vector.
174    pub fn read_dataset<T: H5Type>(&mut self, name: &str) -> Result<Vec<T>> {
175        let raw = self.reader.read_dataset_raw(name)?;
176        if raw.len() % T::element_size() != 0 {
177            return Err(crate::error::Hdf5Error::TypeMismatch(format!(
178                "raw data size {} is not a multiple of element size {}",
179                raw.len(),
180                T::element_size(),
181            )));
182        }
183        let count = raw.len() / T::element_size();
184        let mut result = Vec::<T>::with_capacity(count);
185        unsafe {
186            std::ptr::copy_nonoverlapping(raw.as_ptr(), result.as_mut_ptr() as *mut u8, raw.len());
187            result.set_len(count);
188        }
189        Ok(result)
190    }
191}