1use std::sync::Arc;
8
9use crate::btree_v2::{self, BTreeV2Record};
10use crate::checksum::jenkins_lookup3;
11use crate::error::{Error, Result};
12use crate::fractal_heap::FractalHeap;
13use crate::io::Cursor;
14use crate::messages::{parse_message, HdfMessage};
15use crate::storage::Storage;
16
17const SMTB_SIGNATURE: [u8; 4] = *b"SMTB";
18const SMLI_SIGNATURE: [u8; 4] = *b"SMLI";
19
20#[derive(Debug, Clone)]
22pub(crate) struct SharedMessageTable {
23 indexes: Vec<SharedMessageIndex>,
24}
25
26#[derive(Debug, Clone)]
27struct SharedMessageIndex {
28 index_type: SharedMessageIndexType,
29 message_type_flags: u16,
30 min_message_size: u32,
31 list_cutoff: u16,
32 btree_cutoff: u16,
33 num_messages: u16,
34 index_address: u64,
35 fractal_heap_address: u64,
36}
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39enum SharedMessageIndexType {
40 List,
41 BTree,
42}
43
44#[derive(Debug, Clone)]
45enum SharedMessageRecord {
46 Heap {
47 hash: u32,
48 reference_count: u32,
49 heap_id: Vec<u8>,
50 },
51 ObjectHeader {
52 hash: u32,
53 message_type: u16,
54 object_header_index: u16,
55 object_header_address: u64,
56 },
57}
58
59impl SharedMessageTable {
60 pub(crate) fn parse_at_storage(
62 storage: &dyn Storage,
63 address: u64,
64 num_indices: u8,
65 offset_size: u8,
66 ) -> Result<Self> {
67 let entry_len = 1 + 1 + 2 + 4 + 2 + 2 + 2 + 2 + usize::from(offset_size) * 2;
68 let table_len = 4 + usize::from(num_indices) * entry_len + 4;
69 let bytes = storage.read_range(address, table_len)?;
70 let mut cursor = Cursor::new(bytes.as_ref());
71 let sig = cursor.read_bytes(4)?;
72 if sig != SMTB_SIGNATURE {
73 return Err(Error::InvalidData(format!(
74 "expected SMTB signature at offset {address:#x}"
75 )));
76 }
77
78 let mut indexes = Vec::with_capacity(usize::from(num_indices));
79 for _ in 0..num_indices {
80 let version = cursor.read_u8()?;
81 if version != 0 {
82 return Err(Error::InvalidData(format!(
83 "unsupported SOHM index version: {version}"
84 )));
85 }
86 let index_type = match cursor.read_u8()? {
87 0 => SharedMessageIndexType::List,
88 1 => SharedMessageIndexType::BTree,
89 other => {
90 return Err(Error::InvalidData(format!(
91 "unsupported SOHM index type: {other}"
92 )))
93 }
94 };
95 let message_type_flags = cursor.read_u16_le()?;
96 let min_message_size = cursor.read_u32_le()?;
97 let list_cutoff = cursor.read_u16_le()?;
98 let btree_cutoff = cursor.read_u16_le()?;
99 let num_messages = cursor.read_u16_le()?;
100 cursor.skip(2)?;
101 let index_address = cursor.read_offset(offset_size)?;
102 let fractal_heap_address = cursor.read_offset(offset_size)?;
103 indexes.push(SharedMessageIndex {
104 index_type,
105 message_type_flags,
106 min_message_size,
107 list_cutoff,
108 btree_cutoff,
109 num_messages,
110 index_address,
111 fractal_heap_address,
112 });
113 }
114
115 let checksum_pos = cursor.position() as usize;
116 let stored_checksum = cursor.read_u32_le()?;
117 let computed = jenkins_lookup3(&bytes.as_ref()[..checksum_pos]);
118 if computed != stored_checksum {
119 return Err(Error::ChecksumMismatch {
120 expected: stored_checksum,
121 actual: computed,
122 });
123 }
124
125 Ok(Self { indexes })
126 }
127
128 pub(crate) fn resolve_heap_message(
130 &self,
131 heap_id: &[u8],
132 message_type: u16,
133 storage: &dyn Storage,
134 offset_size: u8,
135 length_size: u8,
136 ) -> Result<Option<HdfMessage>> {
137 let preferred_indexes: Vec<&SharedMessageIndex> = self
138 .indexes
139 .iter()
140 .filter(|index| index.tracks_message_type(message_type))
141 .collect();
142
143 let indexes: Vec<&SharedMessageIndex> = if preferred_indexes.is_empty() {
144 self.indexes.iter().collect()
145 } else {
146 preferred_indexes
147 };
148
149 for index in indexes {
150 for record in index.records(storage, offset_size, length_size)? {
151 match record {
152 SharedMessageRecord::Heap {
153 hash,
154 reference_count,
155 heap_id: record_heap_id,
156 } => {
157 let _ = (hash, reference_count);
158 if record_heap_id != heap_id {
159 continue;
160 }
161 }
162 SharedMessageRecord::ObjectHeader {
163 hash,
164 message_type,
165 object_header_index,
166 object_header_address,
167 } => {
168 let _ = (
169 hash,
170 message_type,
171 object_header_index,
172 object_header_address,
173 );
174 continue;
175 }
176 }
177
178 if Cursor::is_undefined_offset(index.fractal_heap_address, offset_size) {
179 return Err(Error::UndefinedAddress);
180 }
181 let heap = FractalHeap::parse_at_storage(
182 storage,
183 index.fractal_heap_address,
184 offset_size,
185 length_size,
186 )?;
187 let payload =
188 heap.get_object_storage(heap_id, storage, offset_size, length_size)?;
189 let mut cursor = Cursor::new(&payload);
190 let message = parse_message(
191 message_type,
192 payload.len(),
193 &mut cursor,
194 offset_size,
195 length_size,
196 )?;
197 return Ok(Some(message));
198 }
199 }
200
201 Ok(None)
202 }
203}
204
205impl SharedMessageIndex {
206 fn tracks_message_type(&self, message_type: u16) -> bool {
207 let Some(bit) = shared_message_type_bit(message_type) else {
208 return false;
209 };
210 (self.message_type_flags & (1u16 << bit)) != 0
211 }
212
213 fn records(
214 &self,
215 storage: &dyn Storage,
216 offset_size: u8,
217 length_size: u8,
218 ) -> Result<Vec<SharedMessageRecord>> {
219 let _ = (self.min_message_size, self.list_cutoff, self.btree_cutoff);
220 match self.index_type {
221 SharedMessageIndexType::List => {
222 parse_record_list(storage, self.index_address, self.num_messages, offset_size)
223 }
224 SharedMessageIndexType::BTree => {
225 let header = btree_v2::BTreeV2Header::parse_at_storage(
226 storage,
227 self.index_address,
228 offset_size,
229 length_size,
230 )?;
231 let records = btree_v2::collect_btree_v2_records_storage(
232 storage,
233 &header,
234 offset_size,
235 length_size,
236 None,
237 &[],
238 None,
239 )?;
240 records
241 .into_iter()
242 .filter_map(record_from_btree)
243 .collect::<Result<Vec<_>>>()
244 }
245 }
246 }
247}
248
249fn parse_record_list(
250 storage: &dyn Storage,
251 address: u64,
252 num_records: u16,
253 offset_size: u8,
254) -> Result<Vec<SharedMessageRecord>> {
255 if num_records == 0 {
256 return Ok(Vec::new());
257 }
258 let max_record_len = 20usize;
259 let max_len = 4 + usize::from(num_records) * max_record_len + 4;
260 let bytes = storage.read_range(address, max_len)?;
261 let mut cursor = Cursor::new(bytes.as_ref());
262 let sig = cursor.read_bytes(4)?;
263 if sig != SMLI_SIGNATURE {
264 return Err(Error::InvalidData(format!(
265 "expected SMLI signature at offset {address:#x}"
266 )));
267 }
268
269 let mut records = Vec::with_capacity(usize::from(num_records));
270 for _ in 0..num_records {
271 records.push(parse_record(&mut cursor, offset_size)?);
272 }
273
274 let checksum_pos = cursor.position() as usize;
275 let stored_checksum = cursor.read_u32_le()?;
276 let computed = jenkins_lookup3(&bytes.as_ref()[..checksum_pos]);
277 if computed != stored_checksum {
278 return Err(Error::ChecksumMismatch {
279 expected: stored_checksum,
280 actual: computed,
281 });
282 }
283
284 Ok(records)
285}
286
287fn parse_record(cursor: &mut Cursor<'_>, offset_size: u8) -> Result<SharedMessageRecord> {
288 let location = cursor.read_u8()?;
289 cursor.skip(3)?;
290 let hash = cursor.read_u32_le()?;
291 match location {
292 0 => {
293 let reference_count = cursor.read_u32_le()?;
294 let heap_id = cursor.read_bytes(8)?.to_vec();
295 Ok(SharedMessageRecord::Heap {
296 hash,
297 reference_count,
298 heap_id,
299 })
300 }
301 1 => {
302 let _reserved = cursor.read_u8()?;
303 let message_type = u16::from(cursor.read_u8()?);
304 let object_header_index = cursor.read_u16_le()?;
305 let object_header_address = cursor.read_offset(offset_size)?;
306 Ok(SharedMessageRecord::ObjectHeader {
307 hash,
308 message_type,
309 object_header_index,
310 object_header_address,
311 })
312 }
313 other => Err(Error::InvalidData(format!(
314 "unknown SOHM record location: {other}"
315 ))),
316 }
317}
318
319fn record_from_btree(record: BTreeV2Record) -> Option<Result<SharedMessageRecord>> {
320 match record {
321 BTreeV2Record::SharedMessageHeap {
322 hash,
323 reference_count,
324 heap_id,
325 } => Some(Ok(SharedMessageRecord::Heap {
326 hash,
327 reference_count,
328 heap_id,
329 })),
330 BTreeV2Record::SharedMessageObjectHeader {
331 hash,
332 message_type,
333 object_header_index,
334 object_header_address,
335 } => Some(Ok(SharedMessageRecord::ObjectHeader {
336 hash,
337 message_type,
338 object_header_index,
339 object_header_address,
340 })),
341 _ => None,
342 }
343}
344
345fn shared_message_type_bit(message_type: u16) -> Option<u8> {
346 match message_type {
347 crate::messages::MSG_DATASPACE => Some(0),
348 crate::messages::MSG_DATATYPE => Some(1),
349 crate::messages::MSG_FILL_VALUE | crate::messages::MSG_FILL_VALUE_OLD => Some(2),
350 crate::messages::MSG_FILTER_PIPELINE => Some(3),
351 crate::messages::MSG_ATTRIBUTE => Some(4),
352 _ => None,
353 }
354}
355
356pub(crate) type SharedMessageTableRef = Arc<SharedMessageTable>;
357
358#[cfg(test)]
359mod tests {
360 use super::*;
361 use crate::storage::BytesStorage;
362
363 #[test]
364 fn parses_master_table() {
365 let mut table = Vec::new();
366 table.extend_from_slice(b"SMTB");
367 table.push(0);
368 table.push(0);
369 table.extend_from_slice(&0x0002u16.to_le_bytes());
370 table.extend_from_slice(&16u32.to_le_bytes());
371 table.extend_from_slice(&8u16.to_le_bytes());
372 table.extend_from_slice(&6u16.to_le_bytes());
373 table.extend_from_slice(&1u16.to_le_bytes());
374 table.extend_from_slice(&[0, 0]);
375 table.extend_from_slice(&64u64.to_le_bytes());
376 table.extend_from_slice(&128u64.to_le_bytes());
377 let checksum = jenkins_lookup3(&table);
378 table.extend_from_slice(&checksum.to_le_bytes());
379
380 let storage = BytesStorage::new(table);
381 let parsed = SharedMessageTable::parse_at_storage(&storage, 0, 1, 8).unwrap();
382 assert_eq!(parsed.indexes.len(), 1);
383 assert!(parsed.indexes[0].tracks_message_type(crate::messages::MSG_DATATYPE));
384 assert_eq!(parsed.indexes[0].num_messages, 1);
385 assert_eq!(parsed.indexes[0].index_address, 64);
386 assert_eq!(parsed.indexes[0].fractal_heap_address, 128);
387 }
388
389 #[test]
390 fn parses_record_list_heap_record() {
391 let mut data = vec![0u8; 32];
392 let mut list = Vec::new();
393 list.extend_from_slice(b"SMLI");
394 list.push(0);
395 list.extend_from_slice(&[0, 0, 0]);
396 list.extend_from_slice(&0x1122_3344u32.to_le_bytes());
397 list.extend_from_slice(&2u32.to_le_bytes());
398 list.extend_from_slice(&[8, 7, 6, 5, 4, 3, 2, 1]);
399 let checksum = jenkins_lookup3(&list);
400 list.extend_from_slice(&checksum.to_le_bytes());
401 data.extend_from_slice(&list);
402
403 let storage = BytesStorage::new(data);
404 let records = parse_record_list(&storage, 32, 1, 8).unwrap();
405 match &records[0] {
406 SharedMessageRecord::Heap {
407 hash,
408 reference_count,
409 heap_id,
410 } => {
411 assert_eq!(*hash, 0x1122_3344);
412 assert_eq!(*reference_count, 2);
413 assert_eq!(heap_id, &[8, 7, 6, 5, 4, 3, 2, 1]);
414 }
415 other => panic!("expected heap record, got {:?}", other),
416 }
417 }
418}