Skip to main content

hdf5_reader/
shared_message_table.rs

1//! Shared object-header message table (SOHM).
2//!
3//! The superblock extension can point at a file-level `SMTB` table containing
4//! one or more shared-message indexes. Each index is either an `SMLI` list or
5//! a v2 B-tree with record type 7.
6
7use std::sync::Arc;
8
9use crate::btree_v2::{self, BTreeV2Record};
10use crate::checksum::jenkins_lookup3;
11use crate::error::{Error, Result};
12use crate::filters::FilterRegistry;
13use crate::fractal_heap::FractalHeap;
14use crate::io::Cursor;
15use crate::messages::{parse_message, HdfMessage};
16use crate::storage::Storage;
17
18const SMTB_SIGNATURE: [u8; 4] = *b"SMTB";
19const SMLI_SIGNATURE: [u8; 4] = *b"SMLI";
20
21/// File-level SOHM master table.
22#[derive(Debug, Clone)]
23pub(crate) struct SharedMessageTable {
24    indexes: Vec<SharedMessageIndex>,
25}
26
27#[derive(Debug, Clone)]
28struct SharedMessageIndex {
29    index_type: SharedMessageIndexType,
30    message_type_flags: u16,
31    min_message_size: u32,
32    list_cutoff: u16,
33    btree_cutoff: u16,
34    num_messages: u16,
35    index_address: u64,
36    fractal_heap_address: u64,
37}
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40enum SharedMessageIndexType {
41    List,
42    BTree,
43}
44
45#[derive(Debug, Clone)]
46enum SharedMessageRecord {
47    Heap {
48        hash: u32,
49        reference_count: u32,
50        heap_id: Vec<u8>,
51    },
52    ObjectHeader {
53        hash: u32,
54        message_type: u16,
55        object_header_index: u16,
56        object_header_address: u64,
57    },
58}
59
60impl SharedMessageTable {
61    /// Parse a SOHM master table from storage.
62    pub(crate) fn parse_at_storage(
63        storage: &dyn Storage,
64        address: u64,
65        num_indices: u8,
66        offset_size: u8,
67    ) -> Result<Self> {
68        let entry_len = 1 + 1 + 2 + 4 + 2 + 2 + 2 + 2 + usize::from(offset_size) * 2;
69        let table_len = 4 + usize::from(num_indices) * entry_len + 4;
70        let bytes = storage.read_range(address, table_len)?;
71        let mut cursor = Cursor::new(bytes.as_ref());
72        let sig = cursor.read_bytes(4)?;
73        if sig != SMTB_SIGNATURE {
74            return Err(Error::InvalidData(format!(
75                "expected SMTB signature at offset {address:#x}"
76            )));
77        }
78
79        let mut indexes = Vec::with_capacity(usize::from(num_indices));
80        for _ in 0..num_indices {
81            let version = cursor.read_u8()?;
82            if version != 0 {
83                return Err(Error::InvalidData(format!(
84                    "unsupported SOHM index version: {version}"
85                )));
86            }
87            let index_type = match cursor.read_u8()? {
88                0 => SharedMessageIndexType::List,
89                1 => SharedMessageIndexType::BTree,
90                other => {
91                    return Err(Error::InvalidData(format!(
92                        "unsupported SOHM index type: {other}"
93                    )))
94                }
95            };
96            let message_type_flags = cursor.read_u16_le()?;
97            let min_message_size = cursor.read_u32_le()?;
98            let list_cutoff = cursor.read_u16_le()?;
99            let btree_cutoff = cursor.read_u16_le()?;
100            let num_messages = cursor.read_u16_le()?;
101            cursor.skip(2)?;
102            let index_address = cursor.read_offset(offset_size)?;
103            let fractal_heap_address = cursor.read_offset(offset_size)?;
104            indexes.push(SharedMessageIndex {
105                index_type,
106                message_type_flags,
107                min_message_size,
108                list_cutoff,
109                btree_cutoff,
110                num_messages,
111                index_address,
112                fractal_heap_address,
113            });
114        }
115
116        let checksum_pos = cursor.position() as usize;
117        let stored_checksum = cursor.read_u32_le()?;
118        let computed = jenkins_lookup3(&bytes.as_ref()[..checksum_pos]);
119        if computed != stored_checksum {
120            return Err(Error::ChecksumMismatch {
121                expected: stored_checksum,
122                actual: computed,
123            });
124        }
125
126        Ok(Self { indexes })
127    }
128
129    /// Resolve a SOHM heap ID into the concrete object-header message.
130    pub(crate) fn resolve_heap_message(
131        &self,
132        heap_id: &[u8],
133        message_type: u16,
134        storage: &dyn Storage,
135        offset_size: u8,
136        length_size: u8,
137        filter_registry: Option<&FilterRegistry>,
138    ) -> Result<Option<HdfMessage>> {
139        let preferred_indexes: Vec<&SharedMessageIndex> = self
140            .indexes
141            .iter()
142            .filter(|index| index.tracks_message_type(message_type))
143            .collect();
144
145        let indexes: Vec<&SharedMessageIndex> = if preferred_indexes.is_empty() {
146            self.indexes.iter().collect()
147        } else {
148            preferred_indexes
149        };
150
151        for index in indexes {
152            for record in index.records(storage, offset_size, length_size)? {
153                match record {
154                    SharedMessageRecord::Heap {
155                        hash,
156                        reference_count,
157                        heap_id: record_heap_id,
158                    } => {
159                        let _ = (hash, reference_count);
160                        if record_heap_id != heap_id {
161                            continue;
162                        }
163                    }
164                    SharedMessageRecord::ObjectHeader {
165                        hash,
166                        message_type,
167                        object_header_index,
168                        object_header_address,
169                    } => {
170                        let _ = (
171                            hash,
172                            message_type,
173                            object_header_index,
174                            object_header_address,
175                        );
176                        continue;
177                    }
178                }
179
180                if Cursor::is_undefined_offset(index.fractal_heap_address, offset_size) {
181                    return Err(Error::UndefinedAddress);
182                }
183                let heap = FractalHeap::parse_at_storage(
184                    storage,
185                    index.fractal_heap_address,
186                    offset_size,
187                    length_size,
188                )?;
189                let payload = heap.get_object_storage_with_registry(
190                    heap_id,
191                    storage,
192                    offset_size,
193                    length_size,
194                    filter_registry,
195                )?;
196                let mut cursor = Cursor::new(&payload);
197                let message = parse_message(
198                    message_type,
199                    payload.len(),
200                    &mut cursor,
201                    offset_size,
202                    length_size,
203                )?;
204                return Ok(Some(message));
205            }
206        }
207
208        Ok(None)
209    }
210}
211
212impl SharedMessageIndex {
213    fn tracks_message_type(&self, message_type: u16) -> bool {
214        let Some(bit) = shared_message_type_bit(message_type) else {
215            return false;
216        };
217        (self.message_type_flags & (1u16 << bit)) != 0
218    }
219
220    fn records(
221        &self,
222        storage: &dyn Storage,
223        offset_size: u8,
224        length_size: u8,
225    ) -> Result<Vec<SharedMessageRecord>> {
226        let _ = (self.min_message_size, self.list_cutoff, self.btree_cutoff);
227        match self.index_type {
228            SharedMessageIndexType::List => {
229                parse_record_list(storage, self.index_address, self.num_messages, offset_size)
230            }
231            SharedMessageIndexType::BTree => {
232                let header = btree_v2::BTreeV2Header::parse_at_storage(
233                    storage,
234                    self.index_address,
235                    offset_size,
236                    length_size,
237                )?;
238                let records = btree_v2::collect_btree_v2_records_storage(
239                    storage,
240                    &header,
241                    offset_size,
242                    length_size,
243                    None,
244                    &[],
245                    None,
246                )?;
247                records
248                    .into_iter()
249                    .filter_map(record_from_btree)
250                    .collect::<Result<Vec<_>>>()
251            }
252        }
253    }
254}
255
256fn parse_record_list(
257    storage: &dyn Storage,
258    address: u64,
259    num_records: u16,
260    offset_size: u8,
261) -> Result<Vec<SharedMessageRecord>> {
262    if num_records == 0 {
263        return Ok(Vec::new());
264    }
265    let max_record_len = 20usize;
266    let max_len = 4 + usize::from(num_records) * max_record_len + 4;
267    let bytes = storage.read_range(address, max_len)?;
268    let mut cursor = Cursor::new(bytes.as_ref());
269    let sig = cursor.read_bytes(4)?;
270    if sig != SMLI_SIGNATURE {
271        return Err(Error::InvalidData(format!(
272            "expected SMLI signature at offset {address:#x}"
273        )));
274    }
275
276    let mut records = Vec::with_capacity(usize::from(num_records));
277    for _ in 0..num_records {
278        records.push(parse_record(&mut cursor, offset_size)?);
279    }
280
281    let checksum_pos = cursor.position() as usize;
282    let stored_checksum = cursor.read_u32_le()?;
283    let computed = jenkins_lookup3(&bytes.as_ref()[..checksum_pos]);
284    if computed != stored_checksum {
285        return Err(Error::ChecksumMismatch {
286            expected: stored_checksum,
287            actual: computed,
288        });
289    }
290
291    Ok(records)
292}
293
294fn parse_record(cursor: &mut Cursor<'_>, offset_size: u8) -> Result<SharedMessageRecord> {
295    let location = cursor.read_u8()?;
296    cursor.skip(3)?;
297    let hash = cursor.read_u32_le()?;
298    match location {
299        0 => {
300            let reference_count = cursor.read_u32_le()?;
301            let heap_id = cursor.read_bytes(8)?.to_vec();
302            Ok(SharedMessageRecord::Heap {
303                hash,
304                reference_count,
305                heap_id,
306            })
307        }
308        1 => {
309            let _reserved = cursor.read_u8()?;
310            let message_type = u16::from(cursor.read_u8()?);
311            let object_header_index = cursor.read_u16_le()?;
312            let object_header_address = cursor.read_offset(offset_size)?;
313            Ok(SharedMessageRecord::ObjectHeader {
314                hash,
315                message_type,
316                object_header_index,
317                object_header_address,
318            })
319        }
320        other => Err(Error::InvalidData(format!(
321            "unknown SOHM record location: {other}"
322        ))),
323    }
324}
325
326fn record_from_btree(record: BTreeV2Record) -> Option<Result<SharedMessageRecord>> {
327    match record {
328        BTreeV2Record::SharedMessageHeap {
329            hash,
330            reference_count,
331            heap_id,
332        } => Some(Ok(SharedMessageRecord::Heap {
333            hash,
334            reference_count,
335            heap_id,
336        })),
337        BTreeV2Record::SharedMessageObjectHeader {
338            hash,
339            message_type,
340            object_header_index,
341            object_header_address,
342        } => Some(Ok(SharedMessageRecord::ObjectHeader {
343            hash,
344            message_type,
345            object_header_index,
346            object_header_address,
347        })),
348        _ => None,
349    }
350}
351
352fn shared_message_type_bit(message_type: u16) -> Option<u8> {
353    match message_type {
354        crate::messages::MSG_DATASPACE => Some(0),
355        crate::messages::MSG_DATATYPE => Some(1),
356        crate::messages::MSG_FILL_VALUE | crate::messages::MSG_FILL_VALUE_OLD => Some(2),
357        crate::messages::MSG_FILTER_PIPELINE => Some(3),
358        crate::messages::MSG_ATTRIBUTE => Some(4),
359        _ => None,
360    }
361}
362
363pub(crate) type SharedMessageTableRef = Arc<SharedMessageTable>;
364
365#[cfg(test)]
366mod tests {
367    use super::*;
368    use crate::storage::BytesStorage;
369
370    #[test]
371    fn parses_master_table() {
372        let mut table = Vec::new();
373        table.extend_from_slice(b"SMTB");
374        table.push(0);
375        table.push(0);
376        table.extend_from_slice(&0x0002u16.to_le_bytes());
377        table.extend_from_slice(&16u32.to_le_bytes());
378        table.extend_from_slice(&8u16.to_le_bytes());
379        table.extend_from_slice(&6u16.to_le_bytes());
380        table.extend_from_slice(&1u16.to_le_bytes());
381        table.extend_from_slice(&[0, 0]);
382        table.extend_from_slice(&64u64.to_le_bytes());
383        table.extend_from_slice(&128u64.to_le_bytes());
384        let checksum = jenkins_lookup3(&table);
385        table.extend_from_slice(&checksum.to_le_bytes());
386
387        let storage = BytesStorage::new(table);
388        let parsed = SharedMessageTable::parse_at_storage(&storage, 0, 1, 8).unwrap();
389        assert_eq!(parsed.indexes.len(), 1);
390        assert!(parsed.indexes[0].tracks_message_type(crate::messages::MSG_DATATYPE));
391        assert_eq!(parsed.indexes[0].num_messages, 1);
392        assert_eq!(parsed.indexes[0].index_address, 64);
393        assert_eq!(parsed.indexes[0].fractal_heap_address, 128);
394    }
395
396    #[test]
397    fn parses_record_list_heap_record() {
398        let mut data = vec![0u8; 32];
399        let mut list = Vec::new();
400        list.extend_from_slice(b"SMLI");
401        list.push(0);
402        list.extend_from_slice(&[0, 0, 0]);
403        list.extend_from_slice(&0x1122_3344u32.to_le_bytes());
404        list.extend_from_slice(&2u32.to_le_bytes());
405        list.extend_from_slice(&[8, 7, 6, 5, 4, 3, 2, 1]);
406        let checksum = jenkins_lookup3(&list);
407        list.extend_from_slice(&checksum.to_le_bytes());
408        data.extend_from_slice(&list);
409
410        let storage = BytesStorage::new(data);
411        let records = parse_record_list(&storage, 32, 1, 8).unwrap();
412        match &records[0] {
413            SharedMessageRecord::Heap {
414                hash,
415                reference_count,
416                heap_id,
417            } => {
418                assert_eq!(*hash, 0x1122_3344);
419                assert_eq!(*reference_count, 2);
420                assert_eq!(heap_id, &[8, 7, 6, 5, 4, 3, 2, 1]);
421            }
422            other => panic!("expected heap record, got {:?}", other),
423        }
424    }
425}