Skip to main content

hdf5_reader/
fractal_heap.rs

1//! HDF5 Fractal Heap (FRHP).
2//!
3//! Fractal heaps are the storage mechanism for link messages and attribute
4//! messages in new-style (v2) groups and datasets. They use a doubling-table
5//! scheme with direct and indirect blocks. Objects are addressed by a
6//! heap ID that encodes the offset and length within the heap.
7//!
8//! This module parses the heap header and provides object extraction for
9//! managed, tiny, and unfiltered huge object IDs.
10
11use std::collections::{HashMap, HashSet};
12use std::sync::Arc;
13
14use crate::checksum::jenkins_lookup3;
15use crate::error::{Error, Result};
16use crate::filters::{self, FilterRegistry};
17use crate::io::Cursor;
18use crate::messages::filter_pipeline::FilterPipelineMessage;
19use crate::storage::Storage;
20
21/// Signature bytes for a fractal heap header: ASCII `FRHP`.
22const FRHP_SIGNATURE: [u8; 4] = *b"FRHP";
23
24/// Signature bytes for a direct block: ASCII `FHDB`.
25const _FHDB_SIGNATURE: [u8; 4] = *b"FHDB";
26
27/// Signature bytes for an indirect block: ASCII `FHIB`.
28const FHIB_SIGNATURE: [u8; 4] = *b"FHIB";
29const MAX_FRACTAL_HEAP_INDIRECT_DEPTH: usize = 64;
30const MAX_FRACTAL_HEAP_INDIRECT_ROWS: u16 = 64;
31
32/// Parsed fractal heap header.
33#[derive(Debug, Clone)]
34pub struct FractalHeap {
35    /// Size in bytes of heap IDs used to reference objects.
36    pub heap_id_len: u16,
37    /// Size in bytes of I/O filter info (0 if none).
38    pub io_filters_len: u16,
39    /// Heap status flags.
40    pub flags: u8,
41    /// Maximum size of a managed object (larger objects become "huge").
42    pub max_managed_object_size: u64,
43    /// Next huge object ID to assign.
44    pub next_huge_id: u64,
45    /// Address of the B-tree v2 used for huge objects.
46    pub btree_huge_objects_address: u64,
47    /// Address of the free-space manager for managed blocks.
48    pub free_space_managed_address: u64,
49    /// Total managed space in bytes.
50    pub managed_space_amount: u64,
51    /// Total managed allocated space in bytes.
52    pub managed_alloc_amount: u64,
53    /// Iterator offset for managed free-space.
54    pub managed_iter_offset: u64,
55    /// Number of managed objects.
56    pub managed_objects_count: u64,
57    /// Total size of huge objects in bytes.
58    pub huge_objects_size: u64,
59    /// Number of huge objects.
60    pub huge_objects_count: u64,
61    /// Total size of tiny objects in bytes.
62    pub tiny_objects_size: u64,
63    /// Number of tiny objects.
64    pub tiny_objects_count: u64,
65    /// Width of the doubling table (number of direct blocks per row).
66    pub table_width: u16,
67    /// Size in bytes of the starting (smallest) direct block.
68    pub starting_block_size: u64,
69    /// Maximum direct block size before switching to indirect blocks.
70    pub max_direct_block_size: u64,
71    /// Log2 of the maximum managed heap size (used for heap ID encoding).
72    pub max_heap_size: u16,
73    /// Starting row of the root indirect block (for doubling table).
74    pub starting_row_root_indirect: u16,
75    /// Address of the root block (direct or indirect).
76    pub root_block_address: u64,
77    /// Current number of rows in the root indirect block.
78    pub current_rows_in_root_indirect: u16,
79    /// Filtered root direct block size (present only when io_filters_len > 0).
80    pub io_filter_size: Option<u64>,
81    /// Filter mask for root direct block (present only when io_filters_len > 0).
82    pub io_filter_mask: Option<u32>,
83    /// Encoded filter pipeline for heap blocks/huge objects.
84    pub io_filter_info: Vec<u8>,
85}
86
87/// Cache of verified direct blocks for repeated object lookups in one heap.
88#[derive(Debug, Default)]
89pub struct FractalHeapDirectBlockCache {
90    blocks: HashMap<DirectBlockCacheKey, Arc<Vec<u8>>>,
91}
92
93type DirectBlockCacheKey = (u64, u64, Option<u64>, u32);
94
95#[derive(Debug, Clone, Copy)]
96struct DirectBlockLocation {
97    address: u64,
98    block_offset_in_heap: u64,
99    block_size: u64,
100    filtered_size: Option<u64>,
101    filter_mask: u32,
102}
103
104#[derive(Debug, Clone, Copy)]
105struct HugeObjectLocation {
106    address: u64,
107    disk_length: u64,
108    filter_mask: u32,
109    memory_length: Option<u64>,
110}
111
112impl FractalHeapDirectBlockCache {
113    fn get_verified_block_storage(
114        &mut self,
115        heap: &FractalHeap,
116        location: DirectBlockLocation,
117        storage: &dyn Storage,
118        offset_size: u8,
119        filter_registry: Option<&FilterRegistry>,
120    ) -> Result<Arc<Vec<u8>>> {
121        let key = (
122            location.address,
123            location.block_size,
124            location.filtered_size,
125            location.filter_mask,
126        );
127        if let Some(block) = self.blocks.get(&key) {
128            return Ok(block.clone());
129        }
130
131        let block =
132            heap.load_direct_block_storage(location, storage, offset_size, filter_registry)?;
133        heap.verify_direct_block_bytes(&block, offset_size)?;
134        let block = Arc::new(block);
135        self.blocks.insert(key, block.clone());
136        Ok(block)
137    }
138}
139
140impl FractalHeap {
141    /// Parse a fractal heap header at the current cursor position.
142    ///
143    /// Format:
144    /// - Signature: `FRHP` (4 bytes)
145    /// - Version: 0 (1 byte)
146    /// - Heap ID length (u16 LE)
147    /// - I/O filters encoded length (u16 LE)
148    /// - Flags (u8)
149    /// - Max managed object size (u32 LE)
150    /// - Next huge object ID (`length_size` bytes)
151    /// - B-tree address for huge objects (`offset_size` bytes)
152    /// - Managed free-space amount (`length_size` bytes)
153    /// - Free-space manager address (`offset_size` bytes)
154    /// - Managed space amount (`length_size` bytes)
155    /// - Managed alloc amount (`length_size` bytes)
156    /// - Managed free-space iterator offset (`length_size` bytes)
157    /// - Managed objects count (`length_size` bytes)
158    /// - Huge objects size (`length_size` bytes)
159    /// - Huge objects count (`length_size` bytes)
160    /// - Tiny objects size (`length_size` bytes)
161    /// - Tiny objects count (`length_size` bytes)
162    /// - Table width (u16 LE)
163    /// - Starting block size (`length_size` bytes)
164    /// - Maximum direct block size (`length_size` bytes)
165    /// - Max heap size (u16 LE)
166    /// - Starting row of root indirect block (u16 LE)
167    /// - Root block address (`offset_size` bytes)
168    /// - Current rows in root indirect block (u16 LE)
169    /// - If io_filters_len > 0: filtered root direct block size (`length_size`), filter mask (u32 LE)
170    /// - Checksum (u32 LE)
171    pub fn parse(cursor: &mut Cursor, offset_size: u8, length_size: u8) -> Result<Self> {
172        let start = cursor.position();
173
174        let sig = cursor.read_bytes(4)?;
175        if sig != FRHP_SIGNATURE {
176            return Err(Error::InvalidFractalHeapSignature);
177        }
178
179        let version = cursor.read_u8()?;
180        if version != 0 {
181            return Err(Error::UnsupportedFractalHeapVersion(version));
182        }
183
184        let heap_id_len = cursor.read_u16_le()?;
185        let io_filters_len = cursor.read_u16_le()?;
186        let flags = cursor.read_u8()?;
187
188        let max_managed_object_size = cursor.read_u32_le()? as u64;
189        let next_huge_id = cursor.read_length(length_size)?;
190        let btree_huge_objects_address = cursor.read_offset(offset_size)?;
191        let _managed_free_space_amount = cursor.read_length(length_size)?;
192        let free_space_managed_address = cursor.read_offset(offset_size)?;
193        let managed_space_amount = cursor.read_length(length_size)?;
194        let managed_alloc_amount = cursor.read_length(length_size)?;
195        let managed_iter_offset = cursor.read_length(length_size)?;
196        let managed_objects_count = cursor.read_length(length_size)?;
197        let huge_objects_size = cursor.read_length(length_size)?;
198        let huge_objects_count = cursor.read_length(length_size)?;
199        let tiny_objects_size = cursor.read_length(length_size)?;
200        let tiny_objects_count = cursor.read_length(length_size)?;
201
202        let table_width = cursor.read_u16_le()?;
203        let starting_block_size = cursor.read_length(length_size)?;
204        let max_direct_block_size = cursor.read_length(length_size)?;
205        let max_heap_size = cursor.read_u16_le()?;
206        let starting_row_root_indirect = cursor.read_u16_le()?;
207        let root_block_address = cursor.read_offset(offset_size)?;
208        let current_rows_in_root_indirect = cursor.read_u16_le()?;
209
210        let (io_filter_size, io_filter_mask) = if io_filters_len > 0 {
211            let size = cursor.read_length(length_size)?;
212            let mask = cursor.read_u32_le()?;
213            (Some(size), Some(mask))
214        } else {
215            (None, None)
216        };
217        let io_filter_info = if io_filters_len > 0 {
218            cursor.read_bytes(usize::from(io_filters_len))?.to_vec()
219        } else {
220            Vec::new()
221        };
222
223        // Verify checksum.
224        let checksum_end = cursor.position();
225        let stored_checksum = cursor.read_u32_le()?;
226        let computed = jenkins_lookup3(&cursor.data()[start as usize..checksum_end as usize]);
227        if computed != stored_checksum {
228            return Err(Error::ChecksumMismatch {
229                expected: stored_checksum,
230                actual: computed,
231            });
232        }
233
234        Ok(FractalHeap {
235            heap_id_len,
236            io_filters_len,
237            flags,
238            max_managed_object_size,
239            next_huge_id,
240            btree_huge_objects_address,
241            free_space_managed_address,
242            managed_space_amount,
243            managed_alloc_amount,
244            managed_iter_offset,
245            managed_objects_count,
246            huge_objects_size,
247            huge_objects_count,
248            tiny_objects_size,
249            tiny_objects_count,
250            table_width,
251            starting_block_size,
252            max_direct_block_size,
253            max_heap_size,
254            starting_row_root_indirect,
255            root_block_address,
256            current_rows_in_root_indirect,
257            io_filter_size,
258            io_filter_mask,
259            io_filter_info,
260        })
261    }
262
263    /// Parse a fractal heap header from random-access storage.
264    pub fn parse_at_storage(
265        storage: &dyn Storage,
266        address: u64,
267        offset_size: u8,
268        length_size: u8,
269    ) -> Result<Self> {
270        let max_header_len = 256usize;
271        let available = storage.len().saturating_sub(address);
272        let len = usize::try_from(available.min(max_header_len as u64)).map_err(|_| {
273            Error::InvalidData("fractal heap header exceeds platform usize capacity".into())
274        })?;
275        let bytes = storage.read_range(address, len)?;
276        let mut cursor = Cursor::new(bytes.as_ref());
277        Self::parse(&mut cursor, offset_size, length_size)
278    }
279
280    /// Extract any fractal heap object given a heap ID.
281    pub fn get_object(
282        &self,
283        heap_id: &[u8],
284        file_data: &[u8],
285        offset_size: u8,
286        length_size: u8,
287    ) -> Result<Vec<u8>> {
288        self.get_object_with_registry(heap_id, file_data, offset_size, length_size, None)
289    }
290
291    /// Extract any fractal heap object using a caller-provided filter registry
292    /// for filtered managed and huge objects.
293    pub fn get_object_with_registry(
294        &self,
295        heap_id: &[u8],
296        file_data: &[u8],
297        offset_size: u8,
298        length_size: u8,
299        filter_registry: Option<&FilterRegistry>,
300    ) -> Result<Vec<u8>> {
301        match self.heap_id_kind(heap_id)? {
302            HeapIdKind::Managed => self.get_managed_object_impl(
303                heap_id,
304                file_data,
305                offset_size,
306                length_size,
307                filter_registry,
308            ),
309            HeapIdKind::Huge => self.get_huge_object(
310                heap_id,
311                file_data,
312                offset_size,
313                length_size,
314                filter_registry,
315            ),
316            HeapIdKind::Tiny => self.decode_tiny_object(heap_id),
317        }
318    }
319
320    /// Extract any fractal heap object from random-access storage.
321    pub fn get_object_storage(
322        &self,
323        heap_id: &[u8],
324        storage: &dyn Storage,
325        offset_size: u8,
326        length_size: u8,
327    ) -> Result<Vec<u8>> {
328        self.get_object_storage_with_registry(heap_id, storage, offset_size, length_size, None)
329    }
330
331    /// Extract any fractal heap object from random-access storage using a
332    /// caller-provided filter registry for filtered managed and huge objects.
333    pub fn get_object_storage_with_registry(
334        &self,
335        heap_id: &[u8],
336        storage: &dyn Storage,
337        offset_size: u8,
338        length_size: u8,
339        filter_registry: Option<&FilterRegistry>,
340    ) -> Result<Vec<u8>> {
341        let mut cache = FractalHeapDirectBlockCache::default();
342        self.get_object_storage_cached_with_registry(
343            heap_id,
344            storage,
345            offset_size,
346            length_size,
347            &mut cache,
348            filter_registry,
349        )
350    }
351
352    /// Extract any fractal heap object from storage, reusing verified direct
353    /// blocks across managed object lookups.
354    pub fn get_object_storage_cached(
355        &self,
356        heap_id: &[u8],
357        storage: &dyn Storage,
358        offset_size: u8,
359        length_size: u8,
360        direct_block_cache: &mut FractalHeapDirectBlockCache,
361    ) -> Result<Vec<u8>> {
362        self.get_object_storage_cached_with_registry(
363            heap_id,
364            storage,
365            offset_size,
366            length_size,
367            direct_block_cache,
368            None,
369        )
370    }
371
372    /// Extract any fractal heap object from storage while reusing verified
373    /// direct blocks and a caller-provided filter registry.
374    pub fn get_object_storage_cached_with_registry(
375        &self,
376        heap_id: &[u8],
377        storage: &dyn Storage,
378        offset_size: u8,
379        length_size: u8,
380        direct_block_cache: &mut FractalHeapDirectBlockCache,
381        filter_registry: Option<&FilterRegistry>,
382    ) -> Result<Vec<u8>> {
383        match self.heap_id_kind(heap_id)? {
384            HeapIdKind::Managed => self.get_managed_object_storage_cached_impl(
385                heap_id,
386                storage,
387                offset_size,
388                length_size,
389                direct_block_cache,
390                filter_registry,
391            ),
392            HeapIdKind::Huge => self.get_huge_object_storage(
393                heap_id,
394                storage,
395                offset_size,
396                length_size,
397                filter_registry,
398            ),
399            HeapIdKind::Tiny => self.decode_tiny_object(heap_id),
400        }
401    }
402
403    /// Extract a fractal heap object. Kept for existing callers; now also
404    /// handles tiny and huge IDs.
405    pub fn get_managed_object(
406        &self,
407        heap_id: &[u8],
408        file_data: &[u8],
409        offset_size: u8,
410        length_size: u8,
411    ) -> Result<Vec<u8>> {
412        self.get_object_with_registry(heap_id, file_data, offset_size, length_size, None)
413    }
414
415    /// Extract a fractal heap object from random-access storage. Kept for
416    /// existing callers; now also handles tiny and huge IDs.
417    pub fn get_managed_object_storage(
418        &self,
419        heap_id: &[u8],
420        storage: &dyn Storage,
421        offset_size: u8,
422        length_size: u8,
423    ) -> Result<Vec<u8>> {
424        self.get_object_storage_with_registry(heap_id, storage, offset_size, length_size, None)
425    }
426
427    fn get_managed_object_impl(
428        &self,
429        heap_id: &[u8],
430        file_data: &[u8],
431        offset_size: u8,
432        length_size: u8,
433        filter_registry: Option<&FilterRegistry>,
434    ) -> Result<Vec<u8>> {
435        let (heap_offset, obj_length) = self.decode_managed_heap_id(heap_id)?;
436
437        if obj_length == 0 {
438            return Ok(Vec::new());
439        }
440
441        let location = self.find_direct_block(heap_offset, file_data, offset_size, length_size)?;
442        let block = self.load_direct_block(file_data, location, filter_registry)?;
443        self.verify_direct_block_bytes(&block, offset_size)?;
444        let offset_in_block = heap_offset
445            .checked_sub(location.block_offset_in_heap)
446            .ok_or_else(|| {
447                Error::InvalidData("fractal heap object precedes direct block".into())
448            })?;
449        let data_start = usize::try_from(offset_in_block)
450            .map_err(|_| Error::InvalidData("fractal heap object offset exceeds usize".into()))?;
451        let len = usize::try_from(obj_length).map_err(|_| {
452            Error::InvalidData("fractal heap object exceeds platform usize capacity".into())
453        })?;
454        let data_end = data_start
455            .checked_add(len)
456            .ok_or(Error::OffsetOutOfBounds(location.address))?;
457
458        if data_end > block.len() {
459            return Err(Error::UnexpectedEof {
460                offset: location
461                    .address
462                    .checked_add(offset_in_block)
463                    .ok_or(Error::OffsetOutOfBounds(location.address))?,
464                needed: obj_length,
465                available: block.len().saturating_sub(data_start) as u64,
466            });
467        }
468
469        Ok(block[data_start..data_end].to_vec())
470    }
471
472    fn get_managed_object_storage_cached_impl(
473        &self,
474        heap_id: &[u8],
475        storage: &dyn Storage,
476        offset_size: u8,
477        length_size: u8,
478        direct_block_cache: &mut FractalHeapDirectBlockCache,
479        filter_registry: Option<&FilterRegistry>,
480    ) -> Result<Vec<u8>> {
481        let (heap_offset, obj_length) = self.decode_managed_heap_id(heap_id)?;
482        if obj_length == 0 {
483            return Ok(Vec::new());
484        }
485
486        let location =
487            self.find_direct_block_storage(heap_offset, storage, offset_size, length_size)?;
488        let block = direct_block_cache.get_verified_block_storage(
489            self,
490            location,
491            storage,
492            offset_size,
493            filter_registry,
494        )?;
495        let offset_in_block = heap_offset
496            .checked_sub(location.block_offset_in_heap)
497            .ok_or_else(|| {
498                Error::InvalidData("fractal heap object precedes direct block".into())
499            })?;
500        let start = usize::try_from(offset_in_block)
501            .map_err(|_| Error::InvalidData("fractal heap object offset exceeds usize".into()))?;
502        let len = usize::try_from(obj_length).map_err(|_| {
503            Error::InvalidData("fractal heap object exceeds platform usize capacity".into())
504        })?;
505        let end = start
506            .checked_add(len)
507            .ok_or(Error::OffsetOutOfBounds(location.address))?;
508        if end > block.len() {
509            let data_start = location
510                .address
511                .checked_add(offset_in_block)
512                .ok_or(Error::OffsetOutOfBounds(location.address))?;
513            return Err(Error::UnexpectedEof {
514                offset: data_start,
515                needed: obj_length,
516                available: block.len().saturating_sub(start) as u64,
517            });
518        }
519        Ok(block[start..end].to_vec())
520    }
521
522    fn get_huge_object(
523        &self,
524        heap_id: &[u8],
525        file_data: &[u8],
526        offset_size: u8,
527        length_size: u8,
528        filter_registry: Option<&FilterRegistry>,
529    ) -> Result<Vec<u8>> {
530        let location = self.resolve_huge_object_location(
531            heap_id,
532            Some(file_data),
533            None,
534            offset_size,
535            length_size,
536        )?;
537        let start = usize::try_from(location.address)
538            .map_err(|_| Error::OffsetOutOfBounds(location.address))?;
539        let len = usize::try_from(location.disk_length).map_err(|_| {
540            Error::InvalidData("huge fractal heap object exceeds platform usize capacity".into())
541        })?;
542        let end = start
543            .checked_add(len)
544            .ok_or(Error::OffsetOutOfBounds(location.address))?;
545        if end > file_data.len() {
546            return Err(Error::UnexpectedEof {
547                offset: location.address,
548                needed: location.disk_length,
549                available: file_data.len().saturating_sub(start) as u64,
550            });
551        }
552        self.decode_huge_object_bytes(&file_data[start..end], location, filter_registry)
553    }
554
555    fn get_huge_object_storage(
556        &self,
557        heap_id: &[u8],
558        storage: &dyn Storage,
559        offset_size: u8,
560        length_size: u8,
561        filter_registry: Option<&FilterRegistry>,
562    ) -> Result<Vec<u8>> {
563        let location = self.resolve_huge_object_location(
564            heap_id,
565            None,
566            Some(storage),
567            offset_size,
568            length_size,
569        )?;
570        let len = usize::try_from(location.disk_length).map_err(|_| {
571            Error::InvalidData("huge fractal heap object exceeds platform usize capacity".into())
572        })?;
573        let bytes = storage.read_range(location.address, len)?;
574        self.decode_huge_object_bytes(bytes.as_ref(), location, filter_registry)
575    }
576
577    fn decode_huge_object_bytes(
578        &self,
579        bytes: &[u8],
580        location: HugeObjectLocation,
581        filter_registry: Option<&FilterRegistry>,
582    ) -> Result<Vec<u8>> {
583        if let Some(memory_length) = location.memory_length {
584            self.apply_heap_filters(
585                bytes,
586                location.filter_mask,
587                memory_length,
588                "filtered fractal heap huge object",
589                filter_registry,
590            )
591        } else {
592            Ok(bytes.to_vec())
593        }
594    }
595
596    fn load_direct_block(
597        &self,
598        file_data: &[u8],
599        location: DirectBlockLocation,
600        filter_registry: Option<&FilterRegistry>,
601    ) -> Result<Vec<u8>> {
602        let read_len = location.filtered_size.unwrap_or(location.block_size);
603        let start = usize::try_from(location.address)
604            .map_err(|_| Error::OffsetOutOfBounds(location.address))?;
605        let len = usize::try_from(read_len).map_err(|_| {
606            Error::InvalidData("fractal heap direct block size exceeds platform usize".into())
607        })?;
608        let end = start
609            .checked_add(len)
610            .ok_or(Error::OffsetOutOfBounds(location.address))?;
611        if end > file_data.len() {
612            return Err(Error::UnexpectedEof {
613                offset: location.address,
614                needed: read_len,
615                available: file_data.len().saturating_sub(start) as u64,
616            });
617        }
618        let block = if location.filtered_size.is_some() {
619            self.apply_heap_filters(
620                &file_data[start..end],
621                location.filter_mask,
622                location.block_size,
623                "filtered fractal heap direct block",
624                filter_registry,
625            )?
626        } else {
627            file_data[start..end].to_vec()
628        };
629        let expected = usize::try_from(location.block_size).map_err(|_| {
630            Error::InvalidData("fractal heap direct block size exceeds platform usize".into())
631        })?;
632        if block.len() != expected {
633            return Err(Error::InvalidData(format!(
634                "fractal heap direct block has {} bytes, expected {} bytes",
635                block.len(),
636                expected
637            )));
638        }
639        Ok(block)
640    }
641
642    fn load_direct_block_storage(
643        &self,
644        location: DirectBlockLocation,
645        storage: &dyn Storage,
646        _offset_size: u8,
647        filter_registry: Option<&FilterRegistry>,
648    ) -> Result<Vec<u8>> {
649        let read_len = location.filtered_size.unwrap_or(location.block_size);
650        let len = usize::try_from(read_len).map_err(|_| {
651            Error::InvalidData("fractal heap direct block size exceeds platform usize".into())
652        })?;
653        let bytes = storage.read_range(location.address, len)?;
654        let block = if location.filtered_size.is_some() {
655            self.apply_heap_filters(
656                bytes.as_ref(),
657                location.filter_mask,
658                location.block_size,
659                "filtered fractal heap direct block",
660                filter_registry,
661            )?
662        } else {
663            bytes.to_vec()
664        };
665        let expected = usize::try_from(location.block_size).map_err(|_| {
666            Error::InvalidData("fractal heap direct block size exceeds platform usize".into())
667        })?;
668        if block.len() != expected {
669            return Err(Error::InvalidData(format!(
670                "fractal heap direct block has {} bytes, expected {} bytes",
671                block.len(),
672                expected
673            )));
674        }
675        Ok(block)
676    }
677
678    fn apply_heap_filters(
679        &self,
680        bytes: &[u8],
681        filter_mask: u32,
682        expected_len: u64,
683        context: &str,
684        filter_registry: Option<&FilterRegistry>,
685    ) -> Result<Vec<u8>> {
686        let pipeline = self.filter_pipeline()?;
687        let expected = usize::try_from(expected_len).map_err(|_| {
688            Error::InvalidData(format!("{context} size exceeds platform usize capacity"))
689        })?;
690        let filter_output_limit = expected.checked_add(1).ok_or_else(|| {
691            Error::InvalidData(format!(
692                "{context} filter output limit exceeds platform usize capacity"
693            ))
694        })?;
695        let decoded = filters::apply_pipeline_with_limit(
696            bytes,
697            &pipeline.filters,
698            filter_mask,
699            1,
700            filter_registry,
701            Some(filter_output_limit),
702        )?;
703        if decoded.len() != expected {
704            return Err(Error::InvalidData(format!(
705                "{context} decoded to {} bytes, expected {} bytes",
706                decoded.len(),
707                expected
708            )));
709        }
710        Ok(decoded)
711    }
712
713    fn filter_pipeline(&self) -> Result<FilterPipelineMessage> {
714        if self.io_filters_len == 0 {
715            return Err(Error::InvalidData(
716                "fractal heap object is marked filtered but the heap has no filter pipeline".into(),
717            ));
718        }
719        let mut cursor = Cursor::new(&self.io_filter_info);
720        crate::messages::filter_pipeline::parse(&mut cursor, 0, 0, self.io_filter_info.len())
721    }
722
723    fn resolve_huge_object_location(
724        &self,
725        heap_id: &[u8],
726        file_data: Option<&[u8]>,
727        storage: Option<&dyn Storage>,
728        offset_size: u8,
729        length_size: u8,
730    ) -> Result<HugeObjectLocation> {
731        let direct_unfiltered_len = 1 + usize::from(offset_size) + usize::from(length_size);
732        let direct_filtered_len = direct_unfiltered_len + 4 + usize::from(length_size);
733
734        if self.io_filters_len > 0 && heap_id.len() >= direct_filtered_len {
735            let mut cursor = Cursor::new(&heap_id[1..]);
736            let address = cursor.read_offset(offset_size)?;
737            let disk_length = cursor.read_length(length_size)?;
738            let filter_mask = cursor.read_u32_le()?;
739            let memory_length = cursor.read_length(length_size)?;
740            return Ok(HugeObjectLocation {
741                address,
742                disk_length,
743                filter_mask,
744                memory_length: Some(memory_length),
745            });
746        }
747
748        if self.io_filters_len == 0 && heap_id.len() >= direct_unfiltered_len {
749            let mut cursor = Cursor::new(&heap_id[1..]);
750            let address = cursor.read_offset(offset_size)?;
751            let disk_length = cursor.read_length(length_size)?;
752            return Ok(HugeObjectLocation {
753                address,
754                disk_length,
755                filter_mask: 0,
756                memory_length: None,
757            });
758        }
759
760        if heap_id.len() < 1 + usize::from(length_size) {
761            return Err(Error::InvalidData(
762                "huge fractal heap ID is too short".into(),
763            ));
764        }
765        if Cursor::is_undefined_offset(self.btree_huge_objects_address, offset_size) {
766            return Err(Error::UndefinedAddress);
767        }
768
769        let mut key_cursor = Cursor::new(&heap_id[1..]);
770        let object_id = key_cursor.read_length(length_size)?;
771
772        let header = if let Some(storage) = storage {
773            crate::btree_v2::BTreeV2Header::parse_at_storage(
774                storage,
775                self.btree_huge_objects_address,
776                offset_size,
777                length_size,
778            )?
779        } else {
780            let data = file_data.expect("file_data must exist when storage is None");
781            let mut cursor = Cursor::new(data);
782            cursor.set_position(self.btree_huge_objects_address);
783            crate::btree_v2::BTreeV2Header::parse(&mut cursor, offset_size, length_size)?
784        };
785
786        let records = if let Some(storage) = storage {
787            crate::btree_v2::collect_btree_v2_records_storage(
788                storage,
789                &header,
790                offset_size,
791                length_size,
792                None,
793                &[],
794                None,
795            )?
796        } else {
797            crate::btree_v2::collect_btree_v2_records(
798                file_data.expect("file_data must exist when storage is None"),
799                &header,
800                offset_size,
801                length_size,
802                None,
803                &[],
804                None,
805            )?
806        };
807
808        for record in records {
809            match record {
810                crate::btree_v2::BTreeV2Record::HugeIndirectNonFiltered {
811                    address,
812                    length,
813                    object_id: record_id,
814                } if record_id == object_id => {
815                    return Ok(HugeObjectLocation {
816                        address,
817                        disk_length: length,
818                        filter_mask: 0,
819                        memory_length: None,
820                    })
821                }
822                crate::btree_v2::BTreeV2Record::HugeIndirectFiltered {
823                    object_id: record_id,
824                    address,
825                    filtered_length,
826                    filter_mask,
827                    memory_length,
828                } if record_id == object_id => {
829                    return Ok(HugeObjectLocation {
830                        address,
831                        disk_length: filtered_length,
832                        filter_mask,
833                        memory_length: Some(memory_length),
834                    });
835                }
836                _ => {}
837            }
838        }
839
840        Err(Error::InvalidData(format!(
841            "huge fractal heap object ID {} not found",
842            object_id
843        )))
844    }
845
846    fn decode_tiny_object(&self, heap_id: &[u8]) -> Result<Vec<u8>> {
847        let extended = self.heap_id_len > 18;
848        let (data_start, len) = if extended {
849            if heap_id.len() < 2 {
850                return Err(Error::InvalidData(
851                    "extended tiny heap ID is too short".into(),
852                ));
853            }
854            let encoded = (u16::from(heap_id[0] & 0x0F) << 8) | u16::from(heap_id[1]);
855            (2usize, usize::from(encoded) + 1)
856        } else {
857            (1usize, usize::from(heap_id[0] & 0x0F) + 1)
858        };
859        let data_end = data_start
860            .checked_add(len)
861            .ok_or_else(|| Error::InvalidData("tiny heap object length overflows".into()))?;
862        if data_end > heap_id.len() {
863            return Err(Error::InvalidData(format!(
864                "tiny heap object needs {} bytes, heap ID has {}",
865                data_end,
866                heap_id.len()
867            )));
868        }
869        Ok(heap_id[data_start..data_end].to_vec())
870    }
871
872    fn heap_id_kind(&self, heap_id: &[u8]) -> Result<HeapIdKind> {
873        if heap_id.is_empty() {
874            return Err(Error::InvalidData("empty fractal heap ID".into()));
875        }
876        let version = heap_id[0] >> 6;
877        if version != 0 {
878            return Err(Error::InvalidData(format!(
879                "unsupported fractal heap ID version {}",
880                version
881            )));
882        }
883        match (heap_id[0] >> 4) & 0x03 {
884            0 => Ok(HeapIdKind::Managed),
885            1 => Ok(HeapIdKind::Huge),
886            2 => Ok(HeapIdKind::Tiny),
887            other => Err(Error::InvalidData(format!(
888                "unknown fractal heap ID type {}",
889                other
890            ))),
891        }
892    }
893
894    fn decode_managed_heap_id(&self, heap_id: &[u8]) -> Result<(u64, u64)> {
895        let (offset_bytes, length_bytes) = self.managed_id_widths();
896        let needed = 1 + offset_bytes + length_bytes;
897        if heap_id.len() < needed {
898            return Err(Error::InvalidData(format!(
899                "managed fractal heap ID too short: need {} bytes, have {}",
900                needed,
901                heap_id.len()
902            )));
903        }
904        let mut cursor = Cursor::new(&heap_id[1..needed]);
905        let heap_offset = cursor.read_uvar(offset_bytes)?;
906        let obj_length = cursor.read_uvar(length_bytes)?;
907        Ok((heap_offset, obj_length))
908    }
909
910    fn managed_id_widths(&self) -> (usize, usize) {
911        let offset_bytes = usize::from(self.max_heap_size).div_ceil(8).max(1);
912        let max_len = self.max_direct_block_size.min(self.max_managed_object_size);
913        let length_bytes = bytes_needed_to_encode(max_len).max(1);
914        (offset_bytes, length_bytes)
915    }
916
917    /// Find the direct block containing a given heap offset.
918    ///
919    /// Returns (block_file_address, block_offset_within_heap, block_size).
920    fn find_direct_block(
921        &self,
922        heap_offset: u64,
923        file_data: &[u8],
924        offset_size: u8,
925        length_size: u8,
926    ) -> Result<DirectBlockLocation> {
927        if Cursor::is_undefined_offset(self.root_block_address, offset_size) {
928            return Err(Error::UndefinedAddress);
929        }
930
931        if self.current_rows_in_root_indirect == 0 {
932            // Root block is a direct block.
933            // The entire managed space is in this one block.
934            Ok(DirectBlockLocation {
935                address: self.root_block_address,
936                block_offset_in_heap: 0,
937                block_size: self.starting_block_size,
938                filtered_size: self.io_filter_size,
939                filter_mask: self.io_filter_mask.unwrap_or(0),
940            })
941        } else {
942            // Root block is an indirect block — traverse the doubling table.
943            let mut visited = HashSet::new();
944            self.find_direct_block_via_indirect(
945                self.root_block_address,
946                heap_offset,
947                file_data,
948                offset_size,
949                length_size,
950                self.current_rows_in_root_indirect,
951                MAX_FRACTAL_HEAP_INDIRECT_DEPTH,
952                &mut visited,
953            )
954        }
955    }
956
957    fn find_direct_block_storage(
958        &self,
959        heap_offset: u64,
960        storage: &dyn Storage,
961        offset_size: u8,
962        length_size: u8,
963    ) -> Result<DirectBlockLocation> {
964        if Cursor::is_undefined_offset(self.root_block_address, offset_size) {
965            return Err(Error::UndefinedAddress);
966        }
967
968        if self.current_rows_in_root_indirect == 0 {
969            Ok(DirectBlockLocation {
970                address: self.root_block_address,
971                block_offset_in_heap: 0,
972                block_size: self.starting_block_size,
973                filtered_size: self.io_filter_size,
974                filter_mask: self.io_filter_mask.unwrap_or(0),
975            })
976        } else {
977            let mut visited = HashSet::new();
978            self.find_direct_block_via_indirect_storage(
979                self.root_block_address,
980                heap_offset,
981                storage,
982                offset_size,
983                length_size,
984                self.current_rows_in_root_indirect,
985                MAX_FRACTAL_HEAP_INDIRECT_DEPTH,
986                &mut visited,
987            )
988        }
989    }
990
991    /// Traverse an indirect block to find the direct block for a given offset.
992    #[allow(clippy::too_many_arguments)]
993    fn find_direct_block_via_indirect(
994        &self,
995        indirect_address: u64,
996        heap_offset: u64,
997        file_data: &[u8],
998        offset_size: u8,
999        length_size: u8,
1000        nrows: u16,
1001        depth_remaining: usize,
1002        visited: &mut HashSet<u64>,
1003    ) -> Result<DirectBlockLocation> {
1004        self.enter_indirect_block(indirect_address, nrows, depth_remaining, visited)?;
1005
1006        // Validate FHIB signature
1007        let addr = usize::try_from(indirect_address)
1008            .map_err(|_| Error::OffsetOutOfBounds(indirect_address))?;
1009        if addr
1010            .checked_add(4)
1011            .map_or(true, |end| end > file_data.len())
1012        {
1013            return Err(Error::OffsetOutOfBounds(indirect_address));
1014        }
1015        if file_data[addr..addr + 4] != FHIB_SIGNATURE {
1016            return Err(Error::InvalidData(format!(
1017                "expected FHIB signature at offset {:#x}, got {:?}",
1018                indirect_address,
1019                &file_data[addr..addr + 4]
1020            )));
1021        }
1022
1023        // The doubling table has `table_width` entries per row.
1024        // Row 0 and 1 have blocks of size `starting_block_size`.
1025        // Row r (for r >= 1) has blocks of size `starting_block_size * 2^(r-1)`.
1026        //
1027        // We iterate through the rows to find which block contains the
1028        // target offset, then read the block address from the indirect block.
1029
1030        let width = self.table_width as u64;
1031        let mut running_offset: u64 = 0;
1032
1033        for row in 0..nrows as u64 {
1034            let block_size = self.block_size_for_row_checked(row)?;
1035            let is_direct = block_size <= self.max_direct_block_size;
1036
1037            for col in 0..width {
1038                let block_end = running_offset.checked_add(block_size).ok_or_else(|| {
1039                    Error::InvalidData("fractal heap indirect block offset overflows u64".into())
1040                })?;
1041                if heap_offset >= running_offset && heap_offset < block_end {
1042                    // This is the block we want. Read its address from the
1043                    // indirect block.
1044                    let entry_index = row * width + col;
1045
1046                    // Indirect block layout: signature(4) + version(1) +
1047                    // heap_header_addr(offset_size) + block_offset(max_heap_size/8 rounded up)
1048                    // Then direct-block entries, optionally with filtered size
1049                    // and mask, followed by child indirect-block addresses.
1050                    let iblock_header_size =
1051                        4 + 1 + offset_size as u64 + (self.max_heap_size as u64).div_ceil(8);
1052
1053                    if is_direct {
1054                        let entry_size = self.direct_block_entry_size(offset_size, length_size);
1055                        let entry_offset =
1056                            entry_index.checked_mul(entry_size).ok_or_else(|| {
1057                                Error::InvalidData(
1058                                    "fractal heap direct entry offset overflows u64".into(),
1059                                )
1060                            })?;
1061                        let entry_pos = indirect_address
1062                            .checked_add(iblock_header_size)
1063                            .and_then(|pos| pos.checked_add(entry_offset))
1064                            .ok_or_else(|| {
1065                                Error::InvalidData(
1066                                    "fractal heap direct entry address overflows u64".into(),
1067                                )
1068                            })?;
1069                        let entry_len = usize::try_from(entry_size).map_err(|_| {
1070                            Error::InvalidData(
1071                                "fractal heap direct entry size exceeds platform usize".into(),
1072                            )
1073                        })?;
1074                        let entry_pos_usize = usize::try_from(entry_pos)
1075                            .map_err(|_| Error::OffsetOutOfBounds(entry_pos))?;
1076                        if entry_pos_usize
1077                            .checked_add(entry_len)
1078                            .map_or(true, |end| end > file_data.len())
1079                        {
1080                            return Err(Error::OffsetOutOfBounds(entry_pos));
1081                        }
1082                        let mut cursor = Cursor::new(file_data);
1083                        cursor.set_position(entry_pos);
1084                        let block_address = cursor.read_offset(offset_size)?;
1085                        if Cursor::is_undefined_offset(block_address, offset_size) {
1086                            return Err(Error::UndefinedAddress);
1087                        }
1088                        let (filtered_size, filter_mask) = if self.io_filters_len > 0 {
1089                            (
1090                                Some(cursor.read_length(length_size)?),
1091                                cursor.read_u32_le()?,
1092                            )
1093                        } else {
1094                            (None, 0)
1095                        };
1096                        return Ok(DirectBlockLocation {
1097                            address: block_address,
1098                            block_offset_in_heap: running_offset,
1099                            block_size,
1100                            filtered_size,
1101                            filter_mask,
1102                        });
1103                    } else {
1104                        // Need to recurse into a sub-indirect block.
1105                        let direct_count = self
1106                            .max_direct_block_rows_checked()?
1107                            .checked_mul(u64::from(self.table_width))
1108                            .ok_or_else(|| {
1109                                Error::InvalidData(
1110                                    "fractal heap direct entry count overflows u64".into(),
1111                                )
1112                            })?;
1113                        let indirect_index =
1114                            entry_index.checked_sub(direct_count).ok_or_else(|| {
1115                                Error::InvalidData(
1116                                    "fractal heap indirect entry precedes direct entries".into(),
1117                                )
1118                            })?;
1119                        let direct_entry_bytes = direct_count
1120                            .checked_mul(self.direct_block_entry_size(offset_size, length_size))
1121                            .ok_or_else(|| {
1122                                Error::InvalidData(
1123                                    "fractal heap direct entry table size overflows u64".into(),
1124                                )
1125                            })?;
1126                        let indirect_entry_offset = indirect_index
1127                            .checked_mul(u64::from(offset_size))
1128                            .ok_or_else(|| {
1129                                Error::InvalidData(
1130                                    "fractal heap indirect entry offset overflows u64".into(),
1131                                )
1132                            })?;
1133                        let entry_pos = indirect_address
1134                            .checked_add(iblock_header_size)
1135                            .and_then(|pos| pos.checked_add(direct_entry_bytes))
1136                            .and_then(|pos| pos.checked_add(indirect_entry_offset))
1137                            .ok_or_else(|| {
1138                                Error::InvalidData(
1139                                    "fractal heap indirect entry address overflows u64".into(),
1140                                )
1141                            })?;
1142                        let entry_pos_usize = usize::try_from(entry_pos)
1143                            .map_err(|_| Error::OffsetOutOfBounds(entry_pos))?;
1144                        if entry_pos_usize
1145                            .checked_add(usize::from(offset_size))
1146                            .map_or(true, |end| end > file_data.len())
1147                        {
1148                            return Err(Error::OffsetOutOfBounds(entry_pos));
1149                        }
1150                        let mut cursor = Cursor::new(file_data);
1151                        cursor.set_position(entry_pos);
1152                        let block_address = cursor.read_offset(offset_size)?;
1153                        if Cursor::is_undefined_offset(block_address, offset_size) {
1154                            return Err(Error::UndefinedAddress);
1155                        }
1156                        // Determine how many rows the sub-indirect has.
1157                        let sub_rows = self.rows_for_block_size_checked(block_size)?;
1158                        return self.find_direct_block_via_indirect(
1159                            block_address,
1160                            heap_offset - running_offset,
1161                            file_data,
1162                            offset_size,
1163                            length_size,
1164                            sub_rows,
1165                            depth_remaining - 1,
1166                            visited,
1167                        );
1168                    }
1169                }
1170                running_offset = block_end;
1171            }
1172        }
1173
1174        Err(Error::InvalidData(format!(
1175            "fractal heap offset {} not found in doubling table",
1176            heap_offset
1177        )))
1178    }
1179
1180    #[allow(clippy::too_many_arguments)]
1181    fn find_direct_block_via_indirect_storage(
1182        &self,
1183        indirect_address: u64,
1184        heap_offset: u64,
1185        storage: &dyn Storage,
1186        offset_size: u8,
1187        length_size: u8,
1188        nrows: u16,
1189        depth_remaining: usize,
1190        visited: &mut HashSet<u64>,
1191    ) -> Result<DirectBlockLocation> {
1192        self.enter_indirect_block(indirect_address, nrows, depth_remaining, visited)?;
1193
1194        let sig = storage.read_range(indirect_address, 4)?;
1195        if sig.as_ref() != FHIB_SIGNATURE {
1196            return Err(Error::InvalidData(format!(
1197                "expected FHIB signature at offset {:#x}, got {:?}",
1198                indirect_address,
1199                sig.as_ref()
1200            )));
1201        }
1202
1203        let width = self.table_width as u64;
1204        let mut running_offset = 0u64;
1205
1206        for row in 0..u64::from(nrows) {
1207            let block_size = self.block_size_for_row_checked(row)?;
1208            let is_direct = block_size <= self.max_direct_block_size;
1209
1210            for col in 0..width {
1211                let block_end = running_offset.checked_add(block_size).ok_or_else(|| {
1212                    Error::InvalidData("fractal heap indirect block offset overflows u64".into())
1213                })?;
1214                if heap_offset >= running_offset && heap_offset < block_end {
1215                    let entry_index = row * width + col;
1216                    let iblock_header_size = 4
1217                        + 1
1218                        + u64::from(offset_size)
1219                        + (u64::from(self.max_heap_size)).div_ceil(8);
1220
1221                    if is_direct {
1222                        let entry_size = self.direct_block_entry_size(offset_size, length_size);
1223                        let entry_offset =
1224                            entry_index.checked_mul(entry_size).ok_or_else(|| {
1225                                Error::InvalidData(
1226                                    "fractal heap direct entry offset overflows u64".into(),
1227                                )
1228                            })?;
1229                        let entry_pos = indirect_address
1230                            .checked_add(iblock_header_size)
1231                            .and_then(|pos| pos.checked_add(entry_offset))
1232                            .ok_or_else(|| {
1233                                Error::InvalidData(
1234                                    "fractal heap direct entry address overflows u64".into(),
1235                                )
1236                            })?;
1237                        let entry_len = usize::try_from(entry_size).map_err(|_| {
1238                            Error::InvalidData(
1239                                "fractal heap direct entry size exceeds platform usize".into(),
1240                            )
1241                        })?;
1242                        let entry = storage.read_range(entry_pos, entry_len)?;
1243                        let mut cursor = Cursor::new(entry.as_ref());
1244                        let block_address = cursor.read_offset(offset_size)?;
1245                        if Cursor::is_undefined_offset(block_address, offset_size) {
1246                            return Err(Error::UndefinedAddress);
1247                        }
1248                        let (filtered_size, filter_mask) = if self.io_filters_len > 0 {
1249                            (
1250                                Some(cursor.read_length(length_size)?),
1251                                cursor.read_u32_le()?,
1252                            )
1253                        } else {
1254                            (None, 0)
1255                        };
1256                        return Ok(DirectBlockLocation {
1257                            address: block_address,
1258                            block_offset_in_heap: running_offset,
1259                            block_size,
1260                            filtered_size,
1261                            filter_mask,
1262                        });
1263                    }
1264
1265                    let direct_count = self
1266                        .max_direct_block_rows_checked()?
1267                        .checked_mul(u64::from(self.table_width))
1268                        .ok_or_else(|| {
1269                            Error::InvalidData(
1270                                "fractal heap direct entry count overflows u64".into(),
1271                            )
1272                        })?;
1273                    let indirect_index =
1274                        entry_index.checked_sub(direct_count).ok_or_else(|| {
1275                            Error::InvalidData(
1276                                "fractal heap indirect entry precedes direct entries".into(),
1277                            )
1278                        })?;
1279                    let direct_entry_bytes = direct_count
1280                        .checked_mul(self.direct_block_entry_size(offset_size, length_size))
1281                        .ok_or_else(|| {
1282                            Error::InvalidData(
1283                                "fractal heap direct entry table size overflows u64".into(),
1284                            )
1285                        })?;
1286                    let indirect_entry_offset = indirect_index
1287                        .checked_mul(u64::from(offset_size))
1288                        .ok_or_else(|| {
1289                            Error::InvalidData(
1290                                "fractal heap indirect entry offset overflows u64".into(),
1291                            )
1292                        })?;
1293                    let entry_addr_pos = indirect_address
1294                        .checked_add(iblock_header_size)
1295                        .and_then(|pos| pos.checked_add(direct_entry_bytes))
1296                        .and_then(|pos| pos.checked_add(indirect_entry_offset))
1297                        .ok_or_else(|| {
1298                            Error::InvalidData(
1299                                "fractal heap indirect entry address overflows u64".into(),
1300                            )
1301                        })?;
1302                    let entry = storage.read_range(entry_addr_pos, usize::from(offset_size))?;
1303                    let mut cursor = Cursor::new(entry.as_ref());
1304                    let block_address = cursor.read_offset(offset_size)?;
1305                    if Cursor::is_undefined_offset(block_address, offset_size) {
1306                        return Err(Error::UndefinedAddress);
1307                    }
1308
1309                    let sub_rows = self.rows_for_block_size_checked(block_size)?;
1310                    return self.find_direct_block_via_indirect_storage(
1311                        block_address,
1312                        heap_offset - running_offset,
1313                        storage,
1314                        offset_size,
1315                        length_size,
1316                        sub_rows,
1317                        depth_remaining - 1,
1318                        visited,
1319                    );
1320                }
1321                running_offset = block_end;
1322            }
1323        }
1324
1325        Err(Error::InvalidData(format!(
1326            "fractal heap offset {} not found in doubling table",
1327            heap_offset
1328        )))
1329    }
1330
1331    fn enter_indirect_block(
1332        &self,
1333        indirect_address: u64,
1334        nrows: u16,
1335        depth_remaining: usize,
1336        visited: &mut HashSet<u64>,
1337    ) -> Result<()> {
1338        if depth_remaining == 0 {
1339            return Err(Error::InvalidData(
1340                "fractal heap indirect traversal exceeded recursion limit".into(),
1341            ));
1342        }
1343        if nrows > MAX_FRACTAL_HEAP_INDIRECT_ROWS {
1344            return Err(Error::InvalidData(format!(
1345                "fractal heap indirect block has {} rows, limit is {}",
1346                nrows, MAX_FRACTAL_HEAP_INDIRECT_ROWS
1347            )));
1348        }
1349        if !visited.insert(indirect_address) {
1350            return Err(Error::InvalidData(format!(
1351                "fractal heap indirect traversal revisits block at offset {:#x}",
1352                indirect_address
1353            )));
1354        }
1355        Ok(())
1356    }
1357
1358    /// Compute the block size for a given row in the doubling table.
1359    fn block_size_for_row_checked(&self, row: u64) -> Result<u64> {
1360        if row == 0 {
1361            Ok(self.starting_block_size)
1362        } else {
1363            let shift = u32::try_from(row - 1)
1364                .map_err(|_| Error::InvalidData("fractal heap indirect row exceeds u32".into()))?;
1365            let factor = 1u64.checked_shl(shift).ok_or_else(|| {
1366                Error::InvalidData("fractal heap indirect row shift overflows u64".into())
1367            })?;
1368            self.starting_block_size.checked_mul(factor).ok_or_else(|| {
1369                Error::InvalidData("fractal heap indirect block size overflows u64".into())
1370            })
1371        }
1372    }
1373
1374    /// Compute how many rows of the doubling table fit in a block of the
1375    /// given total size.
1376    fn rows_for_block_size_checked(&self, total_size: u64) -> Result<u16> {
1377        let mut rows = 0u16;
1378        let mut accum = 0u64;
1379        let width = u64::from(self.table_width);
1380        loop {
1381            if rows >= MAX_FRACTAL_HEAP_INDIRECT_ROWS {
1382                break;
1383            }
1384            let bs = self.block_size_for_row_checked(u64::from(rows))?;
1385            let row_total = bs.checked_mul(width).ok_or_else(|| {
1386                Error::InvalidData("fractal heap indirect row size overflows u64".into())
1387            })?;
1388            let next = accum.checked_add(row_total).ok_or_else(|| {
1389                Error::InvalidData("fractal heap indirect table size overflows u64".into())
1390            })?;
1391            if next > total_size {
1392                break;
1393            }
1394            accum = next;
1395            rows = rows.checked_add(1).ok_or_else(|| {
1396                Error::InvalidData("fractal heap indirect row count overflows u16".into())
1397            })?;
1398        }
1399        Ok(rows)
1400    }
1401
1402    fn max_direct_block_rows_checked(&self) -> Result<u64> {
1403        let mut rows = 0u64;
1404        loop {
1405            if rows >= u64::from(MAX_FRACTAL_HEAP_INDIRECT_ROWS) {
1406                break;
1407            }
1408            if self.block_size_for_row_checked(rows)? > self.max_direct_block_size {
1409                break;
1410            }
1411            rows += 1;
1412        }
1413        Ok(rows)
1414    }
1415
1416    fn direct_block_entry_size(&self, offset_size: u8, length_size: u8) -> u64 {
1417        let mut size = u64::from(offset_size);
1418        if self.io_filters_len > 0 {
1419            size += u64::from(length_size) + 4;
1420        }
1421        size
1422    }
1423
1424    /// Size in bytes of an unfiltered direct block header.
1425    fn direct_block_header_size(&self, offset_size: u8) -> usize {
1426        // Signature(4) + Version(1) + Heap header address(offset_size) +
1427        // Block offset within heap (max_heap_size bits, rounded up to bytes) +
1428        // optional Checksum(4).
1429        let offset_bytes = (self.max_heap_size as usize).div_ceil(8);
1430        let checksum_bytes = if self.direct_blocks_are_checksummed() {
1431            4
1432        } else {
1433            0
1434        };
1435        4 + 1 + offset_size as usize + offset_bytes + checksum_bytes
1436    }
1437
1438    fn direct_blocks_are_checksummed(&self) -> bool {
1439        self.flags & 0x02 != 0
1440    }
1441
1442    fn direct_block_checksum_pos(&self, offset_size: u8) -> Option<usize> {
1443        if self.direct_blocks_are_checksummed() {
1444            Some(self.direct_block_header_size(offset_size) - 4)
1445        } else {
1446            None
1447        }
1448    }
1449
1450    fn verify_direct_block_bytes(&self, block: &[u8], offset_size: u8) -> Result<()> {
1451        if block.len() < self.direct_block_header_size(offset_size) {
1452            return Err(Error::InvalidData(format!(
1453                "fractal heap direct block has {} bytes, expected at least {}",
1454                block.len(),
1455                self.direct_block_header_size(offset_size)
1456            )));
1457        }
1458        if block[..4] != _FHDB_SIGNATURE {
1459            return Err(Error::InvalidData(
1460                "invalid fractal heap direct block signature".into(),
1461            ));
1462        }
1463        let version = block[4];
1464        if version != 0 {
1465            return Err(Error::UnsupportedFractalHeapVersion(version));
1466        }
1467        if let Some(checksum_pos) = self.direct_block_checksum_pos(offset_size) {
1468            let stored_checksum = u32::from_le_bytes(
1469                block[checksum_pos..checksum_pos + 4]
1470                    .try_into()
1471                    .expect("direct block checksum slice has four bytes"),
1472            );
1473            let mut checksum_data = block.to_vec();
1474            checksum_data[checksum_pos..checksum_pos + 4].fill(0);
1475            let computed = jenkins_lookup3(&checksum_data);
1476            if computed != stored_checksum {
1477                return Err(Error::ChecksumMismatch {
1478                    expected: stored_checksum,
1479                    actual: computed,
1480                });
1481            }
1482        }
1483        Ok(())
1484    }
1485}
1486
1487#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1488enum HeapIdKind {
1489    Managed,
1490    Huge,
1491    Tiny,
1492}
1493
1494fn bytes_needed_to_encode(value: u64) -> usize {
1495    if value <= u8::MAX as u64 {
1496        1
1497    } else if value <= u16::MAX as u64 {
1498        2
1499    } else if value <= 0x00FF_FFFF {
1500        3
1501    } else if value <= u32::MAX as u64 {
1502        4
1503    } else if value <= 0x00FF_FFFF_FFFF {
1504        5
1505    } else if value <= 0x0000_FFFF_FFFF_FFFF {
1506        6
1507    } else if value <= 0x00FF_FFFF_FFFF_FFFF {
1508        7
1509    } else {
1510        8
1511    }
1512}
1513
1514#[cfg(test)]
1515mod tests {
1516    use super::*;
1517    use crate::storage::{Storage, StorageBuffer};
1518    use flate2::write::ZlibEncoder;
1519    use flate2::Compression;
1520    use std::io::Write;
1521    use std::sync::{Arc, Mutex};
1522
1523    fn base_heap() -> FractalHeap {
1524        FractalHeap {
1525            heap_id_len: 8,
1526            io_filters_len: 0,
1527            flags: 0x02,
1528            max_managed_object_size: 128,
1529            next_huge_id: 0,
1530            btree_huge_objects_address: u64::MAX,
1531            free_space_managed_address: 0,
1532            managed_space_amount: 0,
1533            managed_alloc_amount: 0,
1534            managed_iter_offset: 0,
1535            managed_objects_count: 0,
1536            huge_objects_size: 0,
1537            huge_objects_count: 0,
1538            tiny_objects_size: 0,
1539            tiny_objects_count: 0,
1540            table_width: 4,
1541            starting_block_size: 256,
1542            max_direct_block_size: 4096,
1543            max_heap_size: 16,
1544            starting_row_root_indirect: 0,
1545            root_block_address: 0,
1546            current_rows_in_root_indirect: 0,
1547            io_filter_size: None,
1548            io_filter_mask: None,
1549            io_filter_info: Vec::new(),
1550        }
1551    }
1552
1553    fn deflate_filter_info() -> Vec<u8> {
1554        let mut data = vec![0x02, 0x01];
1555        data.extend_from_slice(&1u16.to_le_bytes());
1556        data.extend_from_slice(&0u16.to_le_bytes());
1557        data.extend_from_slice(&1u16.to_le_bytes());
1558        data.extend_from_slice(&6u32.to_le_bytes());
1559        data
1560    }
1561
1562    fn zlib_compress(bytes: &[u8]) -> Vec<u8> {
1563        let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
1564        encoder.write_all(bytes).unwrap();
1565        encoder.finish().unwrap()
1566    }
1567
1568    fn filtered_heap_with_info(filter_info: Vec<u8>) -> FractalHeap {
1569        let mut heap = base_heap();
1570        heap.io_filters_len = filter_info.len() as u16;
1571        heap.io_filter_info = filter_info;
1572        heap
1573    }
1574
1575    fn direct_block_with_object(heap: &FractalHeap, block_size: usize, obj: &[u8]) -> Vec<u8> {
1576        let offset_size = 8;
1577        let mut block = vec![0u8; block_size];
1578        block[0..4].copy_from_slice(b"FHDB");
1579        block[4] = 0;
1580        let obj_offset = heap.direct_block_header_size(offset_size);
1581        block[obj_offset..obj_offset + obj.len()].copy_from_slice(obj);
1582        if let Some(checksum_pos) = heap.direct_block_checksum_pos(offset_size) {
1583            let mut checksum_data = block.clone();
1584            checksum_data[checksum_pos..checksum_pos + 4].fill(0);
1585            let checksum = jenkins_lookup3(&checksum_data);
1586            block[checksum_pos..checksum_pos + 4].copy_from_slice(&checksum.to_le_bytes());
1587        }
1588        block
1589    }
1590
1591    struct CountingStorage {
1592        data: Vec<u8>,
1593        reads: Arc<Mutex<Vec<(u64, usize)>>>,
1594    }
1595
1596    impl Storage for CountingStorage {
1597        fn len(&self) -> u64 {
1598            self.data.len() as u64
1599        }
1600
1601        fn read_range(&self, offset: u64, len: usize) -> Result<StorageBuffer> {
1602            self.reads.lock().unwrap().push((offset, len));
1603            let start = usize::try_from(offset).map_err(|_| Error::OffsetOutOfBounds(offset))?;
1604            let end = start
1605                .checked_add(len)
1606                .ok_or(Error::OffsetOutOfBounds(offset))?;
1607            if end > self.data.len() {
1608                return Err(Error::UnexpectedEof {
1609                    offset,
1610                    needed: len as u64,
1611                    available: self.len().saturating_sub(offset),
1612                });
1613            }
1614            Ok(StorageBuffer::from_vec(self.data[start..end].to_vec()))
1615        }
1616    }
1617
1618    #[test]
1619    fn block_size_for_row_scales_by_table_width() {
1620        let heap = FractalHeap {
1621            heap_id_len: 8,
1622            io_filters_len: 0,
1623            flags: 0x02,
1624            max_managed_object_size: 0,
1625            next_huge_id: 0,
1626            btree_huge_objects_address: 0,
1627            free_space_managed_address: 0,
1628            managed_space_amount: 0,
1629            managed_alloc_amount: 0,
1630            managed_iter_offset: 0,
1631            managed_objects_count: 0,
1632            huge_objects_size: 0,
1633            huge_objects_count: 0,
1634            tiny_objects_size: 0,
1635            tiny_objects_count: 0,
1636            table_width: 4,
1637            starting_block_size: 256,
1638            max_direct_block_size: 4096,
1639            max_heap_size: 16,
1640            starting_row_root_indirect: 0,
1641            root_block_address: 0,
1642            current_rows_in_root_indirect: 0,
1643            io_filter_size: None,
1644            io_filter_mask: None,
1645            io_filter_info: Vec::new(),
1646        };
1647
1648        assert_eq!(heap.block_size_for_row_checked(0).unwrap(), 256);
1649        assert_eq!(heap.block_size_for_row_checked(1).unwrap(), 256); // 256 * 2^0
1650        assert_eq!(heap.block_size_for_row_checked(2).unwrap(), 512); // 256 * 2^1
1651        assert_eq!(heap.block_size_for_row_checked(3).unwrap(), 1024); // 256 * 2^2
1652    }
1653
1654    #[test]
1655    fn find_direct_block_rejects_indirect_row_count_above_limit() {
1656        let mut heap = base_heap();
1657        heap.root_block_address = 0;
1658        heap.current_rows_in_root_indirect = MAX_FRACTAL_HEAP_INDIRECT_ROWS + 1;
1659
1660        let err = heap.find_direct_block(0, &[], 8, 8).unwrap_err();
1661        assert!(
1662            matches!(err, Error::InvalidData(ref message) if message.contains("limit")),
1663            "{err:?}"
1664        );
1665
1666        let storage = crate::storage::BytesStorage::new(Vec::new());
1667        let err = heap
1668            .find_direct_block_storage(0, &storage, 8, 8)
1669            .unwrap_err();
1670        assert!(
1671            matches!(err, Error::InvalidData(ref message) if message.contains("limit")),
1672            "{err:?}"
1673        );
1674    }
1675
1676    #[test]
1677    fn find_direct_block_rejects_cyclic_indirect_block() {
1678        let mut heap = base_heap();
1679        let indirect_address = 512u64;
1680        heap.table_width = 1;
1681        heap.starting_block_size = 256;
1682        heap.max_direct_block_size = 256;
1683        heap.max_heap_size = 64;
1684        heap.root_block_address = indirect_address;
1685        heap.current_rows_in_root_indirect = 3;
1686
1687        let offset_size = 8u8;
1688        let length_size = 8u8;
1689        let offset_bytes = usize::from(heap.max_heap_size).div_ceil(8);
1690        let iblock_header_size = 4 + 1 + usize::from(offset_size) + offset_bytes;
1691        let direct_count = usize::try_from(
1692            heap.max_direct_block_rows_checked().unwrap() * u64::from(heap.table_width),
1693        )
1694        .unwrap();
1695        let direct_entry_size =
1696            usize::try_from(heap.direct_block_entry_size(offset_size, length_size)).unwrap();
1697        let indirect_entry_pos = iblock_header_size + direct_count * direct_entry_size;
1698        let mut indirect = vec![0u8; indirect_entry_pos + usize::from(offset_size)];
1699        indirect[0..4].copy_from_slice(b"FHIB");
1700        indirect[4] = 0;
1701        indirect[indirect_entry_pos..indirect_entry_pos + usize::from(offset_size)]
1702            .copy_from_slice(&indirect_address.to_le_bytes());
1703
1704        let mut file_data = vec![0u8; indirect_address as usize + indirect.len()];
1705        file_data[indirect_address as usize..indirect_address as usize + indirect.len()]
1706            .copy_from_slice(&indirect);
1707
1708        let err = heap
1709            .find_direct_block(600, &file_data, offset_size, length_size)
1710            .unwrap_err();
1711        assert!(
1712            matches!(err, Error::InvalidData(ref message) if message.contains("revisits block")),
1713            "{err:?}"
1714        );
1715
1716        let storage = crate::storage::BytesStorage::new(file_data);
1717        let err = heap
1718            .find_direct_block_storage(600, &storage, offset_size, length_size)
1719            .unwrap_err();
1720        assert!(
1721            matches!(err, Error::InvalidData(ref message) if message.contains("revisits block")),
1722            "{err:?}"
1723        );
1724    }
1725
1726    #[test]
1727    fn get_tiny_object() {
1728        let heap = base_heap();
1729        let heap_id = [0x20 | 3, b't', b'i', b'n', b'y'];
1730        let result = heap.get_object(&heap_id, &[], 8, 8).unwrap();
1731        assert_eq!(result, b"tiny");
1732    }
1733
1734    #[test]
1735    fn get_huge_direct_object() {
1736        let heap = base_heap();
1737        let mut file_data = vec![0u8; 128];
1738        file_data[64..68].copy_from_slice(b"huge");
1739
1740        let mut heap_id = Vec::new();
1741        heap_id.push(0x10);
1742        heap_id.extend_from_slice(&64u64.to_le_bytes());
1743        heap_id.extend_from_slice(&4u64.to_le_bytes());
1744
1745        let result = heap.get_object(&heap_id, &file_data, 8, 8).unwrap();
1746        assert_eq!(result, b"huge");
1747    }
1748
1749    #[test]
1750    fn get_filtered_huge_direct_object() {
1751        let heap = filtered_heap_with_info(deflate_filter_info());
1752        let payload = b"filtered huge payload";
1753        let compressed = zlib_compress(payload);
1754        let address = 64u64;
1755        let mut file_data = vec![0u8; address as usize + compressed.len()];
1756        file_data[address as usize..].copy_from_slice(&compressed);
1757
1758        let mut heap_id = Vec::new();
1759        heap_id.push(0x10);
1760        heap_id.extend_from_slice(&address.to_le_bytes());
1761        heap_id.extend_from_slice(&(compressed.len() as u64).to_le_bytes());
1762        heap_id.extend_from_slice(&0u32.to_le_bytes());
1763        heap_id.extend_from_slice(&(payload.len() as u64).to_le_bytes());
1764
1765        let result = heap.get_object(&heap_id, &file_data, 8, 8).unwrap();
1766        assert_eq!(result, payload);
1767    }
1768
1769    #[test]
1770    fn get_filtered_managed_object_direct_root() {
1771        let mut heap = filtered_heap_with_info(deflate_filter_info());
1772        let block_address = 1000u64;
1773        let obj_data = b"filtered managed";
1774        let block = direct_block_with_object(&heap, heap.starting_block_size as usize, obj_data);
1775        let compressed = zlib_compress(&block);
1776        heap.root_block_address = block_address;
1777        heap.io_filter_size = Some(compressed.len() as u64);
1778        heap.io_filter_mask = Some(0);
1779
1780        let file_size = block_address as usize + compressed.len();
1781        let mut file_data = vec![0u8; file_size];
1782        file_data[block_address as usize..].copy_from_slice(&compressed);
1783
1784        let obj_offset = heap.direct_block_header_size(8) as u16;
1785        let mut heap_id = vec![0x00];
1786        heap_id.extend_from_slice(&obj_offset.to_le_bytes());
1787        heap_id.push(obj_data.len() as u8);
1788
1789        let result = heap.get_object(&heap_id, &file_data, 8, 8).unwrap();
1790        assert_eq!(result, obj_data);
1791    }
1792
1793    #[test]
1794    fn get_filtered_managed_object_from_indirect_block() {
1795        let mut heap = filtered_heap_with_info(deflate_filter_info());
1796        let indirect_address = 512u64;
1797        let block_address = 1000u64;
1798        let obj_data = b"filtered child";
1799        let block = direct_block_with_object(&heap, heap.starting_block_size as usize, obj_data);
1800        let compressed = zlib_compress(&block);
1801
1802        heap.root_block_address = indirect_address;
1803        heap.current_rows_in_root_indirect = 1;
1804
1805        let offset_bytes = usize::from(heap.max_heap_size).div_ceil(8);
1806        let iblock_header_size = 4 + 1 + 8 + offset_bytes;
1807        let direct_entry_size = 8 + 8 + 4;
1808        let mut indirect = vec![0u8; iblock_header_size + direct_entry_size * 4];
1809        indirect[0..4].copy_from_slice(b"FHIB");
1810        indirect[4] = 0;
1811        let entry_pos = iblock_header_size;
1812        indirect[entry_pos..entry_pos + 8].copy_from_slice(&block_address.to_le_bytes());
1813        indirect[entry_pos + 8..entry_pos + 16]
1814            .copy_from_slice(&(compressed.len() as u64).to_le_bytes());
1815        indirect[entry_pos + 16..entry_pos + 20].copy_from_slice(&0u32.to_le_bytes());
1816
1817        let file_size = block_address as usize + compressed.len();
1818        let mut file_data = vec![0u8; file_size];
1819        file_data[indirect_address as usize..indirect_address as usize + indirect.len()]
1820            .copy_from_slice(&indirect);
1821        file_data[block_address as usize..].copy_from_slice(&compressed);
1822        let storage = crate::storage::BytesStorage::new(file_data);
1823
1824        let obj_offset = heap.direct_block_header_size(8) as u16;
1825        let mut heap_id = vec![0x00];
1826        heap_id.extend_from_slice(&obj_offset.to_le_bytes());
1827        heap_id.push(obj_data.len() as u8);
1828
1829        let result = heap.get_object_storage(&heap_id, &storage, 8, 8).unwrap();
1830        assert_eq!(result, obj_data);
1831    }
1832
1833    #[test]
1834    fn direct_block_header_size_includes_optional_fields() {
1835        let heap = FractalHeap {
1836            heap_id_len: 8,
1837            io_filters_len: 0,
1838            flags: 0x02,
1839            max_managed_object_size: 0,
1840            next_huge_id: 0,
1841            btree_huge_objects_address: 0,
1842            free_space_managed_address: 0,
1843            managed_space_amount: 0,
1844            managed_alloc_amount: 0,
1845            managed_iter_offset: 0,
1846            managed_objects_count: 0,
1847            huge_objects_size: 0,
1848            huge_objects_count: 0,
1849            tiny_objects_size: 0,
1850            tiny_objects_count: 0,
1851            table_width: 4,
1852            starting_block_size: 256,
1853            max_direct_block_size: 4096,
1854            max_heap_size: 16,
1855            starting_row_root_indirect: 0,
1856            root_block_address: 0,
1857            current_rows_in_root_indirect: 0,
1858            io_filter_size: None,
1859            io_filter_mask: None,
1860            io_filter_info: Vec::new(),
1861        };
1862
1863        // sig(4) + ver(1) + addr(8) + offset_bytes(2) + checksum(4) = 19
1864        assert_eq!(heap.direct_block_header_size(8), 19);
1865
1866        // With 4-byte offsets: sig(4) + ver(1) + addr(4) + offset_bytes(2) + checksum(4) = 15
1867        assert_eq!(heap.direct_block_header_size(4), 15);
1868    }
1869
1870    #[test]
1871    fn get_managed_object_direct_root() {
1872        // Set up a fractal heap where the root is a direct block.
1873        let offset_size: u8 = 8;
1874        let max_heap_size: u16 = 16;
1875        let starting_block_size: u64 = 256;
1876
1877        // Direct block header size: sig(4) + ver(1) + addr(8) + offset_bytes(2) + checksum(4) = 19
1878        // (no I/O filters => checksum present)
1879        let db_header_size = 19usize;
1880
1881        // Place the direct block at file offset 1000.
1882        let block_address: u64 = 1000;
1883
1884        let heap = FractalHeap {
1885            heap_id_len: 8,
1886            io_filters_len: 0,
1887            flags: 0x02,
1888            max_managed_object_size: 128,
1889            next_huge_id: 0,
1890            btree_huge_objects_address: u64::MAX,
1891            free_space_managed_address: 0,
1892            managed_space_amount: starting_block_size,
1893            managed_alloc_amount: starting_block_size,
1894            managed_iter_offset: 0,
1895            managed_objects_count: 1,
1896            huge_objects_size: 0,
1897            huge_objects_count: 0,
1898            tiny_objects_size: 0,
1899            tiny_objects_count: 0,
1900            table_width: 4,
1901            starting_block_size,
1902            max_direct_block_size: 4096,
1903            max_heap_size,
1904            starting_row_root_indirect: 0,
1905            root_block_address: block_address,
1906            current_rows_in_root_indirect: 0,
1907            io_filter_size: None,
1908            io_filter_mask: None,
1909            io_filter_info: Vec::new(),
1910        };
1911
1912        // Build file data with the direct block.
1913        let file_size = block_address as usize + starting_block_size as usize + 100;
1914        let mut file_data = vec![0u8; file_size];
1915
1916        // Write direct block header at block_address.
1917        let ba = block_address as usize;
1918        file_data[ba..ba + 4].copy_from_slice(b"FHDB");
1919        file_data[ba + 4] = 0; // version
1920                               // heap header address (8 bytes) — doesn't matter for this test
1921                               // block offset (2 bytes) — 0
1922
1923        // Write object data at its direct-block offset, after the block header.
1924        let obj_data = b"test object data";
1925        let obj_start = ba + db_header_size;
1926        file_data[obj_start..obj_start + obj_data.len()].copy_from_slice(obj_data);
1927        let checksum_pos = ba + db_header_size - 4;
1928        let mut checksum_data = file_data[ba..ba + starting_block_size as usize].to_vec();
1929        checksum_data[checksum_pos - ba..checksum_pos - ba + 4].fill(0);
1930        let checksum = jenkins_lookup3(&checksum_data);
1931        file_data[checksum_pos..checksum_pos + 4].copy_from_slice(&checksum.to_le_bytes());
1932
1933        // Build heap ID for managed object at its direct-block offset, length=16.
1934        // Type nibble = 0, offset = direct block header size (16 bits), length = 16.
1935        let heap_id = [0x00, db_header_size as u8, 0x00, 0x10];
1936
1937        let result = heap
1938            .get_managed_object(&heap_id, &file_data, offset_size, 8)
1939            .unwrap();
1940        assert_eq!(result, obj_data);
1941    }
1942
1943    #[test]
1944    fn get_object_storage_cached_reads_direct_block_once() {
1945        let offset_size: u8 = 8;
1946        let max_heap_size: u16 = 16;
1947        let starting_block_size: u64 = 256;
1948        let db_header_size = 19usize;
1949        let block_address: u64 = 1000;
1950
1951        let heap = FractalHeap {
1952            heap_id_len: 8,
1953            io_filters_len: 0,
1954            flags: 0x02,
1955            max_managed_object_size: 128,
1956            next_huge_id: 0,
1957            btree_huge_objects_address: u64::MAX,
1958            free_space_managed_address: 0,
1959            managed_space_amount: starting_block_size,
1960            managed_alloc_amount: starting_block_size,
1961            managed_iter_offset: 0,
1962            managed_objects_count: 2,
1963            huge_objects_size: 0,
1964            huge_objects_count: 0,
1965            tiny_objects_size: 0,
1966            tiny_objects_count: 0,
1967            table_width: 4,
1968            starting_block_size,
1969            max_direct_block_size: 4096,
1970            max_heap_size,
1971            starting_row_root_indirect: 0,
1972            root_block_address: block_address,
1973            current_rows_in_root_indirect: 0,
1974            io_filter_size: None,
1975            io_filter_mask: None,
1976            io_filter_info: Vec::new(),
1977        };
1978
1979        let file_size = block_address as usize + starting_block_size as usize;
1980        let mut file_data = vec![0u8; file_size];
1981        let ba = block_address as usize;
1982        file_data[ba..ba + 4].copy_from_slice(b"FHDB");
1983        file_data[ba + 4] = 0;
1984
1985        let obj1_offset = db_header_size;
1986        let obj2_offset = db_header_size + 16;
1987        file_data[ba + obj1_offset..ba + obj1_offset + 4].copy_from_slice(b"one!");
1988        file_data[ba + obj2_offset..ba + obj2_offset + 4].copy_from_slice(b"two!");
1989
1990        let checksum_pos = ba + db_header_size - 4;
1991        let mut checksum_data = file_data[ba..ba + starting_block_size as usize].to_vec();
1992        checksum_data[checksum_pos - ba..checksum_pos - ba + 4].fill(0);
1993        let checksum = jenkins_lookup3(&checksum_data);
1994        file_data[checksum_pos..checksum_pos + 4].copy_from_slice(&checksum.to_le_bytes());
1995
1996        let reads = Arc::new(Mutex::new(Vec::new()));
1997        let storage = CountingStorage {
1998            data: file_data,
1999            reads: reads.clone(),
2000        };
2001        let mut cache = FractalHeapDirectBlockCache::default();
2002        let heap_id1 = [0x00, obj1_offset as u8, 0x00, 0x04];
2003        let heap_id2 = [0x00, obj2_offset as u8, 0x00, 0x04];
2004
2005        assert_eq!(
2006            heap.get_object_storage_cached(&heap_id1, &storage, offset_size, 8, &mut cache)
2007                .unwrap(),
2008            b"one!"
2009        );
2010        assert_eq!(
2011            heap.get_object_storage_cached(&heap_id2, &storage, offset_size, 8, &mut cache)
2012                .unwrap(),
2013            b"two!"
2014        );
2015
2016        let direct_block_reads = reads
2017            .lock()
2018            .unwrap()
2019            .iter()
2020            .filter(|&&(offset, len)| {
2021                offset == block_address && len == starting_block_size as usize
2022            })
2023            .count();
2024        assert_eq!(direct_block_reads, 1);
2025    }
2026}