1use std::ops::Range;
2use std::sync::Arc;
3
4use bytes::Bytes;
5use tokio::sync::OnceCell;
6
7use crate::chunk_index::{ChunkIndex, ChunkLocation};
8use crate::endian::HDF5Reader;
9use crate::error::{HDF5Error, Result};
10use crate::group::attributes_from_header;
11use crate::messages::attribute::Attribute;
12use crate::messages::data_layout::{ChunkIndexParams, ChunkIndexType, StorageLayout};
13use crate::messages::dataspace::DataspaceMessage;
14use crate::messages::datatype::DataType;
15use crate::messages::fill_value::FillValueMessage;
16use crate::messages::filter_pipeline::FilterPipeline;
17use crate::object_header::{msg_types, ObjectHeader};
18use crate::reader::AsyncFileReader;
19use crate::superblock::Superblock;
20use crate::{btree, extensible_array, fixed_array};
21
22#[derive(Debug)]
28pub struct HDF5Dataset {
29 name: String,
30 header: ObjectHeader,
31 reader: Arc<dyn AsyncFileReader>,
32 raw_reader: Arc<dyn AsyncFileReader>,
34 superblock: Arc<Superblock>,
35
36 shape: Vec<u64>,
38 dataspace_type: u8,
39 dtype: DataType,
40 layout: StorageLayout,
41 filters: FilterPipeline,
42 fill_value: Option<Vec<u8>>,
43
44 cached_chunk_index: OnceCell<ChunkIndex>,
46}
47
48impl HDF5Dataset {
49 pub fn new(
51 name: String,
52 header: ObjectHeader,
53 reader: Arc<dyn AsyncFileReader>,
54 raw_reader: Arc<dyn AsyncFileReader>,
55 superblock: Arc<Superblock>,
56 ) -> Result<Self> {
57 let dataspace = header
59 .find_message(msg_types::DATASPACE)
60 .ok_or_else(|| HDF5Error::General("dataset missing dataspace message".into()))?;
61 let dataspace = DataspaceMessage::parse(&dataspace.data, superblock.size_of_lengths)?;
62
63 let dtype_msg = header
65 .find_message(msg_types::DATATYPE)
66 .ok_or_else(|| HDF5Error::General("dataset missing datatype message".into()))?;
67 let dtype = DataType::parse(&dtype_msg.data)?;
68
69 let layout_msg = header
71 .find_message(msg_types::DATA_LAYOUT)
72 .ok_or_else(|| HDF5Error::General("dataset missing data layout message".into()))?;
73 let layout = StorageLayout::parse(
74 &layout_msg.data,
75 superblock.size_of_offsets,
76 superblock.size_of_lengths,
77 )?;
78
79 let filters = if let Some(filter_msg) = header.find_message(msg_types::FILTER_PIPELINE) {
81 FilterPipeline::parse(&filter_msg.data)?
82 } else {
83 FilterPipeline::empty()
84 };
85
86 let fill_value = if let Some(fv_msg) = header.find_message(msg_types::FILL_VALUE) {
88 FillValueMessage::parse(&fv_msg.data)?.value
89 } else if let Some(fv_msg) = header.find_message(msg_types::FILL_VALUE_OLD) {
90 let mut r = HDF5Reader::new(fv_msg.data.clone());
92 let size = r.read_u32()? as usize;
93 if size > 0 {
94 Some(r.read_bytes(size)?)
95 } else {
96 None
97 }
98 } else {
99 None
100 };
101
102 Ok(Self {
103 name,
104 header,
105 reader,
106 raw_reader,
107 superblock,
108 shape: dataspace.dimensions,
109 dataspace_type: dataspace.dataspace_type,
110 dtype,
111 layout,
112 filters,
113 fill_value,
114 cached_chunk_index: OnceCell::new(),
115 })
116 }
117
118 pub fn name(&self) -> &str {
120 &self.name
121 }
122
123 pub fn shape(&self) -> &[u64] {
125 &self.shape
126 }
127
128 pub fn ndim(&self) -> usize {
130 self.shape.len()
131 }
132
133 pub fn dtype(&self) -> &DataType {
135 &self.dtype
136 }
137
138 pub fn element_size(&self) -> u32 {
140 self.dtype.size()
141 }
142
143 pub fn chunk_shape(&self) -> Option<&[u64]> {
145 match &self.layout {
146 StorageLayout::Chunked { chunk_shape, .. } => Some(chunk_shape),
147 _ => None,
148 }
149 }
150
151 pub fn filters(&self) -> &FilterPipeline {
153 &self.filters
154 }
155
156 pub fn fill_value(&self) -> Option<&[u8]> {
158 self.fill_value.as_deref()
159 }
160
161 pub fn layout(&self) -> &StorageLayout {
163 &self.layout
164 }
165
166 pub fn is_null_dataspace(&self) -> bool {
168 self.dataspace_type == 2
169 }
170
171 pub fn has_external_storage(&self) -> bool {
173 self.header
174 .messages
175 .iter()
176 .any(|m| m.msg_type == msg_types::EXTERNAL_DATA_FILES)
177 }
178
179 pub fn header(&self) -> &ObjectHeader {
181 &self.header
182 }
183
184 pub async fn attributes(&self) -> Vec<Attribute> {
186 attributes_from_header(
187 &self.header,
188 &self.reader,
189 self.superblock.size_of_offsets,
190 self.superblock.size_of_lengths,
191 )
192 .await
193 }
194
195 pub async fn attribute(&self, name: &str) -> Option<Attribute> {
197 self.attributes().await.into_iter().find(|a| a.name == name)
198 }
199
200 pub async fn chunk_index(&self) -> Result<&ChunkIndex> {
206 self.cached_chunk_index
207 .get_or_try_init(|| self.resolve_chunk_index())
208 .await
209 }
210
211 async fn resolve_chunk_index(&self) -> Result<ChunkIndex> {
213 match &self.layout {
214 StorageLayout::Compact { .. } => {
215 Ok(ChunkIndex::new(vec![], vec![], self.shape.clone()))
217 }
218
219 StorageLayout::Contiguous { address, size } => {
220 if HDF5Reader::is_undef_addr(*address, self.superblock.size_of_offsets) {
221 return Ok(ChunkIndex::new(vec![], vec![], self.shape.clone()));
223 }
224 Ok(ChunkIndex::contiguous(*address, *size, self.shape.clone()))
225 }
226
227 StorageLayout::Chunked {
228 chunk_shape,
229 index_address,
230 indexing_type,
231 index_params,
232 flags,
233 } => {
234 if HDF5Reader::is_undef_addr(*index_address, self.superblock.size_of_offsets) {
235 return Ok(ChunkIndex::new(
237 vec![],
238 chunk_shape.clone(),
239 self.shape.clone(),
240 ));
241 }
242
243 match indexing_type {
244 ChunkIndexType::BTreeV1 => {
245 self.chunk_index_btree_v1(*index_address, chunk_shape).await
246 }
247 ChunkIndexType::BTreeV2 => {
248 self.chunk_index_btree_v2(*index_address, chunk_shape).await
249 }
250 ChunkIndexType::SingleChunk => {
251 self.chunk_index_single_chunk(*index_address, chunk_shape, index_params)
252 }
253 ChunkIndexType::FixedArray => {
254 self.chunk_index_fixed_array(*index_address, chunk_shape, *flags)
255 .await
256 }
257 ChunkIndexType::ExtensibleArray => {
258 self.chunk_index_extensible_array(*index_address, chunk_shape, *flags)
259 .await
260 }
261 other => Err(HDF5Error::General(format!(
262 "chunk indexing type {other:?} not yet supported"
263 ))),
264 }
265 }
266 }
267 }
268
269 async fn chunk_index_btree_v1(
271 &self,
272 btree_address: u64,
273 chunk_shape: &[u64],
274 ) -> Result<ChunkIndex> {
275 let ndims = self.shape.len();
276
277 let entries = btree::v1::read_chunk_btree_v1(
278 &self.reader,
279 btree_address,
280 ndims,
281 self.superblock.size_of_offsets,
282 self.superblock.size_of_lengths,
283 )
284 .await?;
285
286 let locations: Vec<ChunkLocation> = entries
290 .into_iter()
291 .map(|e| {
292 let indices: Vec<u64> = e
293 .offsets
294 .iter()
295 .zip(chunk_shape.iter())
296 .map(|(&offset, &cs)| offset / cs)
297 .collect();
298 ChunkLocation {
299 indices,
300 byte_offset: e.address,
301 byte_length: e.size as u64,
302 filter_mask: e.filter_mask,
303 }
304 })
305 .collect();
306
307 Ok(ChunkIndex::new(
308 locations,
309 chunk_shape.to_vec(),
310 self.shape.clone(),
311 ))
312 }
313
314 async fn chunk_index_btree_v2(
316 &self,
317 btree_address: u64,
318 chunk_shape: &[u64],
319 ) -> Result<ChunkIndex> {
320 let ndims = self.shape.len();
321
322 let header = btree::v2::BTreeV2Header::read(
323 &self.reader,
324 btree_address,
325 self.superblock.size_of_offsets,
326 self.superblock.size_of_lengths,
327 )
328 .await?;
329
330 let raw_records = btree::v2::collect_all_records(
331 &self.reader,
332 &header,
333 self.superblock.size_of_offsets,
334 self.superblock.size_of_lengths,
335 )
336 .await?;
337
338 match header.record_type {
339 11 => {
340 let chunk_records = btree::v2::parse_chunk_records_filtered(
342 &raw_records,
343 ndims,
344 self.superblock.size_of_offsets,
345 self.superblock.size_of_lengths,
346 )?;
347
348 let locations: Vec<ChunkLocation> = chunk_records
349 .into_iter()
350 .filter(|c| {
351 !HDF5Reader::is_undef_addr(c.address, self.superblock.size_of_offsets)
352 })
353 .map(|c| ChunkLocation {
354 indices: c.scaled_offsets,
355 byte_offset: c.address,
356 byte_length: c.chunk_size,
357 filter_mask: c.filter_mask,
358 })
359 .collect();
360
361 Ok(ChunkIndex::new(
362 locations,
363 chunk_shape.to_vec(),
364 self.shape.clone(),
365 ))
366 }
367 10 => {
368 let chunk_records = btree::v2::parse_chunk_records_non_filtered(
370 &raw_records,
371 ndims,
372 self.superblock.size_of_offsets,
373 )?;
374
375 let uncompressed_chunk_size: u64 =
377 chunk_shape.iter().product::<u64>() * self.dtype.size() as u64;
378
379 let locations: Vec<ChunkLocation> = chunk_records
380 .into_iter()
381 .filter(|c| {
382 !HDF5Reader::is_undef_addr(c.address, self.superblock.size_of_offsets)
383 })
384 .map(|c| ChunkLocation {
385 indices: c.scaled_offsets,
386 byte_offset: c.address,
387 byte_length: uncompressed_chunk_size,
388 filter_mask: 0,
389 })
390 .collect();
391
392 Ok(ChunkIndex::new(
393 locations,
394 chunk_shape.to_vec(),
395 self.shape.clone(),
396 ))
397 }
398 other => Err(HDF5Error::General(format!(
399 "B-tree v2 record type {other} not supported for chunk indexing (expected 10 or 11)"
400 ))),
401 }
402 }
403
404 async fn chunk_index_fixed_array(
406 &self,
407 fahd_address: u64,
408 chunk_shape: &[u64],
409 _layout_flags: u8,
410 ) -> Result<ChunkIndex> {
411 let ndims = self.shape.len();
412
413 let fahd = fixed_array::FixedArrayHeader::read(
415 &self.reader,
416 fahd_address,
417 self.superblock.size_of_offsets,
418 self.superblock.size_of_lengths,
419 )
420 .await?;
421
422 let uncompressed_chunk_size =
424 chunk_shape.iter().product::<u64>() * self.dtype.size() as u64;
425
426 let layout_version = 4u8;
427
428 let entries = fixed_array::read_fixed_array_entries(
430 &self.reader,
431 &fahd,
432 self.superblock.size_of_offsets,
433 self.superblock.size_of_lengths,
434 uncompressed_chunk_size,
435 layout_version,
436 )
437 .await?;
438
439 let grid_shape: Vec<u64> = self
442 .shape
443 .iter()
444 .zip(chunk_shape.iter())
445 .map(|(&ds, &cs)| ds.div_ceil(cs))
446 .collect();
447
448 let mut locations = Vec::new();
449 for (flat_idx, entry) in entries.iter().enumerate() {
450 if HDF5Reader::is_undef_addr(entry.address, self.superblock.size_of_offsets) {
452 continue;
453 }
454
455 let mut indices = vec![0u64; ndims];
457 let mut remaining = flat_idx as u64;
458 for d in (0..ndims).rev() {
459 indices[d] = remaining % grid_shape[d];
460 remaining /= grid_shape[d];
461 }
462
463 locations.push(ChunkLocation {
464 indices,
465 byte_offset: entry.address,
466 byte_length: entry.chunk_size,
467 filter_mask: entry.filter_mask,
468 });
469 }
470
471 Ok(ChunkIndex::new(
472 locations,
473 chunk_shape.to_vec(),
474 self.shape.clone(),
475 ))
476 }
477
478 async fn chunk_index_extensible_array(
480 &self,
481 eahd_address: u64,
482 chunk_shape: &[u64],
483 _layout_flags: u8,
484 ) -> Result<ChunkIndex> {
485 let ndims = self.shape.len();
486
487 let eahd = extensible_array::ExtensibleArrayHeader::read(
489 &self.reader,
490 eahd_address,
491 self.superblock.size_of_offsets,
492 self.superblock.size_of_lengths,
493 )
494 .await?;
495
496 let uncompressed_chunk_size =
498 chunk_shape.iter().product::<u64>() * self.dtype.size() as u64;
499
500 let layout_version = 4u8;
501
502 let entries = extensible_array::read_extensible_array_entries(
504 &self.reader,
505 &eahd,
506 self.superblock.size_of_offsets,
507 self.superblock.size_of_lengths,
508 uncompressed_chunk_size,
509 layout_version,
510 )
511 .await?;
512
513 let grid_shape: Vec<u64> = self
515 .shape
516 .iter()
517 .zip(chunk_shape.iter())
518 .map(|(&ds, &cs)| ds.div_ceil(cs))
519 .collect();
520
521 let mut locations = Vec::with_capacity(entries.len());
522 for indexed in &entries {
523 let mut indices = vec![0u64; ndims];
524 let mut remaining = indexed.flat_idx;
525 for d in (0..ndims).rev() {
526 indices[d] = remaining % grid_shape[d];
527 remaining /= grid_shape[d];
528 }
529
530 locations.push(ChunkLocation {
531 indices,
532 byte_offset: indexed.entry.address,
533 byte_length: indexed.entry.chunk_size,
534 filter_mask: indexed.entry.filter_mask,
535 });
536 }
537
538 Ok(ChunkIndex::new(
539 locations,
540 chunk_shape.to_vec(),
541 self.shape.clone(),
542 ))
543 }
544
545 pub async fn batch_get_chunks(&self, chunk_indices: &[Vec<u64>]) -> Result<Vec<Option<Bytes>>> {
553 let index = self.chunk_index().await?; let mut ranges: Vec<Range<u64>> = Vec::new();
557 let mut range_map: Vec<Option<usize>> = Vec::with_capacity(chunk_indices.len());
558
559 for indices in chunk_indices {
560 if let Some(loc) = index.get(indices) {
561 range_map.push(Some(ranges.len()));
562 ranges.push(loc.byte_offset..loc.byte_offset + loc.byte_length);
563 } else {
564 range_map.push(None);
565 }
566 }
567
568 if ranges.is_empty() {
569 return Ok(vec![None; chunk_indices.len()]);
570 }
571
572 let fetched = self.raw_reader.get_byte_ranges(ranges).await?;
574
575 let mut results = Vec::with_capacity(chunk_indices.len());
577 for mapping in &range_map {
578 match mapping {
579 Some(idx) => results.push(Some(fetched[*idx].clone())),
580 None => results.push(None),
581 }
582 }
583 Ok(results)
584 }
585
586 pub async fn batch_fetch_ranges(&self, ranges: &[(u64, u64)]) -> Result<Vec<Bytes>> {
592 let byte_ranges: Vec<Range<u64>> = ranges
593 .iter()
594 .map(|&(offset, length)| offset..offset + length)
595 .collect();
596 self.raw_reader.get_byte_ranges(byte_ranges).await
597 }
598
599 fn chunk_index_single_chunk(
601 &self,
602 address: u64,
603 chunk_shape: &[u64],
604 index_params: &ChunkIndexParams,
605 ) -> Result<ChunkIndex> {
606 let ndims = self.shape.len();
607
608 let (byte_length, filter_mask) = match index_params {
609 ChunkIndexParams::SingleChunk {
610 filtered_size,
611 filter_mask,
612 } => (*filtered_size, *filter_mask),
613 _ => {
614 let size = chunk_shape.iter().product::<u64>() * self.dtype.size() as u64;
616 (size, 0u32)
617 }
618 };
619
620 let location = ChunkLocation {
621 indices: vec![0; ndims],
622 byte_offset: address,
623 byte_length,
624 filter_mask,
625 };
626
627 Ok(ChunkIndex::new(
628 vec![location],
629 chunk_shape.to_vec(),
630 self.shape.clone(),
631 ))
632 }
633}