Skip to main content

async_hdf5/
file.rs

1use std::sync::Arc;
2
3use bytes::Bytes;
4
5use crate::endian::HDF5Reader;
6use crate::error::Result;
7use crate::group::HDF5Group;
8use crate::object_header::ObjectHeader;
9use crate::reader::{AsyncFileReader, BlockCache};
10use crate::superblock::Superblock;
11
12/// An opened HDF5 file.
13///
14/// This is the main entry point for reading HDF5 metadata. It holds a
15/// reference to the async reader and the parsed superblock.
16///
17/// # Example
18///
19/// ```ignore
20/// use async_hdf5::HDF5File;
21/// use async_hdf5::reader::ObjectReader;
22///
23/// let reader = ObjectReader::new(store, path);
24/// let file = HDF5File::open(reader).await?;
25/// let root = file.root_group_header().await?;
26/// ```
27#[derive(Debug)]
28pub struct HDF5File {
29    reader: Arc<dyn AsyncFileReader>,
30    /// Raw (uncached) reader for direct byte-range fetches (e.g., batch chunk data).
31    raw_reader: Arc<dyn AsyncFileReader>,
32    superblock: Superblock,
33}
34
35impl HDF5File {
36    /// Open an HDF5 file by parsing its superblock.
37    ///
38    /// Wraps the given reader in a `BlockCache` (default 8 MiB blocks) to
39    /// coalesce the many small metadata reads into a few large requests.
40    pub async fn open(reader: impl AsyncFileReader) -> Result<Self> {
41        Self::open_with_block_size(reader, 8 * 1024 * 1024).await
42    }
43
44    /// Open with a configurable block cache size.
45    pub async fn open_with_block_size(
46        reader: impl AsyncFileReader,
47        block_size: u64,
48    ) -> Result<Self> {
49        Self::open_with_options(reader, block_size, None).await
50    }
51
52    /// Open with configurable block cache size and optional pre-warming.
53    ///
54    /// If `pre_warm_threshold` is `Some(n)`, the file size is queried via
55    /// `AsyncFileReader::file_size()` and up to `n` bytes of cache blocks
56    /// are fetched in parallel before returning.  For files smaller than `n`,
57    /// every block is fetched.  For larger files, the first `n` bytes worth
58    /// of blocks are fetched (HDF5 metadata is typically concentrated near
59    /// the start of the file).  This eliminates sequential cache misses
60    /// during B-tree / object-header traversal.
61    pub async fn open_with_options(
62        reader: impl AsyncFileReader,
63        block_size: u64,
64        pre_warm_threshold: Option<u64>,
65    ) -> Result<Self> {
66        let raw: Arc<dyn AsyncFileReader> = Arc::new(reader);
67        let cached = BlockCache::new(raw.clone()).with_block_size(block_size);
68
69        // Pre-warm: fetch blocks in parallel up to the threshold.
70        if let Some(threshold) = pre_warm_threshold {
71            if let Some(size) = raw.file_size().await? {
72                cached.pre_warm(size, threshold).await?;
73            }
74        }
75
76        let initial_bytes = cached.get_bytes(0..block_size.min(64 * 1024)).await?;
77        let (superblock, _offset) = Superblock::parse(&initial_bytes)?;
78
79        Ok(Self {
80            reader: Arc::new(cached),
81            raw_reader: raw,
82            superblock,
83        })
84    }
85
86    /// Open with an already-configured reader (e.g., a pre-built `BlockCache`).
87    ///
88    /// Note: `raw_reader` is set to the same reader. If you need batch chunk
89    /// fetches to bypass a cache layer, use `open` or `open_with_block_size`.
90    pub async fn open_raw(reader: Arc<dyn AsyncFileReader>) -> Result<Self> {
91        let initial_bytes = reader.get_bytes(0..64 * 1024).await?;
92        let (superblock, _offset) = Superblock::parse(&initial_bytes)?;
93
94        Ok(Self {
95            raw_reader: reader.clone(),
96            reader,
97            superblock,
98        })
99    }
100
101    /// Access the parsed superblock.
102    pub fn superblock(&self) -> &Superblock {
103        &self.superblock
104    }
105
106    /// Access the underlying async reader.
107    pub fn reader(&self) -> &Arc<dyn AsyncFileReader> {
108        &self.reader
109    }
110
111    /// Read and parse the object header at the given file address.
112    pub async fn read_object_header(&self, address: u64) -> Result<ObjectHeader> {
113        read_object_header(
114            &self.reader,
115            address,
116            self.superblock.size_of_offsets,
117            self.superblock.size_of_lengths,
118        )
119        .await
120    }
121
122    /// Read the root group's object header.
123    pub async fn root_group_header(&self) -> Result<ObjectHeader> {
124        self.read_object_header(self.superblock.root_group_address)
125            .await
126    }
127
128    /// Access the raw (uncached) reader for direct byte-range fetches.
129    pub fn raw_reader(&self) -> &Arc<dyn AsyncFileReader> {
130        &self.raw_reader
131    }
132
133    /// Get the root group as an `HDF5Group` for navigation.
134    pub async fn root_group(&self) -> Result<HDF5Group> {
135        let header = self.root_group_header().await?;
136        Ok(HDF5Group::new(
137            "/".to_string(),
138            header,
139            Arc::clone(&self.reader),
140            Arc::clone(&self.raw_reader),
141            Arc::new(self.superblock.clone()),
142        ))
143    }
144}
145
146/// Read and parse an object header, following any continuation messages.
147pub(crate) async fn read_object_header(
148    reader: &Arc<dyn AsyncFileReader>,
149    address: u64,
150    size_of_offsets: u8,
151    size_of_lengths: u8,
152) -> Result<ObjectHeader> {
153    // Initial fetch — 4 KB is usually enough for one object header.
154    let initial_size = 4096u64;
155    let end = address.checked_add(initial_size).ok_or_else(|| {
156        crate::error::HDF5Error::General(format!(
157            "object header address {address:#x} overflows when computing fetch range"
158        ))
159    })?;
160    let data = reader.get_bytes(address..end).await?;
161
162    // Check if the first chunk is larger than our initial fetch and re-fetch if needed.
163    let needed = peek_object_header_size(&data)?;
164    let data = if needed > initial_size {
165        let end = address.checked_add(needed).ok_or_else(|| {
166            crate::error::HDF5Error::General(format!(
167                "object header address {address:#x} overflows when computing fetch range"
168            ))
169        })?;
170        reader.get_bytes(address..end).await?
171    } else {
172        data
173    };
174
175    let mut header = ObjectHeader::parse(&data, size_of_offsets, size_of_lengths)?;
176
177    // Follow continuation messages iteratively — continuation chunks can
178    // themselves contain further continuation messages (nested chains).
179    let mut pending = header.continuation_addresses(size_of_offsets, size_of_lengths)?;
180    while let Some((cont_addr, cont_len)) = pending.pop() {
181        let cont_end = cont_addr.checked_add(cont_len).ok_or_else(|| {
182            crate::error::HDF5Error::General(format!(
183                "continuation address {cont_addr:#x} + length {cont_len:#x} overflows"
184            ))
185        })?;
186        let cont_data = reader.get_bytes(cont_addr..cont_end).await?;
187        let new_messages = parse_continuation_chunk(
188            &cont_data,
189            size_of_offsets,
190            size_of_lengths,
191            header.version,
192            header.track_creation_order,
193        )?;
194
195        // Check new messages for further continuations before adding them
196        for msg in &new_messages {
197            if msg.msg_type == crate::object_header::msg_types::HEADER_CONTINUATION {
198                let mut r =
199                    HDF5Reader::with_sizes(msg.data.clone(), size_of_offsets, size_of_lengths);
200                let address = r.read_offset()?;
201                let length = r.read_length()?;
202                pending.push((address, length));
203            }
204        }
205
206        header.messages.extend(new_messages);
207    }
208
209    Ok(header)
210}
211
212/// Peek at an object header's prefix to determine the total size of the first chunk.
213///
214/// For v2 headers: reads the flags and chunk0_size to compute prefix + chunk0_size.
215/// For v1 headers: reads the header_size field to compute 16 + header_size.
216fn peek_object_header_size(data: &Bytes) -> Result<u64> {
217    if data.len() < 6 {
218        return Err(crate::error::HDF5Error::General(
219            "object header data too short".into(),
220        ));
221    }
222
223    if data.len() >= 4 && data[0..4] == [b'O', b'H', b'D', b'R'] {
224        // v2 header
225        let flags = data[5];
226        let mut offset = 6usize;
227
228        // Optional timestamps (flags bit 5)
229        if flags & 0x20 != 0 {
230            offset += 16;
231        }
232
233        // Optional attribute phase change values (flags bit 4)
234        if flags & 0x10 != 0 {
235            offset += 4;
236        }
237
238        // Chunk size field width
239        let chunk_size_width = 1usize << (flags & 0x03);
240        if data.len() < offset + chunk_size_width {
241            return Err(crate::error::HDF5Error::General(
242                "object header too short for chunk size field".into(),
243            ));
244        }
245
246        let chunk0_size = match chunk_size_width {
247            1 => data[offset] as u64,
248            2 => u16::from_le_bytes(data[offset..offset + 2].try_into().unwrap()) as u64,
249            4 => u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap()) as u64,
250            8 => u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap()),
251            _ => unreachable!(),
252        };
253
254        // Total = header prefix + chunk0_size (which includes messages + checksum)
255        Ok((offset + chunk_size_width) as u64 + chunk0_size)
256    } else {
257        // v1 header: version(1) + reserved(1) + num_messages(2) + ref_count(4) +
258        //            header_size(4) + reserved(4) = 16 bytes prefix
259        if data.len() < 12 {
260            return Err(crate::error::HDF5Error::General(
261                "v1 object header too short".into(),
262            ));
263        }
264        let header_size = u32::from_le_bytes(data[8..12].try_into().unwrap()) as u64;
265        Ok(16 + header_size)
266    }
267}
268
269/// Parse messages from a continuation chunk (OCHK in v2, raw messages in v1).
270fn parse_continuation_chunk(
271    data: &Bytes,
272    size_of_offsets: u8,
273    size_of_lengths: u8,
274    header_version: u8,
275    track_creation_order: bool,
276) -> Result<Vec<crate::object_header::HeaderMessage>> {
277    use crate::object_header::msg_types;
278
279    let mut r = HDF5Reader::with_sizes(data.clone(), size_of_offsets, size_of_lengths);
280    let mut messages = Vec::new();
281
282    if header_version == 2 {
283        // v2 continuation: starts with OCHK signature
284        r.read_signature(b"OCHK")?;
285
286        let end = data.len() as u64 - 4; // minus checksum
287        while r.position() < end {
288            let msg_type = r.read_u8()? as u16;
289            let msg_size = r.read_u16()? as usize;
290            let flags = r.read_u8()?;
291
292            // Skip creation order field if tracked (same format as primary chunk)
293            if track_creation_order {
294                let _creation_order = r.read_u16()?;
295            }
296
297            // NIL message (type 0) signals start of gap/padding to end of chunk.
298            if msg_type == msg_types::NIL {
299                break;
300            }
301
302            let msg_data = if msg_size > 0 {
303                let d = r.slice_from_position(msg_size)?;
304                r.skip(msg_size as u64);
305                d
306            } else {
307                Bytes::new()
308            };
309
310            messages.push(crate::object_header::HeaderMessage {
311                msg_type,
312                data: msg_data,
313                flags,
314            });
315        }
316    } else {
317        // v1 continuation: raw messages with 8-byte alignment
318        let end = data.len() as u64;
319        while r.position() + 8 <= end {
320            let msg_type = r.read_u16()?;
321            let msg_size = r.read_u16()? as usize;
322            let flags = r.read_u8()?;
323            r.skip(3); // reserved
324
325            if msg_size == 0 && msg_type == msg_types::NIL {
326                r.skip_to_alignment(8);
327                continue;
328            }
329
330            let msg_data = if msg_size > 0 {
331                let d = r.slice_from_position(msg_size)?;
332                r.skip(msg_size as u64);
333                d
334            } else {
335                Bytes::new()
336            };
337
338            r.skip_to_alignment(8);
339
340            messages.push(crate::object_header::HeaderMessage {
341                msg_type,
342                data: msg_data,
343                flags,
344            });
345        }
346    }
347
348    Ok(messages)
349}