Skip to main content

async_hdf5/
dataset.rs

1use std::ops::Range;
2use std::sync::Arc;
3
4use bytes::Bytes;
5use tokio::sync::OnceCell;
6
7use crate::chunk_index::{ChunkIndex, ChunkLocation};
8use crate::endian::HDF5Reader;
9use crate::error::{HDF5Error, Result};
10use crate::group::attributes_from_header;
11use crate::messages::attribute::Attribute;
12use crate::messages::data_layout::{ChunkIndexParams, ChunkIndexType, StorageLayout};
13use crate::messages::dataspace::DataspaceMessage;
14use crate::messages::datatype::DataType;
15use crate::messages::fill_value::FillValueMessage;
16use crate::messages::filter_pipeline::FilterPipeline;
17use crate::object_header::{msg_types, ObjectHeader};
18use crate::reader::AsyncFileReader;
19use crate::superblock::Superblock;
20use crate::{btree, extensible_array, fixed_array};
21
22/// An HDF5 dataset — a typed, shaped, chunked (or contiguous) array of data.
23///
24/// Provides synchronous access to parsed metadata (shape, dtype, chunk shape,
25/// filters, fill value) and async access to the chunk index (byte offsets of
26/// all chunks in the file).
27#[derive(Debug)]
28pub struct HDF5Dataset {
29    name: String,
30    header: ObjectHeader,
31    reader: Arc<dyn AsyncFileReader>,
32    /// Raw (uncached) reader for batch chunk data fetches.
33    raw_reader: Arc<dyn AsyncFileReader>,
34    superblock: Arc<Superblock>,
35
36    // Parsed metadata (cached on construction)
37    shape: Vec<u64>,
38    dataspace_type: u8,
39    dtype: DataType,
40    layout: StorageLayout,
41    filters: FilterPipeline,
42    fill_value: Option<Vec<u8>>,
43
44    /// Cached chunk index (lazily resolved on first access).
45    cached_chunk_index: OnceCell<ChunkIndex>,
46}
47
48impl HDF5Dataset {
49    /// Create a new dataset by parsing metadata messages from its object header.
50    pub fn new(
51        name: String,
52        header: ObjectHeader,
53        reader: Arc<dyn AsyncFileReader>,
54        raw_reader: Arc<dyn AsyncFileReader>,
55        superblock: Arc<Superblock>,
56    ) -> Result<Self> {
57        // Parse dataspace
58        let dataspace = header
59            .find_message(msg_types::DATASPACE)
60            .ok_or_else(|| HDF5Error::General("dataset missing dataspace message".into()))?;
61        let dataspace = DataspaceMessage::parse(&dataspace.data, superblock.size_of_lengths)?;
62
63        // Parse datatype
64        let dtype_msg = header
65            .find_message(msg_types::DATATYPE)
66            .ok_or_else(|| HDF5Error::General("dataset missing datatype message".into()))?;
67        let dtype = DataType::parse(&dtype_msg.data)?;
68
69        // Parse data layout
70        let layout_msg = header
71            .find_message(msg_types::DATA_LAYOUT)
72            .ok_or_else(|| HDF5Error::General("dataset missing data layout message".into()))?;
73        let layout = StorageLayout::parse(
74            &layout_msg.data,
75            superblock.size_of_offsets,
76            superblock.size_of_lengths,
77        )?;
78
79        // Parse filter pipeline (optional — absent means no filters)
80        let filters = if let Some(filter_msg) = header.find_message(msg_types::FILTER_PIPELINE) {
81            FilterPipeline::parse(&filter_msg.data)?
82        } else {
83            FilterPipeline::empty()
84        };
85
86        // Parse fill value (optional)
87        let fill_value = if let Some(fv_msg) = header.find_message(msg_types::FILL_VALUE) {
88            FillValueMessage::parse(&fv_msg.data)?.value
89        } else if let Some(fv_msg) = header.find_message(msg_types::FILL_VALUE_OLD) {
90            // Old fill value message: size(4) + data
91            let mut r = HDF5Reader::new(fv_msg.data.clone());
92            let size = r.read_u32()? as usize;
93            if size > 0 {
94                Some(r.read_bytes(size)?)
95            } else {
96                None
97            }
98        } else {
99            None
100        };
101
102        Ok(Self {
103            name,
104            header,
105            reader,
106            raw_reader,
107            superblock,
108            shape: dataspace.dimensions,
109            dataspace_type: dataspace.dataspace_type,
110            dtype,
111            layout,
112            filters,
113            fill_value,
114            cached_chunk_index: OnceCell::new(),
115        })
116    }
117
118    /// The dataset's name (not the full path).
119    pub fn name(&self) -> &str {
120        &self.name
121    }
122
123    /// Dataset shape (current dimensions).
124    pub fn shape(&self) -> &[u64] {
125        &self.shape
126    }
127
128    /// Number of dimensions.
129    pub fn ndim(&self) -> usize {
130        self.shape.len()
131    }
132
133    /// Data type.
134    pub fn dtype(&self) -> &DataType {
135        &self.dtype
136    }
137
138    /// Element size in bytes.
139    pub fn element_size(&self) -> u32 {
140        self.dtype.size()
141    }
142
143    /// Chunk shape (None for contiguous or compact storage).
144    pub fn chunk_shape(&self) -> Option<&[u64]> {
145        match &self.layout {
146            StorageLayout::Chunked { chunk_shape, .. } => Some(chunk_shape),
147            _ => None,
148        }
149    }
150
151    /// Filter pipeline.
152    pub fn filters(&self) -> &FilterPipeline {
153        &self.filters
154    }
155
156    /// Fill value bytes (interpretation depends on dtype).
157    pub fn fill_value(&self) -> Option<&[u8]> {
158        self.fill_value.as_deref()
159    }
160
161    /// Storage layout.
162    pub fn layout(&self) -> &StorageLayout {
163        &self.layout
164    }
165
166    /// Whether this dataset has a null dataspace (no data, type 2).
167    pub fn is_null_dataspace(&self) -> bool {
168        self.dataspace_type == 2
169    }
170
171    /// Whether this dataset uses external data files (msg type 0x0007).
172    pub fn has_external_storage(&self) -> bool {
173        self.header
174            .messages
175            .iter()
176            .any(|m| m.msg_type == msg_types::EXTERNAL_DATA_FILES)
177    }
178
179    /// Access the object header.
180    pub fn header(&self) -> &ObjectHeader {
181        &self.header
182    }
183
184    /// Get all attributes attached to this dataset, resolving vlen data.
185    pub async fn attributes(&self) -> Vec<Attribute> {
186        attributes_from_header(
187            &self.header,
188            &self.reader,
189            self.superblock.size_of_offsets,
190            self.superblock.size_of_lengths,
191        )
192        .await
193    }
194
195    /// Get a single attribute by name.
196    pub async fn attribute(&self, name: &str) -> Option<Attribute> {
197        self.attributes().await.into_iter().find(|a| a.name == name)
198    }
199
200    /// Extract the chunk index, caching the result after first resolution.
201    ///
202    /// For chunked datasets, traverses the B-tree to enumerate all chunks.
203    /// For contiguous datasets, returns a single-entry index.
204    /// For compact datasets, returns an empty index (data is inline in the header).
205    pub async fn chunk_index(&self) -> Result<&ChunkIndex> {
206        self.cached_chunk_index
207            .get_or_try_init(|| self.resolve_chunk_index())
208            .await
209    }
210
211    /// Internal: actually resolve the chunk index (no caching).
212    async fn resolve_chunk_index(&self) -> Result<ChunkIndex> {
213        match &self.layout {
214            StorageLayout::Compact { .. } => {
215                // Compact: data is in the object header. No file-level chunks.
216                Ok(ChunkIndex::new(vec![], vec![], self.shape.clone()))
217            }
218
219            StorageLayout::Contiguous { address, size } => {
220                if HDF5Reader::is_undef_addr(*address, self.superblock.size_of_offsets) {
221                    // Unallocated (never written) — empty index
222                    return Ok(ChunkIndex::new(vec![], vec![], self.shape.clone()));
223                }
224                Ok(ChunkIndex::contiguous(*address, *size, self.shape.clone()))
225            }
226
227            StorageLayout::Chunked {
228                chunk_shape,
229                index_address,
230                indexing_type,
231                index_params,
232                flags,
233            } => {
234                if HDF5Reader::is_undef_addr(*index_address, self.superblock.size_of_offsets) {
235                    // Unallocated — no chunks written yet
236                    return Ok(ChunkIndex::new(
237                        vec![],
238                        chunk_shape.clone(),
239                        self.shape.clone(),
240                    ));
241                }
242
243                match indexing_type {
244                    ChunkIndexType::BTreeV1 => {
245                        self.chunk_index_btree_v1(*index_address, chunk_shape).await
246                    }
247                    ChunkIndexType::BTreeV2 => {
248                        self.chunk_index_btree_v2(*index_address, chunk_shape).await
249                    }
250                    ChunkIndexType::SingleChunk => {
251                        self.chunk_index_single_chunk(*index_address, chunk_shape, index_params)
252                    }
253                    ChunkIndexType::FixedArray => {
254                        self.chunk_index_fixed_array(*index_address, chunk_shape, *flags)
255                            .await
256                    }
257                    ChunkIndexType::ExtensibleArray => {
258                        self.chunk_index_extensible_array(*index_address, chunk_shape, *flags)
259                            .await
260                    }
261                    other => Err(HDF5Error::General(format!(
262                        "chunk indexing type {other:?} not yet supported"
263                    ))),
264                }
265            }
266        }
267    }
268
269    /// Extract chunk index from a B-tree v1 (legacy chunked datasets).
270    async fn chunk_index_btree_v1(
271        &self,
272        btree_address: u64,
273        chunk_shape: &[u64],
274    ) -> Result<ChunkIndex> {
275        let ndims = self.shape.len();
276
277        let entries = btree::v1::read_chunk_btree_v1(
278            &self.reader,
279            btree_address,
280            ndims,
281            self.superblock.size_of_offsets,
282            self.superblock.size_of_lengths,
283        )
284        .await?;
285
286        // Convert B-tree v1 entries to ChunkLocation.
287        // v1 chunk keys store offsets in *elements* (not scaled). We need to
288        // convert to chunk indices by dividing by chunk dimensions.
289        let locations: Vec<ChunkLocation> = entries
290            .into_iter()
291            .map(|e| {
292                let indices: Vec<u64> = e
293                    .offsets
294                    .iter()
295                    .zip(chunk_shape.iter())
296                    .map(|(&offset, &cs)| offset / cs)
297                    .collect();
298                ChunkLocation {
299                    indices,
300                    byte_offset: e.address,
301                    byte_length: e.size as u64,
302                    filter_mask: e.filter_mask,
303                }
304            })
305            .collect();
306
307        Ok(ChunkIndex::new(
308            locations,
309            chunk_shape.to_vec(),
310            self.shape.clone(),
311        ))
312    }
313
314    /// Extract chunk index from a B-tree v2 (modern chunked datasets).
315    async fn chunk_index_btree_v2(
316        &self,
317        btree_address: u64,
318        chunk_shape: &[u64],
319    ) -> Result<ChunkIndex> {
320        let ndims = self.shape.len();
321
322        let header = btree::v2::BTreeV2Header::read(
323            &self.reader,
324            btree_address,
325            self.superblock.size_of_offsets,
326            self.superblock.size_of_lengths,
327        )
328        .await?;
329
330        let raw_records = btree::v2::collect_all_records(
331            &self.reader,
332            &header,
333            self.superblock.size_of_offsets,
334            self.superblock.size_of_lengths,
335        )
336        .await?;
337
338        match header.record_type {
339            11 => {
340                // Filtered chunks (most common for compressed datasets like NISAR)
341                let chunk_records = btree::v2::parse_chunk_records_filtered(
342                    &raw_records,
343                    ndims,
344                    self.superblock.size_of_offsets,
345                    self.superblock.size_of_lengths,
346                )?;
347
348                let locations: Vec<ChunkLocation> = chunk_records
349                    .into_iter()
350                    .filter(|c| {
351                        !HDF5Reader::is_undef_addr(c.address, self.superblock.size_of_offsets)
352                    })
353                    .map(|c| ChunkLocation {
354                        indices: c.scaled_offsets,
355                        byte_offset: c.address,
356                        byte_length: c.chunk_size,
357                        filter_mask: c.filter_mask,
358                    })
359                    .collect();
360
361                Ok(ChunkIndex::new(
362                    locations,
363                    chunk_shape.to_vec(),
364                    self.shape.clone(),
365                ))
366            }
367            10 => {
368                // Non-filtered chunks
369                let chunk_records = btree::v2::parse_chunk_records_non_filtered(
370                    &raw_records,
371                    ndims,
372                    self.superblock.size_of_offsets,
373                )?;
374
375                // For non-filtered, every chunk has the same uncompressed size
376                let uncompressed_chunk_size: u64 =
377                    chunk_shape.iter().product::<u64>() * self.dtype.size() as u64;
378
379                let locations: Vec<ChunkLocation> = chunk_records
380                    .into_iter()
381                    .filter(|c| {
382                        !HDF5Reader::is_undef_addr(c.address, self.superblock.size_of_offsets)
383                    })
384                    .map(|c| ChunkLocation {
385                        indices: c.scaled_offsets,
386                        byte_offset: c.address,
387                        byte_length: uncompressed_chunk_size,
388                        filter_mask: 0,
389                    })
390                    .collect();
391
392                Ok(ChunkIndex::new(
393                    locations,
394                    chunk_shape.to_vec(),
395                    self.shape.clone(),
396                ))
397            }
398            other => Err(HDF5Error::General(format!(
399                "B-tree v2 record type {other} not supported for chunk indexing (expected 10 or 11)"
400            ))),
401        }
402    }
403
404    /// Extract chunk index from a Fixed Array (type 3).
405    async fn chunk_index_fixed_array(
406        &self,
407        fahd_address: u64,
408        chunk_shape: &[u64],
409        _layout_flags: u8,
410    ) -> Result<ChunkIndex> {
411        let ndims = self.shape.len();
412
413        // Read the FAHD header
414        let fahd = fixed_array::FixedArrayHeader::read(
415            &self.reader,
416            fahd_address,
417            self.superblock.size_of_offsets,
418            self.superblock.size_of_lengths,
419        )
420        .await?;
421
422        // Compute uncompressed chunk size for non-filtered entries
423        let uncompressed_chunk_size =
424            chunk_shape.iter().product::<u64>() * self.dtype.size() as u64;
425
426        let layout_version = 4u8;
427
428        // Read all entries from the FADB
429        let entries = fixed_array::read_fixed_array_entries(
430            &self.reader,
431            &fahd,
432            self.superblock.size_of_offsets,
433            self.superblock.size_of_lengths,
434            uncompressed_chunk_size,
435            layout_version,
436        )
437        .await?;
438
439        // Convert to ChunkLocations. The entries are stored in row-major order
440        // of the chunk grid.
441        let grid_shape: Vec<u64> = self
442            .shape
443            .iter()
444            .zip(chunk_shape.iter())
445            .map(|(&ds, &cs)| ds.div_ceil(cs))
446            .collect();
447
448        let mut locations = Vec::new();
449        for (flat_idx, entry) in entries.iter().enumerate() {
450            // Skip undefined addresses (unallocated chunks)
451            if HDF5Reader::is_undef_addr(entry.address, self.superblock.size_of_offsets) {
452                continue;
453            }
454
455            // Convert flat index to multi-dimensional indices (row-major)
456            let mut indices = vec![0u64; ndims];
457            let mut remaining = flat_idx as u64;
458            for d in (0..ndims).rev() {
459                indices[d] = remaining % grid_shape[d];
460                remaining /= grid_shape[d];
461            }
462
463            locations.push(ChunkLocation {
464                indices,
465                byte_offset: entry.address,
466                byte_length: entry.chunk_size,
467                filter_mask: entry.filter_mask,
468            });
469        }
470
471        Ok(ChunkIndex::new(
472            locations,
473            chunk_shape.to_vec(),
474            self.shape.clone(),
475        ))
476    }
477
478    /// Extract chunk index from an Extensible Array (type 4).
479    async fn chunk_index_extensible_array(
480        &self,
481        eahd_address: u64,
482        chunk_shape: &[u64],
483        _layout_flags: u8,
484    ) -> Result<ChunkIndex> {
485        let ndims = self.shape.len();
486
487        // Read the EAHD header
488        let eahd = extensible_array::ExtensibleArrayHeader::read(
489            &self.reader,
490            eahd_address,
491            self.superblock.size_of_offsets,
492            self.superblock.size_of_lengths,
493        )
494        .await?;
495
496        // Compute uncompressed chunk size for non-filtered entries
497        let uncompressed_chunk_size =
498            chunk_shape.iter().product::<u64>() * self.dtype.size() as u64;
499
500        let layout_version = 4u8;
501
502        // Read all entries from the EA structure
503        let entries = extensible_array::read_extensible_array_entries(
504            &self.reader,
505            &eahd,
506            self.superblock.size_of_offsets,
507            self.superblock.size_of_lengths,
508            uncompressed_chunk_size,
509            layout_version,
510        )
511        .await?;
512
513        // Convert to ChunkLocations using flat indices from EA
514        let grid_shape: Vec<u64> = self
515            .shape
516            .iter()
517            .zip(chunk_shape.iter())
518            .map(|(&ds, &cs)| ds.div_ceil(cs))
519            .collect();
520
521        let mut locations = Vec::with_capacity(entries.len());
522        for indexed in &entries {
523            let mut indices = vec![0u64; ndims];
524            let mut remaining = indexed.flat_idx;
525            for d in (0..ndims).rev() {
526                indices[d] = remaining % grid_shape[d];
527                remaining /= grid_shape[d];
528            }
529
530            locations.push(ChunkLocation {
531                indices,
532                byte_offset: indexed.entry.address,
533                byte_length: indexed.entry.chunk_size,
534                filter_mask: indexed.entry.filter_mask,
535            });
536        }
537
538        Ok(ChunkIndex::new(
539            locations,
540            chunk_shape.to_vec(),
541            self.shape.clone(),
542        ))
543    }
544
545    /// Fetch multiple chunks in a single batched I/O call.
546    ///
547    /// Looks up byte ranges from the chunk index and fetches them all via
548    /// `raw_reader.get_byte_ranges()`, bypassing the `BlockCache`.
549    ///
550    /// Returns one entry per input index, in the same order.  Chunks that
551    /// are not present in the index (unallocated) are returned as `None`.
552    pub async fn batch_get_chunks(&self, chunk_indices: &[Vec<u64>]) -> Result<Vec<Option<Bytes>>> {
553        let index = self.chunk_index().await?; // cached after first call
554
555        // Collect byte ranges for all requested chunks
556        let mut ranges: Vec<Range<u64>> = Vec::new();
557        let mut range_map: Vec<Option<usize>> = Vec::with_capacity(chunk_indices.len());
558
559        for indices in chunk_indices {
560            if let Some(loc) = index.get(indices) {
561                range_map.push(Some(ranges.len()));
562                ranges.push(loc.byte_offset..loc.byte_offset + loc.byte_length);
563            } else {
564                range_map.push(None);
565            }
566        }
567
568        if ranges.is_empty() {
569            return Ok(vec![None; chunk_indices.len()]);
570        }
571
572        // Single batched fetch via raw reader (bypasses BlockCache)
573        let fetched = self.raw_reader.get_byte_ranges(ranges).await?;
574
575        // Map results back to input order
576        let mut results = Vec::with_capacity(chunk_indices.len());
577        for mapping in &range_map {
578            match mapping {
579                Some(idx) => results.push(Some(fetched[*idx].clone())),
580                None => results.push(None),
581            }
582        }
583        Ok(results)
584    }
585
586    /// Fetch multiple byte ranges in a single batched I/O call.
587    ///
588    /// Unlike `batch_get_chunks`, this takes pre-resolved `(offset, length)`
589    /// pairs — no chunk index lookup is performed.  Use this when the caller
590    /// has already resolved the chunk index and wants to avoid re-parsing.
591    pub async fn batch_fetch_ranges(&self, ranges: &[(u64, u64)]) -> Result<Vec<Bytes>> {
592        let byte_ranges: Vec<Range<u64>> = ranges
593            .iter()
594            .map(|&(offset, length)| offset..offset + length)
595            .collect();
596        self.raw_reader.get_byte_ranges(byte_ranges).await
597    }
598
599    /// Handle single-chunk datasets (chunk indexing type 1).
600    fn chunk_index_single_chunk(
601        &self,
602        address: u64,
603        chunk_shape: &[u64],
604        index_params: &ChunkIndexParams,
605    ) -> Result<ChunkIndex> {
606        let ndims = self.shape.len();
607
608        let (byte_length, filter_mask) = match index_params {
609            ChunkIndexParams::SingleChunk {
610                filtered_size,
611                filter_mask,
612            } => (*filtered_size, *filter_mask),
613            _ => {
614                // Unfiltered single chunk — compute size from shape and element size
615                let size = chunk_shape.iter().product::<u64>() * self.dtype.size() as u64;
616                (size, 0u32)
617            }
618        };
619
620        let location = ChunkLocation {
621            indices: vec![0; ndims],
622            byte_offset: address,
623            byte_length,
624            filter_mask,
625        };
626
627        Ok(ChunkIndex::new(
628            vec![location],
629            chunk_shape.to_vec(),
630            self.shape.clone(),
631        ))
632    }
633}