1use std::sync::Arc;
2
3use bytes::Bytes;
4
5use crate::endian::HDF5Reader;
6use crate::error::Result;
7use crate::group::HDF5Group;
8use crate::object_header::ObjectHeader;
9use crate::reader::{AsyncFileReader, BlockCache};
10use crate::superblock::Superblock;
11
12#[derive(Debug)]
28pub struct HDF5File {
29 reader: Arc<dyn AsyncFileReader>,
30 raw_reader: Arc<dyn AsyncFileReader>,
32 superblock: Superblock,
33}
34
35impl HDF5File {
36 pub async fn open(reader: impl AsyncFileReader) -> Result<Self> {
41 Self::open_with_block_size(reader, 8 * 1024 * 1024).await
42 }
43
44 pub async fn open_with_block_size(
46 reader: impl AsyncFileReader,
47 block_size: u64,
48 ) -> Result<Self> {
49 Self::open_with_options(reader, block_size, None).await
50 }
51
52 pub async fn open_with_options(
62 reader: impl AsyncFileReader,
63 block_size: u64,
64 pre_warm_threshold: Option<u64>,
65 ) -> Result<Self> {
66 let raw: Arc<dyn AsyncFileReader> = Arc::new(reader);
67 let cached = BlockCache::new(raw.clone()).with_block_size(block_size);
68
69 if let Some(threshold) = pre_warm_threshold {
71 if let Some(size) = raw.file_size().await? {
72 cached.pre_warm(size, threshold).await?;
73 }
74 }
75
76 let initial_bytes = cached.get_bytes(0..block_size.min(64 * 1024)).await?;
77 let (superblock, _offset) = Superblock::parse(&initial_bytes)?;
78
79 Ok(Self {
80 reader: Arc::new(cached),
81 raw_reader: raw,
82 superblock,
83 })
84 }
85
86 pub async fn open_raw(reader: Arc<dyn AsyncFileReader>) -> Result<Self> {
91 let initial_bytes = reader.get_bytes(0..64 * 1024).await?;
92 let (superblock, _offset) = Superblock::parse(&initial_bytes)?;
93
94 Ok(Self {
95 raw_reader: reader.clone(),
96 reader,
97 superblock,
98 })
99 }
100
101 pub fn superblock(&self) -> &Superblock {
103 &self.superblock
104 }
105
106 pub fn reader(&self) -> &Arc<dyn AsyncFileReader> {
108 &self.reader
109 }
110
111 pub async fn read_object_header(&self, address: u64) -> Result<ObjectHeader> {
113 read_object_header(
114 &self.reader,
115 address,
116 self.superblock.size_of_offsets,
117 self.superblock.size_of_lengths,
118 )
119 .await
120 }
121
122 pub async fn root_group_header(&self) -> Result<ObjectHeader> {
124 self.read_object_header(self.superblock.root_group_address)
125 .await
126 }
127
128 pub fn raw_reader(&self) -> &Arc<dyn AsyncFileReader> {
130 &self.raw_reader
131 }
132
133 pub async fn root_group(&self) -> Result<HDF5Group> {
135 let header = self.root_group_header().await?;
136 Ok(HDF5Group::new(
137 "/".to_string(),
138 header,
139 Arc::clone(&self.reader),
140 Arc::clone(&self.raw_reader),
141 Arc::new(self.superblock.clone()),
142 ))
143 }
144}
145
146pub(crate) async fn read_object_header(
148 reader: &Arc<dyn AsyncFileReader>,
149 address: u64,
150 size_of_offsets: u8,
151 size_of_lengths: u8,
152) -> Result<ObjectHeader> {
153 let initial_size = 4096u64;
155 let end = address.checked_add(initial_size).ok_or_else(|| {
156 crate::error::HDF5Error::General(format!(
157 "object header address {address:#x} overflows when computing fetch range"
158 ))
159 })?;
160 let data = reader.get_bytes(address..end).await?;
161
162 let needed = peek_object_header_size(&data)?;
164 let data = if needed > initial_size {
165 let end = address.checked_add(needed).ok_or_else(|| {
166 crate::error::HDF5Error::General(format!(
167 "object header address {address:#x} overflows when computing fetch range"
168 ))
169 })?;
170 reader.get_bytes(address..end).await?
171 } else {
172 data
173 };
174
175 let mut header = ObjectHeader::parse(&data, size_of_offsets, size_of_lengths)?;
176
177 let mut pending = header.continuation_addresses(size_of_offsets, size_of_lengths)?;
180 while let Some((cont_addr, cont_len)) = pending.pop() {
181 let cont_end = cont_addr.checked_add(cont_len).ok_or_else(|| {
182 crate::error::HDF5Error::General(format!(
183 "continuation address {cont_addr:#x} + length {cont_len:#x} overflows"
184 ))
185 })?;
186 let cont_data = reader.get_bytes(cont_addr..cont_end).await?;
187 let new_messages = parse_continuation_chunk(
188 &cont_data,
189 size_of_offsets,
190 size_of_lengths,
191 header.version,
192 header.track_creation_order,
193 )?;
194
195 for msg in &new_messages {
197 if msg.msg_type == crate::object_header::msg_types::HEADER_CONTINUATION {
198 let mut r =
199 HDF5Reader::with_sizes(msg.data.clone(), size_of_offsets, size_of_lengths);
200 let address = r.read_offset()?;
201 let length = r.read_length()?;
202 pending.push((address, length));
203 }
204 }
205
206 header.messages.extend(new_messages);
207 }
208
209 Ok(header)
210}
211
212fn peek_object_header_size(data: &Bytes) -> Result<u64> {
217 if data.len() < 6 {
218 return Err(crate::error::HDF5Error::General(
219 "object header data too short".into(),
220 ));
221 }
222
223 if data.len() >= 4 && data[0..4] == [b'O', b'H', b'D', b'R'] {
224 let flags = data[5];
226 let mut offset = 6usize;
227
228 if flags & 0x20 != 0 {
230 offset += 16;
231 }
232
233 if flags & 0x10 != 0 {
235 offset += 4;
236 }
237
238 let chunk_size_width = 1usize << (flags & 0x03);
240 if data.len() < offset + chunk_size_width {
241 return Err(crate::error::HDF5Error::General(
242 "object header too short for chunk size field".into(),
243 ));
244 }
245
246 let chunk0_size = match chunk_size_width {
247 1 => data[offset] as u64,
248 2 => u16::from_le_bytes(data[offset..offset + 2].try_into().unwrap()) as u64,
249 4 => u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap()) as u64,
250 8 => u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap()),
251 _ => unreachable!(),
252 };
253
254 Ok((offset + chunk_size_width) as u64 + chunk0_size)
256 } else {
257 if data.len() < 12 {
260 return Err(crate::error::HDF5Error::General(
261 "v1 object header too short".into(),
262 ));
263 }
264 let header_size = u32::from_le_bytes(data[8..12].try_into().unwrap()) as u64;
265 Ok(16 + header_size)
266 }
267}
268
269fn parse_continuation_chunk(
271 data: &Bytes,
272 size_of_offsets: u8,
273 size_of_lengths: u8,
274 header_version: u8,
275 track_creation_order: bool,
276) -> Result<Vec<crate::object_header::HeaderMessage>> {
277 use crate::object_header::msg_types;
278
279 let mut r = HDF5Reader::with_sizes(data.clone(), size_of_offsets, size_of_lengths);
280 let mut messages = Vec::new();
281
282 if header_version == 2 {
283 r.read_signature(b"OCHK")?;
285
286 let end = data.len() as u64 - 4; while r.position() < end {
288 let msg_type = r.read_u8()? as u16;
289 let msg_size = r.read_u16()? as usize;
290 let flags = r.read_u8()?;
291
292 if track_creation_order {
294 let _creation_order = r.read_u16()?;
295 }
296
297 if msg_type == msg_types::NIL {
299 break;
300 }
301
302 let msg_data = if msg_size > 0 {
303 let d = r.slice_from_position(msg_size)?;
304 r.skip(msg_size as u64);
305 d
306 } else {
307 Bytes::new()
308 };
309
310 messages.push(crate::object_header::HeaderMessage {
311 msg_type,
312 data: msg_data,
313 flags,
314 });
315 }
316 } else {
317 let end = data.len() as u64;
319 while r.position() + 8 <= end {
320 let msg_type = r.read_u16()?;
321 let msg_size = r.read_u16()? as usize;
322 let flags = r.read_u8()?;
323 r.skip(3); if msg_size == 0 && msg_type == msg_types::NIL {
326 r.skip_to_alignment(8);
327 continue;
328 }
329
330 let msg_data = if msg_size > 0 {
331 let d = r.slice_from_position(msg_size)?;
332 r.skip(msg_size as u64);
333 d
334 } else {
335 Bytes::new()
336 };
337
338 r.skip_to_alignment(8);
339
340 messages.push(crate::object_header::HeaderMessage {
341 msg_type,
342 data: msg_data,
343 flags,
344 });
345 }
346 }
347
348 Ok(messages)
349}