use std::path::Path;
use crate::format::btree_v1::BTreeV1Node;
use crate::format::global_heap::{
decode_vlen_reference, vlen_reference_size, GlobalHeapCollection,
};
use crate::format::local_heap::{local_heap_get_string, LocalHeapHeader};
use crate::format::messages::attribute::AttributeMessage;
use crate::format::messages::data_layout::{self, DataLayoutMessage};
use crate::format::messages::dataspace::DataspaceMessage;
use crate::format::messages::datatype::DatatypeMessage;
use crate::format::messages::filter::{self, FilterPipeline};
use crate::format::messages::link::LinkMessage;
use crate::format::messages::link::LinkTarget;
use crate::format::messages::*;
use crate::format::object_header::ObjectHeader;
use crate::format::superblock::{detect_superblock_version, SuperblockV0V1, SuperblockV2V3};
use crate::format::symbol_table::SymbolTableNode;
use crate::format::{FormatContext, UNDEF_ADDR};
use crate::io::file_handle::FileHandle;
#[cfg(feature = "mmap")]
use crate::io::file_handle::MmapFileHandle;
use crate::io::IoResult;
pub struct DatasetReadInfo {
pub name: String,
pub datatype: DatatypeMessage,
pub dataspace: DataspaceMessage,
pub layout: DataLayoutMessage,
pub filter_pipeline: Option<FilterPipeline>,
pub attributes: Vec<AttributeMessage>,
}
#[allow(dead_code)]
enum RootGroupInfo {
V2V3 {
root_group_object_header_address: u64,
},
V0V1 {
root_obj_header_addr: u64,
btree_addr: u64,
heap_addr: u64,
},
}
pub struct Hdf5Reader {
handle: FileHandle,
ctx: FormatContext,
_eof: u64,
#[allow(dead_code)]
root_group_info: RootGroupInfo,
datasets: Vec<DatasetReadInfo>,
root_attributes: Vec<AttributeMessage>,
}
impl Hdf5Reader {
#[cfg(feature = "mmap")]
pub fn open_mmap(path: &Path) -> IoResult<(Self, MmapFileHandle)> {
let reader = Self::open(path)?;
let mmap = MmapFileHandle::open(path)?;
Ok((reader, mmap))
}
pub fn open_swmr(path: &Path) -> IoResult<Self> {
Self::open(path)
}
pub fn open(path: &Path) -> IoResult<Self> {
let mut handle = FileHandle::open_read(path)?;
let sb_buf = handle.read_at_most(0, 1024)?;
let version = detect_superblock_version(&sb_buf)?;
match version {
0 | 1 => Self::open_v0v1(handle, &sb_buf),
2 | 3 => Self::open_v2v3(handle, &sb_buf),
v => Err(crate::io::IoError::Format(
crate::format::FormatError::InvalidVersion(v),
)),
}
}
fn open_v2v3(mut handle: FileHandle, sb_buf: &[u8]) -> IoResult<Self> {
let sb = SuperblockV2V3::decode(sb_buf)?;
let ctx = FormatContext {
sizeof_addr: sb.sizeof_offsets,
sizeof_size: sb.sizeof_lengths,
};
let root_buf = handle.read_at_most(sb.root_group_object_header_address, 4096)?;
let (root_header, _) = ObjectHeader::decode(&root_buf)?;
let datasets = Self::discover_datasets_from_links(&mut handle, &root_header, &ctx)?;
let mut root_attributes = Vec::new();
for msg in &root_header.messages {
if msg.msg_type == MSG_ATTRIBUTE {
if let Ok((attr, _)) = AttributeMessage::decode(&msg.data, &ctx) {
root_attributes.push(attr);
}
}
}
Ok(Self {
handle,
ctx,
_eof: sb.end_of_file_address,
root_group_info: RootGroupInfo::V2V3 {
root_group_object_header_address: sb.root_group_object_header_address,
},
datasets,
root_attributes,
})
}
fn open_v0v1(mut handle: FileHandle, sb_buf: &[u8]) -> IoResult<Self> {
let sb = SuperblockV0V1::decode(sb_buf)?;
let ctx = FormatContext {
sizeof_addr: sb.sizeof_offsets,
sizeof_size: sb.sizeof_lengths,
};
let ste = &sb.root_symbol_table_entry;
let btree_addr = ste.btree_addr;
let heap_addr = ste.heap_addr;
let (btree_addr, heap_addr) = if ste.cache_type == 1 {
(btree_addr, heap_addr)
} else {
Self::find_stab_in_object_header(&mut handle, &ctx, ste.obj_header_addr)?
};
let datasets = if btree_addr != UNDEF_ADDR && heap_addr != UNDEF_ADDR {
Self::discover_datasets_from_btree(&mut handle, &ctx, btree_addr, heap_addr)?
} else {
Vec::new()
};
Ok(Self {
handle,
ctx,
_eof: sb.end_of_file_address,
root_group_info: RootGroupInfo::V0V1 {
root_obj_header_addr: ste.obj_header_addr,
btree_addr,
heap_addr,
},
datasets,
root_attributes: Vec::new(),
})
}
fn find_stab_in_object_header(
handle: &mut FileHandle,
ctx: &FormatContext,
obj_header_addr: u64,
) -> IoResult<(u64, u64)> {
let buf = handle.read_at_most(obj_header_addr, 4096)?;
let (header, _) = ObjectHeader::decode_any(&buf)?;
for msg in &header.messages {
if msg.msg_type == MSG_SYMBOL_TABLE {
let sa = ctx.sizeof_addr as usize;
if msg.data.len() >= 2 * sa {
let btree = read_uint(&msg.data, sa);
let heap = read_uint(&msg.data[sa..], sa);
return Ok((btree, heap));
}
}
}
Ok((UNDEF_ADDR, UNDEF_ADDR))
}
fn discover_datasets_from_links(
handle: &mut FileHandle,
root_header: &ObjectHeader,
ctx: &FormatContext,
) -> IoResult<Vec<DatasetReadInfo>> {
Self::discover_datasets_recursive(handle, root_header, ctx, "")
}
fn discover_datasets_recursive(
handle: &mut FileHandle,
header: &ObjectHeader,
ctx: &FormatContext,
prefix: &str,
) -> IoResult<Vec<DatasetReadInfo>> {
let mut datasets = Vec::new();
for msg in &header.messages {
if msg.msg_type == MSG_LINK {
let (link, _) = LinkMessage::decode(&msg.data, ctx)?;
if let LinkTarget::Hard { address } = &link.target {
let full_name = if prefix.is_empty() {
link.name.clone()
} else {
format!("{}/{}", prefix, link.name)
};
if let Some(info) =
Self::read_dataset_from_object_header(handle, ctx, *address, &full_name)?
{
datasets.push(info);
} else {
let child_buf = handle.read_at_most(*address, 8192)?;
if let Ok((child_header, _)) = ObjectHeader::decode_any(&child_buf) {
let has_links =
child_header.messages.iter().any(|m| m.msg_type == MSG_LINK);
if has_links {
let child_ds = Self::discover_datasets_recursive(
handle,
&child_header,
ctx,
&full_name,
)?;
datasets.extend(child_ds);
}
}
}
}
}
}
Ok(datasets)
}
fn discover_datasets_from_btree(
handle: &mut FileHandle,
ctx: &FormatContext,
btree_addr: u64,
heap_addr: u64,
) -> IoResult<Vec<DatasetReadInfo>> {
let sa = ctx.sizeof_addr as usize;
let ss = ctx.sizeof_size as usize;
let heap_hdr_buf = handle.read_at_most(heap_addr, 64)?;
let heap_hdr = LocalHeapHeader::decode(&heap_hdr_buf, sa, ss)?;
let heap_data = handle.read_at(heap_hdr.data_addr, heap_hdr.data_size as usize)?;
let snod_addrs = Self::collect_snod_addresses(handle, btree_addr, sa, ss)?;
let mut datasets = Vec::new();
for snod_addr in snod_addrs {
let snod_buf = handle.read_at_most(snod_addr, 8192)?;
let snod = SymbolTableNode::decode(&snod_buf, sa, ss)?;
for entry in &snod.entries {
let name = local_heap_get_string(&heap_data, entry.name_offset)?;
if name.is_empty() {
continue;
}
if let Some(info) = Self::read_dataset_from_object_header(
handle,
ctx,
entry.obj_header_addr,
&name,
)? {
datasets.push(info);
}
}
}
Ok(datasets)
}
fn collect_snod_addresses(
handle: &mut FileHandle,
tree_addr: u64,
sizeof_addr: usize,
sizeof_size: usize,
) -> IoResult<Vec<u64>> {
let buf = handle.read_at_most(tree_addr, 8192)?;
let node = BTreeV1Node::decode(&buf, sizeof_addr, sizeof_size)?;
if node.level == 0 {
Ok(node.children.clone())
} else {
let mut addrs = Vec::new();
for &child_addr in &node.children {
let child_addrs =
Self::collect_snod_addresses(handle, child_addr, sizeof_addr, sizeof_size)?;
addrs.extend(child_addrs);
}
Ok(addrs)
}
}
fn read_dataset_from_object_header(
handle: &mut FileHandle,
ctx: &FormatContext,
addr: u64,
name: &str,
) -> IoResult<Option<DatasetReadInfo>> {
let buf = handle.read_at_most(addr, 4096)?;
let (mut header, _) = ObjectHeader::decode_any(&buf)?;
let sa = ctx.sizeof_addr as usize;
let ss = ctx.sizeof_size as usize;
let mut continuations: Vec<(u64, u64)> = Vec::new();
for msg in &header.messages {
if msg.msg_type == MSG_OBJ_HEADER_CONTINUATION && msg.data.len() >= sa + ss {
let cont_addr = read_uint(&msg.data, sa);
let cont_len = read_uint(&msg.data[sa..], ss);
continuations.push((cont_addr, cont_len));
}
}
for (cont_addr, cont_len) in continuations {
if cont_addr == UNDEF_ADDR || cont_len == 0 {
continue;
}
let cont_buf = handle.read_at_most(cont_addr, cont_len as usize)?;
let mut pos = 0;
while pos + 8 <= cont_buf.len() {
let msg_type = u16::from_le_bytes([cont_buf[pos], cont_buf[pos + 1]]);
let data_size = u16::from_le_bytes([cont_buf[pos + 2], cont_buf[pos + 3]]) as usize;
let msg_flags = cont_buf[pos + 4];
pos += 8; if pos + data_size > cont_buf.len() {
break;
}
if msg_type != 0 {
header
.messages
.push(crate::format::object_header::ObjectHeaderMessage {
msg_type: msg_type as u8,
flags: msg_flags,
data: cont_buf[pos..pos + data_size].to_vec(),
});
}
pos += data_size;
pos = (pos + 7) & !7;
}
}
let mut datatype = None;
let mut dataspace = None;
let mut layout = None;
let mut filter_pipeline = None;
let mut attributes = Vec::new();
for msg in &header.messages {
match msg.msg_type {
MSG_DATATYPE => {
if let Ok((dt, _)) = DatatypeMessage::decode(&msg.data, ctx) {
datatype = Some(dt);
}
}
MSG_DATASPACE => {
if let Ok((ds, _)) = DataspaceMessage::decode(&msg.data, ctx) {
dataspace = Some(ds);
}
}
MSG_DATA_LAYOUT => {
if let Ok((dl, _)) = DataLayoutMessage::decode(&msg.data, ctx) {
layout = Some(dl);
}
}
MSG_FILTER_PIPELINE => {
if let Ok((fp, _)) = FilterPipeline::decode(&msg.data) {
if !fp.filters.is_empty() {
filter_pipeline = Some(fp);
}
}
}
MSG_ATTRIBUTE => {
if let Ok((attr, _)) = AttributeMessage::decode(&msg.data, ctx) {
attributes.push(attr);
}
}
_ => {}
}
}
if let (Some(dt), Some(ds), Some(dl)) = (datatype, dataspace, layout) {
Ok(Some(DatasetReadInfo {
name: name.to_string(),
datatype: dt,
dataspace: ds,
layout: dl,
filter_pipeline,
attributes,
}))
} else {
Ok(None)
}
}
pub fn dataset_names(&self) -> Vec<&str> {
self.datasets.iter().map(|d| d.name.as_str()).collect()
}
pub fn dataset_info(&self, name: &str) -> Option<&DatasetReadInfo> {
self.datasets.iter().find(|d| d.name == name)
}
pub fn dataset_attr_names(&self, name: &str) -> IoResult<Vec<String>> {
let info = self
.dataset_info(name)
.ok_or_else(|| crate::io::IoError::NotFound(name.to_string()))?;
Ok(info.attributes.iter().map(|a| a.name.clone()).collect())
}
pub fn dataset_attr(&self, ds_name: &str, attr_name: &str) -> IoResult<&AttributeMessage> {
let info = self
.dataset_info(ds_name)
.ok_or_else(|| crate::io::IoError::NotFound(ds_name.to_string()))?;
info.attributes
.iter()
.find(|a| a.name == attr_name)
.ok_or_else(|| crate::io::IoError::NotFound(format!("{}:{}", ds_name, attr_name)))
}
pub fn root_attr_names(&self) -> Vec<String> {
self.root_attributes
.iter()
.map(|a| a.name.clone())
.collect()
}
pub fn root_attr(&self, name: &str) -> Option<&AttributeMessage> {
self.root_attributes.iter().find(|a| a.name == name)
}
pub fn dataset_shape(&self, name: &str) -> IoResult<Vec<u64>> {
let info = self
.dataset_info(name)
.ok_or_else(|| crate::io::IoError::NotFound(name.to_string()))?;
Ok(info.dataspace.dims.clone())
}
pub fn read_dataset_raw(&mut self, name: &str) -> IoResult<Vec<u8>> {
let info = self
.dataset_info(name)
.ok_or_else(|| crate::io::IoError::NotFound(name.to_string()))?;
let layout = info.layout.clone();
let pipeline = info.filter_pipeline.clone();
match &layout {
DataLayoutMessage::Contiguous { address, size } => {
if *address == UNDEF_ADDR {
return Ok(vec![]);
}
let data = self.handle.read_at(*address, *size as usize)?;
Ok(data)
}
DataLayoutMessage::Compact { data } => Ok(data.clone()),
DataLayoutMessage::ChunkedV4 {
chunk_dims,
index_address,
index_type,
earray_params,
..
} => {
let real_chunk_dims = &chunk_dims[..chunk_dims.len() - 1];
self.read_chunked_v4(
name,
real_chunk_dims,
*index_address,
*index_type,
earray_params.as_ref(),
pipeline.as_ref(),
)
}
}
}
pub fn refresh(&mut self) -> IoResult<()> {
let sb_buf = self.handle.read_at_most(0, 256)?;
let sb = SuperblockV2V3::decode(&sb_buf)?;
let ctx = FormatContext {
sizeof_addr: sb.sizeof_offsets,
sizeof_size: sb.sizeof_lengths,
};
let root_buf = self
.handle
.read_at_most(sb.root_group_object_header_address, 4096)?;
let (root_header, _) = ObjectHeader::decode(&root_buf)?;
let datasets = Self::discover_datasets_from_links(&mut self.handle, &root_header, &ctx)?;
self._eof = sb.end_of_file_address;
self.ctx = ctx;
self.datasets = datasets;
Ok(())
}
fn read_chunked_v4(
&mut self,
name: &str,
chunk_dims: &[u64],
index_address: u64,
index_type: data_layout::ChunkIndexType,
earray_params: Option<&data_layout::EarrayParams>,
pipeline: Option<&FilterPipeline>,
) -> IoResult<Vec<u8>> {
use crate::format::chunk_index::extensible_array::{self as ea, *};
let info = self
.dataset_info(name)
.ok_or_else(|| crate::io::IoError::NotFound(name.to_string()))?;
let dims = info.dataspace.dims.clone();
let element_size = info.datatype.element_size() as u64;
match index_type {
data_layout::ChunkIndexType::SingleChunk => {
let total_size: u64 = dims.iter().product::<u64>() * element_size;
if index_address == UNDEF_ADDR || total_size == 0 {
return Ok(vec![]);
}
let data = if let Some(pipeline) = pipeline {
let raw = self
.handle
.read_at_most(index_address, total_size as usize * 2)?;
filter::reverse_filters(pipeline, &raw)?
} else {
self.handle.read_at(index_address, total_size as usize)?
};
Ok(data)
}
data_layout::ChunkIndexType::FixedArray => {
self.read_chunked_fixed_array(name, chunk_dims, index_address, pipeline)
}
data_layout::ChunkIndexType::BTreeV2 => {
self.read_chunked_btree_v2(name, chunk_dims, index_address, pipeline)
}
data_layout::ChunkIndexType::ExtensibleArray => {
let params = earray_params.ok_or_else(|| {
crate::io::IoError::InvalidState("missing earray params".into())
})?;
if index_address == UNDEF_ADDR {
return Ok(vec![]);
}
let hdr_buf = self.handle.read_at_most(index_address, 256)?;
let ea_hdr = ExtensibleArrayHeader::decode(&hdr_buf, &self.ctx)?;
if ea_hdr.idx_blk_addr == UNDEF_ADDR {
return Ok(vec![]);
}
let chunks_dim0 = if chunk_dims[0] > 0 {
dims[0].div_ceil(chunk_dims[0])
} else {
0
};
let ndblk_addrs = compute_ndblk_addrs(params.sup_blk_min_data_ptrs);
let nsblk_addrs = compute_nsblk_addrs(
params.idx_blk_elmts,
params.data_blk_min_elmts,
params.sup_blk_min_data_ptrs,
params.max_nelmts_bits,
);
let chunk_bytes: u64 = chunk_dims.iter().product::<u64>() * element_size;
let is_filtered = ea_hdr.class_id == ea::EA_CLS_FILT_CHUNK;
let mut chunk_entries: Vec<(u64, u64)> = Vec::new();
let min_elmts = params.data_blk_min_elmts as usize;
if is_filtered {
let chunk_size_len = ea_hdr.raw_elmt_size - self.ctx.sizeof_addr - 4;
let iblk_buf = self.handle.read_at_most(ea_hdr.idx_blk_addr, 65536)?;
let fiblk = ea::FilteredIndexBlock::decode(
&iblk_buf,
&self.ctx,
params.idx_blk_elmts as usize,
ndblk_addrs,
nsblk_addrs,
chunk_size_len,
)?;
for e in &fiblk.elements {
chunk_entries.push((e.addr, e.nbytes));
}
let mut dblk_nelmts = min_elmts;
let mut pair_count = 0usize;
for &dblk_addr in &fiblk.dblk_addrs {
if dblk_addr == UNDEF_ADDR {
chunk_entries.extend(std::iter::repeat_n((UNDEF_ADDR, 0), dblk_nelmts));
} else {
let dblk_buf = self.handle.read_at_most(dblk_addr, 65536)?;
let dblk = ea::FilteredDataBlock::decode(
&dblk_buf,
&self.ctx,
params.max_nelmts_bits,
dblk_nelmts,
chunk_size_len,
)?;
for e in &dblk.elements {
chunk_entries.push((e.addr, e.nbytes));
}
}
if chunk_entries.len() >= chunks_dim0 as usize {
break;
}
pair_count += 1;
if pair_count >= 2 {
pair_count = 0;
dblk_nelmts *= 2;
}
}
} else {
let iblk_buf = self.handle.read_at_most(ea_hdr.idx_blk_addr, 8192)?;
let iblk = ExtensibleArrayIndexBlock::decode(
&iblk_buf,
&self.ctx,
params.idx_blk_elmts as usize,
ndblk_addrs,
nsblk_addrs,
)?;
for &addr in &iblk.elements {
chunk_entries.push((addr, chunk_bytes));
}
let mut dblk_nelmts = min_elmts;
let mut pair_count = 0usize;
for &dblk_addr in &iblk.dblk_addrs {
if dblk_addr == UNDEF_ADDR {
chunk_entries.extend(std::iter::repeat_n((UNDEF_ADDR, 0), dblk_nelmts));
} else {
let dblk_buf = self.handle.read_at_most(dblk_addr, 65536)?;
let dblk = ExtensibleArrayDataBlock::decode(
&dblk_buf,
&self.ctx,
params.max_nelmts_bits,
dblk_nelmts,
)?;
for &addr in &dblk.elements {
chunk_entries.push((addr, chunk_bytes));
}
}
if chunk_entries.len() >= chunks_dim0 as usize {
break;
}
pair_count += 1;
if pair_count >= 2 {
pair_count = 0;
dblk_nelmts *= 2;
}
}
}
let total_size: u64 = dims.iter().product::<u64>() * element_size;
let mut output = vec![0u8; total_size as usize];
let n_chunks = std::cmp::min(chunks_dim0 as usize, chunk_entries.len());
if let Some(pl) = pipeline {
let mut raw_chunks: Vec<Option<Vec<u8>>> = Vec::with_capacity(n_chunks);
for &(addr, nbytes) in &chunk_entries[..n_chunks] {
if addr == UNDEF_ADDR {
raw_chunks.push(None);
} else {
raw_chunks.push(Some(self.handle.read_at(addr, nbytes as usize)?));
}
}
#[cfg(feature = "parallel")]
let decompressed: Vec<Option<Vec<u8>>> = {
use rayon::prelude::*;
raw_chunks
.into_par_iter()
.map(|raw| raw.map(|r| filter::reverse_filters(pl, &r).unwrap_or(r)))
.collect()
};
#[cfg(not(feature = "parallel"))]
let decompressed: Vec<Option<Vec<u8>>> = raw_chunks
.into_iter()
.map(|raw| raw.map(|r| filter::reverse_filters(pl, &r).unwrap_or(r)))
.collect();
for (i, chunk_data) in decompressed.iter().enumerate() {
if let Some(data) = chunk_data {
let offset = i as u64 * chunk_bytes;
let end = std::cmp::min(offset + chunk_bytes, total_size);
let copy_len = (end - offset) as usize;
output[offset as usize..offset as usize + copy_len]
.copy_from_slice(&data[..copy_len]);
}
}
} else {
for (i, &(addr, nbytes)) in chunk_entries[..n_chunks].iter().enumerate() {
if addr == UNDEF_ADDR {
continue;
}
let chunk_data = self.handle.read_at(addr, nbytes as usize)?;
let offset = i as u64 * chunk_bytes;
let end = std::cmp::min(offset + chunk_bytes, total_size);
let copy_len = (end - offset) as usize;
output[offset as usize..offset as usize + copy_len]
.copy_from_slice(&chunk_data[..copy_len]);
}
}
Ok(output)
}
_ => Err(crate::io::IoError::InvalidState(format!(
"unsupported chunk index type: {:?}",
index_type
))),
}
}
fn read_chunked_fixed_array(
&mut self,
name: &str,
chunk_dims: &[u64],
index_address: u64,
pipeline: Option<&FilterPipeline>,
) -> IoResult<Vec<u8>> {
use crate::format::chunk_index::fixed_array::*;
let info = self
.dataset_info(name)
.ok_or_else(|| crate::io::IoError::NotFound(name.to_string()))?;
let dims = info.dataspace.dims.clone();
let element_size = info.datatype.element_size() as u64;
let ndims = dims.len();
if index_address == UNDEF_ADDR {
return Ok(vec![]);
}
let hdr_buf = self.handle.read_at_most(index_address, 256)?;
let fa_hdr = FixedArrayHeader::decode(&hdr_buf, &self.ctx)?;
if fa_hdr.data_blk_addr == UNDEF_ADDR {
return Ok(vec![]);
}
let dblk_buf = self.handle.read_at_most(fa_hdr.data_blk_addr, 65536)?;
let fa_dblk = FixedArrayDataBlock::decode_unfiltered(
&dblk_buf,
&self.ctx,
fa_hdr.num_elmts as usize,
)?;
let chunk_bytes: u64 = chunk_dims.iter().product::<u64>() * element_size;
let total_size: u64 = dims.iter().product::<u64>() * element_size;
let mut output = vec![0u8; total_size as usize];
let chunks_per_dim: Vec<u64> = (0..ndims)
.map(|d| dims[d].div_ceil(chunk_dims[d]))
.collect();
let n_chunks = std::cmp::min(fa_hdr.num_elmts as usize, fa_dblk.elements.len());
let mut raw_chunks: Vec<(usize, Option<Vec<u8>>)> = Vec::with_capacity(n_chunks);
for linear_idx in 0..n_chunks {
let addr = fa_dblk.elements[linear_idx];
if addr == UNDEF_ADDR {
raw_chunks.push((linear_idx, None));
} else if pipeline.is_some() {
raw_chunks.push((
linear_idx,
Some(self.handle.read_at_most(addr, chunk_bytes as usize * 2)?),
));
} else {
raw_chunks.push((
linear_idx,
Some(self.handle.read_at(addr, chunk_bytes as usize)?),
));
}
}
let decompressed: Vec<(usize, Option<Vec<u8>>)> = if let Some(pl) = pipeline {
#[cfg(feature = "parallel")]
{
use rayon::prelude::*;
raw_chunks
.into_par_iter()
.map(|(idx, raw)| {
(
idx,
raw.map(|r| filter::reverse_filters(pl, &r).unwrap_or(r)),
)
})
.collect()
}
#[cfg(not(feature = "parallel"))]
{
raw_chunks
.into_iter()
.map(|(idx, raw)| {
(
idx,
raw.map(|r| filter::reverse_filters(pl, &r).unwrap_or(r)),
)
})
.collect()
}
} else {
raw_chunks
};
for (linear_idx, chunk_data) in &decompressed {
let Some(data) = chunk_data else { continue };
let mut remaining = *linear_idx as u64;
let mut coords = vec![0u64; ndims];
for d in (0..ndims).rev() {
coords[d] = remaining % chunks_per_dim[d];
remaining /= chunks_per_dim[d];
}
self.copy_chunk_to_output(data, &mut output, &dims, chunk_dims, &coords, element_size);
}
Ok(output)
}
fn read_chunked_btree_v2(
&mut self,
name: &str,
chunk_dims: &[u64],
index_address: u64,
pipeline: Option<&FilterPipeline>,
) -> IoResult<Vec<u8>> {
use crate::format::chunk_index::btree_v2::*;
let info = self
.dataset_info(name)
.ok_or_else(|| crate::io::IoError::NotFound(name.to_string()))?;
let dims = info.dataspace.dims.clone();
let element_size = info.datatype.element_size() as u64;
let ndims = dims.len();
if index_address == UNDEF_ADDR {
return Ok(vec![]);
}
let hdr_buf = self.handle.read_at_most(index_address, 256)?;
let bt2_hdr = Bt2Header::decode(&hdr_buf, &self.ctx)?;
if bt2_hdr.root_node_addr == UNDEF_ADDR || bt2_hdr.total_num_records == 0 {
return Ok(vec![]);
}
if bt2_hdr.depth != 0 {
return Err(crate::io::IoError::InvalidState(
"B-tree v2 depth > 0 not yet supported for reading".into(),
));
}
let leaf_buf = self.handle.read_at_most(bt2_hdr.root_node_addr, 65536)?;
let leaf =
Bt2LeafNode::decode(&leaf_buf, bt2_hdr.num_records_in_root, bt2_hdr.record_size)?;
let records = if bt2_hdr.record_type == BT2_TYPE_CHUNK_UNFILT {
Bt2ChunkIndex::decode_unfiltered_records(
&leaf.record_data,
bt2_hdr.num_records_in_root as usize,
ndims,
&self.ctx,
)?
} else {
return Err(crate::io::IoError::InvalidState(
"filtered B-tree v2 chunk reading not yet supported".into(),
));
};
let chunk_bytes: u64 = chunk_dims.iter().product::<u64>() * element_size;
let total_size: u64 = dims.iter().product::<u64>() * element_size;
let mut output = vec![0u8; total_size as usize];
let mut raw_chunks: Vec<(usize, Option<Vec<u8>>)> = Vec::with_capacity(records.len());
for (i, rec) in records.iter().enumerate() {
if rec.chunk_address == UNDEF_ADDR {
raw_chunks.push((i, None));
} else if pipeline.is_some() {
raw_chunks.push((
i,
Some(
self.handle
.read_at_most(rec.chunk_address, chunk_bytes as usize * 2)?,
),
));
} else {
raw_chunks.push((
i,
Some(
self.handle
.read_at(rec.chunk_address, chunk_bytes as usize)?,
),
));
}
}
let decompressed: Vec<(usize, Option<Vec<u8>>)> = if let Some(pl) = pipeline {
#[cfg(feature = "parallel")]
{
use rayon::prelude::*;
raw_chunks
.into_par_iter()
.map(|(i, raw)| (i, raw.map(|r| filter::reverse_filters(pl, &r).unwrap_or(r))))
.collect()
}
#[cfg(not(feature = "parallel"))]
{
raw_chunks
.into_iter()
.map(|(i, raw)| (i, raw.map(|r| filter::reverse_filters(pl, &r).unwrap_or(r))))
.collect()
}
} else {
raw_chunks
};
for (i, chunk_data) in &decompressed {
let Some(data) = chunk_data else { continue };
self.copy_chunk_to_output(
data,
&mut output,
&dims,
chunk_dims,
&records[*i].scaled_offsets,
element_size,
);
}
Ok(output)
}
fn copy_chunk_to_output(
&self,
chunk_data: &[u8],
output: &mut [u8],
dims: &[u64],
chunk_dims: &[u64],
chunk_coords: &[u64],
element_size: u64,
) {
let ndims = dims.len();
if ndims == 0 {
return;
}
if ndims == 1 {
let start = chunk_coords[0] * chunk_dims[0] * element_size;
let actual_elems =
std::cmp::min(chunk_dims[0], dims[0] - chunk_coords[0] * chunk_dims[0]);
let copy_bytes = (actual_elems * element_size) as usize;
let start = start as usize;
if start + copy_bytes <= output.len() && copy_bytes <= chunk_data.len() {
output[start..start + copy_bytes].copy_from_slice(&chunk_data[..copy_bytes]);
}
return;
}
let chunk_elems: u64 = chunk_dims.iter().product();
let mut chunk_coord_iter = vec![0u64; ndims];
for elem_idx in 0..chunk_elems {
let mut remaining = elem_idx;
for d in (0..ndims).rev() {
chunk_coord_iter[d] = remaining % chunk_dims[d];
remaining /= chunk_dims[d];
}
let mut valid = true;
let mut global_linear = 0u64;
let mut stride = 1u64;
for d in (0..ndims).rev() {
let global_d = chunk_coords[d] * chunk_dims[d] + chunk_coord_iter[d];
if global_d >= dims[d] {
valid = false;
break;
}
global_linear += global_d * stride;
stride *= dims[d];
}
if !valid {
continue;
}
let src_offset = (elem_idx * element_size) as usize;
let dst_offset = (global_linear * element_size) as usize;
let es = element_size as usize;
if src_offset + es <= chunk_data.len() && dst_offset + es <= output.len() {
output[dst_offset..dst_offset + es]
.copy_from_slice(&chunk_data[src_offset..src_offset + es]);
}
}
}
pub fn read_vlen_strings(&mut self, name: &str) -> IoResult<Vec<String>> {
let info = self
.dataset_info(name)
.ok_or_else(|| crate::io::IoError::NotFound(name.to_string()))?;
let dims = info.dataspace.dims.clone();
let layout = info.layout.clone();
let total_elements: u64 = dims.iter().product();
let raw = match &layout {
DataLayoutMessage::Contiguous { address, size } => {
if *address == UNDEF_ADDR {
return Ok(vec![]);
}
self.handle.read_at(*address, *size as usize)?
}
DataLayoutMessage::Compact { data } => data.clone(),
_ => {
self.read_dataset_raw(name)?
}
};
let ref_size = vlen_reference_size(&self.ctx);
let mut strings = Vec::with_capacity(total_elements as usize);
let mut heap_cache: std::collections::HashMap<u64, GlobalHeapCollection> =
std::collections::HashMap::new();
for i in 0..total_elements as usize {
let offset = i * ref_size;
if offset + ref_size > raw.len() {
break;
}
let (collection_addr, obj_index) = decode_vlen_reference(&raw[offset..], &self.ctx)?;
if collection_addr == UNDEF_ADDR || collection_addr == 0 {
strings.push(String::new());
continue;
}
let collection = if let Some(c) = heap_cache.get(&collection_addr) {
c.clone()
} else {
let heap_buf = self.handle.read_at_most(collection_addr, 65536)?;
let (coll, _) = GlobalHeapCollection::decode(&heap_buf, &self.ctx)?;
heap_cache.insert(collection_addr, coll.clone());
coll
};
if let Some(data) = collection.get_object(obj_index as u16) {
let s = String::from_utf8_lossy(data).to_string();
strings.push(s);
} else {
strings.push(String::new());
}
}
Ok(strings)
}
fn collect_ea_chunk_entries(
&mut self,
index_address: u64,
params: &data_layout::EarrayParams,
dims: &[u64],
chunk_dims: &[u64],
element_size: u64,
) -> IoResult<Vec<(u64, u64)>> {
use crate::format::chunk_index::extensible_array::{self as ea, *};
if index_address == UNDEF_ADDR {
return Ok(vec![]);
}
let hdr_buf = self.handle.read_at_most(index_address, 256)?;
let ea_hdr = ExtensibleArrayHeader::decode(&hdr_buf, &self.ctx)?;
if ea_hdr.idx_blk_addr == UNDEF_ADDR {
return Ok(vec![]);
}
let chunks_dim0 = if chunk_dims[0] > 0 {
dims[0].div_ceil(chunk_dims[0])
} else {
0
};
let ndblk_addrs = compute_ndblk_addrs(params.sup_blk_min_data_ptrs);
let nsblk_addrs = compute_nsblk_addrs(
params.idx_blk_elmts,
params.data_blk_min_elmts,
params.sup_blk_min_data_ptrs,
params.max_nelmts_bits,
);
let chunk_bytes = chunk_dims.iter().product::<u64>() * element_size;
let is_filtered = ea_hdr.class_id == ea::EA_CLS_FILT_CHUNK;
let min_elmts = params.data_blk_min_elmts as usize;
let mut entries: Vec<(u64, u64)> = Vec::new();
if is_filtered {
let chunk_size_len = ea_hdr.raw_elmt_size - self.ctx.sizeof_addr - 4;
let iblk_buf = self.handle.read_at_most(ea_hdr.idx_blk_addr, 65536)?;
let fiblk = ea::FilteredIndexBlock::decode(
&iblk_buf,
&self.ctx,
params.idx_blk_elmts as usize,
ndblk_addrs,
nsblk_addrs,
chunk_size_len,
)?;
for e in &fiblk.elements {
entries.push((e.addr, e.nbytes));
}
let mut nelmts = min_elmts;
let mut pair = 0usize;
for &dblk_addr in &fiblk.dblk_addrs {
if dblk_addr == UNDEF_ADDR {
entries.extend(std::iter::repeat_n((UNDEF_ADDR, 0), nelmts));
} else {
let buf = self.handle.read_at_most(dblk_addr, 65536)?;
let dblk = ea::FilteredDataBlock::decode(
&buf,
&self.ctx,
params.max_nelmts_bits,
nelmts,
chunk_size_len,
)?;
for e in &dblk.elements {
entries.push((e.addr, e.nbytes));
}
}
if entries.len() >= chunks_dim0 as usize {
break;
}
pair += 1;
if pair >= 2 {
pair = 0;
nelmts *= 2;
}
}
} else {
let iblk_buf = self.handle.read_at_most(ea_hdr.idx_blk_addr, 8192)?;
let iblk = ExtensibleArrayIndexBlock::decode(
&iblk_buf,
&self.ctx,
params.idx_blk_elmts as usize,
ndblk_addrs,
nsblk_addrs,
)?;
for &addr in &iblk.elements {
entries.push((addr, chunk_bytes));
}
let mut nelmts = min_elmts;
let mut pair = 0usize;
for &dblk_addr in &iblk.dblk_addrs {
if dblk_addr == UNDEF_ADDR {
entries.extend(std::iter::repeat_n((UNDEF_ADDR, 0), nelmts));
} else {
let buf = self.handle.read_at_most(dblk_addr, 65536)?;
let dblk = ExtensibleArrayDataBlock::decode(
&buf,
&self.ctx,
params.max_nelmts_bits,
nelmts,
)?;
for &addr in &dblk.elements {
entries.push((addr, chunk_bytes));
}
}
if entries.len() >= chunks_dim0 as usize {
break;
}
pair += 1;
if pair >= 2 {
pair = 0;
nelmts *= 2;
}
}
}
Ok(entries)
}
pub fn read_slice(&mut self, name: &str, starts: &[u64], counts: &[u64]) -> IoResult<Vec<u8>> {
let info = self
.dataset_info(name)
.ok_or_else(|| crate::io::IoError::NotFound(name.to_string()))?;
let dims = info.dataspace.dims.clone();
let element_size = info.datatype.element_size() as u64;
let layout = info.layout.clone();
let pipeline = info.filter_pipeline.clone();
let ndims = dims.len();
if starts.len() != ndims || counts.len() != ndims {
return Err(crate::io::IoError::InvalidState(
"starts/counts length must match dataset rank".into(),
));
}
for d in 0..ndims {
if starts[d] + counts[d] > dims[d] {
return Err(crate::io::IoError::InvalidState(format!(
"slice out of bounds: dim {} start {} + count {} > {}",
d, starts[d], counts[d], dims[d]
)));
}
}
let out_elems: u64 = counts.iter().product();
let out_bytes = (out_elems * element_size) as usize;
match &layout {
DataLayoutMessage::Contiguous { address, .. } => {
if *address == UNDEF_ADDR {
return Ok(vec![0u8; out_bytes]);
}
let strides = compute_strides(&dims, element_size);
if ndims == 1 {
let offset = *address + starts[0] * element_size;
return self
.handle
.read_at(offset, (counts[0] * element_size) as usize)
.map_err(Into::into);
}
let mut output = vec![0u8; out_bytes];
let row_bytes = (counts[ndims - 1] * element_size) as usize;
let mut coords = vec![0u64; ndims - 1];
let n_rows: u64 = counts[..ndims - 1].iter().product();
for row in 0..n_rows {
let mut file_offset = *address + starts[ndims - 1] * element_size;
for d in 0..ndims - 1 {
file_offset += (starts[d] + coords[d]) * strides[d];
}
let out_offset = row as usize * row_bytes;
let data = self.handle.read_at(file_offset, row_bytes)?;
output[out_offset..out_offset + row_bytes].copy_from_slice(&data);
for d in (0..ndims - 1).rev() {
coords[d] += 1;
if coords[d] < counts[d] {
break;
}
coords[d] = 0;
}
}
Ok(output)
}
DataLayoutMessage::Compact { data } => {
let strides = compute_strides(&dims, element_size);
let mut output = vec![0u8; out_bytes];
let row_bytes = (counts[ndims - 1] * element_size) as usize;
let n_rows: u64 = if ndims > 1 {
counts[..ndims - 1].iter().product()
} else {
1
};
let mut coords = vec![0u64; ndims.saturating_sub(1)];
for row in 0..n_rows {
let mut src_offset = (starts[ndims - 1] * element_size) as usize;
for d in 0..ndims.saturating_sub(1) {
src_offset += ((starts[d] + coords[d]) * strides[d]) as usize;
}
let out_offset = row as usize * row_bytes;
output[out_offset..out_offset + row_bytes]
.copy_from_slice(&data[src_offset..src_offset + row_bytes]);
for d in (0..ndims.saturating_sub(1)).rev() {
coords[d] += 1;
if coords[d] < counts[d] {
break;
}
coords[d] = 0;
}
}
Ok(output)
}
DataLayoutMessage::ChunkedV4 {
chunk_dims: layout_chunk_dims,
index_address,
index_type,
earray_params,
..
} => {
let real_chunk_dims = &layout_chunk_dims[..layout_chunk_dims.len() - 1];
let fp = pipeline.clone();
let can_optimize = ndims >= 2
&& real_chunk_dims[0] == 1
&& *index_type == data_layout::ChunkIndexType::ExtensibleArray;
if can_optimize {
let all_entries = self.collect_ea_chunk_entries(
*index_address,
earray_params.as_ref().unwrap(),
&dims,
real_chunk_dims,
element_size,
)?;
let mut output = vec![0u8; out_bytes];
let out_strides = compute_strides(counts, element_size);
let chunk_inner_dims = &real_chunk_dims[1..];
let chunk_strides = compute_strides(chunk_inner_dims, element_size);
let inner_starts = &starts[1..];
let inner_counts = &counts[1..];
let inner_ndims = inner_starts.len();
let row_bytes = (inner_counts[inner_ndims - 1] * element_size) as usize;
let n_inner_rows: u64 = if inner_ndims > 1 {
inner_counts[..inner_ndims - 1].iter().product()
} else {
1
};
for fi in 0..counts[0] {
let gi = starts[0] + fi;
if (gi as usize) >= all_entries.len() {
break;
}
let (addr, nbytes) = all_entries[gi as usize];
if addr == UNDEF_ADDR {
continue;
}
let chunk_data = if let Some(ref pl) = fp {
let raw = self.handle.read_at(addr, nbytes as usize)?;
filter::reverse_filters(pl, &raw)?
} else {
self.handle.read_at(addr, nbytes as usize)?
};
let mut ic = vec![0u64; inner_ndims.saturating_sub(1)];
for _irow in 0..n_inner_rows {
let mut src_off =
(inner_starts[inner_ndims - 1] * element_size) as usize;
let mut dst_off = (fi * out_strides[0]) as usize;
for d in 0..inner_ndims.saturating_sub(1) {
src_off += ((inner_starts[d] + ic[d]) * chunk_strides[d]) as usize;
dst_off += (ic[d] * out_strides[d + 1]) as usize;
}
if src_off + row_bytes <= chunk_data.len()
&& dst_off + row_bytes <= output.len()
{
output[dst_off..dst_off + row_bytes]
.copy_from_slice(&chunk_data[src_off..src_off + row_bytes]);
}
for d in (0..inner_ndims.saturating_sub(1)).rev() {
ic[d] += 1;
if ic[d] < inner_counts[d] {
break;
}
ic[d] = 0;
}
}
}
Ok(output)
} else {
let full = self.read_dataset_raw(name)?;
let mut output = vec![0u8; out_bytes];
let src_strides = compute_strides(&dims, element_size);
let row_bytes = (counts[ndims - 1] * element_size) as usize;
let n_rows: u64 = if ndims > 1 {
counts[..ndims - 1].iter().product()
} else {
1
};
if ndims == 1 {
let src_off = (starts[0] * element_size) as usize;
output[..row_bytes].copy_from_slice(&full[src_off..src_off + row_bytes]);
} else {
let out_strides = compute_strides(counts, element_size);
let mut coords = vec![0u64; ndims - 1];
for _row in 0..n_rows {
let mut src_off = (starts[ndims - 1] * element_size) as usize;
let mut out_off = 0usize;
for d in 0..ndims - 1 {
src_off += ((starts[d] + coords[d]) * src_strides[d]) as usize;
out_off += (coords[d] * out_strides[d]) as usize;
}
output[out_off..out_off + row_bytes]
.copy_from_slice(&full[src_off..src_off + row_bytes]);
for d in (0..ndims - 1).rev() {
coords[d] += 1;
if coords[d] < counts[d] {
break;
}
coords[d] = 0;
}
}
}
Ok(output)
}
}
}
}
}
fn compute_strides(dims: &[u64], element_size: u64) -> Vec<u64> {
let ndims = dims.len();
if ndims == 0 {
return vec![];
}
let mut strides = vec![0u64; ndims];
strides[ndims - 1] = element_size;
for d in (0..ndims - 1).rev() {
strides[d] = strides[d + 1] * dims[d + 1];
}
strides
}
fn read_uint(buf: &[u8], n: usize) -> u64 {
let mut tmp = [0u8; 8];
tmp[..n].copy_from_slice(&buf[..n]);
u64::from_le_bytes(tmp)
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
fn write_le(buf: &mut Vec<u8>, value: u64, n: usize) {
buf.extend_from_slice(&value.to_le_bytes()[..n]);
}
fn build_v0_file(dataset_name: &str, dims: &[u64], data: &[u8]) -> Vec<u8> {
let sa: usize = 8; let ss: usize = 8; let ndims = dims.len();
let element_size = data.len() as u64 / dims.iter().product::<u64>();
let mut file = Vec::new();
let sb_size = 8 + 8 + 4 + 4 * sa + (ss + sa + 4 + 4 + 16); let sb_size_aligned = (sb_size + 7) & !7;
let root_ohdr_addr = sb_size_aligned as u64;
let stab_msg_data_size = 2 * sa; let stab_msg_wire = 8 + stab_msg_data_size;
let stab_msg_wire_aligned = (stab_msg_wire + 7) & !7;
let root_ohdr_data_size = stab_msg_wire_aligned;
let root_ohdr_total = 16 + root_ohdr_data_size; let root_ohdr_total_aligned = (root_ohdr_total + 7) & !7;
let heap_hdr_addr = root_ohdr_addr + root_ohdr_total_aligned as u64;
let heap_hdr_size = 4 + 1 + 3 + ss + ss + sa;
let heap_hdr_size_aligned = (heap_hdr_size + 7) & !7;
let heap_data_addr = heap_hdr_addr + heap_hdr_size_aligned as u64;
let name_bytes = dataset_name.as_bytes();
let heap_data_content_size = 1 + name_bytes.len() + 1; let heap_data_size = (heap_data_content_size + 7) & !7;
let btree_addr = heap_data_addr + heap_data_size as u64;
let btree_size = 4 + 1 + 1 + 2 + 2 * sa + 2 * ss + sa;
let btree_size_aligned = (btree_size + 7) & !7;
let snod_addr = btree_addr + btree_size_aligned as u64;
let entry_size = ss + sa + 4 + 4 + 16;
let snod_size = 8 + entry_size;
let snod_size_aligned = (snod_size + 7) & !7;
let ds_ohdr_addr = snod_addr + snod_size_aligned as u64;
let ds_msg_data_size = 8 + ndims * ss;
let ds_msg_wire = 8 + ds_msg_data_size;
let ds_msg_wire_aligned = (ds_msg_wire + 7) & !7;
let dt_msg_data_size = 12;
let dt_msg_wire = 8 + dt_msg_data_size;
let dt_msg_wire_aligned = (dt_msg_wire + 7) & !7;
let dl_msg_data_size = 2 + sa + ss;
let dl_msg_wire = 8 + dl_msg_data_size;
let dl_msg_wire_aligned = (dl_msg_wire + 7) & !7;
let ds_ohdr_data_size = ds_msg_wire_aligned + dt_msg_wire_aligned + dl_msg_wire_aligned;
let ds_ohdr_total = 16 + ds_ohdr_data_size; let ds_ohdr_total_aligned = (ds_ohdr_total + 7) & !7;
let raw_data_addr = ds_ohdr_addr + ds_ohdr_total_aligned as u64;
let raw_data_size = data.len();
let eof = raw_data_addr + raw_data_size as u64;
let sig: [u8; 8] = [0x89, 0x48, 0x44, 0x46, 0x0d, 0x0a, 0x1a, 0x0a];
file.extend_from_slice(&sig);
file.push(0); file.push(0); file.push(0); file.push(0); file.push(0); file.push(sa as u8); file.push(ss as u8); file.push(0); file.extend_from_slice(&4u16.to_le_bytes()); file.extend_from_slice(&32u16.to_le_bytes()); file.extend_from_slice(&0u32.to_le_bytes()); write_le(&mut file, 0, sa);
write_le(&mut file, UNDEF_ADDR, sa);
write_le(&mut file, eof, sa);
write_le(&mut file, UNDEF_ADDR, sa);
write_le(&mut file, 0, ss); write_le(&mut file, root_ohdr_addr, sa); file.extend_from_slice(&1u32.to_le_bytes()); file.extend_from_slice(&0u32.to_le_bytes()); write_le(&mut file, btree_addr, sa);
write_le(&mut file, heap_hdr_addr, sa);
while file.len() < sb_size_aligned {
file.push(0);
}
assert_eq!(file.len(), root_ohdr_addr as usize);
file.push(1); file.push(0); file.extend_from_slice(&1u16.to_le_bytes()); file.extend_from_slice(&1u32.to_le_bytes()); file.extend_from_slice(&(root_ohdr_data_size as u32).to_le_bytes());
file.extend_from_slice(&[0u8; 4]); file.extend_from_slice(&0x0011u16.to_le_bytes()); file.extend_from_slice(&(stab_msg_data_size as u16).to_le_bytes()); file.push(0); file.extend_from_slice(&[0u8; 3]); write_le(&mut file, btree_addr, sa);
write_le(&mut file, heap_hdr_addr, sa);
while file.len() < (root_ohdr_addr as usize + root_ohdr_total_aligned) {
file.push(0);
}
assert_eq!(file.len(), heap_hdr_addr as usize);
file.extend_from_slice(b"HEAP");
file.push(0); file.extend_from_slice(&[0u8; 3]); write_le(&mut file, heap_data_size as u64, ss); write_le(&mut file, u64::MAX, ss); write_le(&mut file, heap_data_addr, sa); while file.len() < (heap_hdr_addr as usize + heap_hdr_size_aligned) {
file.push(0);
}
assert_eq!(file.len(), heap_data_addr as usize);
file.push(0); file.extend_from_slice(name_bytes); file.push(0); while file.len() < (heap_data_addr as usize + heap_data_size) {
file.push(0);
}
assert_eq!(file.len(), btree_addr as usize);
file.extend_from_slice(b"TREE");
file.push(0); file.push(0); file.extend_from_slice(&1u16.to_le_bytes()); write_le(&mut file, UNDEF_ADDR, sa); write_le(&mut file, UNDEF_ADDR, sa); write_le(&mut file, 0, ss);
write_le(&mut file, snod_addr, sa);
write_le(&mut file, 1, ss);
while file.len() < (btree_addr as usize + btree_size_aligned) {
file.push(0);
}
assert_eq!(file.len(), snod_addr as usize);
file.extend_from_slice(b"SNOD");
file.push(1); file.push(0); file.extend_from_slice(&1u16.to_le_bytes()); write_le(&mut file, 1, ss); write_le(&mut file, ds_ohdr_addr, sa); file.extend_from_slice(&0u32.to_le_bytes()); file.extend_from_slice(&0u32.to_le_bytes()); file.extend_from_slice(&[0u8; 16]); while file.len() < (snod_addr as usize + snod_size_aligned) {
file.push(0);
}
assert_eq!(file.len(), ds_ohdr_addr as usize);
file.push(1); file.push(0); file.extend_from_slice(&3u16.to_le_bytes()); file.extend_from_slice(&1u32.to_le_bytes()); file.extend_from_slice(&(ds_ohdr_data_size as u32).to_le_bytes());
file.extend_from_slice(&[0u8; 4]);
file.extend_from_slice(&0x0001u16.to_le_bytes());
file.extend_from_slice(&(ds_msg_data_size as u16).to_le_bytes());
file.push(0); file.extend_from_slice(&[0u8; 3]); file.push(1); file.push(ndims as u8);
file.push(0); file.push(0); file.extend_from_slice(&[0u8; 4]); for &d in dims {
write_le(&mut file, d, ss);
}
let target = ds_ohdr_addr as usize + 16 + ds_msg_wire_aligned;
while file.len() < target {
file.push(0);
}
file.extend_from_slice(&0x0003u16.to_le_bytes());
file.extend_from_slice(&(dt_msg_data_size as u16).to_le_bytes());
file.push(0); file.extend_from_slice(&[0u8; 3]); file.push(0x10); file.push(0x08); file.push(0); file.push(0); file.extend_from_slice(&(element_size as u32).to_le_bytes()); file.extend_from_slice(&0u16.to_le_bytes()); file.extend_from_slice(&((element_size * 8) as u16).to_le_bytes()); let target = ds_ohdr_addr as usize + 16 + ds_msg_wire_aligned + dt_msg_wire_aligned;
while file.len() < target {
file.push(0);
}
file.extend_from_slice(&0x0008u16.to_le_bytes());
file.extend_from_slice(&(dl_msg_data_size as u16).to_le_bytes());
file.push(0); file.extend_from_slice(&[0u8; 3]); file.push(3); file.push(1); write_le(&mut file, raw_data_addr, sa); write_le(&mut file, raw_data_size as u64, ss); let target = ds_ohdr_addr as usize + ds_ohdr_total_aligned;
while file.len() < target {
file.push(0);
}
assert_eq!(file.len(), raw_data_addr as usize);
file.extend_from_slice(data);
assert_eq!(file.len(), eof as usize);
file
}
#[test]
fn test_read_v0_file_with_one_dataset() {
let dims = [3u64, 4];
let values: Vec<i32> = (0..12).collect();
let raw_data: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
let file_bytes = build_v0_file("my_dataset", &dims, &raw_data);
let path = std::env::temp_dir().join("hdf5_test_v0_reader.h5");
{
let mut f = std::fs::File::create(&path).unwrap();
f.write_all(&file_bytes).unwrap();
f.sync_all().unwrap();
}
let mut reader = Hdf5Reader::open(&path).unwrap();
let names = reader.dataset_names();
assert_eq!(names, vec!["my_dataset"]);
let shape = reader.dataset_shape("my_dataset").unwrap();
assert_eq!(shape, vec![3, 4]);
let data = reader.read_dataset_raw("my_dataset").unwrap();
assert_eq!(data, raw_data);
let read_values: Vec<i32> = data
.chunks_exact(4)
.map(|c| i32::from_le_bytes([c[0], c[1], c[2], c[3]]))
.collect();
assert_eq!(read_values, values);
std::fs::remove_file(&path).ok();
}
#[test]
fn test_read_v0_file_1d_dataset() {
let dims = [5u64];
let values: Vec<i32> = vec![100, 200, 300, 400, 500];
let raw_data: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
let file_bytes = build_v0_file("data_1d", &dims, &raw_data);
let path = std::env::temp_dir().join("hdf5_test_v0_1d.h5");
{
let mut f = std::fs::File::create(&path).unwrap();
f.write_all(&file_bytes).unwrap();
}
let mut reader = Hdf5Reader::open(&path).unwrap();
assert_eq!(reader.dataset_names(), vec!["data_1d"]);
assert_eq!(reader.dataset_shape("data_1d").unwrap(), vec![5]);
let data = reader.read_dataset_raw("data_1d").unwrap();
let read_values: Vec<i32> = data
.chunks_exact(4)
.map(|c| i32::from_le_bytes([c[0], c[1], c[2], c[3]]))
.collect();
assert_eq!(read_values, values);
std::fs::remove_file(&path).ok();
}
#[test]
fn test_detect_v2v3_still_works() {
let path = std::env::temp_dir().join("hdf5_test_detect_v3.h5");
{
use crate::io::writer::Hdf5Writer;
let mut writer = Hdf5Writer::create(&path).unwrap();
let datatype = crate::format::messages::datatype::DatatypeMessage::i32_type();
let idx = writer.create_dataset("test", datatype, &[4]).unwrap();
let data = [1i32, 2, 3, 4];
let raw: Vec<u8> = data.iter().flat_map(|v| v.to_le_bytes()).collect();
writer.write_dataset_raw(idx, &raw).unwrap();
writer.close().unwrap();
}
let mut reader = Hdf5Reader::open(&path).unwrap();
assert_eq!(reader.dataset_names(), vec!["test"]);
let shape = reader.dataset_shape("test").unwrap();
assert_eq!(shape, vec![4]);
let data = reader.read_dataset_raw("test").unwrap();
let vals: Vec<i32> = data
.chunks_exact(4)
.map(|c| i32::from_le_bytes([c[0], c[1], c[2], c[3]]))
.collect();
assert_eq!(vals, vec![1, 2, 3, 4]);
std::fs::remove_file(&path).ok();
}
}
#[cfg(test)]
mod h5py_debug_tests {
use super::*;
#[test]
fn debug_read_h5py() {
let path = std::path::Path::new("/tmp/test_h5py_default.h5");
if !path.exists() {
return;
}
let mut handle = FileHandle::open_read(path).unwrap();
let sb_buf = handle.read_at_most(0, 1024).unwrap();
let version = detect_superblock_version(&sb_buf).unwrap();
eprintln!("Superblock version: {}", version);
let sb = SuperblockV0V1::decode(&sb_buf).unwrap();
eprintln!(
"sizeof_addr={}, sizeof_size={}",
sb.sizeof_offsets, sb.sizeof_lengths
);
eprintln!(
"STE: obj_header={}, cache_type={}, btree={}, heap={}",
sb.root_symbol_table_entry.obj_header_addr,
sb.root_symbol_table_entry.cache_type,
sb.root_symbol_table_entry.btree_addr,
sb.root_symbol_table_entry.heap_addr
);
let ctx = FormatContext {
sizeof_addr: sb.sizeof_offsets,
sizeof_size: sb.sizeof_lengths,
};
let heap_buf = handle
.read_at_most(sb.root_symbol_table_entry.heap_addr, 128)
.unwrap();
let heap_hdr = LocalHeapHeader::decode(
&heap_buf,
ctx.sizeof_addr as usize,
ctx.sizeof_size as usize,
)
.unwrap();
eprintln!(
"Heap data_addr={}, data_size={}",
heap_hdr.data_addr, heap_hdr.data_size
);
let heap_data = handle
.read_at(heap_hdr.data_addr, heap_hdr.data_size as usize)
.unwrap();
eprintln!(
"Heap data bytes: {:?}",
&heap_data[..std::cmp::min(64, heap_data.len())]
);
let btree_buf = handle
.read_at_most(sb.root_symbol_table_entry.btree_addr, 8192)
.unwrap();
let btree = BTreeV1Node::decode(
&btree_buf,
ctx.sizeof_addr as usize,
ctx.sizeof_size as usize,
)
.unwrap();
eprintln!(
"BTree: type={}, level={}, entries={}, children={:?}",
btree.node_type, btree.level, btree.entries_used, btree.children
);
for &child in &btree.children {
let snod_buf = handle.read_at_most(child, 8192).unwrap();
let snod = SymbolTableNode::decode(
&snod_buf,
ctx.sizeof_addr as usize,
ctx.sizeof_size as usize,
)
.unwrap();
eprintln!("SNOD at {}: {} entries", child, snod.entries.len());
for entry in &snod.entries {
let name = local_heap_get_string(&heap_data, entry.name_offset).unwrap();
eprintln!(
" entry: name='{}' (offset={}), obj_header={}, cache_type={}",
name, entry.name_offset, entry.obj_header_addr, entry.cache_type
);
}
}
let reader = Hdf5Reader::open(path).unwrap();
eprintln!("Datasets found: {:?}", reader.dataset_names());
}
}