use std::path::Path;
use crate::format::chunk_index::btree_v2::Bt2ChunkIndex;
use crate::format::chunk_index::extensible_array::{
compute_chunk_size_len, compute_ndblk_addrs, compute_nsblk_addrs, ExtensibleArrayDataBlock,
ExtensibleArrayHeader, ExtensibleArrayIndexBlock, FilteredChunkEntry, FilteredDataBlock,
FilteredIndexBlock,
};
use crate::format::chunk_index::fixed_array::{FixedArrayDataBlock, FixedArrayHeader};
use crate::format::messages::attribute::AttributeMessage;
use crate::format::messages::data_layout::{DataLayoutMessage, EarrayParams, FixedArrayParams};
use crate::format::messages::dataspace::DataspaceMessage;
use crate::format::messages::datatype::DatatypeMessage;
use crate::format::messages::fill_value::FillValueMessage;
use crate::format::messages::filter::{self, FilterPipeline};
use crate::format::messages::group_info::GroupInfoMessage;
use crate::format::messages::link::LinkMessage;
use crate::format::messages::link_info::LinkInfoMessage;
use crate::format::messages::*;
use crate::format::object_header::ObjectHeader;
use crate::format::superblock::*;
use crate::format::{FormatContext, UNDEF_ADDR};
use crate::io::allocator::FileAllocator;
use crate::io::file_handle::FileHandle;
use crate::io::IoResult;
pub struct DatasetInfo {
pub name: String,
pub datatype: DatatypeMessage,
pub dataspace: DataspaceMessage,
pub obj_header_addr: u64,
pub data_addr: u64,
pub data_size: u64,
pub chunked: Option<ChunkedDatasetInfo>,
pub fixed_array: Option<FixedArrayDatasetInfo>,
pub btree_v2: Option<Bt2DatasetInfo>,
pub attributes: Vec<AttributeMessage>,
pub obj_header_written_addr: Option<u64>,
pub obj_header_encoded_size: usize,
pub filter_pipeline: Option<FilterPipeline>,
}
pub struct ChunkedDatasetInfo {
pub chunk_dims: Vec<u64>,
pub max_dims: Vec<u64>,
pub earray_params: EarrayParams,
pub ea_header_addr: u64,
pub ea_iblk_addr: u64,
pub ndblk_addrs: usize,
pub ea_header: ExtensibleArrayHeader,
pub ea_iblk: ExtensibleArrayIndexBlock,
pub data_blocks: Vec<(u64, ExtensibleArrayDataBlock)>,
pub chunks_written: u64,
pub filt_iblk: Option<FilteredIndexBlock>,
pub filt_data_blocks: Vec<(u64, FilteredDataBlock)>,
pub chunk_size_len: u8,
}
pub struct FixedArrayDatasetInfo {
pub chunk_dims: Vec<u64>,
pub fa_header_addr: u64,
pub fa_dblk_addr: u64,
pub fa_header: FixedArrayHeader,
pub fa_dblk: FixedArrayDataBlock,
pub chunks_written: u64,
}
pub struct Bt2DatasetInfo {
pub chunk_dims: Vec<u64>,
pub max_dims: Vec<u64>,
pub bt2_header_addr: u64,
pub bt2_leaf_addr: u64,
pub index: Bt2ChunkIndex,
pub chunks_written: u64,
}
pub struct GroupInfo {
pub name: String,
pub parent: Option<usize>,
pub child_datasets: Vec<usize>,
pub child_groups: Vec<usize>,
pub obj_header_addr: u64,
}
pub struct Hdf5Writer {
handle: FileHandle,
allocator: FileAllocator,
ctx: FormatContext,
pub(crate) datasets: Vec<DatasetInfo>,
pub(crate) groups: Vec<GroupInfo>,
pub(crate) root_attributes: Vec<crate::format::messages::attribute::AttributeMessage>,
closed: bool,
root_group_addr: Option<u64>,
root_group_encoded_size: usize,
}
impl Hdf5Writer {
pub fn create(path: &Path) -> IoResult<Self> {
let handle = FileHandle::create(path)?;
let ctx = FormatContext::default_v3();
let sb_size = (SuperblockV2V3 {
version: SUPERBLOCK_V3,
sizeof_offsets: ctx.sizeof_addr,
sizeof_lengths: ctx.sizeof_size,
file_consistency_flags: 0,
base_address: 0,
superblock_extension_address: UNDEF_ADDR,
end_of_file_address: 0,
root_group_object_header_address: 0,
})
.encoded_size() as u64;
let allocator = FileAllocator::new(sb_size);
Ok(Self {
handle,
allocator,
ctx,
datasets: Vec::new(),
groups: Vec::new(),
root_attributes: Vec::new(),
closed: false,
root_group_addr: None,
root_group_encoded_size: 0,
})
}
pub fn ctx(&self) -> &FormatContext {
&self.ctx
}
pub fn open_append(path: &Path) -> IoResult<Self> {
use crate::format::messages::attribute::AttributeMessage;
use crate::format::messages::data_layout::DataLayoutMessage;
use crate::format::messages::dataspace::DataspaceMessage;
use crate::format::messages::datatype::DatatypeMessage;
let mut handle = FileHandle::open_readwrite(path)?;
let file_size = handle.file_size()?;
let sb_buf = handle.read_at_most(0, 256)?;
let sb = SuperblockV2V3::decode(&sb_buf)?;
let ctx = FormatContext {
sizeof_addr: sb.sizeof_offsets,
sizeof_size: sb.sizeof_lengths,
};
let root_buf = handle.read_at_most(sb.root_group_object_header_address, 8192)?;
let (root_header, _) = crate::format::object_header::ObjectHeader::decode(&root_buf)?;
let mut link_entries: Vec<(String, u64)> = Vec::new();
Self::collect_links_recursive(&mut handle, &root_header, &ctx, "", &mut link_entries)?;
let mut existing_datasets = Vec::new();
for (name, obj_addr) in &link_entries {
let ds_buf = handle.read_at_most(*obj_addr, 8192)?;
let (ds_header, _) =
match crate::format::object_header::ObjectHeader::decode_any(&ds_buf) {
Ok(h) => h,
Err(_) => continue,
};
let mut datatype = None;
let mut dataspace = None;
let mut layout = None;
let mut fp = None;
let mut attrs = Vec::new();
for msg in &ds_header.messages {
match msg.msg_type {
crate::format::messages::MSG_DATATYPE => {
if let Ok((dt, _)) = DatatypeMessage::decode(&msg.data, &ctx) {
datatype = Some(dt);
}
}
crate::format::messages::MSG_DATASPACE => {
if let Ok((ds, _)) = DataspaceMessage::decode(&msg.data, &ctx) {
dataspace = Some(ds);
}
}
crate::format::messages::MSG_DATA_LAYOUT => {
if let Ok((dl, _)) = DataLayoutMessage::decode(&msg.data, &ctx) {
layout = Some(dl);
}
}
crate::format::messages::MSG_FILTER_PIPELINE => {
if let Ok((p, _)) = FilterPipeline::decode(&msg.data) {
if !p.filters.is_empty() {
fp = Some(p);
}
}
}
crate::format::messages::MSG_ATTRIBUTE => {
if let Ok((a, _)) = AttributeMessage::decode(&msg.data, &ctx) {
attrs.push(a);
}
}
_ => {}
}
}
let (dt, ds, dl) = match (datatype, dataspace, layout) {
(Some(dt), Some(ds), Some(dl)) => (dt, ds, dl),
_ => continue, };
let mut info = DatasetInfo {
name: name.clone(),
datatype: dt,
dataspace: ds,
obj_header_addr: *obj_addr,
data_addr: UNDEF_ADDR,
data_size: 0,
chunked: None,
fixed_array: None,
btree_v2: None,
attributes: attrs,
obj_header_written_addr: Some(*obj_addr),
obj_header_encoded_size: 0,
filter_pipeline: fp,
};
match &dl {
DataLayoutMessage::Contiguous { address, size } => {
info.data_addr = *address;
info.data_size = *size;
}
DataLayoutMessage::ChunkedV4 {
chunk_dims,
index_address,
index_type,
earray_params,
..
} => {
let real_chunk_dims: Vec<u64> = chunk_dims[..chunk_dims.len() - 1].to_vec();
if *index_type
== crate::format::messages::data_layout::ChunkIndexType::ExtensibleArray
{
if let Some(params) = earray_params {
let ep = EarrayParams {
max_nelmts_bits: params.max_nelmts_bits,
idx_blk_elmts: params.idx_blk_elmts,
sup_blk_min_data_ptrs: params.sup_blk_min_data_ptrs,
data_blk_min_elmts: params.data_blk_min_elmts,
max_dblk_page_nelmts_bits: params.max_dblk_page_nelmts_bits,
};
let ndblk_addrs = compute_ndblk_addrs(ep.sup_blk_min_data_ptrs);
let nsblk_addrs = compute_nsblk_addrs(
ep.idx_blk_elmts,
ep.data_blk_min_elmts,
ep.sup_blk_min_data_ptrs,
ep.max_nelmts_bits,
);
let hdr_buf = handle.read_at_most(*index_address, 256)?;
let ea_header = ExtensibleArrayHeader::decode(&hdr_buf, &ctx)?;
let is_filtered = ea_header.class_id
== crate::format::chunk_index::extensible_array::EA_CLS_FILT_CHUNK;
let chunk_size_len = if is_filtered {
ea_header.raw_elmt_size - ctx.sizeof_addr - 4
} else {
0
};
let ea_iblk_addr = ea_header.idx_blk_addr;
let ea_iblk = if ea_iblk_addr != UNDEF_ADDR {
let iblk_buf = handle.read_at_most(ea_iblk_addr, 65536)?;
ExtensibleArrayIndexBlock::decode(
&iblk_buf,
&ctx,
ep.idx_blk_elmts as usize,
ndblk_addrs,
nsblk_addrs,
)
.unwrap_or_else(|_| {
ExtensibleArrayIndexBlock::new(
*index_address,
ep.idx_blk_elmts,
ndblk_addrs,
nsblk_addrs,
)
})
} else {
ExtensibleArrayIndexBlock::new(
*index_address,
ep.idx_blk_elmts,
ndblk_addrs,
nsblk_addrs,
)
};
let max_dims = info
.dataspace
.max_dims
.clone()
.unwrap_or_else(|| info.dataspace.dims.clone());
info.chunked = Some(ChunkedDatasetInfo {
chunk_dims: real_chunk_dims,
max_dims,
earray_params: ep,
ea_header_addr: *index_address,
ea_iblk_addr,
ndblk_addrs,
ea_header,
ea_iblk,
data_blocks: Vec::new(),
chunks_written: 0,
filt_iblk: None,
filt_data_blocks: Vec::new(),
chunk_size_len,
});
}
}
}
_ => {}
}
existing_datasets.push(info);
}
let allocator = FileAllocator::new(file_size);
Ok(Self {
handle,
allocator,
ctx,
datasets: existing_datasets,
groups: Vec::new(),
root_attributes: Vec::new(),
closed: false,
root_group_addr: None,
root_group_encoded_size: 0,
})
}
fn collect_links_recursive(
handle: &mut FileHandle,
header: &crate::format::object_header::ObjectHeader,
ctx: &FormatContext,
prefix: &str,
out: &mut Vec<(String, u64)>,
) -> IoResult<()> {
use crate::format::messages::link::{LinkMessage, LinkTarget};
for msg in &header.messages {
if msg.msg_type == crate::format::messages::MSG_LINK {
if let Ok((link, _)) = LinkMessage::decode(&msg.data, ctx) {
if let LinkTarget::Hard { address } = &link.target {
let full_name = if prefix.is_empty() {
link.name.clone()
} else {
format!("{}/{}", prefix, link.name)
};
out.push((full_name.clone(), *address));
if let Ok(child_buf) = handle.read_at_most(*address, 8192) {
if let Ok((child_header, _)) =
crate::format::object_header::ObjectHeader::decode_any(&child_buf)
{
let has_links = child_header
.messages
.iter()
.any(|m| m.msg_type == crate::format::messages::MSG_LINK);
if has_links {
let _ = Self::collect_links_recursive(
handle,
&child_header,
ctx,
&full_name,
out,
);
}
}
}
}
}
}
}
Ok(())
}
pub fn dataset_names(&self) -> Vec<&str> {
self.datasets.iter().map(|d| d.name.as_str()).collect()
}
pub fn dataset_index(&self, name: &str) -> Option<usize> {
self.datasets.iter().position(|d| d.name == name)
}
pub fn group_names(&self) -> Vec<&str> {
self.groups.iter().map(|g| g.name.as_str()).collect()
}
pub fn create_group(&mut self, parent_path: &str, name: &str) -> IoResult<usize> {
let full_name = if parent_path == "/" {
format!("/{}", name)
} else {
format!("{}/{}", parent_path, name)
};
if self.groups.iter().any(|g| g.name == full_name) {
return Err(crate::io::IoError::InvalidState(format!(
"group '{}' already exists",
full_name
)));
}
let parent_idx = if parent_path == "/" {
None
} else {
let idx = self
.groups
.iter()
.position(|g| g.name == parent_path)
.ok_or_else(|| {
crate::io::IoError::NotFound(format!(
"parent group '{}' not found",
parent_path
))
})?;
Some(idx)
};
let group_idx = self.groups.len();
self.groups.push(GroupInfo {
name: full_name,
parent: parent_idx,
child_datasets: Vec::new(),
child_groups: Vec::new(),
obj_header_addr: 0,
});
if let Some(pidx) = parent_idx {
self.groups[pidx].child_groups.push(group_idx);
}
Ok(group_idx)
}
pub fn assign_dataset_to_group(&mut self, group_path: &str, ds_index: usize) -> IoResult<()> {
let group_idx = self
.groups
.iter()
.position(|g| g.name == group_path)
.ok_or_else(|| {
crate::io::IoError::NotFound(format!("group '{}' not found", group_path))
})?;
self.groups[group_idx].child_datasets.push(ds_index);
Ok(())
}
pub fn create_dataset(
&mut self,
name: &str,
datatype: DatatypeMessage,
dims: &[u64],
) -> IoResult<usize> {
let total_elements: u64 = if dims.is_empty() {
1
} else {
dims.iter().product()
};
let element_size = datatype.element_size() as u64;
let data_size = total_elements * element_size;
let data_addr = if data_size > 0 {
self.allocator.allocate(data_size)
} else {
UNDEF_ADDR
};
let dataspace = if dims.is_empty() {
DataspaceMessage::scalar()
} else {
DataspaceMessage::simple(dims)
};
let idx = self.datasets.len();
self.datasets.push(DatasetInfo {
name: name.to_string(),
datatype,
dataspace,
obj_header_addr: 0, data_addr,
data_size,
chunked: None,
fixed_array: None,
btree_v2: None,
attributes: Vec::new(),
obj_header_written_addr: None,
obj_header_encoded_size: 0,
filter_pipeline: None,
});
Ok(idx)
}
pub fn create_chunked_dataset(
&mut self,
name: &str,
datatype: DatatypeMessage,
dims: &[u64],
max_dims: &[u64],
chunk_dims: &[u64],
) -> IoResult<usize> {
let earray_params = EarrayParams::default_params();
let ndblk_addrs = compute_ndblk_addrs(earray_params.sup_blk_min_data_ptrs);
let nsblk_addrs = compute_nsblk_addrs(
earray_params.idx_blk_elmts,
earray_params.data_blk_min_elmts,
earray_params.sup_blk_min_data_ptrs,
earray_params.max_nelmts_bits,
);
let mut ea_header = ExtensibleArrayHeader::new_for_chunks(&self.ctx);
ea_header.max_nelmts_bits = earray_params.max_nelmts_bits;
ea_header.idx_blk_elmts = earray_params.idx_blk_elmts;
ea_header.data_blk_min_elmts = earray_params.data_blk_min_elmts;
ea_header.sup_blk_min_data_ptrs = earray_params.sup_blk_min_data_ptrs;
ea_header.max_dblk_page_nelmts_bits = earray_params.max_dblk_page_nelmts_bits;
let hdr_encoded = ea_header.encode(&self.ctx);
let ea_header_addr = self.allocator.allocate(hdr_encoded.len() as u64);
let ea_iblk = ExtensibleArrayIndexBlock::new(
ea_header_addr,
earray_params.idx_blk_elmts,
ndblk_addrs,
nsblk_addrs,
);
let iblk_encoded = ea_iblk.encode(&self.ctx);
let ea_iblk_addr = self.allocator.allocate(iblk_encoded.len() as u64);
ea_header.idx_blk_addr = ea_iblk_addr;
let hdr_encoded = ea_header.encode(&self.ctx);
self.handle.write_at(ea_header_addr, &hdr_encoded)?;
self.handle.write_at(ea_iblk_addr, &iblk_encoded)?;
let dataspace = DataspaceMessage {
dims: dims.to_vec(),
max_dims: Some(max_dims.to_vec()),
};
let idx = self.datasets.len();
self.datasets.push(DatasetInfo {
name: name.to_string(),
datatype,
dataspace,
obj_header_addr: 0,
data_addr: UNDEF_ADDR,
data_size: 0,
attributes: Vec::new(),
obj_header_written_addr: None,
obj_header_encoded_size: 0,
filter_pipeline: None,
fixed_array: None,
btree_v2: None,
chunked: Some(ChunkedDatasetInfo {
chunk_dims: chunk_dims.to_vec(),
max_dims: max_dims.to_vec(),
earray_params,
ea_header_addr,
ea_iblk_addr,
ndblk_addrs,
ea_header,
ea_iblk,
data_blocks: Vec::new(),
chunks_written: 0,
filt_iblk: None,
filt_data_blocks: Vec::new(),
chunk_size_len: 0,
}),
});
Ok(idx)
}
pub fn write_dataset_raw(&mut self, index: usize, data: &[u8]) -> IoResult<()> {
let ds = &self.datasets[index];
if ds.chunked.is_some() {
return Err(crate::io::IoError::InvalidState(
"use write_chunk for chunked datasets".into(),
));
}
if ds.data_addr == UNDEF_ADDR {
return Err(crate::io::IoError::InvalidState(
"dataset has no data allocated".into(),
));
}
if data.len() as u64 != ds.data_size {
return Err(crate::io::IoError::InvalidState(format!(
"data size mismatch: expected {} bytes, got {}",
ds.data_size,
data.len()
)));
}
self.handle.write_at(ds.data_addr, data)?;
Ok(())
}
pub fn write_chunk(&mut self, index: usize, chunk_idx: u64, data: &[u8]) -> IoResult<()> {
let ds = &self.datasets[index];
let element_size = ds.datatype.element_size() as u64;
let chunked = ds
.chunked
.as_ref()
.ok_or_else(|| crate::io::IoError::InvalidState("not a chunked dataset".into()))?;
let chunk_bytes: u64 = chunked.chunk_dims.iter().product::<u64>() * element_size;
if data.len() as u64 != chunk_bytes {
return Err(crate::io::IoError::InvalidState(format!(
"chunk data size mismatch: expected {} bytes, got {}",
chunk_bytes,
data.len()
)));
}
let compressed;
let write_data = if let Some(ref pipeline) = ds.filter_pipeline {
compressed = filter::apply_filters(pipeline, data)?;
&compressed
} else {
data
};
let is_filtered = ds.filter_pipeline.is_some();
let compressed_size = write_data.len() as u64;
let chunk_addr = self.allocator.allocate(compressed_size);
self.handle.write_at(chunk_addr, write_data)?;
let idx_blk_elmts = {
let c = self.datasets[index].chunked.as_ref().unwrap();
c.earray_params.idx_blk_elmts as u64
};
if chunk_idx < idx_blk_elmts {
let chunked = self.datasets[index].chunked.as_mut().unwrap();
if is_filtered {
if let Some(ref mut fiblk) = chunked.filt_iblk {
fiblk.elements[chunk_idx as usize] = FilteredChunkEntry {
addr: chunk_addr,
nbytes: compressed_size,
filter_mask: 0,
};
}
} else {
chunked.ea_iblk.elements[chunk_idx as usize] = chunk_addr;
}
chunked.chunks_written += 1;
if chunk_idx + 1 > chunked.ea_header.max_idx_set {
chunked.ea_header.max_idx_set = chunk_idx + 1;
}
if chunked.ea_header.num_elmts_realized < idx_blk_elmts {
chunked.ea_header.num_elmts_realized = idx_blk_elmts;
}
} else {
let offset_in_dblks = chunk_idx - idx_blk_elmts;
let chunked = self.datasets[index].chunked.as_mut().unwrap();
let min_elmts = chunked.earray_params.data_blk_min_elmts as u64;
let mut cumulative = 0u64;
let mut dblk_idx = 0usize;
let mut current_size = min_elmts;
let mut pair_count = 0;
loop {
if offset_in_dblks < cumulative + current_size {
break;
}
cumulative += current_size;
dblk_idx += 1;
pair_count += 1;
if pair_count >= 2 {
pair_count = 0;
current_size *= 2;
}
if dblk_idx >= chunked.ndblk_addrs {
return Err(crate::io::IoError::InvalidState(
"chunk index exceeds extensible array capacity".into(),
));
}
}
let offset_in_block = (offset_in_dblks - cumulative) as usize;
let block_nelmts = current_size as usize;
if is_filtered {
let filt_iblk = chunked.filt_iblk.as_mut().unwrap();
if filt_iblk.dblk_addrs[dblk_idx] == UNDEF_ADDR {
let mut dblk =
FilteredDataBlock::new(chunked.ea_header_addr, cumulative, block_nelmts);
dblk.elements[offset_in_block] = FilteredChunkEntry {
addr: chunk_addr,
nbytes: compressed_size,
filter_mask: 0,
};
let dblk_encoded = dblk.encode(
&self.ctx,
chunked.earray_params.max_nelmts_bits,
chunked.chunk_size_len,
);
let dblk_addr = self.allocator.allocate(dblk_encoded.len() as u64);
self.handle.write_at(dblk_addr, &dblk_encoded)?;
filt_iblk.dblk_addrs[dblk_idx] = dblk_addr;
chunked.filt_data_blocks.push((dblk_addr, dblk));
chunked.ea_header.num_dblks_created += 1;
chunked.ea_header.size_dblks_created += dblk_encoded.len() as u64;
} else {
let dblk_addr = filt_iblk.dblk_addrs[dblk_idx];
if let Some((_, ref mut dblk)) = chunked
.filt_data_blocks
.iter_mut()
.find(|(a, _)| *a == dblk_addr)
{
dblk.elements[offset_in_block] = FilteredChunkEntry {
addr: chunk_addr,
nbytes: compressed_size,
filter_mask: 0,
};
let dblk_encoded = dblk.encode(
&self.ctx,
chunked.earray_params.max_nelmts_bits,
chunked.chunk_size_len,
);
self.handle.write_at(dblk_addr, &dblk_encoded)?;
}
}
} else {
if chunked.ea_iblk.dblk_addrs[dblk_idx] == UNDEF_ADDR {
let mut dblk = ExtensibleArrayDataBlock::new(
chunked.ea_header_addr,
cumulative,
block_nelmts,
);
dblk.elements[offset_in_block] = chunk_addr;
let dblk_encoded =
dblk.encode(&self.ctx, chunked.earray_params.max_nelmts_bits);
let dblk_addr = self.allocator.allocate(dblk_encoded.len() as u64);
self.handle.write_at(dblk_addr, &dblk_encoded)?;
chunked.ea_iblk.dblk_addrs[dblk_idx] = dblk_addr;
chunked.data_blocks.push((dblk_addr, dblk));
chunked.ea_header.num_dblks_created += 1;
chunked.ea_header.size_dblks_created += dblk_encoded.len() as u64;
} else {
let dblk_addr = chunked.ea_iblk.dblk_addrs[dblk_idx];
if let Some((_, ref mut dblk)) = chunked
.data_blocks
.iter_mut()
.find(|(a, _)| *a == dblk_addr)
{
dblk.elements[offset_in_block] = chunk_addr;
let dblk_encoded =
dblk.encode(&self.ctx, chunked.earray_params.max_nelmts_bits);
self.handle.write_at(dblk_addr, &dblk_encoded)?;
} else {
let dblk_buf = self.handle.read_at_most(dblk_addr, 65536)?;
if let Ok(mut dblk) = ExtensibleArrayDataBlock::decode(
&dblk_buf,
&self.ctx,
chunked.earray_params.max_nelmts_bits,
block_nelmts,
) {
dblk.elements[offset_in_block] = chunk_addr;
let dblk_encoded =
dblk.encode(&self.ctx, chunked.earray_params.max_nelmts_bits);
self.handle.write_at(dblk_addr, &dblk_encoded)?;
chunked.data_blocks.push((dblk_addr, dblk));
}
}
}
}
chunked.chunks_written += 1;
if chunk_idx + 1 > chunked.ea_header.max_idx_set {
chunked.ea_header.max_idx_set = chunk_idx + 1;
}
let total_realized = if is_filtered {
idx_blk_elmts
+ chunked
.filt_data_blocks
.iter()
.map(|(_, db)| db.elements.len() as u64)
.sum::<u64>()
} else {
idx_blk_elmts
+ chunked
.data_blocks
.iter()
.map(|(_, db)| db.elements.len() as u64)
.sum::<u64>()
};
chunked.ea_header.num_elmts_realized = total_realized;
}
Ok(())
}
pub fn write_slice(
&mut self,
index: usize,
starts: &[u64],
counts: &[u64],
data: &[u8],
) -> IoResult<()> {
let ds = &self.datasets[index];
if ds.chunked.is_some() || ds.fixed_array.is_some() || ds.btree_v2.is_some() {
return Err(crate::io::IoError::InvalidState(
"write_slice is only for contiguous datasets".into(),
));
}
if ds.data_addr == UNDEF_ADDR {
return Err(crate::io::IoError::InvalidState(
"dataset has no data allocated".into(),
));
}
let dims = &ds.dataspace.dims;
let element_size = ds.datatype.element_size() as u64;
let ndims = dims.len();
if starts.len() != ndims || counts.len() != ndims {
return Err(crate::io::IoError::InvalidState(
"starts/counts length must match dataset rank".into(),
));
}
let out_elems: u64 = counts.iter().product();
if data.len() as u64 != out_elems * element_size {
return Err(crate::io::IoError::InvalidState(format!(
"data size mismatch: expected {} bytes, got {}",
out_elems * element_size,
data.len()
)));
}
let mut strides = vec![0u64; ndims];
strides[ndims - 1] = element_size;
for d in (0..ndims - 1).rev() {
strides[d] = strides[d + 1] * dims[d + 1];
}
let base_addr = ds.data_addr;
let row_bytes = (counts[ndims - 1] * element_size) as usize;
let n_rows: u64 = if ndims > 1 {
counts[..ndims - 1].iter().product()
} else {
1
};
if ndims == 1 {
let offset = base_addr + starts[0] * element_size;
self.handle.write_at(offset, data)?;
return Ok(());
}
let mut coords = vec![0u64; ndims - 1];
for row in 0..n_rows {
let mut file_offset = base_addr + starts[ndims - 1] * element_size;
for d in 0..ndims - 1 {
file_offset += (starts[d] + coords[d]) * strides[d];
}
let src_offset = row as usize * row_bytes;
self.handle
.write_at(file_offset, &data[src_offset..src_offset + row_bytes])?;
for d in (0..ndims - 1).rev() {
coords[d] += 1;
if coords[d] < counts[d] {
break;
}
coords[d] = 0;
}
}
Ok(())
}
pub fn add_root_attribute(
&mut self,
attr: crate::format::messages::attribute::AttributeMessage,
) {
self.root_attributes.push(attr);
}
pub fn create_vlen_string_dataset(&mut self, name: &str, strings: &[&str]) -> IoResult<usize> {
use crate::format::global_heap::{encode_vlen_reference, GlobalHeapCollection};
use crate::format::messages::datatype::DatatypeMessage;
let num_strings = strings.len() as u64;
let mut gcol = GlobalHeapCollection::new();
let mut obj_indices = Vec::with_capacity(strings.len());
for s in strings {
let idx = gcol.add_object(s.as_bytes().to_vec());
obj_indices.push(idx);
}
let gcol_encoded = gcol.encode(&self.ctx);
let gcol_addr = self.allocator.allocate(gcol_encoded.len() as u64);
self.handle.write_at(gcol_addr, &gcol_encoded)?;
let ref_size = crate::format::global_heap::vlen_reference_size(&self.ctx);
let data_size = (num_strings as usize) * ref_size;
let mut raw_data = Vec::with_capacity(data_size);
for &obj_idx in &obj_indices {
raw_data.extend_from_slice(&encode_vlen_reference(
gcol_addr,
obj_idx as u32,
&self.ctx,
));
}
let data_addr = self.allocator.allocate(data_size as u64);
self.handle.write_at(data_addr, &raw_data)?;
let datatype = DatatypeMessage::vlen_string_utf8();
let dataspace =
crate::format::messages::dataspace::DataspaceMessage::simple(&[num_strings]);
let idx = self.datasets.len();
self.datasets.push(DatasetInfo {
name: name.to_string(),
datatype,
dataspace,
obj_header_addr: 0,
data_addr,
data_size: data_size as u64,
chunked: None,
fixed_array: None,
btree_v2: None,
attributes: Vec::new(),
obj_header_written_addr: None,
obj_header_encoded_size: 0,
filter_pipeline: None,
});
Ok(idx)
}
pub fn add_dataset_attribute(
&mut self,
ds_index: usize,
attr: AttributeMessage,
) -> IoResult<()> {
if ds_index >= self.datasets.len() {
return Err(crate::io::IoError::InvalidState(format!(
"dataset index {} out of range (have {})",
ds_index,
self.datasets.len()
)));
}
self.datasets[ds_index].attributes.push(attr);
Ok(())
}
pub fn create_fixed_array_dataset(
&mut self,
name: &str,
datatype: DatatypeMessage,
dims: &[u64],
chunk_dims: &[u64],
) -> IoResult<usize> {
let ndims = dims.len();
let mut num_chunks: u64 = 1;
for d in 0..ndims {
num_chunks *= dims[d].div_ceil(chunk_dims[d]);
}
let mut fa_header = FixedArrayHeader::new_for_chunks(&self.ctx, num_chunks);
let hdr_encoded = fa_header.encode(&self.ctx);
let fa_header_addr = self.allocator.allocate(hdr_encoded.len() as u64);
let fa_dblk = FixedArrayDataBlock::new_unfiltered(fa_header_addr, num_chunks as usize);
let dblk_encoded = fa_dblk.encode_unfiltered(&self.ctx);
let fa_dblk_addr = self.allocator.allocate(dblk_encoded.len() as u64);
fa_header.data_blk_addr = fa_dblk_addr;
let hdr_encoded = fa_header.encode(&self.ctx);
self.handle.write_at(fa_header_addr, &hdr_encoded)?;
self.handle.write_at(fa_dblk_addr, &dblk_encoded)?;
let dataspace = DataspaceMessage::simple(dims);
let idx = self.datasets.len();
self.datasets.push(DatasetInfo {
name: name.to_string(),
datatype,
dataspace,
obj_header_addr: 0,
data_addr: UNDEF_ADDR,
data_size: 0,
chunked: None,
btree_v2: None,
attributes: Vec::new(),
obj_header_written_addr: None,
obj_header_encoded_size: 0,
filter_pipeline: None,
fixed_array: Some(FixedArrayDatasetInfo {
chunk_dims: chunk_dims.to_vec(),
fa_header_addr,
fa_dblk_addr,
fa_header,
fa_dblk,
chunks_written: 0,
}),
});
Ok(idx)
}
pub fn create_btree_v2_dataset(
&mut self,
name: &str,
datatype: DatatypeMessage,
dims: &[u64],
max_dims: &[u64],
chunk_dims: &[u64],
) -> IoResult<usize> {
let ndims = dims.len();
let bt2_index = Bt2ChunkIndex::new_unfiltered(ndims);
let hdr = crate::format::chunk_index::btree_v2::Bt2Header::new_for_chunks(&self.ctx, ndims);
let hdr_encoded = hdr.encode(&self.ctx);
let bt2_header_addr = self.allocator.allocate(hdr_encoded.len() as u64);
self.handle.write_at(bt2_header_addr, &hdr_encoded)?;
let leaf = crate::format::chunk_index::btree_v2::Bt2LeafNode::new(
crate::format::chunk_index::btree_v2::BT2_TYPE_CHUNK_UNFILT,
bt2_index.record_size(&self.ctx),
);
let leaf_encoded = leaf.encode();
let bt2_leaf_addr = self.allocator.allocate(leaf_encoded.len() as u64);
self.handle.write_at(bt2_leaf_addr, &leaf_encoded)?;
let dataspace = DataspaceMessage {
dims: dims.to_vec(),
max_dims: Some(max_dims.to_vec()),
};
let idx = self.datasets.len();
self.datasets.push(DatasetInfo {
name: name.to_string(),
datatype,
dataspace,
obj_header_addr: 0,
data_addr: UNDEF_ADDR,
data_size: 0,
chunked: None,
fixed_array: None,
attributes: Vec::new(),
obj_header_written_addr: None,
obj_header_encoded_size: 0,
filter_pipeline: None,
btree_v2: Some(Bt2DatasetInfo {
chunk_dims: chunk_dims.to_vec(),
max_dims: max_dims.to_vec(),
bt2_header_addr,
bt2_leaf_addr,
index: bt2_index,
chunks_written: 0,
}),
});
Ok(idx)
}
pub fn create_chunked_dataset_compressed(
&mut self,
name: &str,
datatype: DatatypeMessage,
dims: &[u64],
max_dims: &[u64],
chunk_dims: &[u64],
compression_level: u32,
) -> IoResult<usize> {
let element_size = datatype.element_size() as u64;
let chunk_bytes: u64 = chunk_dims.iter().product::<u64>() * element_size;
let chunk_size_len = compute_chunk_size_len(chunk_bytes);
let earray_params = EarrayParams::default_params();
let ndblk_addrs = compute_ndblk_addrs(earray_params.sup_blk_min_data_ptrs);
let nsblk_addrs = compute_nsblk_addrs(
earray_params.idx_blk_elmts,
earray_params.data_blk_min_elmts,
earray_params.sup_blk_min_data_ptrs,
earray_params.max_nelmts_bits,
);
let mut ea_header =
ExtensibleArrayHeader::new_for_filtered_chunks(&self.ctx, chunk_size_len);
ea_header.max_nelmts_bits = earray_params.max_nelmts_bits;
ea_header.idx_blk_elmts = earray_params.idx_blk_elmts;
ea_header.data_blk_min_elmts = earray_params.data_blk_min_elmts;
ea_header.sup_blk_min_data_ptrs = earray_params.sup_blk_min_data_ptrs;
ea_header.max_dblk_page_nelmts_bits = earray_params.max_dblk_page_nelmts_bits;
let hdr_encoded = ea_header.encode(&self.ctx);
let ea_header_addr = self.allocator.allocate(hdr_encoded.len() as u64);
let filt_iblk = FilteredIndexBlock::new(
ea_header_addr,
earray_params.idx_blk_elmts,
ndblk_addrs,
nsblk_addrs,
);
let iblk_encoded = filt_iblk.encode(&self.ctx, chunk_size_len);
let ea_iblk_addr = self.allocator.allocate(iblk_encoded.len() as u64);
ea_header.idx_blk_addr = ea_iblk_addr;
let hdr_encoded = ea_header.encode(&self.ctx);
self.handle.write_at(ea_header_addr, &hdr_encoded)?;
self.handle.write_at(ea_iblk_addr, &iblk_encoded)?;
let dataspace = DataspaceMessage {
dims: dims.to_vec(),
max_dims: Some(max_dims.to_vec()),
};
let ea_iblk = ExtensibleArrayIndexBlock::new(
ea_header_addr,
earray_params.idx_blk_elmts,
ndblk_addrs,
nsblk_addrs,
);
let idx = self.datasets.len();
self.datasets.push(DatasetInfo {
name: name.to_string(),
datatype,
dataspace,
obj_header_addr: 0,
data_addr: UNDEF_ADDR,
data_size: 0,
attributes: Vec::new(),
obj_header_written_addr: None,
obj_header_encoded_size: 0,
filter_pipeline: Some(FilterPipeline::deflate(compression_level)),
fixed_array: None,
btree_v2: None,
chunked: Some(ChunkedDatasetInfo {
chunk_dims: chunk_dims.to_vec(),
max_dims: max_dims.to_vec(),
earray_params,
ea_header_addr,
ea_iblk_addr,
ndblk_addrs,
ea_header,
ea_iblk,
data_blocks: Vec::new(),
chunks_written: 0,
filt_iblk: Some(filt_iblk),
filt_data_blocks: Vec::new(),
chunk_size_len,
}),
});
Ok(idx)
}
pub fn create_chunked_dataset_with_pipeline(
&mut self,
name: &str,
datatype: DatatypeMessage,
dims: &[u64],
max_dims: &[u64],
chunk_dims: &[u64],
pipeline: FilterPipeline,
) -> IoResult<usize> {
let element_size = datatype.element_size() as u64;
let chunk_bytes: u64 = chunk_dims.iter().product::<u64>() * element_size;
let chunk_size_len = compute_chunk_size_len(chunk_bytes);
let earray_params = EarrayParams::default_params();
let ndblk_addrs = compute_ndblk_addrs(earray_params.sup_blk_min_data_ptrs);
let nsblk_addrs = compute_nsblk_addrs(
earray_params.idx_blk_elmts,
earray_params.data_blk_min_elmts,
earray_params.sup_blk_min_data_ptrs,
earray_params.max_nelmts_bits,
);
let mut ea_header =
ExtensibleArrayHeader::new_for_filtered_chunks(&self.ctx, chunk_size_len);
ea_header.max_nelmts_bits = earray_params.max_nelmts_bits;
ea_header.idx_blk_elmts = earray_params.idx_blk_elmts;
ea_header.data_blk_min_elmts = earray_params.data_blk_min_elmts;
ea_header.sup_blk_min_data_ptrs = earray_params.sup_blk_min_data_ptrs;
ea_header.max_dblk_page_nelmts_bits = earray_params.max_dblk_page_nelmts_bits;
let hdr_encoded = ea_header.encode(&self.ctx);
let ea_header_addr = self.allocator.allocate(hdr_encoded.len() as u64);
let filt_iblk = FilteredIndexBlock::new(
ea_header_addr,
earray_params.idx_blk_elmts,
ndblk_addrs,
nsblk_addrs,
);
let iblk_encoded = filt_iblk.encode(&self.ctx, chunk_size_len);
let ea_iblk_addr = self.allocator.allocate(iblk_encoded.len() as u64);
ea_header.idx_blk_addr = ea_iblk_addr;
let hdr_encoded = ea_header.encode(&self.ctx);
self.handle.write_at(ea_header_addr, &hdr_encoded)?;
self.handle.write_at(ea_iblk_addr, &iblk_encoded)?;
let dataspace = DataspaceMessage {
dims: dims.to_vec(),
max_dims: Some(max_dims.to_vec()),
};
let ea_iblk = ExtensibleArrayIndexBlock::new(
ea_header_addr,
earray_params.idx_blk_elmts,
ndblk_addrs,
nsblk_addrs,
);
let idx = self.datasets.len();
self.datasets.push(DatasetInfo {
name: name.to_string(),
datatype,
dataspace,
obj_header_addr: 0,
data_addr: UNDEF_ADDR,
data_size: 0,
attributes: Vec::new(),
obj_header_written_addr: None,
obj_header_encoded_size: 0,
filter_pipeline: Some(pipeline),
fixed_array: None,
btree_v2: None,
chunked: Some(ChunkedDatasetInfo {
chunk_dims: chunk_dims.to_vec(),
max_dims: max_dims.to_vec(),
earray_params,
ea_header_addr,
ea_iblk_addr,
ndblk_addrs,
ea_header,
ea_iblk,
data_blocks: Vec::new(),
chunks_written: 0,
filt_iblk: Some(filt_iblk),
filt_data_blocks: Vec::new(),
chunk_size_len,
}),
});
Ok(idx)
}
pub fn write_chunk_fixed_array(
&mut self,
index: usize,
chunk_coords: &[u64],
data: &[u8],
) -> IoResult<()> {
let ds = &self.datasets[index];
let element_size = ds.datatype.element_size() as u64;
let fa = ds
.fixed_array
.as_ref()
.ok_or_else(|| crate::io::IoError::InvalidState("not a fixed-array dataset".into()))?;
let chunk_bytes: u64 = fa.chunk_dims.iter().product::<u64>() * element_size;
let write_data;
let data_to_write = if let Some(ref pipeline) = ds.filter_pipeline {
write_data = filter::apply_filters(pipeline, data)?;
&write_data
} else {
if data.len() as u64 != chunk_bytes {
return Err(crate::io::IoError::InvalidState(format!(
"chunk data size mismatch: expected {} bytes, got {}",
chunk_bytes,
data.len()
)));
}
data
};
let dims = &ds.dataspace.dims;
let chunk_dims = &fa.chunk_dims;
let ndims = dims.len();
let mut linear_idx: u64 = 0;
let mut stride: u64 = 1;
for d in (0..ndims).rev() {
let n_chunks_in_dim = dims[d].div_ceil(chunk_dims[d]);
linear_idx += chunk_coords[d] * stride;
stride *= n_chunks_in_dim;
}
let chunk_addr = self.allocator.allocate(data_to_write.len() as u64);
self.handle.write_at(chunk_addr, data_to_write)?;
let fa = self.datasets[index].fixed_array.as_mut().unwrap();
if (linear_idx as usize) < fa.fa_dblk.elements.len() {
fa.fa_dblk.elements[linear_idx as usize] = chunk_addr;
fa.chunks_written += 1;
} else {
return Err(crate::io::IoError::InvalidState(format!(
"chunk index {} out of range (max {})",
linear_idx,
fa.fa_dblk.elements.len()
)));
}
Ok(())
}
pub fn write_chunk_btree_v2(
&mut self,
index: usize,
chunk_coords: &[u64],
data: &[u8],
) -> IoResult<()> {
let ds = &self.datasets[index];
let element_size = ds.datatype.element_size() as u64;
let bt2 = ds
.btree_v2
.as_ref()
.ok_or_else(|| crate::io::IoError::InvalidState("not a B-tree v2 dataset".into()))?;
let chunk_bytes: u64 = bt2.chunk_dims.iter().product::<u64>() * element_size;
if data.len() as u64 != chunk_bytes {
return Err(crate::io::IoError::InvalidState(format!(
"chunk data size mismatch: expected {} bytes, got {}",
chunk_bytes,
data.len()
)));
}
let chunk_addr = self.allocator.allocate(chunk_bytes);
self.handle.write_at(chunk_addr, data)?;
let bt2 = self.datasets[index].btree_v2.as_mut().unwrap();
bt2.index.insert(chunk_coords.to_vec(), chunk_addr);
bt2.chunks_written += 1;
Ok(())
}
pub fn write_chunks_batch(&mut self, ds_index: usize, chunks: &[(u64, &[u8])]) -> IoResult<()> {
#[cfg(feature = "parallel")]
{
if let Some(ref pipeline) = self.datasets[ds_index].filter_pipeline {
let chunk_data: Vec<Vec<u8>> = chunks.iter().map(|(_, d)| d.to_vec()).collect();
let compressed = filter::apply_filters_parallel(pipeline, &chunk_data);
for ((idx, _), compressed_data) in chunks.iter().zip(compressed.iter()) {
self.write_compressed_chunk(ds_index, *idx, compressed_data)?;
}
return Ok(());
}
}
for (idx, data) in chunks {
self.write_chunk(ds_index, *idx, data)?;
}
Ok(())
}
pub fn write_compressed_chunk(
&mut self,
index: usize,
chunk_idx: u64,
compressed_data: &[u8],
) -> IoResult<()> {
let compressed_size = compressed_data.len() as u64;
let chunk_addr = self.allocator.allocate(compressed_size);
self.handle.write_at(chunk_addr, compressed_data)?;
let is_filtered = self.datasets[index].filter_pipeline.is_some();
let idx_blk_elmts = {
let c = self.datasets[index]
.chunked
.as_ref()
.ok_or_else(|| crate::io::IoError::InvalidState("not a chunked dataset".into()))?;
c.earray_params.idx_blk_elmts as u64
};
if chunk_idx < idx_blk_elmts {
let chunked = self.datasets[index].chunked.as_mut().unwrap();
if is_filtered {
if let Some(ref mut fiblk) = chunked.filt_iblk {
fiblk.elements[chunk_idx as usize] = FilteredChunkEntry {
addr: chunk_addr,
nbytes: compressed_size,
filter_mask: 0,
};
}
} else {
chunked.ea_iblk.elements[chunk_idx as usize] = chunk_addr;
}
chunked.chunks_written += 1;
if chunk_idx + 1 > chunked.ea_header.max_idx_set {
chunked.ea_header.max_idx_set = chunk_idx + 1;
}
if chunked.ea_header.num_elmts_realized < idx_blk_elmts {
chunked.ea_header.num_elmts_realized = idx_blk_elmts;
}
} else {
let offset_in_dblks = chunk_idx - idx_blk_elmts;
let chunked = self.datasets[index].chunked.as_mut().unwrap();
let min_elmts = chunked.earray_params.data_blk_min_elmts as u64;
let mut cumulative = 0u64;
let mut dblk_idx = 0usize;
let mut current_size = min_elmts;
let mut pair_count = 0;
loop {
if offset_in_dblks < cumulative + current_size {
break;
}
cumulative += current_size;
dblk_idx += 1;
pair_count += 1;
if pair_count >= 2 {
pair_count = 0;
current_size *= 2;
}
if dblk_idx >= chunked.ndblk_addrs {
return Err(crate::io::IoError::InvalidState(
"chunk index exceeds extensible array capacity".into(),
));
}
}
let offset_in_block = (offset_in_dblks - cumulative) as usize;
let block_nelmts = current_size as usize;
if is_filtered {
let filt_iblk = chunked.filt_iblk.as_mut().unwrap();
if filt_iblk.dblk_addrs[dblk_idx] == UNDEF_ADDR {
let mut dblk =
FilteredDataBlock::new(chunked.ea_header_addr, cumulative, block_nelmts);
dblk.elements[offset_in_block] = FilteredChunkEntry {
addr: chunk_addr,
nbytes: compressed_size,
filter_mask: 0,
};
let encoded = dblk.encode(
&self.ctx,
chunked.earray_params.max_nelmts_bits,
chunked.chunk_size_len,
);
let dblk_addr = self.allocator.allocate(encoded.len() as u64);
self.handle.write_at(dblk_addr, &encoded)?;
filt_iblk.dblk_addrs[dblk_idx] = dblk_addr;
chunked.filt_data_blocks.push((dblk_addr, dblk));
chunked.ea_header.num_dblks_created += 1;
chunked.ea_header.size_dblks_created += encoded.len() as u64;
} else {
let dblk_addr = filt_iblk.dblk_addrs[dblk_idx];
if let Some((_, ref mut dblk)) = chunked
.filt_data_blocks
.iter_mut()
.find(|(a, _)| *a == dblk_addr)
{
dblk.elements[offset_in_block] = FilteredChunkEntry {
addr: chunk_addr,
nbytes: compressed_size,
filter_mask: 0,
};
let encoded = dblk.encode(
&self.ctx,
chunked.earray_params.max_nelmts_bits,
chunked.chunk_size_len,
);
self.handle.write_at(dblk_addr, &encoded)?;
}
}
} else {
if chunked.ea_iblk.dblk_addrs[dblk_idx] == UNDEF_ADDR {
let mut dblk = ExtensibleArrayDataBlock::new(
chunked.ea_header_addr,
cumulative,
block_nelmts,
);
dblk.elements[offset_in_block] = chunk_addr;
let encoded = dblk.encode(&self.ctx, chunked.earray_params.max_nelmts_bits);
let dblk_addr = self.allocator.allocate(encoded.len() as u64);
self.handle.write_at(dblk_addr, &encoded)?;
chunked.ea_iblk.dblk_addrs[dblk_idx] = dblk_addr;
chunked.data_blocks.push((dblk_addr, dblk));
chunked.ea_header.num_dblks_created += 1;
chunked.ea_header.size_dblks_created += encoded.len() as u64;
} else {
let dblk_addr = chunked.ea_iblk.dblk_addrs[dblk_idx];
if let Some((_, ref mut dblk)) = chunked
.data_blocks
.iter_mut()
.find(|(a, _)| *a == dblk_addr)
{
dblk.elements[offset_in_block] = chunk_addr;
let encoded = dblk.encode(&self.ctx, chunked.earray_params.max_nelmts_bits);
self.handle.write_at(dblk_addr, &encoded)?;
}
}
}
chunked.chunks_written += 1;
if chunk_idx + 1 > chunked.ea_header.max_idx_set {
chunked.ea_header.max_idx_set = chunk_idx + 1;
}
let total_realized = if is_filtered {
idx_blk_elmts
+ chunked
.filt_data_blocks
.iter()
.map(|(_, db)| db.elements.len() as u64)
.sum::<u64>()
} else {
idx_blk_elmts
+ chunked
.data_blocks
.iter()
.map(|(_, db)| db.elements.len() as u64)
.sum::<u64>()
};
chunked.ea_header.num_elmts_realized = total_realized;
}
Ok(())
}
pub fn extend_dataset(&mut self, index: usize, new_dims: &[u64]) -> IoResult<()> {
let ds = &mut self.datasets[index];
if ds.chunked.is_none() && ds.fixed_array.is_none() && ds.btree_v2.is_none() {
return Err(crate::io::IoError::InvalidState(
"can only extend chunked datasets".into(),
));
}
ds.dataspace.dims = new_dims.to_vec();
Ok(())
}
pub fn flush_dataset(&mut self, index: usize) -> IoResult<()> {
let ds = &self.datasets[index];
if let Some(ref chunked) = ds.chunked {
if let Some(ref fiblk) = chunked.filt_iblk {
let iblk_encoded = fiblk.encode(&self.ctx, chunked.chunk_size_len);
self.handle.write_at(chunked.ea_iblk_addr, &iblk_encoded)?;
} else {
let iblk_encoded = chunked.ea_iblk.encode(&self.ctx);
self.handle.write_at(chunked.ea_iblk_addr, &iblk_encoded)?;
}
let hdr_encoded = chunked.ea_header.encode(&self.ctx);
self.handle.write_at(chunked.ea_header_addr, &hdr_encoded)?;
self.handle.sync_data()?;
return Ok(());
}
if let Some(ref fa) = ds.fixed_array {
let dblk_encoded = fa.fa_dblk.encode_unfiltered(&self.ctx);
self.handle.write_at(fa.fa_dblk_addr, &dblk_encoded)?;
let hdr_encoded = fa.fa_header.encode(&self.ctx);
self.handle.write_at(fa.fa_header_addr, &hdr_encoded)?;
self.handle.sync_data()?;
return Ok(());
}
if let Some(ref bt2) = ds.btree_v2 {
let (hdr_bytes, leaf_bytes) = bt2.index.encode(&self.ctx);
let leaf_addr = self.allocator.allocate(leaf_bytes.len() as u64);
self.handle.write_at(leaf_addr, &leaf_bytes)?;
let mut hdr =
crate::format::chunk_index::btree_v2::Bt2Header::decode(&hdr_bytes, &self.ctx)?;
hdr.root_node_addr = leaf_addr;
let hdr_encoded = hdr.encode(&self.ctx);
self.handle.write_at(bt2.bt2_header_addr, &hdr_encoded)?;
let bt2_mut = self.datasets[index].btree_v2.as_mut().unwrap();
bt2_mut.bt2_leaf_addr = leaf_addr;
self.handle.sync_data()?;
return Ok(());
}
Ok(())
}
pub fn close(mut self) -> IoResult<()> {
self.finalize()?;
self.closed = true;
Ok(())
}
pub fn handle(&mut self) -> &mut FileHandle {
&mut self.handle
}
pub fn eof(&self) -> u64 {
self.allocator.eof()
}
pub fn write_superblock(&mut self, flags: u8) -> IoResult<()> {
let root_addr = self
.root_group_addr
.ok_or_else(|| crate::io::IoError::InvalidState("root group not yet written".into()))?;
let sb = SuperblockV2V3 {
version: SUPERBLOCK_V3,
sizeof_offsets: self.ctx.sizeof_addr,
sizeof_lengths: self.ctx.sizeof_size,
file_consistency_flags: flags,
base_address: 0,
superblock_extension_address: UNDEF_ADDR,
end_of_file_address: self.allocator.eof(),
root_group_object_header_address: root_addr,
};
let sb_encoded = sb.encode();
self.handle.write_at(0, &sb_encoded)?;
Ok(())
}
pub fn write_dataset_header_inplace(&mut self, index: usize) -> IoResult<()> {
let addr = self.datasets[index]
.obj_header_written_addr
.ok_or_else(|| {
crate::io::IoError::InvalidState("dataset header not yet written".into())
})?;
let original_size = self.datasets[index].obj_header_encoded_size;
let header = self.build_dataset_header(index);
let encoded = header.encode();
if encoded.len() > original_size {
return Err(crate::io::IoError::InvalidState(format!(
"dataset header grew from {} to {} bytes; cannot rewrite in place",
original_size,
encoded.len()
)));
}
let mut padded = encoded;
padded.resize(original_size, 0);
self.handle.write_at(addr, &padded)?;
Ok(())
}
pub fn finalize_for_swmr(&mut self) -> IoResult<()> {
for i in 0..self.datasets.len() {
if self.datasets[i].chunked.is_some()
|| self.datasets[i].fixed_array.is_some()
|| self.datasets[i].btree_v2.is_some()
{
self.flush_dataset(i)?;
}
}
for i in 0..self.datasets.len() {
let ds_header = self.build_dataset_header(i);
let encoded = ds_header.encode();
let encoded_size = encoded.len();
let addr = self.allocator.allocate(encoded_size as u64);
self.handle.write_at(addr, &encoded)?;
self.datasets[i].obj_header_addr = addr;
self.datasets[i].obj_header_written_addr = Some(addr);
self.datasets[i].obj_header_encoded_size = encoded_size;
}
{
let order = Self::topological_group_order(&self.groups);
for &gi in &order {
let grp_header = self.build_group_header(gi);
let encoded = grp_header.encode();
let addr = self.allocator.allocate(encoded.len() as u64);
self.handle.write_at(addr, &encoded)?;
self.groups[gi].obj_header_addr = addr;
}
}
let root_header = self.build_root_group_header();
let root_encoded = root_header.encode();
let root_encoded_size = root_encoded.len();
let root_addr = self.allocator.allocate(root_encoded_size as u64);
self.handle.write_at(root_addr, &root_encoded)?;
self.root_group_addr = Some(root_addr);
self.root_group_encoded_size = root_encoded_size;
self.write_superblock(FLAG_WRITE_ACCESS | FLAG_SWMR_WRITE)?;
self.handle.sync_all()?;
Ok(())
}
fn finalize(&mut self) -> IoResult<()> {
if self.root_group_addr.is_some() {
for i in 0..self.datasets.len() {
if self.datasets[i].chunked.is_some()
|| self.datasets[i].fixed_array.is_some()
|| self.datasets[i].btree_v2.is_some()
{
self.flush_dataset(i)?;
}
}
for i in 0..self.datasets.len() {
if self.datasets[i].obj_header_written_addr.is_some() {
self.write_dataset_header_inplace(i)?;
}
}
self.write_superblock(0)?;
self.handle.sync_all()?;
return Ok(());
}
for i in 0..self.datasets.len() {
if self.datasets[i].chunked.is_some()
|| self.datasets[i].fixed_array.is_some()
|| self.datasets[i].btree_v2.is_some()
{
self.flush_dataset(i)?;
}
}
for i in 0..self.datasets.len() {
if self.datasets[i].obj_header_written_addr.is_some() {
let modified = self.datasets[i]
.chunked
.as_ref()
.is_some_and(|c| c.chunks_written > 0);
if !modified {
continue;
}
}
let ds_header = self.build_dataset_header(i);
let encoded = ds_header.encode();
let addr = self.allocator.allocate(encoded.len() as u64);
self.handle.write_at(addr, &encoded)?;
self.datasets[i].obj_header_addr = addr;
}
{
let order = Self::topological_group_order(&self.groups);
for &gi in &order {
let grp_header = self.build_group_header(gi);
let encoded = grp_header.encode();
let addr = self.allocator.allocate(encoded.len() as u64);
self.handle.write_at(addr, &encoded)?;
self.groups[gi].obj_header_addr = addr;
}
}
let root_header = self.build_root_group_header();
let root_encoded = root_header.encode();
let root_addr = self.allocator.allocate(root_encoded.len() as u64);
self.handle.write_at(root_addr, &root_encoded)?;
self.root_group_addr = Some(root_addr);
self.write_superblock(0)?;
self.handle.sync_all()?;
Ok(())
}
fn topological_group_order(groups: &[GroupInfo]) -> Vec<usize> {
let mut depths: Vec<usize> = vec![0; groups.len()];
for (i, grp) in groups.iter().enumerate() {
let mut d = 0;
let mut cur = grp.parent;
while let Some(pidx) = cur {
d += 1;
cur = groups[pidx].parent;
}
depths[i] = d;
}
let mut order: Vec<usize> = (0..groups.len()).collect();
order.sort_by(|a, b| depths[*b].cmp(&depths[*a]));
order
}
fn build_dataset_header(&self, index: usize) -> ObjectHeader {
let ds = &self.datasets[index];
let mut header = ObjectHeader::new();
let ds_msg = ds.dataspace.encode(&self.ctx);
header.add_message(MSG_DATASPACE, 0x00, ds_msg);
let dt_msg = ds.datatype.encode(&self.ctx);
header.add_message(MSG_DATATYPE, 0x01, dt_msg);
let is_chunked = ds.chunked.is_some() || ds.fixed_array.is_some() || ds.btree_v2.is_some();
let fv = if is_chunked {
FillValueMessage {
alloc_time: 3, fill_write_time: 0, fill_defined: 1, fill_value: None,
}
} else {
FillValueMessage::default()
};
let fv_msg = fv.encode();
header.add_message(MSG_FILL_VALUE, 0x00, fv_msg);
let layout = if let Some(ref chunked) = ds.chunked {
let mut layout_dims = chunked.chunk_dims.clone();
layout_dims.push(ds.datatype.element_size() as u64);
DataLayoutMessage::chunked_v4_earray(
layout_dims,
chunked.earray_params.clone(),
chunked.ea_header_addr,
)
} else if let Some(ref fa) = ds.fixed_array {
let mut layout_dims = fa.chunk_dims.clone();
layout_dims.push(ds.datatype.element_size() as u64);
DataLayoutMessage::chunked_v4_farray(
layout_dims,
FixedArrayParams::default_params(),
fa.fa_header_addr,
)
} else if let Some(ref bt2) = ds.btree_v2 {
let mut layout_dims = bt2.chunk_dims.clone();
layout_dims.push(ds.datatype.element_size() as u64);
DataLayoutMessage::chunked_v4_btree_v2(layout_dims, bt2.bt2_header_addr)
} else {
DataLayoutMessage::contiguous(ds.data_addr, ds.data_size)
};
let layout_msg = layout.encode(&self.ctx);
header.add_message(MSG_DATA_LAYOUT, 0x00, layout_msg);
if let Some(ref pipeline) = ds.filter_pipeline {
if !pipeline.filters.is_empty() {
let filter_msg = pipeline.encode();
header.add_message(MSG_FILTER_PIPELINE, 0x00, filter_msg);
}
}
for attr in &ds.attributes {
let attr_msg = attr.encode(&self.ctx);
header.add_message(MSG_ATTRIBUTE, 0x00, attr_msg);
}
header
}
fn build_group_header(&self, group_idx: usize) -> ObjectHeader {
let mut header = ObjectHeader::new();
let link_info = LinkInfoMessage::compact();
let li_msg = link_info.encode(&self.ctx);
header.add_message(MSG_LINK_INFO, 0x00, li_msg);
let group_info = GroupInfoMessage::default();
let gi_msg = group_info.encode();
header.add_message(MSG_GROUP_INFO, 0x00, gi_msg);
let grp = &self.groups[group_idx];
for &ds_idx in &grp.child_datasets {
let ds = &self.datasets[ds_idx];
let leaf_name = ds.name.rsplit('/').next().unwrap_or(&ds.name);
let link = LinkMessage::hard(leaf_name, ds.obj_header_addr);
let link_msg = link.encode(&self.ctx);
header.add_message(MSG_LINK, 0x00, link_msg);
}
for &child_idx in &grp.child_groups {
let child_grp = &self.groups[child_idx];
let leaf_name = child_grp.name.rsplit('/').next().unwrap_or(&child_grp.name);
let link = LinkMessage::hard(leaf_name, child_grp.obj_header_addr);
let link_msg = link.encode(&self.ctx);
header.add_message(MSG_LINK, 0x00, link_msg);
}
header
}
fn build_root_group_header(&self) -> ObjectHeader {
let mut header = ObjectHeader::new();
let link_info = LinkInfoMessage::compact();
let li_msg = link_info.encode(&self.ctx);
header.add_message(MSG_LINK_INFO, 0x00, li_msg);
let group_info = GroupInfoMessage::default();
let gi_msg = group_info.encode();
header.add_message(MSG_GROUP_INFO, 0x00, gi_msg);
let datasets_in_subgroups: std::collections::HashSet<usize> = self
.groups
.iter()
.flat_map(|g| g.child_datasets.iter().copied())
.collect();
for (i, ds) in self.datasets.iter().enumerate() {
if !datasets_in_subgroups.contains(&i) {
let link = LinkMessage::hard(&ds.name, ds.obj_header_addr);
let link_msg = link.encode(&self.ctx);
header.add_message(MSG_LINK, 0x00, link_msg);
}
}
for grp in &self.groups {
if grp.parent.is_none() {
let leaf_name = grp.name.rsplit('/').next().unwrap_or(&grp.name);
let link = LinkMessage::hard(leaf_name, grp.obj_header_addr);
let link_msg = link.encode(&self.ctx);
header.add_message(MSG_LINK, 0x00, link_msg);
}
}
for attr in &self.root_attributes {
let attr_msg = attr.encode(&self.ctx);
header.add_message(MSG_ATTRIBUTE, 0x00, attr_msg);
}
header
}
}
impl Drop for Hdf5Writer {
fn drop(&mut self) {
if !self.closed {
let _ = self.finalize();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::format::messages::datatype::DatatypeMessage;
use crate::io::reader::Hdf5Reader;
#[test]
fn create_empty_file() {
let dir = std::env::temp_dir();
let path = dir.join("test_empty.h5");
let writer = Hdf5Writer::create(&path).unwrap();
writer.close().unwrap();
let reader = Hdf5Reader::open(&path).unwrap();
assert!(reader.dataset_names().is_empty());
std::fs::remove_file(&path).ok();
}
#[test]
fn create_single_dataset() {
let dir = std::env::temp_dir();
let path = dir.join("test_single.h5");
let mut writer = Hdf5Writer::create(&path).unwrap();
let idx = writer
.create_dataset("data", DatatypeMessage::f64_type(), &[4])
.unwrap();
let values: Vec<f64> = vec![1.0, 2.0, 3.0, 4.0];
let raw: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
writer.write_dataset_raw(idx, &raw).unwrap();
writer.close().unwrap();
let mut reader = Hdf5Reader::open(&path).unwrap();
assert_eq!(reader.dataset_names(), vec!["data"]);
assert_eq!(reader.dataset_shape("data").unwrap(), vec![4]);
let readback = reader.read_dataset_raw("data").unwrap();
assert_eq!(readback, raw);
std::fs::remove_file(&path).ok();
}
#[test]
fn create_multiple_datasets() {
let dir = std::env::temp_dir();
let path = dir.join("test_multi.h5");
let mut writer = Hdf5Writer::create(&path).unwrap();
let idx0 = writer
.create_dataset("ints", DatatypeMessage::i32_type(), &[3])
.unwrap();
let i_data: Vec<u8> = [10i32, 20, 30]
.iter()
.flat_map(|v| v.to_le_bytes())
.collect();
writer.write_dataset_raw(idx0, &i_data).unwrap();
let idx1 = writer
.create_dataset("floats", DatatypeMessage::f32_type(), &[2, 2])
.unwrap();
let f_data: Vec<u8> = [1.0f32, 2.0, 3.0, 4.0]
.iter()
.flat_map(|v| v.to_le_bytes())
.collect();
writer.write_dataset_raw(idx1, &f_data).unwrap();
writer.close().unwrap();
let mut reader = Hdf5Reader::open(&path).unwrap();
let names = reader.dataset_names();
assert!(names.contains(&"ints"));
assert!(names.contains(&"floats"));
assert_eq!(reader.dataset_shape("ints").unwrap(), vec![3]);
assert_eq!(reader.dataset_shape("floats").unwrap(), vec![2, 2]);
assert_eq!(reader.read_dataset_raw("ints").unwrap(), i_data);
assert_eq!(reader.read_dataset_raw("floats").unwrap(), f_data);
std::fs::remove_file(&path).ok();
}
#[test]
fn data_size_mismatch() {
let dir = std::env::temp_dir();
let path = dir.join("test_mismatch.h5");
let mut writer = Hdf5Writer::create(&path).unwrap();
let idx = writer
.create_dataset("x", DatatypeMessage::u8_type(), &[4])
.unwrap();
let err = writer.write_dataset_raw(idx, &[1, 2, 3]); assert!(err.is_err());
std::fs::remove_file(&path).ok();
}
#[test]
fn create_chunked_dataset_simple() {
let dir = std::env::temp_dir();
let path = dir.join("test_chunked_simple.h5");
let mut writer = Hdf5Writer::create(&path).unwrap();
let idx = writer
.create_chunked_dataset(
"data",
DatatypeMessage::f64_type(),
&[0, 4], &[u64::MAX, 4], &[1, 4], )
.unwrap();
for frame in 0..3u64 {
let values: Vec<f64> = (0..4).map(|i| (frame * 4 + i) as f64).collect();
let raw: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
writer.write_chunk(idx, frame, &raw).unwrap();
}
writer.extend_dataset(idx, &[3, 4]).unwrap();
writer.close().unwrap();
let mut reader = Hdf5Reader::open(&path).unwrap();
assert_eq!(reader.dataset_names(), vec!["data"]);
assert_eq!(reader.dataset_shape("data").unwrap(), vec![3, 4]);
let raw = reader.read_dataset_raw("data").unwrap();
let values: Vec<f64> = raw
.chunks(8)
.map(|chunk| f64::from_le_bytes(chunk.try_into().unwrap()))
.collect();
assert_eq!(values.len(), 12);
for (i, val) in values.iter().enumerate() {
assert_eq!(*val, i as f64);
}
std::fs::remove_file(&path).ok();
}
#[test]
fn chunked_dataset_many_frames() {
let dir = std::env::temp_dir();
let path = dir.join("test_chunked_many.h5");
let mut writer = Hdf5Writer::create(&path).unwrap();
let idx = writer
.create_chunked_dataset(
"frames",
DatatypeMessage::i32_type(),
&[0, 2],
&[u64::MAX, 2],
&[1, 2],
)
.unwrap();
let n_frames = 10u64;
for frame in 0..n_frames {
let values = [(frame * 2) as i32, (frame * 2 + 1) as i32];
let raw: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
writer.write_chunk(idx, frame, &raw).unwrap();
}
writer.extend_dataset(idx, &[n_frames, 2]).unwrap();
writer.close().unwrap();
let mut reader = Hdf5Reader::open(&path).unwrap();
assert_eq!(reader.dataset_shape("frames").unwrap(), vec![10, 2]);
let raw = reader.read_dataset_raw("frames").unwrap();
let values: Vec<i32> = raw
.chunks(4)
.map(|chunk| i32::from_le_bytes(chunk.try_into().unwrap()))
.collect();
assert_eq!(values.len(), 20);
for (i, val) in values.iter().enumerate() {
assert_eq!(*val, i as i32);
}
std::fs::remove_file(&path).ok();
}
#[test]
fn create_fixed_array_dataset_roundtrip() {
let dir = std::env::temp_dir();
let path = dir.join("test_fixed_array.h5");
let mut writer = Hdf5Writer::create(&path).unwrap();
let idx = writer
.create_fixed_array_dataset(
"grid",
DatatypeMessage::i32_type(),
&[4, 6], &[2, 3], )
.unwrap();
let c00: Vec<u8> = [0i32, 1, 2, 6, 7, 8]
.iter()
.flat_map(|v| v.to_le_bytes())
.collect();
writer.write_chunk_fixed_array(idx, &[0, 0], &c00).unwrap();
let c01: Vec<u8> = [3i32, 4, 5, 9, 10, 11]
.iter()
.flat_map(|v| v.to_le_bytes())
.collect();
writer.write_chunk_fixed_array(idx, &[0, 1], &c01).unwrap();
let c10: Vec<u8> = [12i32, 13, 14, 18, 19, 20]
.iter()
.flat_map(|v| v.to_le_bytes())
.collect();
writer.write_chunk_fixed_array(idx, &[1, 0], &c10).unwrap();
let c11: Vec<u8> = [15i32, 16, 17, 21, 22, 23]
.iter()
.flat_map(|v| v.to_le_bytes())
.collect();
writer.write_chunk_fixed_array(idx, &[1, 1], &c11).unwrap();
writer.close().unwrap();
let mut reader = Hdf5Reader::open(&path).unwrap();
assert_eq!(reader.dataset_names(), vec!["grid"]);
assert_eq!(reader.dataset_shape("grid").unwrap(), vec![4, 6]);
let raw = reader.read_dataset_raw("grid").unwrap();
let values: Vec<i32> = raw
.chunks(4)
.map(|chunk| i32::from_le_bytes(chunk.try_into().unwrap()))
.collect();
assert_eq!(values.len(), 24);
for (i, val) in values.iter().enumerate() {
assert_eq!(*val, i as i32);
}
std::fs::remove_file(&path).ok();
}
#[test]
fn create_btree_v2_dataset_roundtrip() {
let dir = std::env::temp_dir();
let path = dir.join("test_btree_v2.h5");
let mut writer = Hdf5Writer::create(&path).unwrap();
let idx = writer
.create_btree_v2_dataset(
"data",
DatatypeMessage::f64_type(),
&[0, 0], &[u64::MAX, u64::MAX], &[2, 3], )
.unwrap();
let c00: Vec<u8> = [0.0f64, 1.0, 2.0, 6.0, 7.0, 8.0]
.iter()
.flat_map(|v| v.to_le_bytes())
.collect();
writer.write_chunk_btree_v2(idx, &[0, 0], &c00).unwrap();
let c01: Vec<u8> = [3.0f64, 4.0, 5.0, 9.0, 10.0, 11.0]
.iter()
.flat_map(|v| v.to_le_bytes())
.collect();
writer.write_chunk_btree_v2(idx, &[0, 1], &c01).unwrap();
let c10: Vec<u8> = [12.0f64, 13.0, 14.0, 18.0, 19.0, 20.0]
.iter()
.flat_map(|v| v.to_le_bytes())
.collect();
writer.write_chunk_btree_v2(idx, &[1, 0], &c10).unwrap();
let c11: Vec<u8> = [15.0f64, 16.0, 17.0, 21.0, 22.0, 23.0]
.iter()
.flat_map(|v| v.to_le_bytes())
.collect();
writer.write_chunk_btree_v2(idx, &[1, 1], &c11).unwrap();
writer.extend_dataset(idx, &[4, 6]).unwrap();
writer.close().unwrap();
let mut reader = Hdf5Reader::open(&path).unwrap();
assert_eq!(reader.dataset_names(), vec!["data"]);
assert_eq!(reader.dataset_shape("data").unwrap(), vec![4, 6]);
let raw = reader.read_dataset_raw("data").unwrap();
let values: Vec<f64> = raw
.chunks(8)
.map(|chunk| f64::from_le_bytes(chunk.try_into().unwrap()))
.collect();
assert_eq!(values.len(), 24);
for (i, val) in values.iter().enumerate() {
assert_eq!(*val, i as f64);
}
std::fs::remove_file(&path).ok();
}
#[cfg(feature = "parallel")]
#[test]
fn parallel_batch_write_roundtrip() {
let dir = std::env::temp_dir();
let path = dir.join("test_parallel_batch.h5");
let mut writer = Hdf5Writer::create(&path).unwrap();
let idx = writer
.create_chunked_dataset(
"data",
DatatypeMessage::i32_type(),
&[0, 4],
&[u64::MAX, 4],
&[1, 4],
)
.unwrap();
let chunks_data: Vec<(u64, Vec<u8>)> = (0..8u64)
.map(|frame| {
let values: Vec<i32> = (0..4).map(|i| (frame * 4 + i) as i32).collect();
let raw: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
(frame, raw)
})
.collect();
let batch: Vec<(u64, &[u8])> = chunks_data
.iter()
.map(|(idx, data)| (*idx, data.as_slice()))
.collect();
writer.write_chunks_batch(idx, &batch).unwrap();
writer.extend_dataset(idx, &[8, 4]).unwrap();
writer.close().unwrap();
let mut reader = Hdf5Reader::open(&path).unwrap();
assert_eq!(reader.dataset_shape("data").unwrap(), vec![8, 4]);
let raw = reader.read_dataset_raw("data").unwrap();
let values: Vec<i32> = raw
.chunks(4)
.map(|chunk| i32::from_le_bytes(chunk.try_into().unwrap()))
.collect();
assert_eq!(values.len(), 32);
for (i, val) in values.iter().enumerate() {
assert_eq!(*val, i as i32);
}
std::fs::remove_file(&path).ok();
}
#[test]
fn swmr_writer_append_frames() {
use crate::io::swmr::SwmrWriter;
let dir = std::env::temp_dir();
let path = dir.join("test_swmr_append.h5");
let mut swmr = SwmrWriter::create(&path).unwrap();
let idx = swmr
.create_streaming_dataset("detector", DatatypeMessage::u16_type(), &[4, 4])
.unwrap();
swmr.start_swmr().unwrap();
for frame in 0..5u16 {
let data: Vec<u16> = (0..16).map(|i| frame * 16 + i).collect();
let raw: Vec<u8> = data.iter().flat_map(|v| v.to_le_bytes()).collect();
swmr.append_frame(idx, &raw).unwrap();
}
swmr.flush().unwrap();
swmr.close().unwrap();
let mut reader = Hdf5Reader::open(&path).unwrap();
assert_eq!(reader.dataset_shape("detector").unwrap(), vec![5, 4, 4]);
let raw = reader.read_dataset_raw("detector").unwrap();
let values: Vec<u16> = raw
.chunks(2)
.map(|chunk| u16::from_le_bytes(chunk.try_into().unwrap()))
.collect();
assert_eq!(values.len(), 80); for (i, val) in values.iter().enumerate().take(16) {
assert_eq!(*val, i as u16);
}
for (i, val) in values[64..80].iter().enumerate() {
assert_eq!(*val, 4 * 16 + i as u16);
}
std::fs::remove_file(&path).ok();
}
#[test]
fn group_hierarchy_writer_reader() {
let dir = std::env::temp_dir();
let path = dir.join("test_group_hierarchy.h5");
let mut writer = Hdf5Writer::create(&path).unwrap();
let g0 = writer.create_group("/", "group1").unwrap();
let g1 = writer.create_group("/group1", "sub").unwrap();
assert_eq!(g0, 0);
assert_eq!(g1, 1);
let ds_root = writer
.create_dataset("root_data", DatatypeMessage::f64_type(), &[2])
.unwrap();
let raw_root: Vec<u8> = [1.0f64, 2.0].iter().flat_map(|v| v.to_le_bytes()).collect();
writer.write_dataset_raw(ds_root, &raw_root).unwrap();
let ds_g0 = writer
.create_dataset("group1/data", DatatypeMessage::i32_type(), &[3])
.unwrap();
writer.assign_dataset_to_group("/group1", ds_g0).unwrap();
let raw_g0: Vec<u8> = [10i32, 20, 30]
.iter()
.flat_map(|v| v.to_le_bytes())
.collect();
writer.write_dataset_raw(ds_g0, &raw_g0).unwrap();
let ds_g1 = writer
.create_dataset("group1/sub/values", DatatypeMessage::u8_type(), &[4])
.unwrap();
writer
.assign_dataset_to_group("/group1/sub", ds_g1)
.unwrap();
writer.write_dataset_raw(ds_g1, &[1u8, 2, 3, 4]).unwrap();
writer.close().unwrap();
let mut reader = Hdf5Reader::open(&path).unwrap();
let names = reader.dataset_names();
assert!(names.contains(&"root_data"), "names: {:?}", names);
assert!(names.contains(&"group1/data"), "names: {:?}", names);
assert!(names.contains(&"group1/sub/values"), "names: {:?}", names);
let raw = reader.read_dataset_raw("root_data").unwrap();
let vals: Vec<f64> = raw
.chunks(8)
.map(|c| f64::from_le_bytes(c.try_into().unwrap()))
.collect();
assert_eq!(vals, vec![1.0, 2.0]);
let raw = reader.read_dataset_raw("group1/data").unwrap();
let vals: Vec<i32> = raw
.chunks(4)
.map(|c| i32::from_le_bytes(c.try_into().unwrap()))
.collect();
assert_eq!(vals, vec![10, 20, 30]);
let raw = reader.read_dataset_raw("group1/sub/values").unwrap();
assert_eq!(raw, vec![1, 2, 3, 4]);
std::fs::remove_file(&path).ok();
}
}