Skip to main content

hdf5_reader/
shared_message_table.rs

1//! Shared object-header message table (SOHM).
2//!
3//! The superblock extension can point at a file-level `SMTB` table containing
4//! one or more shared-message indexes. Each index is either an `SMLI` list or
5//! a v2 B-tree with record type 7.
6
7use std::sync::Arc;
8
9use crate::btree_v2::{self, BTreeV2Record};
10use crate::checksum::jenkins_lookup3;
11use crate::error::{Error, Result};
12use crate::fractal_heap::FractalHeap;
13use crate::io::Cursor;
14use crate::messages::{parse_message, HdfMessage};
15use crate::storage::Storage;
16
17const SMTB_SIGNATURE: [u8; 4] = *b"SMTB";
18const SMLI_SIGNATURE: [u8; 4] = *b"SMLI";
19
20/// File-level SOHM master table.
21#[derive(Debug, Clone)]
22pub(crate) struct SharedMessageTable {
23    indexes: Vec<SharedMessageIndex>,
24}
25
26#[derive(Debug, Clone)]
27struct SharedMessageIndex {
28    index_type: SharedMessageIndexType,
29    message_type_flags: u16,
30    min_message_size: u32,
31    list_cutoff: u16,
32    btree_cutoff: u16,
33    num_messages: u16,
34    index_address: u64,
35    fractal_heap_address: u64,
36}
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39enum SharedMessageIndexType {
40    List,
41    BTree,
42}
43
44#[derive(Debug, Clone)]
45enum SharedMessageRecord {
46    Heap {
47        hash: u32,
48        reference_count: u32,
49        heap_id: Vec<u8>,
50    },
51    ObjectHeader {
52        hash: u32,
53        message_type: u16,
54        object_header_index: u16,
55        object_header_address: u64,
56    },
57}
58
59impl SharedMessageTable {
60    /// Parse a SOHM master table from storage.
61    pub(crate) fn parse_at_storage(
62        storage: &dyn Storage,
63        address: u64,
64        num_indices: u8,
65        offset_size: u8,
66    ) -> Result<Self> {
67        let entry_len = 1 + 1 + 2 + 4 + 2 + 2 + 2 + 2 + usize::from(offset_size) * 2;
68        let table_len = 4 + usize::from(num_indices) * entry_len + 4;
69        let bytes = storage.read_range(address, table_len)?;
70        let mut cursor = Cursor::new(bytes.as_ref());
71        let sig = cursor.read_bytes(4)?;
72        if sig != SMTB_SIGNATURE {
73            return Err(Error::InvalidData(format!(
74                "expected SMTB signature at offset {address:#x}"
75            )));
76        }
77
78        let mut indexes = Vec::with_capacity(usize::from(num_indices));
79        for _ in 0..num_indices {
80            let version = cursor.read_u8()?;
81            if version != 0 {
82                return Err(Error::InvalidData(format!(
83                    "unsupported SOHM index version: {version}"
84                )));
85            }
86            let index_type = match cursor.read_u8()? {
87                0 => SharedMessageIndexType::List,
88                1 => SharedMessageIndexType::BTree,
89                other => {
90                    return Err(Error::InvalidData(format!(
91                        "unsupported SOHM index type: {other}"
92                    )))
93                }
94            };
95            let message_type_flags = cursor.read_u16_le()?;
96            let min_message_size = cursor.read_u32_le()?;
97            let list_cutoff = cursor.read_u16_le()?;
98            let btree_cutoff = cursor.read_u16_le()?;
99            let num_messages = cursor.read_u16_le()?;
100            cursor.skip(2)?;
101            let index_address = cursor.read_offset(offset_size)?;
102            let fractal_heap_address = cursor.read_offset(offset_size)?;
103            indexes.push(SharedMessageIndex {
104                index_type,
105                message_type_flags,
106                min_message_size,
107                list_cutoff,
108                btree_cutoff,
109                num_messages,
110                index_address,
111                fractal_heap_address,
112            });
113        }
114
115        let checksum_pos = cursor.position() as usize;
116        let stored_checksum = cursor.read_u32_le()?;
117        let computed = jenkins_lookup3(&bytes.as_ref()[..checksum_pos]);
118        if computed != stored_checksum {
119            return Err(Error::ChecksumMismatch {
120                expected: stored_checksum,
121                actual: computed,
122            });
123        }
124
125        Ok(Self { indexes })
126    }
127
128    /// Resolve a SOHM heap ID into the concrete object-header message.
129    pub(crate) fn resolve_heap_message(
130        &self,
131        heap_id: &[u8],
132        message_type: u16,
133        storage: &dyn Storage,
134        offset_size: u8,
135        length_size: u8,
136    ) -> Result<Option<HdfMessage>> {
137        let preferred_indexes: Vec<&SharedMessageIndex> = self
138            .indexes
139            .iter()
140            .filter(|index| index.tracks_message_type(message_type))
141            .collect();
142
143        let indexes: Vec<&SharedMessageIndex> = if preferred_indexes.is_empty() {
144            self.indexes.iter().collect()
145        } else {
146            preferred_indexes
147        };
148
149        for index in indexes {
150            for record in index.records(storage, offset_size, length_size)? {
151                match record {
152                    SharedMessageRecord::Heap {
153                        hash,
154                        reference_count,
155                        heap_id: record_heap_id,
156                    } => {
157                        let _ = (hash, reference_count);
158                        if record_heap_id != heap_id {
159                            continue;
160                        }
161                    }
162                    SharedMessageRecord::ObjectHeader {
163                        hash,
164                        message_type,
165                        object_header_index,
166                        object_header_address,
167                    } => {
168                        let _ = (
169                            hash,
170                            message_type,
171                            object_header_index,
172                            object_header_address,
173                        );
174                        continue;
175                    }
176                }
177
178                if Cursor::is_undefined_offset(index.fractal_heap_address, offset_size) {
179                    return Err(Error::UndefinedAddress);
180                }
181                let heap = FractalHeap::parse_at_storage(
182                    storage,
183                    index.fractal_heap_address,
184                    offset_size,
185                    length_size,
186                )?;
187                let payload =
188                    heap.get_object_storage(heap_id, storage, offset_size, length_size)?;
189                let mut cursor = Cursor::new(&payload);
190                let message = parse_message(
191                    message_type,
192                    payload.len(),
193                    &mut cursor,
194                    offset_size,
195                    length_size,
196                )?;
197                return Ok(Some(message));
198            }
199        }
200
201        Ok(None)
202    }
203}
204
205impl SharedMessageIndex {
206    fn tracks_message_type(&self, message_type: u16) -> bool {
207        let Some(bit) = shared_message_type_bit(message_type) else {
208            return false;
209        };
210        (self.message_type_flags & (1u16 << bit)) != 0
211    }
212
213    fn records(
214        &self,
215        storage: &dyn Storage,
216        offset_size: u8,
217        length_size: u8,
218    ) -> Result<Vec<SharedMessageRecord>> {
219        let _ = (self.min_message_size, self.list_cutoff, self.btree_cutoff);
220        match self.index_type {
221            SharedMessageIndexType::List => {
222                parse_record_list(storage, self.index_address, self.num_messages, offset_size)
223            }
224            SharedMessageIndexType::BTree => {
225                let header = btree_v2::BTreeV2Header::parse_at_storage(
226                    storage,
227                    self.index_address,
228                    offset_size,
229                    length_size,
230                )?;
231                let records = btree_v2::collect_btree_v2_records_storage(
232                    storage,
233                    &header,
234                    offset_size,
235                    length_size,
236                    None,
237                    &[],
238                    None,
239                )?;
240                records
241                    .into_iter()
242                    .filter_map(record_from_btree)
243                    .collect::<Result<Vec<_>>>()
244            }
245        }
246    }
247}
248
249fn parse_record_list(
250    storage: &dyn Storage,
251    address: u64,
252    num_records: u16,
253    offset_size: u8,
254) -> Result<Vec<SharedMessageRecord>> {
255    if num_records == 0 {
256        return Ok(Vec::new());
257    }
258    let max_record_len = 20usize;
259    let max_len = 4 + usize::from(num_records) * max_record_len + 4;
260    let bytes = storage.read_range(address, max_len)?;
261    let mut cursor = Cursor::new(bytes.as_ref());
262    let sig = cursor.read_bytes(4)?;
263    if sig != SMLI_SIGNATURE {
264        return Err(Error::InvalidData(format!(
265            "expected SMLI signature at offset {address:#x}"
266        )));
267    }
268
269    let mut records = Vec::with_capacity(usize::from(num_records));
270    for _ in 0..num_records {
271        records.push(parse_record(&mut cursor, offset_size)?);
272    }
273
274    let checksum_pos = cursor.position() as usize;
275    let stored_checksum = cursor.read_u32_le()?;
276    let computed = jenkins_lookup3(&bytes.as_ref()[..checksum_pos]);
277    if computed != stored_checksum {
278        return Err(Error::ChecksumMismatch {
279            expected: stored_checksum,
280            actual: computed,
281        });
282    }
283
284    Ok(records)
285}
286
287fn parse_record(cursor: &mut Cursor<'_>, offset_size: u8) -> Result<SharedMessageRecord> {
288    let location = cursor.read_u8()?;
289    cursor.skip(3)?;
290    let hash = cursor.read_u32_le()?;
291    match location {
292        0 => {
293            let reference_count = cursor.read_u32_le()?;
294            let heap_id = cursor.read_bytes(8)?.to_vec();
295            Ok(SharedMessageRecord::Heap {
296                hash,
297                reference_count,
298                heap_id,
299            })
300        }
301        1 => {
302            let _reserved = cursor.read_u8()?;
303            let message_type = u16::from(cursor.read_u8()?);
304            let object_header_index = cursor.read_u16_le()?;
305            let object_header_address = cursor.read_offset(offset_size)?;
306            Ok(SharedMessageRecord::ObjectHeader {
307                hash,
308                message_type,
309                object_header_index,
310                object_header_address,
311            })
312        }
313        other => Err(Error::InvalidData(format!(
314            "unknown SOHM record location: {other}"
315        ))),
316    }
317}
318
319fn record_from_btree(record: BTreeV2Record) -> Option<Result<SharedMessageRecord>> {
320    match record {
321        BTreeV2Record::SharedMessageHeap {
322            hash,
323            reference_count,
324            heap_id,
325        } => Some(Ok(SharedMessageRecord::Heap {
326            hash,
327            reference_count,
328            heap_id,
329        })),
330        BTreeV2Record::SharedMessageObjectHeader {
331            hash,
332            message_type,
333            object_header_index,
334            object_header_address,
335        } => Some(Ok(SharedMessageRecord::ObjectHeader {
336            hash,
337            message_type,
338            object_header_index,
339            object_header_address,
340        })),
341        _ => None,
342    }
343}
344
345fn shared_message_type_bit(message_type: u16) -> Option<u8> {
346    match message_type {
347        crate::messages::MSG_DATASPACE => Some(0),
348        crate::messages::MSG_DATATYPE => Some(1),
349        crate::messages::MSG_FILL_VALUE | crate::messages::MSG_FILL_VALUE_OLD => Some(2),
350        crate::messages::MSG_FILTER_PIPELINE => Some(3),
351        crate::messages::MSG_ATTRIBUTE => Some(4),
352        _ => None,
353    }
354}
355
356pub(crate) type SharedMessageTableRef = Arc<SharedMessageTable>;
357
358#[cfg(test)]
359mod tests {
360    use super::*;
361    use crate::storage::BytesStorage;
362
363    #[test]
364    fn parses_master_table() {
365        let mut table = Vec::new();
366        table.extend_from_slice(b"SMTB");
367        table.push(0);
368        table.push(0);
369        table.extend_from_slice(&0x0002u16.to_le_bytes());
370        table.extend_from_slice(&16u32.to_le_bytes());
371        table.extend_from_slice(&8u16.to_le_bytes());
372        table.extend_from_slice(&6u16.to_le_bytes());
373        table.extend_from_slice(&1u16.to_le_bytes());
374        table.extend_from_slice(&[0, 0]);
375        table.extend_from_slice(&64u64.to_le_bytes());
376        table.extend_from_slice(&128u64.to_le_bytes());
377        let checksum = jenkins_lookup3(&table);
378        table.extend_from_slice(&checksum.to_le_bytes());
379
380        let storage = BytesStorage::new(table);
381        let parsed = SharedMessageTable::parse_at_storage(&storage, 0, 1, 8).unwrap();
382        assert_eq!(parsed.indexes.len(), 1);
383        assert!(parsed.indexes[0].tracks_message_type(crate::messages::MSG_DATATYPE));
384        assert_eq!(parsed.indexes[0].num_messages, 1);
385        assert_eq!(parsed.indexes[0].index_address, 64);
386        assert_eq!(parsed.indexes[0].fractal_heap_address, 128);
387    }
388
389    #[test]
390    fn parses_record_list_heap_record() {
391        let mut data = vec![0u8; 32];
392        let mut list = Vec::new();
393        list.extend_from_slice(b"SMLI");
394        list.push(0);
395        list.extend_from_slice(&[0, 0, 0]);
396        list.extend_from_slice(&0x1122_3344u32.to_le_bytes());
397        list.extend_from_slice(&2u32.to_le_bytes());
398        list.extend_from_slice(&[8, 7, 6, 5, 4, 3, 2, 1]);
399        let checksum = jenkins_lookup3(&list);
400        list.extend_from_slice(&checksum.to_le_bytes());
401        data.extend_from_slice(&list);
402
403        let storage = BytesStorage::new(data);
404        let records = parse_record_list(&storage, 32, 1, 8).unwrap();
405        match &records[0] {
406            SharedMessageRecord::Heap {
407                hash,
408                reference_count,
409                heap_id,
410            } => {
411                assert_eq!(*hash, 0x1122_3344);
412                assert_eq!(*reference_count, 2);
413                assert_eq!(heap_id, &[8, 7, 6, 5, 4, 3, 2, 1]);
414            }
415            other => panic!("expected heap record, got {:?}", other),
416        }
417    }
418}