use std::path::Path;
use crate::format::messages::datatype::DatatypeMessage;
use crate::format::superblock::{FLAG_SWMR_WRITE, FLAG_WRITE_ACCESS};
use crate::io::writer::Hdf5Writer;
use crate::io::IoResult;
pub struct SwmrWriter {
writer: Hdf5Writer,
swmr_active: bool,
}
impl SwmrWriter {
pub fn create(path: &Path) -> IoResult<Self> {
let writer = Hdf5Writer::create(path)?;
Ok(Self {
writer,
swmr_active: false,
})
}
pub fn create_with_locking(
path: &Path,
locking: crate::io::locking::FileLocking,
) -> IoResult<Self> {
let writer = Hdf5Writer::create_with_locking(path, locking)?;
Ok(Self {
writer,
swmr_active: false,
})
}
pub fn create_streaming_dataset(
&mut self,
name: &str,
datatype: DatatypeMessage,
frame_dims: &[u64],
) -> IoResult<usize> {
let mut dims = vec![0u64];
dims.extend_from_slice(frame_dims);
let mut max_dims = vec![u64::MAX];
max_dims.extend_from_slice(frame_dims);
let mut chunk_dims = vec![1u64];
chunk_dims.extend_from_slice(frame_dims);
self.writer
.create_chunked_dataset(name, datatype, &dims, &max_dims, &chunk_dims)
}
pub fn create_streaming_dataset_tiled(
&mut self,
name: &str,
datatype: DatatypeMessage,
frame_dims: &[u64],
frame_chunk: &[u64],
) -> IoResult<usize> {
validate_frame_chunk(frame_dims, frame_chunk)?;
let mut dims = vec![0u64];
dims.extend_from_slice(frame_dims);
let mut max_dims = vec![u64::MAX];
max_dims.extend_from_slice(frame_dims);
let mut chunk_dims = vec![1u64];
chunk_dims.extend_from_slice(frame_chunk);
self.writer
.create_chunked_dataset(name, datatype, &dims, &max_dims, &chunk_dims)
}
pub fn create_streaming_dataset_compressed(
&mut self,
name: &str,
datatype: DatatypeMessage,
frame_dims: &[u64],
pipeline: crate::format::messages::filter::FilterPipeline,
) -> IoResult<usize> {
let mut dims = vec![0u64];
dims.extend_from_slice(frame_dims);
let mut max_dims = vec![u64::MAX];
max_dims.extend_from_slice(frame_dims);
let mut chunk_dims = vec![1u64];
chunk_dims.extend_from_slice(frame_dims);
self.writer.create_chunked_dataset_with_pipeline(
name,
datatype,
&dims,
&max_dims,
&chunk_dims,
pipeline,
)
}
pub fn create_streaming_dataset_tiled_compressed(
&mut self,
name: &str,
datatype: DatatypeMessage,
frame_dims: &[u64],
frame_chunk: &[u64],
pipeline: crate::format::messages::filter::FilterPipeline,
) -> IoResult<usize> {
validate_frame_chunk(frame_dims, frame_chunk)?;
let mut dims = vec![0u64];
dims.extend_from_slice(frame_dims);
let mut max_dims = vec![u64::MAX];
max_dims.extend_from_slice(frame_dims);
let mut chunk_dims = vec![1u64];
chunk_dims.extend_from_slice(frame_chunk);
self.writer.create_chunked_dataset_with_pipeline(
name,
datatype,
&dims,
&max_dims,
&chunk_dims,
pipeline,
)
}
pub fn start_swmr(&mut self) -> IoResult<()> {
self.writer.finalize_for_swmr()?;
self.writer.handle().release_lock()?;
self.swmr_active = true;
Ok(())
}
pub fn append_frame(&mut self, ds_index: usize, data: &[u8]) -> IoResult<()> {
let frame_idx = self.writer.datasets[ds_index].dataspace.dims[0];
let dims = self.writer.dataset_dims(ds_index).to_vec();
let chunk_dims = self
.writer
.dataset_chunk_dims(ds_index)
.ok_or_else(|| {
crate::io::IoError::InvalidState("append_frame requires a chunked dataset".into())
})?
.to_vec();
let elem_size = self.writer.datasets[ds_index].datatype.element_size() as usize;
let frame_elems: u64 = dims[1..].iter().product();
let expected = frame_elems as usize * elem_size;
if data.len() != expected {
return Err(crate::io::IoError::InvalidState(format!(
"append_frame: data is {} bytes, expected {expected} for one frame",
data.len()
)));
}
if chunk_dims[1..] == dims[1..] {
self.writer.write_chunk(ds_index, frame_idx, data)?;
} else {
let mut tiles_per_frame = 1u64;
for d in 1..dims.len() {
tiles_per_frame *= dims[d].div_ceil(chunk_dims[d].max(1));
}
let tiles = split_frame_into_tiles(data, &dims[1..], &chunk_dims[1..], elem_size);
let base = frame_idx * tiles_per_frame;
for (i, tile) in tiles.iter().enumerate() {
self.writer.write_chunk(ds_index, base + i as u64, tile)?;
}
}
let mut new_dims = self.writer.datasets[ds_index].dataspace.dims.clone();
new_dims[0] = frame_idx + 1;
self.writer.extend_dataset(ds_index, &new_dims)?;
Ok(())
}
pub fn flush(&mut self) -> IoResult<()> {
for i in 0..self.writer.datasets.len() {
if self.writer.datasets[i].chunked.is_some() {
self.writer.flush_dataset(i)?;
}
}
self.writer.handle().sync_data()?;
if self.swmr_active {
for i in 0..self.writer.datasets.len() {
if self.writer.datasets[i].obj_header_written_addr.is_some() {
self.writer.write_dataset_header_inplace(i)?;
}
}
self.writer.handle().sync_data()?;
self.writer
.write_superblock(FLAG_WRITE_ACCESS | FLAG_SWMR_WRITE)?;
self.writer.handle().sync_data()?;
}
Ok(())
}
pub fn writer_mut(&mut self) -> &mut Hdf5Writer {
&mut self.writer
}
pub fn close(self) -> IoResult<()> {
self.writer.close()
}
}
fn validate_frame_chunk(frame_dims: &[u64], frame_chunk: &[u64]) -> IoResult<()> {
if frame_chunk.len() != frame_dims.len() {
return Err(crate::io::IoError::InvalidState(format!(
"frame_chunk rank {} does not match frame_dims rank {}",
frame_chunk.len(),
frame_dims.len()
)));
}
if frame_chunk.contains(&0) {
return Err(crate::io::IoError::InvalidState(
"frame_chunk dimensions must be non-zero".into(),
));
}
if frame_dims.contains(&0) {
return Err(crate::io::IoError::InvalidState(
"frame_dims dimensions must be non-zero".into(),
));
}
Ok(())
}
fn lin_offset(coords: &[u64], dims: &[u64]) -> u64 {
let mut off = 0u64;
for d in 0..dims.len() {
off = off * dims[d] + coords[d];
}
off
}
fn split_frame_into_tiles(
frame: &[u8],
frame_dims: &[u64],
tile_dims: &[u64],
elem_size: usize,
) -> Vec<Vec<u8>> {
let k = frame_dims.len();
let grid: Vec<u64> = (0..k)
.map(|d| frame_dims[d].div_ceil(tile_dims[d].max(1)))
.collect();
let n_tiles: u64 = grid.iter().product();
let tile_elems: u64 = tile_dims.iter().product();
let last = k - 1;
let tile_rows: u64 = tile_dims[..last].iter().product();
let run = tile_dims[last] as usize;
let mut tiles = Vec::with_capacity(n_tiles as usize);
for t in 0..n_tiles {
let mut tg = vec![0u64; k];
let mut rem = t;
for d in (0..k).rev() {
tg[d] = rem % grid[d];
rem /= grid[d];
}
let mut tile = vec![0u8; tile_elems as usize * elem_size];
for row in 0..tile_rows {
let mut src = vec![0u64; k];
let mut r = row;
let mut oob = false;
for d in (0..last).rev() {
let tc = r % tile_dims[d];
r /= tile_dims[d];
src[d] = tg[d] * tile_dims[d] + tc;
if src[d] >= frame_dims[d] {
oob = true;
}
}
let last_base = tg[last] * tile_dims[last];
if oob || last_base >= frame_dims[last] {
continue;
}
src[last] = last_base;
let copy = run.min((frame_dims[last] - last_base) as usize);
let src_off = lin_offset(&src, frame_dims) as usize * elem_size;
let dst_off = row as usize * run * elem_size;
tile[dst_off..dst_off + copy * elem_size]
.copy_from_slice(&frame[src_off..src_off + copy * elem_size]);
}
tiles.push(tile);
}
tiles
}