1use std::sync::Arc;
8
9use crate::btree_v2::{self, BTreeV2Record};
10use crate::checksum::jenkins_lookup3;
11use crate::error::{Error, Result};
12use crate::filters::FilterRegistry;
13use crate::fractal_heap::FractalHeap;
14use crate::io::Cursor;
15use crate::messages::{parse_message, HdfMessage};
16use crate::storage::Storage;
17
18const SMTB_SIGNATURE: [u8; 4] = *b"SMTB";
19const SMLI_SIGNATURE: [u8; 4] = *b"SMLI";
20
21#[derive(Debug, Clone)]
23pub(crate) struct SharedMessageTable {
24 indexes: Vec<SharedMessageIndex>,
25}
26
27#[derive(Debug, Clone)]
28struct SharedMessageIndex {
29 index_type: SharedMessageIndexType,
30 message_type_flags: u16,
31 min_message_size: u32,
32 list_cutoff: u16,
33 btree_cutoff: u16,
34 num_messages: u16,
35 index_address: u64,
36 fractal_heap_address: u64,
37}
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40enum SharedMessageIndexType {
41 List,
42 BTree,
43}
44
45#[derive(Debug, Clone)]
46enum SharedMessageRecord {
47 Heap {
48 hash: u32,
49 reference_count: u32,
50 heap_id: Vec<u8>,
51 },
52 ObjectHeader {
53 hash: u32,
54 message_type: u16,
55 object_header_index: u16,
56 object_header_address: u64,
57 },
58}
59
60impl SharedMessageTable {
61 pub(crate) fn parse_at_storage(
63 storage: &dyn Storage,
64 address: u64,
65 num_indices: u8,
66 offset_size: u8,
67 ) -> Result<Self> {
68 let entry_len = 1 + 1 + 2 + 4 + 2 + 2 + 2 + 2 + usize::from(offset_size) * 2;
69 let table_len = 4 + usize::from(num_indices) * entry_len + 4;
70 let bytes = storage.read_range(address, table_len)?;
71 let mut cursor = Cursor::new(bytes.as_ref());
72 let sig = cursor.read_bytes(4)?;
73 if sig != SMTB_SIGNATURE {
74 return Err(Error::InvalidData(format!(
75 "expected SMTB signature at offset {address:#x}"
76 )));
77 }
78
79 let mut indexes = Vec::with_capacity(usize::from(num_indices));
80 for _ in 0..num_indices {
81 let version = cursor.read_u8()?;
82 if version != 0 {
83 return Err(Error::InvalidData(format!(
84 "unsupported SOHM index version: {version}"
85 )));
86 }
87 let index_type = match cursor.read_u8()? {
88 0 => SharedMessageIndexType::List,
89 1 => SharedMessageIndexType::BTree,
90 other => {
91 return Err(Error::InvalidData(format!(
92 "unsupported SOHM index type: {other}"
93 )))
94 }
95 };
96 let message_type_flags = cursor.read_u16_le()?;
97 let min_message_size = cursor.read_u32_le()?;
98 let list_cutoff = cursor.read_u16_le()?;
99 let btree_cutoff = cursor.read_u16_le()?;
100 let num_messages = cursor.read_u16_le()?;
101 cursor.skip(2)?;
102 let index_address = cursor.read_offset(offset_size)?;
103 let fractal_heap_address = cursor.read_offset(offset_size)?;
104 indexes.push(SharedMessageIndex {
105 index_type,
106 message_type_flags,
107 min_message_size,
108 list_cutoff,
109 btree_cutoff,
110 num_messages,
111 index_address,
112 fractal_heap_address,
113 });
114 }
115
116 let checksum_pos = cursor.position() as usize;
117 let stored_checksum = cursor.read_u32_le()?;
118 let computed = jenkins_lookup3(&bytes.as_ref()[..checksum_pos]);
119 if computed != stored_checksum {
120 return Err(Error::ChecksumMismatch {
121 expected: stored_checksum,
122 actual: computed,
123 });
124 }
125
126 Ok(Self { indexes })
127 }
128
129 pub(crate) fn resolve_heap_message(
131 &self,
132 heap_id: &[u8],
133 message_type: u16,
134 storage: &dyn Storage,
135 offset_size: u8,
136 length_size: u8,
137 filter_registry: Option<&FilterRegistry>,
138 ) -> Result<Option<HdfMessage>> {
139 let preferred_indexes: Vec<&SharedMessageIndex> = self
140 .indexes
141 .iter()
142 .filter(|index| index.tracks_message_type(message_type))
143 .collect();
144
145 let indexes: Vec<&SharedMessageIndex> = if preferred_indexes.is_empty() {
146 self.indexes.iter().collect()
147 } else {
148 preferred_indexes
149 };
150
151 for index in indexes {
152 for record in index.records(storage, offset_size, length_size)? {
153 match record {
154 SharedMessageRecord::Heap {
155 hash,
156 reference_count,
157 heap_id: record_heap_id,
158 } => {
159 let _ = (hash, reference_count);
160 if record_heap_id != heap_id {
161 continue;
162 }
163 }
164 SharedMessageRecord::ObjectHeader {
165 hash,
166 message_type,
167 object_header_index,
168 object_header_address,
169 } => {
170 let _ = (
171 hash,
172 message_type,
173 object_header_index,
174 object_header_address,
175 );
176 continue;
177 }
178 }
179
180 if Cursor::is_undefined_offset(index.fractal_heap_address, offset_size) {
181 return Err(Error::UndefinedAddress);
182 }
183 let heap = FractalHeap::parse_at_storage(
184 storage,
185 index.fractal_heap_address,
186 offset_size,
187 length_size,
188 )?;
189 let payload = heap.get_object_storage_with_registry(
190 heap_id,
191 storage,
192 offset_size,
193 length_size,
194 filter_registry,
195 )?;
196 let mut cursor = Cursor::new(&payload);
197 let message = parse_message(
198 message_type,
199 payload.len(),
200 &mut cursor,
201 offset_size,
202 length_size,
203 )?;
204 return Ok(Some(message));
205 }
206 }
207
208 Ok(None)
209 }
210}
211
212impl SharedMessageIndex {
213 fn tracks_message_type(&self, message_type: u16) -> bool {
214 let Some(bit) = shared_message_type_bit(message_type) else {
215 return false;
216 };
217 (self.message_type_flags & (1u16 << bit)) != 0
218 }
219
220 fn records(
221 &self,
222 storage: &dyn Storage,
223 offset_size: u8,
224 length_size: u8,
225 ) -> Result<Vec<SharedMessageRecord>> {
226 let _ = (self.min_message_size, self.list_cutoff, self.btree_cutoff);
227 match self.index_type {
228 SharedMessageIndexType::List => {
229 parse_record_list(storage, self.index_address, self.num_messages, offset_size)
230 }
231 SharedMessageIndexType::BTree => {
232 let header = btree_v2::BTreeV2Header::parse_at_storage(
233 storage,
234 self.index_address,
235 offset_size,
236 length_size,
237 )?;
238 let records = btree_v2::collect_btree_v2_records_storage(
239 storage,
240 &header,
241 offset_size,
242 length_size,
243 None,
244 &[],
245 None,
246 )?;
247 records
248 .into_iter()
249 .filter_map(record_from_btree)
250 .collect::<Result<Vec<_>>>()
251 }
252 }
253 }
254}
255
256fn parse_record_list(
257 storage: &dyn Storage,
258 address: u64,
259 num_records: u16,
260 offset_size: u8,
261) -> Result<Vec<SharedMessageRecord>> {
262 if num_records == 0 {
263 return Ok(Vec::new());
264 }
265 let max_record_len = 20usize;
266 let max_len = 4 + usize::from(num_records) * max_record_len + 4;
267 let bytes = storage.read_range(address, max_len)?;
268 let mut cursor = Cursor::new(bytes.as_ref());
269 let sig = cursor.read_bytes(4)?;
270 if sig != SMLI_SIGNATURE {
271 return Err(Error::InvalidData(format!(
272 "expected SMLI signature at offset {address:#x}"
273 )));
274 }
275
276 let mut records = Vec::with_capacity(usize::from(num_records));
277 for _ in 0..num_records {
278 records.push(parse_record(&mut cursor, offset_size)?);
279 }
280
281 let checksum_pos = cursor.position() as usize;
282 let stored_checksum = cursor.read_u32_le()?;
283 let computed = jenkins_lookup3(&bytes.as_ref()[..checksum_pos]);
284 if computed != stored_checksum {
285 return Err(Error::ChecksumMismatch {
286 expected: stored_checksum,
287 actual: computed,
288 });
289 }
290
291 Ok(records)
292}
293
294fn parse_record(cursor: &mut Cursor<'_>, offset_size: u8) -> Result<SharedMessageRecord> {
295 let location = cursor.read_u8()?;
296 cursor.skip(3)?;
297 let hash = cursor.read_u32_le()?;
298 match location {
299 0 => {
300 let reference_count = cursor.read_u32_le()?;
301 let heap_id = cursor.read_bytes(8)?.to_vec();
302 Ok(SharedMessageRecord::Heap {
303 hash,
304 reference_count,
305 heap_id,
306 })
307 }
308 1 => {
309 let _reserved = cursor.read_u8()?;
310 let message_type = u16::from(cursor.read_u8()?);
311 let object_header_index = cursor.read_u16_le()?;
312 let object_header_address = cursor.read_offset(offset_size)?;
313 Ok(SharedMessageRecord::ObjectHeader {
314 hash,
315 message_type,
316 object_header_index,
317 object_header_address,
318 })
319 }
320 other => Err(Error::InvalidData(format!(
321 "unknown SOHM record location: {other}"
322 ))),
323 }
324}
325
326fn record_from_btree(record: BTreeV2Record) -> Option<Result<SharedMessageRecord>> {
327 match record {
328 BTreeV2Record::SharedMessageHeap {
329 hash,
330 reference_count,
331 heap_id,
332 } => Some(Ok(SharedMessageRecord::Heap {
333 hash,
334 reference_count,
335 heap_id,
336 })),
337 BTreeV2Record::SharedMessageObjectHeader {
338 hash,
339 message_type,
340 object_header_index,
341 object_header_address,
342 } => Some(Ok(SharedMessageRecord::ObjectHeader {
343 hash,
344 message_type,
345 object_header_index,
346 object_header_address,
347 })),
348 _ => None,
349 }
350}
351
352fn shared_message_type_bit(message_type: u16) -> Option<u8> {
353 match message_type {
354 crate::messages::MSG_DATASPACE => Some(0),
355 crate::messages::MSG_DATATYPE => Some(1),
356 crate::messages::MSG_FILL_VALUE | crate::messages::MSG_FILL_VALUE_OLD => Some(2),
357 crate::messages::MSG_FILTER_PIPELINE => Some(3),
358 crate::messages::MSG_ATTRIBUTE => Some(4),
359 _ => None,
360 }
361}
362
363pub(crate) type SharedMessageTableRef = Arc<SharedMessageTable>;
364
365#[cfg(test)]
366mod tests {
367 use super::*;
368 use crate::storage::BytesStorage;
369
370 #[test]
371 fn parses_master_table() {
372 let mut table = Vec::new();
373 table.extend_from_slice(b"SMTB");
374 table.push(0);
375 table.push(0);
376 table.extend_from_slice(&0x0002u16.to_le_bytes());
377 table.extend_from_slice(&16u32.to_le_bytes());
378 table.extend_from_slice(&8u16.to_le_bytes());
379 table.extend_from_slice(&6u16.to_le_bytes());
380 table.extend_from_slice(&1u16.to_le_bytes());
381 table.extend_from_slice(&[0, 0]);
382 table.extend_from_slice(&64u64.to_le_bytes());
383 table.extend_from_slice(&128u64.to_le_bytes());
384 let checksum = jenkins_lookup3(&table);
385 table.extend_from_slice(&checksum.to_le_bytes());
386
387 let storage = BytesStorage::new(table);
388 let parsed = SharedMessageTable::parse_at_storage(&storage, 0, 1, 8).unwrap();
389 assert_eq!(parsed.indexes.len(), 1);
390 assert!(parsed.indexes[0].tracks_message_type(crate::messages::MSG_DATATYPE));
391 assert_eq!(parsed.indexes[0].num_messages, 1);
392 assert_eq!(parsed.indexes[0].index_address, 64);
393 assert_eq!(parsed.indexes[0].fractal_heap_address, 128);
394 }
395
396 #[test]
397 fn parses_record_list_heap_record() {
398 let mut data = vec![0u8; 32];
399 let mut list = Vec::new();
400 list.extend_from_slice(b"SMLI");
401 list.push(0);
402 list.extend_from_slice(&[0, 0, 0]);
403 list.extend_from_slice(&0x1122_3344u32.to_le_bytes());
404 list.extend_from_slice(&2u32.to_le_bytes());
405 list.extend_from_slice(&[8, 7, 6, 5, 4, 3, 2, 1]);
406 let checksum = jenkins_lookup3(&list);
407 list.extend_from_slice(&checksum.to_le_bytes());
408 data.extend_from_slice(&list);
409
410 let storage = BytesStorage::new(data);
411 let records = parse_record_list(&storage, 32, 1, 8).unwrap();
412 match &records[0] {
413 SharedMessageRecord::Heap {
414 hash,
415 reference_count,
416 heap_id,
417 } => {
418 assert_eq!(*hash, 0x1122_3344);
419 assert_eq!(*reference_count, 2);
420 assert_eq!(heap_id, &[8, 7, 6, 5, 4, 3, 2, 1]);
421 }
422 other => panic!("expected heap record, got {:?}", other),
423 }
424 }
425}