rust-hdf5 0.2.15

Pure Rust HDF5 library with full read/write and SWMR support
Documentation
//! Single Writer / Multiple Reader (SWMR) API.
//!
//! Provides a high-level wrapper around the SWMR protocol for streaming
//! frame-based data (e.g., area detector images).

use std::path::Path;

use crate::io::locking::FileLocking;
use crate::io::Hdf5Reader;
use crate::io::SwmrWriter as IoSwmrWriter;

use crate::error::Result;
use crate::types::H5Type;

/// SWMR writer for streaming frame-based data to an HDF5 file.
///
/// Usage:
/// ```no_run
/// use rust_hdf5::swmr::SwmrFileWriter;
///
/// let mut writer = SwmrFileWriter::create("stream.h5").unwrap();
/// let ds = writer.create_streaming_dataset::<f32>("frames", &[256, 256]).unwrap();
/// writer.start_swmr().unwrap();
///
/// // Write frames
/// let frame_data = vec![0.0f32; 256 * 256];
/// let raw: Vec<u8> = frame_data.iter()
///     .flat_map(|v| v.to_le_bytes())
///     .collect();
/// writer.append_frame(ds, &raw).unwrap();
/// writer.flush().unwrap();
///
/// writer.close().unwrap();
/// ```
pub struct SwmrFileWriter {
    inner: IoSwmrWriter,
}

impl SwmrFileWriter {
    /// Create a new HDF5 file for SWMR streaming using the env-var-derived
    /// locking policy.
    pub fn create<P: AsRef<Path>>(path: P) -> Result<Self> {
        let inner = IoSwmrWriter::create(path.as_ref())?;
        Ok(Self { inner })
    }

    /// Create a new HDF5 file for SWMR streaming with an explicit locking
    /// policy. The writer holds an exclusive lock until [`Self::start_swmr`]
    /// is called, at which point the lock is downgraded to shared so
    /// concurrent SWMR readers can attach.
    pub fn create_with_locking<P: AsRef<Path>>(path: P, locking: FileLocking) -> Result<Self> {
        let inner = IoSwmrWriter::create_with_locking(path.as_ref(), locking)?;
        Ok(Self { inner })
    }

    /// Create a streaming dataset.
    ///
    /// The dataset will have shape `[0, frame_dims...]` initially, with
    /// chunk dimensions `[1, frame_dims...]` and unlimited first dimension.
    ///
    /// Returns the dataset index for use with `append_frame`.
    pub fn create_streaming_dataset<T: H5Type>(
        &mut self,
        name: &str,
        frame_dims: &[u64],
    ) -> Result<usize> {
        let datatype = T::hdf5_type();
        let idx = self
            .inner
            .create_streaming_dataset(name, datatype, frame_dims)?;
        Ok(idx)
    }

    /// Create a streaming dataset whose frames are compressed.
    ///
    /// Like [`create_streaming_dataset`](Self::create_streaming_dataset) but
    /// each appended frame is run through `pipeline` (e.g.
    /// `FilterPipeline::deflate(4)`). SWMR appends and in-place header
    /// updates work the same as for uncompressed streaming datasets.
    pub fn create_streaming_dataset_compressed<T: H5Type>(
        &mut self,
        name: &str,
        frame_dims: &[u64],
        pipeline: crate::format::messages::filter::FilterPipeline,
    ) -> Result<usize> {
        let idx = self.inner.create_streaming_dataset_compressed(
            name,
            T::hdf5_type(),
            frame_dims,
            pipeline,
        )?;
        Ok(idx)
    }

    /// Create a streaming dataset whose frames are split into fixed-size
    /// chunk tiles.
    ///
    /// `frame_dims` is the per-frame shape (e.g. `[1024, 1024]`);
    /// `frame_chunk` is the tile shape within a frame (e.g. `[256, 256]`),
    /// of the same rank. The on-disk chunk shape becomes
    /// `[1, frame_chunk...]`, so each frame is stored as
    /// `product(frame_dims / frame_chunk)` chunks instead of one. This
    /// mirrors area-detector tiling controls such as NDFileHDF5's
    /// `nRowChunks` / `nColChunks`: it changes only the partial-read
    /// granularity and compression unit, not the stored data.
    /// [`append_frame`](Self::append_frame) accepts a whole frame and
    /// splits it into tiles automatically.
    pub fn create_streaming_dataset_tiled<T: H5Type>(
        &mut self,
        name: &str,
        frame_dims: &[u64],
        frame_chunk: &[u64],
    ) -> Result<usize> {
        let idx = self.inner.create_streaming_dataset_tiled(
            name,
            T::hdf5_type(),
            frame_dims,
            frame_chunk,
        )?;
        Ok(idx)
    }

    /// Create a compressed streaming dataset whose frames are split into
    /// fixed-size chunk tiles. See
    /// [`create_streaming_dataset_tiled`](Self::create_streaming_dataset_tiled)
    /// for the meaning of `frame_chunk`; each tile is the compression unit.
    pub fn create_streaming_dataset_tiled_compressed<T: H5Type>(
        &mut self,
        name: &str,
        frame_dims: &[u64],
        frame_chunk: &[u64],
        pipeline: crate::format::messages::filter::FilterPipeline,
    ) -> Result<usize> {
        let idx = self.inner.create_streaming_dataset_tiled_compressed(
            name,
            T::hdf5_type(),
            frame_dims,
            frame_chunk,
            pipeline,
        )?;
        Ok(idx)
    }

    /// Create a streaming dataset with full control over the chunk shape,
    /// including the frame axis.
    ///
    /// `chunk` is the complete per-chunk shape, of rank
    /// `frame_dims.len() + 1`: `chunk[0]` frames per chunk (the NDFileHDF5
    /// `nFramesChunks` control) and `chunk[1..]` the per-frame tile shape
    /// (`nRowChunks` / `nColChunks`). When `chunk[0] > 1`,
    /// [`append_frame`](Self::append_frame) buffers whole frames until a
    /// chunk band fills; the final partial band is written (zero-padded) at
    /// [`close`](Self::close), and the dataset's logical frame count always
    /// equals the exact number of frames appended.
    pub fn create_streaming_dataset_chunked<T: H5Type>(
        &mut self,
        name: &str,
        frame_dims: &[u64],
        chunk: &[u64],
    ) -> Result<usize> {
        let idx =
            self.inner
                .create_streaming_dataset_chunked(name, T::hdf5_type(), frame_dims, chunk)?;
        Ok(idx)
    }

    /// Compressed variant of
    /// [`create_streaming_dataset_chunked`](Self::create_streaming_dataset_chunked);
    /// each chunk is filtered independently through `pipeline`.
    pub fn create_streaming_dataset_chunked_compressed<T: H5Type>(
        &mut self,
        name: &str,
        frame_dims: &[u64],
        chunk: &[u64],
        pipeline: crate::format::messages::filter::FilterPipeline,
    ) -> Result<usize> {
        let idx = self.inner.create_streaming_dataset_chunked_compressed(
            name,
            T::hdf5_type(),
            frame_dims,
            chunk,
            pipeline,
        )?;
        Ok(idx)
    }

    /// Signal the start of SWMR mode.
    pub fn start_swmr(&mut self) -> Result<()> {
        self.inner.start_swmr()?;
        Ok(())
    }

    /// Append a frame of raw data to a streaming dataset.
    ///
    /// The data size must match one frame (product of frame_dims * element_size).
    pub fn append_frame(&mut self, ds_index: usize, data: &[u8]) -> Result<()> {
        self.inner.append_frame(ds_index, data)?;
        Ok(())
    }

    /// Flush all dataset index structures to disk with SWMR ordering.
    pub fn flush(&mut self) -> Result<()> {
        self.inner.flush()?;
        Ok(())
    }

    /// Close and finalize the file.
    pub fn close(self) -> Result<()> {
        self.inner.close()?;
        Ok(())
    }
}

/// SWMR reader for monitoring a streaming HDF5 file.
///
/// Opens a file being written by a concurrent [`SwmrFileWriter`] and
/// periodically calls [`refresh`](Self::refresh) to pick up new data.
///
/// ```no_run
/// use rust_hdf5::swmr::SwmrFileReader;
///
/// let mut reader = SwmrFileReader::open("stream.h5").unwrap();
///
/// loop {
///     reader.refresh().unwrap();
///     let names = reader.dataset_names();
///     if let Some(shape) = reader.dataset_shape("frames").ok() {
///         println!("frames shape: {:?}", shape);
///         if shape[0] > 0 {
///             let data = reader.read_dataset_raw("frames").unwrap();
///             println!("got {} bytes", data.len());
///             break;
///         }
///     }
///     std::thread::sleep(std::time::Duration::from_millis(100));
/// }
/// ```
pub struct SwmrFileReader {
    reader: Hdf5Reader,
}

impl SwmrFileReader {
    /// Open an HDF5 file for SWMR reading using the env-var-derived
    /// locking policy. Takes a shared lock so it coexists with the
    /// downgraded shared lock held by [`SwmrFileWriter`] after
    /// `start_swmr`, and with other concurrent SWMR readers.
    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
        let reader = Hdf5Reader::open_swmr(path.as_ref())?;
        Ok(Self { reader })
    }

    /// Open an HDF5 file for SWMR reading with an explicit locking policy.
    pub fn open_with_locking<P: AsRef<Path>>(path: P, locking: FileLocking) -> Result<Self> {
        let reader = Hdf5Reader::open_swmr_with_locking(path.as_ref(), locking)?;
        Ok(Self { reader })
    }

    /// Re-read the superblock and dataset metadata from disk.
    ///
    /// Call this periodically to pick up new data written by the concurrent
    /// SWMR writer.
    pub fn refresh(&mut self) -> Result<()> {
        self.reader.refresh()?;
        Ok(())
    }

    /// Return the names of all datasets.
    pub fn dataset_names(&self) -> Vec<String> {
        self.reader
            .dataset_names()
            .iter()
            .map(|s| s.to_string())
            .collect()
    }

    /// Return the current shape of a dataset.
    pub fn dataset_shape(&self, name: &str) -> Result<Vec<u64>> {
        Ok(self.reader.dataset_shape(name)?)
    }

    /// Read the raw bytes of a dataset.
    pub fn read_dataset_raw(&mut self, name: &str) -> Result<Vec<u8>> {
        Ok(self.reader.read_dataset_raw(name)?)
    }

    /// Read a dataset as a typed vector.
    pub fn read_dataset<T: H5Type>(&mut self, name: &str) -> Result<Vec<T>> {
        let raw = self.reader.read_dataset_raw(name)?;
        if raw.len() % T::element_size() != 0 {
            return Err(crate::error::Hdf5Error::TypeMismatch(format!(
                "raw data size {} is not a multiple of element size {}",
                raw.len(),
                T::element_size(),
            )));
        }
        let count = raw.len() / T::element_size();
        let mut result = Vec::<T>::with_capacity(count);
        unsafe {
            std::ptr::copy_nonoverlapping(raw.as_ptr(), result.as_mut_ptr() as *mut u8, raw.len());
            result.set_len(count);
        }
        Ok(result)
    }
}