Skip to main content

hdf5_reader/
fractal_heap.rs

1//! HDF5 Fractal Heap (FRHP).
2//!
3//! Fractal heaps are the storage mechanism for link messages and attribute
4//! messages in new-style (v2) groups and datasets. They use a doubling-table
5//! scheme with direct and indirect blocks. Objects are addressed by a
6//! heap ID that encodes the offset and length within the heap.
7//!
8//! This module parses the heap header and provides object extraction for
9//! managed, tiny, and unfiltered huge object IDs.
10
11use std::collections::HashMap;
12use std::sync::Arc;
13
14use crate::checksum::jenkins_lookup3;
15use crate::error::{Error, Result};
16use crate::filters::{self, FilterRegistry};
17use crate::io::Cursor;
18use crate::messages::filter_pipeline::FilterPipelineMessage;
19use crate::storage::Storage;
20
21/// Signature bytes for a fractal heap header: ASCII `FRHP`.
22const FRHP_SIGNATURE: [u8; 4] = *b"FRHP";
23
24/// Signature bytes for a direct block: ASCII `FHDB`.
25const _FHDB_SIGNATURE: [u8; 4] = *b"FHDB";
26
27/// Signature bytes for an indirect block: ASCII `FHIB`.
28const FHIB_SIGNATURE: [u8; 4] = *b"FHIB";
29
30/// Parsed fractal heap header.
31#[derive(Debug, Clone)]
32pub struct FractalHeap {
33    /// Size in bytes of heap IDs used to reference objects.
34    pub heap_id_len: u16,
35    /// Size in bytes of I/O filter info (0 if none).
36    pub io_filters_len: u16,
37    /// Heap status flags.
38    pub flags: u8,
39    /// Maximum size of a managed object (larger objects become "huge").
40    pub max_managed_object_size: u64,
41    /// Next huge object ID to assign.
42    pub next_huge_id: u64,
43    /// Address of the B-tree v2 used for huge objects.
44    pub btree_huge_objects_address: u64,
45    /// Address of the free-space manager for managed blocks.
46    pub free_space_managed_address: u64,
47    /// Total managed space in bytes.
48    pub managed_space_amount: u64,
49    /// Total managed allocated space in bytes.
50    pub managed_alloc_amount: u64,
51    /// Iterator offset for managed free-space.
52    pub managed_iter_offset: u64,
53    /// Number of managed objects.
54    pub managed_objects_count: u64,
55    /// Total size of huge objects in bytes.
56    pub huge_objects_size: u64,
57    /// Number of huge objects.
58    pub huge_objects_count: u64,
59    /// Total size of tiny objects in bytes.
60    pub tiny_objects_size: u64,
61    /// Number of tiny objects.
62    pub tiny_objects_count: u64,
63    /// Width of the doubling table (number of direct blocks per row).
64    pub table_width: u16,
65    /// Size in bytes of the starting (smallest) direct block.
66    pub starting_block_size: u64,
67    /// Maximum direct block size before switching to indirect blocks.
68    pub max_direct_block_size: u64,
69    /// Log2 of the maximum managed heap size (used for heap ID encoding).
70    pub max_heap_size: u16,
71    /// Starting row of the root indirect block (for doubling table).
72    pub starting_row_root_indirect: u16,
73    /// Address of the root block (direct or indirect).
74    pub root_block_address: u64,
75    /// Current number of rows in the root indirect block.
76    pub current_rows_in_root_indirect: u16,
77    /// Filtered root direct block size (present only when io_filters_len > 0).
78    pub io_filter_size: Option<u64>,
79    /// Filter mask for root direct block (present only when io_filters_len > 0).
80    pub io_filter_mask: Option<u32>,
81    /// Encoded filter pipeline for heap blocks/huge objects.
82    pub io_filter_info: Vec<u8>,
83}
84
85/// Cache of verified direct blocks for repeated object lookups in one heap.
86#[derive(Debug, Default)]
87pub struct FractalHeapDirectBlockCache {
88    blocks: HashMap<DirectBlockCacheKey, Arc<Vec<u8>>>,
89}
90
91type DirectBlockCacheKey = (u64, u64, Option<u64>, u32);
92
93#[derive(Debug, Clone, Copy)]
94struct DirectBlockLocation {
95    address: u64,
96    block_offset_in_heap: u64,
97    block_size: u64,
98    filtered_size: Option<u64>,
99    filter_mask: u32,
100}
101
102#[derive(Debug, Clone, Copy)]
103struct HugeObjectLocation {
104    address: u64,
105    disk_length: u64,
106    filter_mask: u32,
107    memory_length: Option<u64>,
108}
109
110impl FractalHeapDirectBlockCache {
111    fn get_verified_block_storage(
112        &mut self,
113        heap: &FractalHeap,
114        location: DirectBlockLocation,
115        storage: &dyn Storage,
116        offset_size: u8,
117        filter_registry: Option<&FilterRegistry>,
118    ) -> Result<Arc<Vec<u8>>> {
119        let key = (
120            location.address,
121            location.block_size,
122            location.filtered_size,
123            location.filter_mask,
124        );
125        if let Some(block) = self.blocks.get(&key) {
126            return Ok(block.clone());
127        }
128
129        let block =
130            heap.load_direct_block_storage(location, storage, offset_size, filter_registry)?;
131        heap.verify_direct_block_bytes(&block, offset_size)?;
132        let block = Arc::new(block);
133        self.blocks.insert(key, block.clone());
134        Ok(block)
135    }
136}
137
138impl FractalHeap {
139    /// Parse a fractal heap header at the current cursor position.
140    ///
141    /// Format:
142    /// - Signature: `FRHP` (4 bytes)
143    /// - Version: 0 (1 byte)
144    /// - Heap ID length (u16 LE)
145    /// - I/O filters encoded length (u16 LE)
146    /// - Flags (u8)
147    /// - Max managed object size (u32 LE)
148    /// - Next huge object ID (`length_size` bytes)
149    /// - B-tree address for huge objects (`offset_size` bytes)
150    /// - Managed free-space amount (`length_size` bytes)
151    /// - Free-space manager address (`offset_size` bytes)
152    /// - Managed space amount (`length_size` bytes)
153    /// - Managed alloc amount (`length_size` bytes)
154    /// - Managed free-space iterator offset (`length_size` bytes)
155    /// - Managed objects count (`length_size` bytes)
156    /// - Huge objects size (`length_size` bytes)
157    /// - Huge objects count (`length_size` bytes)
158    /// - Tiny objects size (`length_size` bytes)
159    /// - Tiny objects count (`length_size` bytes)
160    /// - Table width (u16 LE)
161    /// - Starting block size (`length_size` bytes)
162    /// - Maximum direct block size (`length_size` bytes)
163    /// - Max heap size (u16 LE)
164    /// - Starting row of root indirect block (u16 LE)
165    /// - Root block address (`offset_size` bytes)
166    /// - Current rows in root indirect block (u16 LE)
167    /// - If io_filters_len > 0: filtered root direct block size (`length_size`), filter mask (u32 LE)
168    /// - Checksum (u32 LE)
169    pub fn parse(cursor: &mut Cursor, offset_size: u8, length_size: u8) -> Result<Self> {
170        let start = cursor.position();
171
172        let sig = cursor.read_bytes(4)?;
173        if sig != FRHP_SIGNATURE {
174            return Err(Error::InvalidFractalHeapSignature);
175        }
176
177        let version = cursor.read_u8()?;
178        if version != 0 {
179            return Err(Error::UnsupportedFractalHeapVersion(version));
180        }
181
182        let heap_id_len = cursor.read_u16_le()?;
183        let io_filters_len = cursor.read_u16_le()?;
184        let flags = cursor.read_u8()?;
185
186        let max_managed_object_size = cursor.read_u32_le()? as u64;
187        let next_huge_id = cursor.read_length(length_size)?;
188        let btree_huge_objects_address = cursor.read_offset(offset_size)?;
189        let _managed_free_space_amount = cursor.read_length(length_size)?;
190        let free_space_managed_address = cursor.read_offset(offset_size)?;
191        let managed_space_amount = cursor.read_length(length_size)?;
192        let managed_alloc_amount = cursor.read_length(length_size)?;
193        let managed_iter_offset = cursor.read_length(length_size)?;
194        let managed_objects_count = cursor.read_length(length_size)?;
195        let huge_objects_size = cursor.read_length(length_size)?;
196        let huge_objects_count = cursor.read_length(length_size)?;
197        let tiny_objects_size = cursor.read_length(length_size)?;
198        let tiny_objects_count = cursor.read_length(length_size)?;
199
200        let table_width = cursor.read_u16_le()?;
201        let starting_block_size = cursor.read_length(length_size)?;
202        let max_direct_block_size = cursor.read_length(length_size)?;
203        let max_heap_size = cursor.read_u16_le()?;
204        let starting_row_root_indirect = cursor.read_u16_le()?;
205        let root_block_address = cursor.read_offset(offset_size)?;
206        let current_rows_in_root_indirect = cursor.read_u16_le()?;
207
208        let (io_filter_size, io_filter_mask) = if io_filters_len > 0 {
209            let size = cursor.read_length(length_size)?;
210            let mask = cursor.read_u32_le()?;
211            (Some(size), Some(mask))
212        } else {
213            (None, None)
214        };
215        let io_filter_info = if io_filters_len > 0 {
216            cursor.read_bytes(usize::from(io_filters_len))?.to_vec()
217        } else {
218            Vec::new()
219        };
220
221        // Verify checksum.
222        let checksum_end = cursor.position();
223        let stored_checksum = cursor.read_u32_le()?;
224        let computed = jenkins_lookup3(&cursor.data()[start as usize..checksum_end as usize]);
225        if computed != stored_checksum {
226            return Err(Error::ChecksumMismatch {
227                expected: stored_checksum,
228                actual: computed,
229            });
230        }
231
232        Ok(FractalHeap {
233            heap_id_len,
234            io_filters_len,
235            flags,
236            max_managed_object_size,
237            next_huge_id,
238            btree_huge_objects_address,
239            free_space_managed_address,
240            managed_space_amount,
241            managed_alloc_amount,
242            managed_iter_offset,
243            managed_objects_count,
244            huge_objects_size,
245            huge_objects_count,
246            tiny_objects_size,
247            tiny_objects_count,
248            table_width,
249            starting_block_size,
250            max_direct_block_size,
251            max_heap_size,
252            starting_row_root_indirect,
253            root_block_address,
254            current_rows_in_root_indirect,
255            io_filter_size,
256            io_filter_mask,
257            io_filter_info,
258        })
259    }
260
261    /// Parse a fractal heap header from random-access storage.
262    pub fn parse_at_storage(
263        storage: &dyn Storage,
264        address: u64,
265        offset_size: u8,
266        length_size: u8,
267    ) -> Result<Self> {
268        let max_header_len = 256usize;
269        let available = storage.len().saturating_sub(address);
270        let len = usize::try_from(available.min(max_header_len as u64)).map_err(|_| {
271            Error::InvalidData("fractal heap header exceeds platform usize capacity".into())
272        })?;
273        let bytes = storage.read_range(address, len)?;
274        let mut cursor = Cursor::new(bytes.as_ref());
275        Self::parse(&mut cursor, offset_size, length_size)
276    }
277
278    /// Extract any fractal heap object given a heap ID.
279    pub fn get_object(
280        &self,
281        heap_id: &[u8],
282        file_data: &[u8],
283        offset_size: u8,
284        length_size: u8,
285    ) -> Result<Vec<u8>> {
286        self.get_object_with_registry(heap_id, file_data, offset_size, length_size, None)
287    }
288
289    /// Extract any fractal heap object using a caller-provided filter registry
290    /// for filtered managed and huge objects.
291    pub fn get_object_with_registry(
292        &self,
293        heap_id: &[u8],
294        file_data: &[u8],
295        offset_size: u8,
296        length_size: u8,
297        filter_registry: Option<&FilterRegistry>,
298    ) -> Result<Vec<u8>> {
299        match self.heap_id_kind(heap_id)? {
300            HeapIdKind::Managed => self.get_managed_object_impl(
301                heap_id,
302                file_data,
303                offset_size,
304                length_size,
305                filter_registry,
306            ),
307            HeapIdKind::Huge => self.get_huge_object(
308                heap_id,
309                file_data,
310                offset_size,
311                length_size,
312                filter_registry,
313            ),
314            HeapIdKind::Tiny => self.decode_tiny_object(heap_id),
315        }
316    }
317
318    /// Extract any fractal heap object from random-access storage.
319    pub fn get_object_storage(
320        &self,
321        heap_id: &[u8],
322        storage: &dyn Storage,
323        offset_size: u8,
324        length_size: u8,
325    ) -> Result<Vec<u8>> {
326        self.get_object_storage_with_registry(heap_id, storage, offset_size, length_size, None)
327    }
328
329    /// Extract any fractal heap object from random-access storage using a
330    /// caller-provided filter registry for filtered managed and huge objects.
331    pub fn get_object_storage_with_registry(
332        &self,
333        heap_id: &[u8],
334        storage: &dyn Storage,
335        offset_size: u8,
336        length_size: u8,
337        filter_registry: Option<&FilterRegistry>,
338    ) -> Result<Vec<u8>> {
339        let mut cache = FractalHeapDirectBlockCache::default();
340        self.get_object_storage_cached_with_registry(
341            heap_id,
342            storage,
343            offset_size,
344            length_size,
345            &mut cache,
346            filter_registry,
347        )
348    }
349
350    /// Extract any fractal heap object from storage, reusing verified direct
351    /// blocks across managed object lookups.
352    pub fn get_object_storage_cached(
353        &self,
354        heap_id: &[u8],
355        storage: &dyn Storage,
356        offset_size: u8,
357        length_size: u8,
358        direct_block_cache: &mut FractalHeapDirectBlockCache,
359    ) -> Result<Vec<u8>> {
360        self.get_object_storage_cached_with_registry(
361            heap_id,
362            storage,
363            offset_size,
364            length_size,
365            direct_block_cache,
366            None,
367        )
368    }
369
370    /// Extract any fractal heap object from storage while reusing verified
371    /// direct blocks and a caller-provided filter registry.
372    pub fn get_object_storage_cached_with_registry(
373        &self,
374        heap_id: &[u8],
375        storage: &dyn Storage,
376        offset_size: u8,
377        length_size: u8,
378        direct_block_cache: &mut FractalHeapDirectBlockCache,
379        filter_registry: Option<&FilterRegistry>,
380    ) -> Result<Vec<u8>> {
381        match self.heap_id_kind(heap_id)? {
382            HeapIdKind::Managed => self.get_managed_object_storage_cached_impl(
383                heap_id,
384                storage,
385                offset_size,
386                length_size,
387                direct_block_cache,
388                filter_registry,
389            ),
390            HeapIdKind::Huge => self.get_huge_object_storage(
391                heap_id,
392                storage,
393                offset_size,
394                length_size,
395                filter_registry,
396            ),
397            HeapIdKind::Tiny => self.decode_tiny_object(heap_id),
398        }
399    }
400
401    /// Extract a fractal heap object. Kept for existing callers; now also
402    /// handles tiny and huge IDs.
403    pub fn get_managed_object(
404        &self,
405        heap_id: &[u8],
406        file_data: &[u8],
407        offset_size: u8,
408        length_size: u8,
409    ) -> Result<Vec<u8>> {
410        self.get_object_with_registry(heap_id, file_data, offset_size, length_size, None)
411    }
412
413    /// Extract a fractal heap object from random-access storage. Kept for
414    /// existing callers; now also handles tiny and huge IDs.
415    pub fn get_managed_object_storage(
416        &self,
417        heap_id: &[u8],
418        storage: &dyn Storage,
419        offset_size: u8,
420        length_size: u8,
421    ) -> Result<Vec<u8>> {
422        self.get_object_storage_with_registry(heap_id, storage, offset_size, length_size, None)
423    }
424
425    fn get_managed_object_impl(
426        &self,
427        heap_id: &[u8],
428        file_data: &[u8],
429        offset_size: u8,
430        length_size: u8,
431        filter_registry: Option<&FilterRegistry>,
432    ) -> Result<Vec<u8>> {
433        let (heap_offset, obj_length) = self.decode_managed_heap_id(heap_id)?;
434
435        if obj_length == 0 {
436            return Ok(Vec::new());
437        }
438
439        let location = self.find_direct_block(heap_offset, file_data, offset_size, length_size)?;
440        let block = self.load_direct_block(file_data, location, filter_registry)?;
441        self.verify_direct_block_bytes(&block, offset_size)?;
442        let offset_in_block = heap_offset
443            .checked_sub(location.block_offset_in_heap)
444            .ok_or_else(|| {
445                Error::InvalidData("fractal heap object precedes direct block".into())
446            })?;
447        let data_start = usize::try_from(offset_in_block)
448            .map_err(|_| Error::InvalidData("fractal heap object offset exceeds usize".into()))?;
449        let len = usize::try_from(obj_length).map_err(|_| {
450            Error::InvalidData("fractal heap object exceeds platform usize capacity".into())
451        })?;
452        let data_end = data_start
453            .checked_add(len)
454            .ok_or(Error::OffsetOutOfBounds(location.address))?;
455
456        if data_end > block.len() {
457            return Err(Error::UnexpectedEof {
458                offset: location
459                    .address
460                    .checked_add(offset_in_block)
461                    .ok_or(Error::OffsetOutOfBounds(location.address))?,
462                needed: obj_length,
463                available: block.len().saturating_sub(data_start) as u64,
464            });
465        }
466
467        Ok(block[data_start..data_end].to_vec())
468    }
469
470    fn get_managed_object_storage_cached_impl(
471        &self,
472        heap_id: &[u8],
473        storage: &dyn Storage,
474        offset_size: u8,
475        length_size: u8,
476        direct_block_cache: &mut FractalHeapDirectBlockCache,
477        filter_registry: Option<&FilterRegistry>,
478    ) -> Result<Vec<u8>> {
479        let (heap_offset, obj_length) = self.decode_managed_heap_id(heap_id)?;
480        if obj_length == 0 {
481            return Ok(Vec::new());
482        }
483
484        let location =
485            self.find_direct_block_storage(heap_offset, storage, offset_size, length_size)?;
486        let block = direct_block_cache.get_verified_block_storage(
487            self,
488            location,
489            storage,
490            offset_size,
491            filter_registry,
492        )?;
493        let offset_in_block = heap_offset
494            .checked_sub(location.block_offset_in_heap)
495            .ok_or_else(|| {
496                Error::InvalidData("fractal heap object precedes direct block".into())
497            })?;
498        let start = usize::try_from(offset_in_block)
499            .map_err(|_| Error::InvalidData("fractal heap object offset exceeds usize".into()))?;
500        let len = usize::try_from(obj_length).map_err(|_| {
501            Error::InvalidData("fractal heap object exceeds platform usize capacity".into())
502        })?;
503        let end = start
504            .checked_add(len)
505            .ok_or(Error::OffsetOutOfBounds(location.address))?;
506        if end > block.len() {
507            let data_start = location
508                .address
509                .checked_add(offset_in_block)
510                .ok_or(Error::OffsetOutOfBounds(location.address))?;
511            return Err(Error::UnexpectedEof {
512                offset: data_start,
513                needed: obj_length,
514                available: block.len().saturating_sub(start) as u64,
515            });
516        }
517        Ok(block[start..end].to_vec())
518    }
519
520    fn get_huge_object(
521        &self,
522        heap_id: &[u8],
523        file_data: &[u8],
524        offset_size: u8,
525        length_size: u8,
526        filter_registry: Option<&FilterRegistry>,
527    ) -> Result<Vec<u8>> {
528        let location = self.resolve_huge_object_location(
529            heap_id,
530            Some(file_data),
531            None,
532            offset_size,
533            length_size,
534        )?;
535        let start = usize::try_from(location.address)
536            .map_err(|_| Error::OffsetOutOfBounds(location.address))?;
537        let len = usize::try_from(location.disk_length).map_err(|_| {
538            Error::InvalidData("huge fractal heap object exceeds platform usize capacity".into())
539        })?;
540        let end = start
541            .checked_add(len)
542            .ok_or(Error::OffsetOutOfBounds(location.address))?;
543        if end > file_data.len() {
544            return Err(Error::UnexpectedEof {
545                offset: location.address,
546                needed: location.disk_length,
547                available: file_data.len().saturating_sub(start) as u64,
548            });
549        }
550        self.decode_huge_object_bytes(&file_data[start..end], location, filter_registry)
551    }
552
553    fn get_huge_object_storage(
554        &self,
555        heap_id: &[u8],
556        storage: &dyn Storage,
557        offset_size: u8,
558        length_size: u8,
559        filter_registry: Option<&FilterRegistry>,
560    ) -> Result<Vec<u8>> {
561        let location = self.resolve_huge_object_location(
562            heap_id,
563            None,
564            Some(storage),
565            offset_size,
566            length_size,
567        )?;
568        let len = usize::try_from(location.disk_length).map_err(|_| {
569            Error::InvalidData("huge fractal heap object exceeds platform usize capacity".into())
570        })?;
571        let bytes = storage.read_range(location.address, len)?;
572        self.decode_huge_object_bytes(bytes.as_ref(), location, filter_registry)
573    }
574
575    fn decode_huge_object_bytes(
576        &self,
577        bytes: &[u8],
578        location: HugeObjectLocation,
579        filter_registry: Option<&FilterRegistry>,
580    ) -> Result<Vec<u8>> {
581        if let Some(memory_length) = location.memory_length {
582            self.apply_heap_filters(
583                bytes,
584                location.filter_mask,
585                memory_length,
586                "filtered fractal heap huge object",
587                filter_registry,
588            )
589        } else {
590            Ok(bytes.to_vec())
591        }
592    }
593
594    fn load_direct_block(
595        &self,
596        file_data: &[u8],
597        location: DirectBlockLocation,
598        filter_registry: Option<&FilterRegistry>,
599    ) -> Result<Vec<u8>> {
600        let read_len = location.filtered_size.unwrap_or(location.block_size);
601        let start = usize::try_from(location.address)
602            .map_err(|_| Error::OffsetOutOfBounds(location.address))?;
603        let len = usize::try_from(read_len).map_err(|_| {
604            Error::InvalidData("fractal heap direct block size exceeds platform usize".into())
605        })?;
606        let end = start
607            .checked_add(len)
608            .ok_or(Error::OffsetOutOfBounds(location.address))?;
609        if end > file_data.len() {
610            return Err(Error::UnexpectedEof {
611                offset: location.address,
612                needed: read_len,
613                available: file_data.len().saturating_sub(start) as u64,
614            });
615        }
616        let block = if location.filtered_size.is_some() {
617            self.apply_heap_filters(
618                &file_data[start..end],
619                location.filter_mask,
620                location.block_size,
621                "filtered fractal heap direct block",
622                filter_registry,
623            )?
624        } else {
625            file_data[start..end].to_vec()
626        };
627        let expected = usize::try_from(location.block_size).map_err(|_| {
628            Error::InvalidData("fractal heap direct block size exceeds platform usize".into())
629        })?;
630        if block.len() != expected {
631            return Err(Error::InvalidData(format!(
632                "fractal heap direct block has {} bytes, expected {} bytes",
633                block.len(),
634                expected
635            )));
636        }
637        Ok(block)
638    }
639
640    fn load_direct_block_storage(
641        &self,
642        location: DirectBlockLocation,
643        storage: &dyn Storage,
644        _offset_size: u8,
645        filter_registry: Option<&FilterRegistry>,
646    ) -> Result<Vec<u8>> {
647        let read_len = location.filtered_size.unwrap_or(location.block_size);
648        let len = usize::try_from(read_len).map_err(|_| {
649            Error::InvalidData("fractal heap direct block size exceeds platform usize".into())
650        })?;
651        let bytes = storage.read_range(location.address, len)?;
652        let block = if location.filtered_size.is_some() {
653            self.apply_heap_filters(
654                bytes.as_ref(),
655                location.filter_mask,
656                location.block_size,
657                "filtered fractal heap direct block",
658                filter_registry,
659            )?
660        } else {
661            bytes.to_vec()
662        };
663        let expected = usize::try_from(location.block_size).map_err(|_| {
664            Error::InvalidData("fractal heap direct block size exceeds platform usize".into())
665        })?;
666        if block.len() != expected {
667            return Err(Error::InvalidData(format!(
668                "fractal heap direct block has {} bytes, expected {} bytes",
669                block.len(),
670                expected
671            )));
672        }
673        Ok(block)
674    }
675
676    fn apply_heap_filters(
677        &self,
678        bytes: &[u8],
679        filter_mask: u32,
680        expected_len: u64,
681        context: &str,
682        filter_registry: Option<&FilterRegistry>,
683    ) -> Result<Vec<u8>> {
684        let pipeline = self.filter_pipeline()?;
685        let decoded =
686            filters::apply_pipeline(bytes, &pipeline.filters, filter_mask, 1, filter_registry)?;
687        let expected = usize::try_from(expected_len).map_err(|_| {
688            Error::InvalidData(format!("{context} size exceeds platform usize capacity"))
689        })?;
690        if decoded.len() != expected {
691            return Err(Error::InvalidData(format!(
692                "{context} decoded to {} bytes, expected {} bytes",
693                decoded.len(),
694                expected
695            )));
696        }
697        Ok(decoded)
698    }
699
700    fn filter_pipeline(&self) -> Result<FilterPipelineMessage> {
701        if self.io_filters_len == 0 {
702            return Err(Error::InvalidData(
703                "fractal heap object is marked filtered but the heap has no filter pipeline".into(),
704            ));
705        }
706        let mut cursor = Cursor::new(&self.io_filter_info);
707        crate::messages::filter_pipeline::parse(&mut cursor, 0, 0, self.io_filter_info.len())
708    }
709
710    fn resolve_huge_object_location(
711        &self,
712        heap_id: &[u8],
713        file_data: Option<&[u8]>,
714        storage: Option<&dyn Storage>,
715        offset_size: u8,
716        length_size: u8,
717    ) -> Result<HugeObjectLocation> {
718        let direct_unfiltered_len = 1 + usize::from(offset_size) + usize::from(length_size);
719        let direct_filtered_len = direct_unfiltered_len + 4 + usize::from(length_size);
720
721        if self.io_filters_len > 0 && heap_id.len() >= direct_filtered_len {
722            let mut cursor = Cursor::new(&heap_id[1..]);
723            let address = cursor.read_offset(offset_size)?;
724            let disk_length = cursor.read_length(length_size)?;
725            let filter_mask = cursor.read_u32_le()?;
726            let memory_length = cursor.read_length(length_size)?;
727            return Ok(HugeObjectLocation {
728                address,
729                disk_length,
730                filter_mask,
731                memory_length: Some(memory_length),
732            });
733        }
734
735        if self.io_filters_len == 0 && heap_id.len() >= direct_unfiltered_len {
736            let mut cursor = Cursor::new(&heap_id[1..]);
737            let address = cursor.read_offset(offset_size)?;
738            let disk_length = cursor.read_length(length_size)?;
739            return Ok(HugeObjectLocation {
740                address,
741                disk_length,
742                filter_mask: 0,
743                memory_length: None,
744            });
745        }
746
747        if heap_id.len() < 1 + usize::from(length_size) {
748            return Err(Error::InvalidData(
749                "huge fractal heap ID is too short".into(),
750            ));
751        }
752        if Cursor::is_undefined_offset(self.btree_huge_objects_address, offset_size) {
753            return Err(Error::UndefinedAddress);
754        }
755
756        let mut key_cursor = Cursor::new(&heap_id[1..]);
757        let object_id = key_cursor.read_length(length_size)?;
758
759        let header = if let Some(storage) = storage {
760            crate::btree_v2::BTreeV2Header::parse_at_storage(
761                storage,
762                self.btree_huge_objects_address,
763                offset_size,
764                length_size,
765            )?
766        } else {
767            let data = file_data.expect("file_data must exist when storage is None");
768            let mut cursor = Cursor::new(data);
769            cursor.set_position(self.btree_huge_objects_address);
770            crate::btree_v2::BTreeV2Header::parse(&mut cursor, offset_size, length_size)?
771        };
772
773        let records = if let Some(storage) = storage {
774            crate::btree_v2::collect_btree_v2_records_storage(
775                storage,
776                &header,
777                offset_size,
778                length_size,
779                None,
780                &[],
781                None,
782            )?
783        } else {
784            crate::btree_v2::collect_btree_v2_records(
785                file_data.expect("file_data must exist when storage is None"),
786                &header,
787                offset_size,
788                length_size,
789                None,
790                &[],
791                None,
792            )?
793        };
794
795        for record in records {
796            match record {
797                crate::btree_v2::BTreeV2Record::HugeIndirectNonFiltered {
798                    address,
799                    length,
800                    object_id: record_id,
801                } if record_id == object_id => {
802                    return Ok(HugeObjectLocation {
803                        address,
804                        disk_length: length,
805                        filter_mask: 0,
806                        memory_length: None,
807                    })
808                }
809                crate::btree_v2::BTreeV2Record::HugeIndirectFiltered {
810                    object_id: record_id,
811                    address,
812                    filtered_length,
813                    filter_mask,
814                    memory_length,
815                } if record_id == object_id => {
816                    return Ok(HugeObjectLocation {
817                        address,
818                        disk_length: filtered_length,
819                        filter_mask,
820                        memory_length: Some(memory_length),
821                    });
822                }
823                _ => {}
824            }
825        }
826
827        Err(Error::InvalidData(format!(
828            "huge fractal heap object ID {} not found",
829            object_id
830        )))
831    }
832
833    fn decode_tiny_object(&self, heap_id: &[u8]) -> Result<Vec<u8>> {
834        let extended = self.heap_id_len > 18;
835        let (data_start, len) = if extended {
836            if heap_id.len() < 2 {
837                return Err(Error::InvalidData(
838                    "extended tiny heap ID is too short".into(),
839                ));
840            }
841            let encoded = (u16::from(heap_id[0] & 0x0F) << 8) | u16::from(heap_id[1]);
842            (2usize, usize::from(encoded) + 1)
843        } else {
844            (1usize, usize::from(heap_id[0] & 0x0F) + 1)
845        };
846        let data_end = data_start
847            .checked_add(len)
848            .ok_or_else(|| Error::InvalidData("tiny heap object length overflows".into()))?;
849        if data_end > heap_id.len() {
850            return Err(Error::InvalidData(format!(
851                "tiny heap object needs {} bytes, heap ID has {}",
852                data_end,
853                heap_id.len()
854            )));
855        }
856        Ok(heap_id[data_start..data_end].to_vec())
857    }
858
859    fn heap_id_kind(&self, heap_id: &[u8]) -> Result<HeapIdKind> {
860        if heap_id.is_empty() {
861            return Err(Error::InvalidData("empty fractal heap ID".into()));
862        }
863        let version = heap_id[0] >> 6;
864        if version != 0 {
865            return Err(Error::InvalidData(format!(
866                "unsupported fractal heap ID version {}",
867                version
868            )));
869        }
870        match (heap_id[0] >> 4) & 0x03 {
871            0 => Ok(HeapIdKind::Managed),
872            1 => Ok(HeapIdKind::Huge),
873            2 => Ok(HeapIdKind::Tiny),
874            other => Err(Error::InvalidData(format!(
875                "unknown fractal heap ID type {}",
876                other
877            ))),
878        }
879    }
880
881    fn decode_managed_heap_id(&self, heap_id: &[u8]) -> Result<(u64, u64)> {
882        let (offset_bytes, length_bytes) = self.managed_id_widths();
883        let needed = 1 + offset_bytes + length_bytes;
884        if heap_id.len() < needed {
885            return Err(Error::InvalidData(format!(
886                "managed fractal heap ID too short: need {} bytes, have {}",
887                needed,
888                heap_id.len()
889            )));
890        }
891        let mut cursor = Cursor::new(&heap_id[1..needed]);
892        let heap_offset = cursor.read_uvar(offset_bytes)?;
893        let obj_length = cursor.read_uvar(length_bytes)?;
894        Ok((heap_offset, obj_length))
895    }
896
897    fn managed_id_widths(&self) -> (usize, usize) {
898        let offset_bytes = usize::from(self.max_heap_size).div_ceil(8).max(1);
899        let max_len = self.max_direct_block_size.min(self.max_managed_object_size);
900        let length_bytes = bytes_needed_to_encode(max_len).max(1);
901        (offset_bytes, length_bytes)
902    }
903
904    /// Find the direct block containing a given heap offset.
905    ///
906    /// Returns (block_file_address, block_offset_within_heap, block_size).
907    fn find_direct_block(
908        &self,
909        heap_offset: u64,
910        file_data: &[u8],
911        offset_size: u8,
912        length_size: u8,
913    ) -> Result<DirectBlockLocation> {
914        if Cursor::is_undefined_offset(self.root_block_address, offset_size) {
915            return Err(Error::UndefinedAddress);
916        }
917
918        if self.current_rows_in_root_indirect == 0 {
919            // Root block is a direct block.
920            // The entire managed space is in this one block.
921            Ok(DirectBlockLocation {
922                address: self.root_block_address,
923                block_offset_in_heap: 0,
924                block_size: self.starting_block_size,
925                filtered_size: self.io_filter_size,
926                filter_mask: self.io_filter_mask.unwrap_or(0),
927            })
928        } else {
929            // Root block is an indirect block — traverse the doubling table.
930            self.find_direct_block_via_indirect(
931                self.root_block_address,
932                heap_offset,
933                file_data,
934                offset_size,
935                length_size,
936                self.current_rows_in_root_indirect,
937            )
938        }
939    }
940
941    fn find_direct_block_storage(
942        &self,
943        heap_offset: u64,
944        storage: &dyn Storage,
945        offset_size: u8,
946        length_size: u8,
947    ) -> Result<DirectBlockLocation> {
948        if Cursor::is_undefined_offset(self.root_block_address, offset_size) {
949            return Err(Error::UndefinedAddress);
950        }
951
952        if self.current_rows_in_root_indirect == 0 {
953            Ok(DirectBlockLocation {
954                address: self.root_block_address,
955                block_offset_in_heap: 0,
956                block_size: self.starting_block_size,
957                filtered_size: self.io_filter_size,
958                filter_mask: self.io_filter_mask.unwrap_or(0),
959            })
960        } else {
961            self.find_direct_block_via_indirect_storage(
962                self.root_block_address,
963                heap_offset,
964                storage,
965                offset_size,
966                length_size,
967                self.current_rows_in_root_indirect,
968            )
969        }
970    }
971
972    /// Traverse an indirect block to find the direct block for a given offset.
973    fn find_direct_block_via_indirect(
974        &self,
975        indirect_address: u64,
976        heap_offset: u64,
977        file_data: &[u8],
978        offset_size: u8,
979        length_size: u8,
980        nrows: u16,
981    ) -> Result<DirectBlockLocation> {
982        // Validate FHIB signature
983        let addr = indirect_address as usize;
984        if addr + 4 > file_data.len() {
985            return Err(Error::OffsetOutOfBounds(indirect_address));
986        }
987        if file_data[addr..addr + 4] != FHIB_SIGNATURE {
988            return Err(Error::InvalidData(format!(
989                "expected FHIB signature at offset {:#x}, got {:?}",
990                indirect_address,
991                &file_data[addr..addr + 4]
992            )));
993        }
994
995        // The doubling table has `table_width` entries per row.
996        // Row 0 and 1 have blocks of size `starting_block_size`.
997        // Row r (for r >= 1) has blocks of size `starting_block_size * 2^(r-1)`.
998        //
999        // We iterate through the rows to find which block contains the
1000        // target offset, then read the block address from the indirect block.
1001
1002        let width = self.table_width as u64;
1003        let mut running_offset: u64 = 0;
1004
1005        for row in 0..nrows as u64 {
1006            let block_size = self.block_size_for_row(row);
1007            let is_direct = block_size <= self.max_direct_block_size;
1008
1009            for col in 0..width {
1010                let block_end = running_offset + block_size;
1011                if heap_offset >= running_offset && heap_offset < block_end {
1012                    // This is the block we want. Read its address from the
1013                    // indirect block.
1014                    let entry_index = row * width + col;
1015
1016                    // Indirect block layout: signature(4) + version(1) +
1017                    // heap_header_addr(offset_size) + block_offset(max_heap_size/8 rounded up)
1018                    // Then direct-block entries, optionally with filtered size
1019                    // and mask, followed by child indirect-block addresses.
1020                    let iblock_header_size =
1021                        4 + 1 + offset_size as u64 + (self.max_heap_size as u64).div_ceil(8);
1022
1023                    if is_direct {
1024                        let entry_size = self.direct_block_entry_size(offset_size, length_size);
1025                        let entry_pos =
1026                            indirect_address + iblock_header_size + entry_index * entry_size;
1027                        let entry_len = usize::try_from(entry_size).map_err(|_| {
1028                            Error::InvalidData(
1029                                "fractal heap direct entry size exceeds platform usize".into(),
1030                            )
1031                        })?;
1032                        if entry_pos as usize + entry_len > file_data.len() {
1033                            return Err(Error::OffsetOutOfBounds(entry_pos));
1034                        }
1035                        let mut cursor = Cursor::new(file_data);
1036                        cursor.set_position(entry_pos);
1037                        let block_address = cursor.read_offset(offset_size)?;
1038                        if Cursor::is_undefined_offset(block_address, offset_size) {
1039                            return Err(Error::UndefinedAddress);
1040                        }
1041                        let (filtered_size, filter_mask) = if self.io_filters_len > 0 {
1042                            (
1043                                Some(cursor.read_length(length_size)?),
1044                                cursor.read_u32_le()?,
1045                            )
1046                        } else {
1047                            (None, 0)
1048                        };
1049                        return Ok(DirectBlockLocation {
1050                            address: block_address,
1051                            block_offset_in_heap: running_offset,
1052                            block_size,
1053                            filtered_size,
1054                            filter_mask,
1055                        });
1056                    } else {
1057                        // Need to recurse into a sub-indirect block.
1058                        let direct_count =
1059                            self.max_direct_block_rows() * u64::from(self.table_width);
1060                        let indirect_index =
1061                            entry_index.checked_sub(direct_count).ok_or_else(|| {
1062                                Error::InvalidData(
1063                                    "fractal heap indirect entry precedes direct entries".into(),
1064                                )
1065                            })?;
1066                        let entry_pos = indirect_address
1067                            + iblock_header_size
1068                            + direct_count * self.direct_block_entry_size(offset_size, length_size)
1069                            + indirect_index * u64::from(offset_size);
1070                        if entry_pos as usize + offset_size as usize > file_data.len() {
1071                            return Err(Error::OffsetOutOfBounds(entry_pos));
1072                        }
1073                        let mut cursor = Cursor::new(file_data);
1074                        cursor.set_position(entry_pos);
1075                        let block_address = cursor.read_offset(offset_size)?;
1076                        if Cursor::is_undefined_offset(block_address, offset_size) {
1077                            return Err(Error::UndefinedAddress);
1078                        }
1079                        // Determine how many rows the sub-indirect has.
1080                        let sub_rows = self.rows_for_block_size(block_size);
1081                        return self.find_direct_block_via_indirect(
1082                            block_address,
1083                            heap_offset - running_offset,
1084                            file_data,
1085                            offset_size,
1086                            length_size,
1087                            sub_rows,
1088                        );
1089                    }
1090                }
1091                running_offset = block_end;
1092            }
1093        }
1094
1095        Err(Error::InvalidData(format!(
1096            "fractal heap offset {} not found in doubling table",
1097            heap_offset
1098        )))
1099    }
1100
1101    fn find_direct_block_via_indirect_storage(
1102        &self,
1103        indirect_address: u64,
1104        heap_offset: u64,
1105        storage: &dyn Storage,
1106        offset_size: u8,
1107        length_size: u8,
1108        nrows: u16,
1109    ) -> Result<DirectBlockLocation> {
1110        let sig = storage.read_range(indirect_address, 4)?;
1111        if sig.as_ref() != FHIB_SIGNATURE {
1112            return Err(Error::InvalidData(format!(
1113                "expected FHIB signature at offset {:#x}, got {:?}",
1114                indirect_address,
1115                sig.as_ref()
1116            )));
1117        }
1118
1119        let width = self.table_width as u64;
1120        let mut running_offset = 0u64;
1121
1122        for row in 0..u64::from(nrows) {
1123            let block_size = self.block_size_for_row(row);
1124            let is_direct = block_size <= self.max_direct_block_size;
1125
1126            for col in 0..width {
1127                let block_end = running_offset + block_size;
1128                if heap_offset >= running_offset && heap_offset < block_end {
1129                    let entry_index = row * width + col;
1130                    let iblock_header_size = 4
1131                        + 1
1132                        + u64::from(offset_size)
1133                        + (u64::from(self.max_heap_size)).div_ceil(8);
1134
1135                    if is_direct {
1136                        let entry_size = self.direct_block_entry_size(offset_size, length_size);
1137                        let entry_pos =
1138                            indirect_address + iblock_header_size + entry_index * entry_size;
1139                        let entry_len = usize::try_from(entry_size).map_err(|_| {
1140                            Error::InvalidData(
1141                                "fractal heap direct entry size exceeds platform usize".into(),
1142                            )
1143                        })?;
1144                        let entry = storage.read_range(entry_pos, entry_len)?;
1145                        let mut cursor = Cursor::new(entry.as_ref());
1146                        let block_address = cursor.read_offset(offset_size)?;
1147                        if Cursor::is_undefined_offset(block_address, offset_size) {
1148                            return Err(Error::UndefinedAddress);
1149                        }
1150                        let (filtered_size, filter_mask) = if self.io_filters_len > 0 {
1151                            (
1152                                Some(cursor.read_length(length_size)?),
1153                                cursor.read_u32_le()?,
1154                            )
1155                        } else {
1156                            (None, 0)
1157                        };
1158                        return Ok(DirectBlockLocation {
1159                            address: block_address,
1160                            block_offset_in_heap: running_offset,
1161                            block_size,
1162                            filtered_size,
1163                            filter_mask,
1164                        });
1165                    }
1166
1167                    let direct_count = self.max_direct_block_rows() * u64::from(self.table_width);
1168                    let indirect_index =
1169                        entry_index.checked_sub(direct_count).ok_or_else(|| {
1170                            Error::InvalidData(
1171                                "fractal heap indirect entry precedes direct entries".into(),
1172                            )
1173                        })?;
1174                    let entry_addr_pos = indirect_address
1175                        + iblock_header_size
1176                        + direct_count * self.direct_block_entry_size(offset_size, length_size)
1177                        + indirect_index * u64::from(offset_size);
1178                    let entry = storage.read_range(entry_addr_pos, usize::from(offset_size))?;
1179                    let mut cursor = Cursor::new(entry.as_ref());
1180                    let block_address = cursor.read_offset(offset_size)?;
1181                    if Cursor::is_undefined_offset(block_address, offset_size) {
1182                        return Err(Error::UndefinedAddress);
1183                    }
1184
1185                    let sub_rows = self.rows_for_block_size(block_size);
1186                    return self.find_direct_block_via_indirect_storage(
1187                        block_address,
1188                        heap_offset - running_offset,
1189                        storage,
1190                        offset_size,
1191                        length_size,
1192                        sub_rows,
1193                    );
1194                }
1195                running_offset = block_end;
1196            }
1197        }
1198
1199        Err(Error::InvalidData(format!(
1200            "fractal heap offset {} not found in doubling table",
1201            heap_offset
1202        )))
1203    }
1204
1205    /// Compute the block size for a given row in the doubling table.
1206    fn block_size_for_row(&self, row: u64) -> u64 {
1207        if row == 0 {
1208            self.starting_block_size
1209        } else {
1210            self.starting_block_size * (1u64 << (row - 1))
1211        }
1212    }
1213
1214    /// Compute how many rows of the doubling table fit in a block of the
1215    /// given total size.
1216    fn rows_for_block_size(&self, total_size: u64) -> u16 {
1217        let mut rows = 0u16;
1218        let mut accum = 0u64;
1219        let width = self.table_width as u64;
1220        loop {
1221            let bs = self.block_size_for_row(rows as u64);
1222            let row_total = bs * width;
1223            if accum + row_total > total_size {
1224                break;
1225            }
1226            accum += row_total;
1227            rows += 1;
1228            if rows > 1000 {
1229                break; // safety
1230            }
1231        }
1232        rows
1233    }
1234
1235    fn max_direct_block_rows(&self) -> u64 {
1236        let mut rows = 0u64;
1237        loop {
1238            if self.block_size_for_row(rows) > self.max_direct_block_size {
1239                break;
1240            }
1241            rows += 1;
1242            if rows > 1000 {
1243                break;
1244            }
1245        }
1246        rows
1247    }
1248
1249    fn direct_block_entry_size(&self, offset_size: u8, length_size: u8) -> u64 {
1250        let mut size = u64::from(offset_size);
1251        if self.io_filters_len > 0 {
1252            size += u64::from(length_size) + 4;
1253        }
1254        size
1255    }
1256
1257    /// Size in bytes of an unfiltered direct block header.
1258    fn direct_block_header_size(&self, offset_size: u8) -> usize {
1259        // Signature(4) + Version(1) + Heap header address(offset_size) +
1260        // Block offset within heap (max_heap_size bits, rounded up to bytes) +
1261        // optional Checksum(4).
1262        let offset_bytes = (self.max_heap_size as usize).div_ceil(8);
1263        let checksum_bytes = if self.direct_blocks_are_checksummed() {
1264            4
1265        } else {
1266            0
1267        };
1268        4 + 1 + offset_size as usize + offset_bytes + checksum_bytes
1269    }
1270
1271    fn direct_blocks_are_checksummed(&self) -> bool {
1272        self.flags & 0x02 != 0
1273    }
1274
1275    fn direct_block_checksum_pos(&self, offset_size: u8) -> Option<usize> {
1276        if self.direct_blocks_are_checksummed() {
1277            Some(self.direct_block_header_size(offset_size) - 4)
1278        } else {
1279            None
1280        }
1281    }
1282
1283    fn verify_direct_block_bytes(&self, block: &[u8], offset_size: u8) -> Result<()> {
1284        if block.len() < self.direct_block_header_size(offset_size) {
1285            return Err(Error::InvalidData(format!(
1286                "fractal heap direct block has {} bytes, expected at least {}",
1287                block.len(),
1288                self.direct_block_header_size(offset_size)
1289            )));
1290        }
1291        if block[..4] != _FHDB_SIGNATURE {
1292            return Err(Error::InvalidData(
1293                "invalid fractal heap direct block signature".into(),
1294            ));
1295        }
1296        let version = block[4];
1297        if version != 0 {
1298            return Err(Error::UnsupportedFractalHeapVersion(version));
1299        }
1300        if let Some(checksum_pos) = self.direct_block_checksum_pos(offset_size) {
1301            let stored_checksum = u32::from_le_bytes(
1302                block[checksum_pos..checksum_pos + 4]
1303                    .try_into()
1304                    .expect("direct block checksum slice has four bytes"),
1305            );
1306            let mut checksum_data = block.to_vec();
1307            checksum_data[checksum_pos..checksum_pos + 4].fill(0);
1308            let computed = jenkins_lookup3(&checksum_data);
1309            if computed != stored_checksum {
1310                return Err(Error::ChecksumMismatch {
1311                    expected: stored_checksum,
1312                    actual: computed,
1313                });
1314            }
1315        }
1316        Ok(())
1317    }
1318}
1319
1320#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1321enum HeapIdKind {
1322    Managed,
1323    Huge,
1324    Tiny,
1325}
1326
1327fn bytes_needed_to_encode(value: u64) -> usize {
1328    if value <= u8::MAX as u64 {
1329        1
1330    } else if value <= u16::MAX as u64 {
1331        2
1332    } else if value <= 0x00FF_FFFF {
1333        3
1334    } else if value <= u32::MAX as u64 {
1335        4
1336    } else if value <= 0x00FF_FFFF_FFFF {
1337        5
1338    } else if value <= 0x0000_FFFF_FFFF_FFFF {
1339        6
1340    } else if value <= 0x00FF_FFFF_FFFF_FFFF {
1341        7
1342    } else {
1343        8
1344    }
1345}
1346
1347#[cfg(test)]
1348mod tests {
1349    use super::*;
1350    use crate::storage::{Storage, StorageBuffer};
1351    use flate2::write::ZlibEncoder;
1352    use flate2::Compression;
1353    use std::io::Write;
1354    use std::sync::{Arc, Mutex};
1355
1356    fn base_heap() -> FractalHeap {
1357        FractalHeap {
1358            heap_id_len: 8,
1359            io_filters_len: 0,
1360            flags: 0x02,
1361            max_managed_object_size: 128,
1362            next_huge_id: 0,
1363            btree_huge_objects_address: u64::MAX,
1364            free_space_managed_address: 0,
1365            managed_space_amount: 0,
1366            managed_alloc_amount: 0,
1367            managed_iter_offset: 0,
1368            managed_objects_count: 0,
1369            huge_objects_size: 0,
1370            huge_objects_count: 0,
1371            tiny_objects_size: 0,
1372            tiny_objects_count: 0,
1373            table_width: 4,
1374            starting_block_size: 256,
1375            max_direct_block_size: 4096,
1376            max_heap_size: 16,
1377            starting_row_root_indirect: 0,
1378            root_block_address: 0,
1379            current_rows_in_root_indirect: 0,
1380            io_filter_size: None,
1381            io_filter_mask: None,
1382            io_filter_info: Vec::new(),
1383        }
1384    }
1385
1386    fn deflate_filter_info() -> Vec<u8> {
1387        let mut data = vec![0x02, 0x01];
1388        data.extend_from_slice(&1u16.to_le_bytes());
1389        data.extend_from_slice(&0u16.to_le_bytes());
1390        data.extend_from_slice(&1u16.to_le_bytes());
1391        data.extend_from_slice(&6u32.to_le_bytes());
1392        data
1393    }
1394
1395    fn zlib_compress(bytes: &[u8]) -> Vec<u8> {
1396        let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
1397        encoder.write_all(bytes).unwrap();
1398        encoder.finish().unwrap()
1399    }
1400
1401    fn filtered_heap_with_info(filter_info: Vec<u8>) -> FractalHeap {
1402        let mut heap = base_heap();
1403        heap.io_filters_len = filter_info.len() as u16;
1404        heap.io_filter_info = filter_info;
1405        heap
1406    }
1407
1408    fn direct_block_with_object(heap: &FractalHeap, block_size: usize, obj: &[u8]) -> Vec<u8> {
1409        let offset_size = 8;
1410        let mut block = vec![0u8; block_size];
1411        block[0..4].copy_from_slice(b"FHDB");
1412        block[4] = 0;
1413        let obj_offset = heap.direct_block_header_size(offset_size);
1414        block[obj_offset..obj_offset + obj.len()].copy_from_slice(obj);
1415        if let Some(checksum_pos) = heap.direct_block_checksum_pos(offset_size) {
1416            let mut checksum_data = block.clone();
1417            checksum_data[checksum_pos..checksum_pos + 4].fill(0);
1418            let checksum = jenkins_lookup3(&checksum_data);
1419            block[checksum_pos..checksum_pos + 4].copy_from_slice(&checksum.to_le_bytes());
1420        }
1421        block
1422    }
1423
1424    struct CountingStorage {
1425        data: Vec<u8>,
1426        reads: Arc<Mutex<Vec<(u64, usize)>>>,
1427    }
1428
1429    impl Storage for CountingStorage {
1430        fn len(&self) -> u64 {
1431            self.data.len() as u64
1432        }
1433
1434        fn read_range(&self, offset: u64, len: usize) -> Result<StorageBuffer> {
1435            self.reads.lock().unwrap().push((offset, len));
1436            let start = usize::try_from(offset).map_err(|_| Error::OffsetOutOfBounds(offset))?;
1437            let end = start
1438                .checked_add(len)
1439                .ok_or(Error::OffsetOutOfBounds(offset))?;
1440            if end > self.data.len() {
1441                return Err(Error::UnexpectedEof {
1442                    offset,
1443                    needed: len as u64,
1444                    available: self.len().saturating_sub(offset),
1445                });
1446            }
1447            Ok(StorageBuffer::from_vec(self.data[start..end].to_vec()))
1448        }
1449    }
1450
1451    #[test]
1452    fn test_block_size_for_row() {
1453        let heap = FractalHeap {
1454            heap_id_len: 8,
1455            io_filters_len: 0,
1456            flags: 0x02,
1457            max_managed_object_size: 0,
1458            next_huge_id: 0,
1459            btree_huge_objects_address: 0,
1460            free_space_managed_address: 0,
1461            managed_space_amount: 0,
1462            managed_alloc_amount: 0,
1463            managed_iter_offset: 0,
1464            managed_objects_count: 0,
1465            huge_objects_size: 0,
1466            huge_objects_count: 0,
1467            tiny_objects_size: 0,
1468            tiny_objects_count: 0,
1469            table_width: 4,
1470            starting_block_size: 256,
1471            max_direct_block_size: 4096,
1472            max_heap_size: 16,
1473            starting_row_root_indirect: 0,
1474            root_block_address: 0,
1475            current_rows_in_root_indirect: 0,
1476            io_filter_size: None,
1477            io_filter_mask: None,
1478            io_filter_info: Vec::new(),
1479        };
1480
1481        assert_eq!(heap.block_size_for_row(0), 256);
1482        assert_eq!(heap.block_size_for_row(1), 256); // 256 * 2^0
1483        assert_eq!(heap.block_size_for_row(2), 512); // 256 * 2^1
1484        assert_eq!(heap.block_size_for_row(3), 1024); // 256 * 2^2
1485    }
1486
1487    #[test]
1488    fn test_get_tiny_object() {
1489        let heap = base_heap();
1490        let heap_id = [0x20 | 3, b't', b'i', b'n', b'y'];
1491        let result = heap.get_object(&heap_id, &[], 8, 8).unwrap();
1492        assert_eq!(result, b"tiny");
1493    }
1494
1495    #[test]
1496    fn test_get_huge_direct_object() {
1497        let heap = base_heap();
1498        let mut file_data = vec![0u8; 128];
1499        file_data[64..68].copy_from_slice(b"huge");
1500
1501        let mut heap_id = Vec::new();
1502        heap_id.push(0x10);
1503        heap_id.extend_from_slice(&64u64.to_le_bytes());
1504        heap_id.extend_from_slice(&4u64.to_le_bytes());
1505
1506        let result = heap.get_object(&heap_id, &file_data, 8, 8).unwrap();
1507        assert_eq!(result, b"huge");
1508    }
1509
1510    #[test]
1511    fn test_get_filtered_huge_direct_object() {
1512        let heap = filtered_heap_with_info(deflate_filter_info());
1513        let payload = b"filtered huge payload";
1514        let compressed = zlib_compress(payload);
1515        let address = 64u64;
1516        let mut file_data = vec![0u8; address as usize + compressed.len()];
1517        file_data[address as usize..].copy_from_slice(&compressed);
1518
1519        let mut heap_id = Vec::new();
1520        heap_id.push(0x10);
1521        heap_id.extend_from_slice(&address.to_le_bytes());
1522        heap_id.extend_from_slice(&(compressed.len() as u64).to_le_bytes());
1523        heap_id.extend_from_slice(&0u32.to_le_bytes());
1524        heap_id.extend_from_slice(&(payload.len() as u64).to_le_bytes());
1525
1526        let result = heap.get_object(&heap_id, &file_data, 8, 8).unwrap();
1527        assert_eq!(result, payload);
1528    }
1529
1530    #[test]
1531    fn test_get_filtered_managed_object_direct_root() {
1532        let mut heap = filtered_heap_with_info(deflate_filter_info());
1533        let block_address = 1000u64;
1534        let obj_data = b"filtered managed";
1535        let block = direct_block_with_object(&heap, heap.starting_block_size as usize, obj_data);
1536        let compressed = zlib_compress(&block);
1537        heap.root_block_address = block_address;
1538        heap.io_filter_size = Some(compressed.len() as u64);
1539        heap.io_filter_mask = Some(0);
1540
1541        let file_size = block_address as usize + compressed.len();
1542        let mut file_data = vec![0u8; file_size];
1543        file_data[block_address as usize..].copy_from_slice(&compressed);
1544
1545        let obj_offset = heap.direct_block_header_size(8) as u16;
1546        let mut heap_id = vec![0x00];
1547        heap_id.extend_from_slice(&obj_offset.to_le_bytes());
1548        heap_id.push(obj_data.len() as u8);
1549
1550        let result = heap.get_object(&heap_id, &file_data, 8, 8).unwrap();
1551        assert_eq!(result, obj_data);
1552    }
1553
1554    #[test]
1555    fn test_get_filtered_managed_object_from_indirect_block() {
1556        let mut heap = filtered_heap_with_info(deflate_filter_info());
1557        let indirect_address = 512u64;
1558        let block_address = 1000u64;
1559        let obj_data = b"filtered child";
1560        let block = direct_block_with_object(&heap, heap.starting_block_size as usize, obj_data);
1561        let compressed = zlib_compress(&block);
1562
1563        heap.root_block_address = indirect_address;
1564        heap.current_rows_in_root_indirect = 1;
1565
1566        let offset_bytes = usize::from(heap.max_heap_size).div_ceil(8);
1567        let iblock_header_size = 4 + 1 + 8 + offset_bytes;
1568        let direct_entry_size = 8 + 8 + 4;
1569        let mut indirect = vec![0u8; iblock_header_size + direct_entry_size * 4];
1570        indirect[0..4].copy_from_slice(b"FHIB");
1571        indirect[4] = 0;
1572        let entry_pos = iblock_header_size;
1573        indirect[entry_pos..entry_pos + 8].copy_from_slice(&block_address.to_le_bytes());
1574        indirect[entry_pos + 8..entry_pos + 16]
1575            .copy_from_slice(&(compressed.len() as u64).to_le_bytes());
1576        indirect[entry_pos + 16..entry_pos + 20].copy_from_slice(&0u32.to_le_bytes());
1577
1578        let file_size = block_address as usize + compressed.len();
1579        let mut file_data = vec![0u8; file_size];
1580        file_data[indirect_address as usize..indirect_address as usize + indirect.len()]
1581            .copy_from_slice(&indirect);
1582        file_data[block_address as usize..].copy_from_slice(&compressed);
1583        let storage = crate::storage::BytesStorage::new(file_data);
1584
1585        let obj_offset = heap.direct_block_header_size(8) as u16;
1586        let mut heap_id = vec![0x00];
1587        heap_id.extend_from_slice(&obj_offset.to_le_bytes());
1588        heap_id.push(obj_data.len() as u8);
1589
1590        let result = heap.get_object_storage(&heap_id, &storage, 8, 8).unwrap();
1591        assert_eq!(result, obj_data);
1592    }
1593
1594    #[test]
1595    fn test_direct_block_header_size() {
1596        let heap = FractalHeap {
1597            heap_id_len: 8,
1598            io_filters_len: 0,
1599            flags: 0x02,
1600            max_managed_object_size: 0,
1601            next_huge_id: 0,
1602            btree_huge_objects_address: 0,
1603            free_space_managed_address: 0,
1604            managed_space_amount: 0,
1605            managed_alloc_amount: 0,
1606            managed_iter_offset: 0,
1607            managed_objects_count: 0,
1608            huge_objects_size: 0,
1609            huge_objects_count: 0,
1610            tiny_objects_size: 0,
1611            tiny_objects_count: 0,
1612            table_width: 4,
1613            starting_block_size: 256,
1614            max_direct_block_size: 4096,
1615            max_heap_size: 16,
1616            starting_row_root_indirect: 0,
1617            root_block_address: 0,
1618            current_rows_in_root_indirect: 0,
1619            io_filter_size: None,
1620            io_filter_mask: None,
1621            io_filter_info: Vec::new(),
1622        };
1623
1624        // sig(4) + ver(1) + addr(8) + offset_bytes(2) + checksum(4) = 19
1625        assert_eq!(heap.direct_block_header_size(8), 19);
1626
1627        // With 4-byte offsets: sig(4) + ver(1) + addr(4) + offset_bytes(2) + checksum(4) = 15
1628        assert_eq!(heap.direct_block_header_size(4), 15);
1629    }
1630
1631    #[test]
1632    fn test_get_managed_object_direct_root() {
1633        // Set up a fractal heap where the root is a direct block.
1634        let offset_size: u8 = 8;
1635        let max_heap_size: u16 = 16;
1636        let starting_block_size: u64 = 256;
1637
1638        // Direct block header size: sig(4) + ver(1) + addr(8) + offset_bytes(2) + checksum(4) = 19
1639        // (no I/O filters => checksum present)
1640        let db_header_size = 19usize;
1641
1642        // Place the direct block at file offset 1000.
1643        let block_address: u64 = 1000;
1644
1645        let heap = FractalHeap {
1646            heap_id_len: 8,
1647            io_filters_len: 0,
1648            flags: 0x02,
1649            max_managed_object_size: 128,
1650            next_huge_id: 0,
1651            btree_huge_objects_address: u64::MAX,
1652            free_space_managed_address: 0,
1653            managed_space_amount: starting_block_size,
1654            managed_alloc_amount: starting_block_size,
1655            managed_iter_offset: 0,
1656            managed_objects_count: 1,
1657            huge_objects_size: 0,
1658            huge_objects_count: 0,
1659            tiny_objects_size: 0,
1660            tiny_objects_count: 0,
1661            table_width: 4,
1662            starting_block_size,
1663            max_direct_block_size: 4096,
1664            max_heap_size,
1665            starting_row_root_indirect: 0,
1666            root_block_address: block_address,
1667            current_rows_in_root_indirect: 0,
1668            io_filter_size: None,
1669            io_filter_mask: None,
1670            io_filter_info: Vec::new(),
1671        };
1672
1673        // Build file data with the direct block.
1674        let file_size = block_address as usize + starting_block_size as usize + 100;
1675        let mut file_data = vec![0u8; file_size];
1676
1677        // Write direct block header at block_address.
1678        let ba = block_address as usize;
1679        file_data[ba..ba + 4].copy_from_slice(b"FHDB");
1680        file_data[ba + 4] = 0; // version
1681                               // heap header address (8 bytes) — doesn't matter for this test
1682                               // block offset (2 bytes) — 0
1683
1684        // Write object data at its direct-block offset, after the block header.
1685        let obj_data = b"test object data";
1686        let obj_start = ba + db_header_size;
1687        file_data[obj_start..obj_start + obj_data.len()].copy_from_slice(obj_data);
1688        let checksum_pos = ba + db_header_size - 4;
1689        let mut checksum_data = file_data[ba..ba + starting_block_size as usize].to_vec();
1690        checksum_data[checksum_pos - ba..checksum_pos - ba + 4].fill(0);
1691        let checksum = jenkins_lookup3(&checksum_data);
1692        file_data[checksum_pos..checksum_pos + 4].copy_from_slice(&checksum.to_le_bytes());
1693
1694        // Build heap ID for managed object at its direct-block offset, length=16.
1695        // Type nibble = 0, offset = direct block header size (16 bits), length = 16.
1696        let heap_id = [0x00, db_header_size as u8, 0x00, 0x10];
1697
1698        let result = heap
1699            .get_managed_object(&heap_id, &file_data, offset_size, 8)
1700            .unwrap();
1701        assert_eq!(result, obj_data);
1702    }
1703
1704    #[test]
1705    fn test_get_object_storage_cached_reads_direct_block_once() {
1706        let offset_size: u8 = 8;
1707        let max_heap_size: u16 = 16;
1708        let starting_block_size: u64 = 256;
1709        let db_header_size = 19usize;
1710        let block_address: u64 = 1000;
1711
1712        let heap = FractalHeap {
1713            heap_id_len: 8,
1714            io_filters_len: 0,
1715            flags: 0x02,
1716            max_managed_object_size: 128,
1717            next_huge_id: 0,
1718            btree_huge_objects_address: u64::MAX,
1719            free_space_managed_address: 0,
1720            managed_space_amount: starting_block_size,
1721            managed_alloc_amount: starting_block_size,
1722            managed_iter_offset: 0,
1723            managed_objects_count: 2,
1724            huge_objects_size: 0,
1725            huge_objects_count: 0,
1726            tiny_objects_size: 0,
1727            tiny_objects_count: 0,
1728            table_width: 4,
1729            starting_block_size,
1730            max_direct_block_size: 4096,
1731            max_heap_size,
1732            starting_row_root_indirect: 0,
1733            root_block_address: block_address,
1734            current_rows_in_root_indirect: 0,
1735            io_filter_size: None,
1736            io_filter_mask: None,
1737            io_filter_info: Vec::new(),
1738        };
1739
1740        let file_size = block_address as usize + starting_block_size as usize;
1741        let mut file_data = vec![0u8; file_size];
1742        let ba = block_address as usize;
1743        file_data[ba..ba + 4].copy_from_slice(b"FHDB");
1744        file_data[ba + 4] = 0;
1745
1746        let obj1_offset = db_header_size;
1747        let obj2_offset = db_header_size + 16;
1748        file_data[ba + obj1_offset..ba + obj1_offset + 4].copy_from_slice(b"one!");
1749        file_data[ba + obj2_offset..ba + obj2_offset + 4].copy_from_slice(b"two!");
1750
1751        let checksum_pos = ba + db_header_size - 4;
1752        let mut checksum_data = file_data[ba..ba + starting_block_size as usize].to_vec();
1753        checksum_data[checksum_pos - ba..checksum_pos - ba + 4].fill(0);
1754        let checksum = jenkins_lookup3(&checksum_data);
1755        file_data[checksum_pos..checksum_pos + 4].copy_from_slice(&checksum.to_le_bytes());
1756
1757        let reads = Arc::new(Mutex::new(Vec::new()));
1758        let storage = CountingStorage {
1759            data: file_data,
1760            reads: reads.clone(),
1761        };
1762        let mut cache = FractalHeapDirectBlockCache::default();
1763        let heap_id1 = [0x00, obj1_offset as u8, 0x00, 0x04];
1764        let heap_id2 = [0x00, obj2_offset as u8, 0x00, 0x04];
1765
1766        assert_eq!(
1767            heap.get_object_storage_cached(&heap_id1, &storage, offset_size, 8, &mut cache)
1768                .unwrap(),
1769            b"one!"
1770        );
1771        assert_eq!(
1772            heap.get_object_storage_cached(&heap_id2, &storage, offset_size, 8, &mut cache)
1773                .unwrap(),
1774            b"two!"
1775        );
1776
1777        let direct_block_reads = reads
1778            .lock()
1779            .unwrap()
1780            .iter()
1781            .filter(|&&(offset, len)| {
1782                offset == block_address && len == starting_block_size as usize
1783            })
1784            .count();
1785        assert_eq!(direct_block_reads, 1);
1786    }
1787}