Skip to main content

hdf5_reader/
fractal_heap.rs

1//! HDF5 Fractal Heap (FRHP).
2//!
3//! Fractal heaps are the storage mechanism for link messages and attribute
4//! messages in new-style (v2) groups and datasets. They use a doubling-table
5//! scheme with direct and indirect blocks. Objects are addressed by a
6//! heap ID that encodes the offset and length within the heap.
7//!
8//! This module parses the heap header and provides object extraction for
9//! managed, tiny, and unfiltered huge object IDs.
10
11use std::collections::HashMap;
12use std::sync::Arc;
13
14use crate::checksum::jenkins_lookup3;
15use crate::error::{Error, Result};
16use crate::filters::{self, FilterRegistry};
17use crate::io::Cursor;
18use crate::messages::filter_pipeline::FilterPipelineMessage;
19use crate::storage::Storage;
20
21/// Signature bytes for a fractal heap header: ASCII `FRHP`.
22const FRHP_SIGNATURE: [u8; 4] = *b"FRHP";
23
24/// Signature bytes for a direct block: ASCII `FHDB`.
25const _FHDB_SIGNATURE: [u8; 4] = *b"FHDB";
26
27/// Signature bytes for an indirect block: ASCII `FHIB`.
28const FHIB_SIGNATURE: [u8; 4] = *b"FHIB";
29
30/// Parsed fractal heap header.
31#[derive(Debug, Clone)]
32pub struct FractalHeap {
33    /// Size in bytes of heap IDs used to reference objects.
34    pub heap_id_len: u16,
35    /// Size in bytes of I/O filter info (0 if none).
36    pub io_filters_len: u16,
37    /// Heap status flags.
38    pub flags: u8,
39    /// Maximum size of a managed object (larger objects become "huge").
40    pub max_managed_object_size: u64,
41    /// Next huge object ID to assign.
42    pub next_huge_id: u64,
43    /// Address of the B-tree v2 used for huge objects.
44    pub btree_huge_objects_address: u64,
45    /// Address of the free-space manager for managed blocks.
46    pub free_space_managed_address: u64,
47    /// Total managed space in bytes.
48    pub managed_space_amount: u64,
49    /// Total managed allocated space in bytes.
50    pub managed_alloc_amount: u64,
51    /// Iterator offset for managed free-space.
52    pub managed_iter_offset: u64,
53    /// Number of managed objects.
54    pub managed_objects_count: u64,
55    /// Total size of huge objects in bytes.
56    pub huge_objects_size: u64,
57    /// Number of huge objects.
58    pub huge_objects_count: u64,
59    /// Total size of tiny objects in bytes.
60    pub tiny_objects_size: u64,
61    /// Number of tiny objects.
62    pub tiny_objects_count: u64,
63    /// Width of the doubling table (number of direct blocks per row).
64    pub table_width: u16,
65    /// Size in bytes of the starting (smallest) direct block.
66    pub starting_block_size: u64,
67    /// Maximum direct block size before switching to indirect blocks.
68    pub max_direct_block_size: u64,
69    /// Log2 of the maximum managed heap size (used for heap ID encoding).
70    pub max_heap_size: u16,
71    /// Starting row of the root indirect block (for doubling table).
72    pub starting_row_root_indirect: u16,
73    /// Address of the root block (direct or indirect).
74    pub root_block_address: u64,
75    /// Current number of rows in the root indirect block.
76    pub current_rows_in_root_indirect: u16,
77    /// Filtered root direct block size (present only when io_filters_len > 0).
78    pub io_filter_size: Option<u64>,
79    /// Filter mask for root direct block (present only when io_filters_len > 0).
80    pub io_filter_mask: Option<u32>,
81    /// Encoded filter pipeline for heap blocks/huge objects.
82    pub io_filter_info: Vec<u8>,
83}
84
85/// Cache of verified direct blocks for repeated object lookups in one heap.
86#[derive(Debug, Default)]
87pub struct FractalHeapDirectBlockCache {
88    blocks: HashMap<DirectBlockCacheKey, Arc<Vec<u8>>>,
89}
90
91type DirectBlockCacheKey = (u64, u64, Option<u64>, u32);
92
93#[derive(Debug, Clone, Copy)]
94struct DirectBlockLocation {
95    address: u64,
96    block_offset_in_heap: u64,
97    block_size: u64,
98    filtered_size: Option<u64>,
99    filter_mask: u32,
100}
101
102#[derive(Debug, Clone, Copy)]
103struct HugeObjectLocation {
104    address: u64,
105    disk_length: u64,
106    filter_mask: u32,
107    memory_length: Option<u64>,
108}
109
110impl FractalHeapDirectBlockCache {
111    fn get_verified_block_storage(
112        &mut self,
113        heap: &FractalHeap,
114        location: DirectBlockLocation,
115        storage: &dyn Storage,
116        offset_size: u8,
117        filter_registry: Option<&FilterRegistry>,
118    ) -> Result<Arc<Vec<u8>>> {
119        let key = (
120            location.address,
121            location.block_size,
122            location.filtered_size,
123            location.filter_mask,
124        );
125        if let Some(block) = self.blocks.get(&key) {
126            return Ok(block.clone());
127        }
128
129        let block =
130            heap.load_direct_block_storage(location, storage, offset_size, filter_registry)?;
131        heap.verify_direct_block_bytes(&block, offset_size)?;
132        let block = Arc::new(block);
133        self.blocks.insert(key, block.clone());
134        Ok(block)
135    }
136}
137
138impl FractalHeap {
139    /// Parse a fractal heap header at the current cursor position.
140    ///
141    /// Format:
142    /// - Signature: `FRHP` (4 bytes)
143    /// - Version: 0 (1 byte)
144    /// - Heap ID length (u16 LE)
145    /// - I/O filters encoded length (u16 LE)
146    /// - Flags (u8)
147    /// - Max managed object size (u32 LE)
148    /// - Next huge object ID (`length_size` bytes)
149    /// - B-tree address for huge objects (`offset_size` bytes)
150    /// - Managed free-space amount (`length_size` bytes)
151    /// - Free-space manager address (`offset_size` bytes)
152    /// - Managed space amount (`length_size` bytes)
153    /// - Managed alloc amount (`length_size` bytes)
154    /// - Managed free-space iterator offset (`length_size` bytes)
155    /// - Managed objects count (`length_size` bytes)
156    /// - Huge objects size (`length_size` bytes)
157    /// - Huge objects count (`length_size` bytes)
158    /// - Tiny objects size (`length_size` bytes)
159    /// - Tiny objects count (`length_size` bytes)
160    /// - Table width (u16 LE)
161    /// - Starting block size (`length_size` bytes)
162    /// - Maximum direct block size (`length_size` bytes)
163    /// - Max heap size (u16 LE)
164    /// - Starting row of root indirect block (u16 LE)
165    /// - Root block address (`offset_size` bytes)
166    /// - Current rows in root indirect block (u16 LE)
167    /// - If io_filters_len > 0: filtered root direct block size (`length_size`), filter mask (u32 LE)
168    /// - Checksum (u32 LE)
169    pub fn parse(cursor: &mut Cursor, offset_size: u8, length_size: u8) -> Result<Self> {
170        let start = cursor.position();
171
172        let sig = cursor.read_bytes(4)?;
173        if sig != FRHP_SIGNATURE {
174            return Err(Error::InvalidFractalHeapSignature);
175        }
176
177        let version = cursor.read_u8()?;
178        if version != 0 {
179            return Err(Error::UnsupportedFractalHeapVersion(version));
180        }
181
182        let heap_id_len = cursor.read_u16_le()?;
183        let io_filters_len = cursor.read_u16_le()?;
184        let flags = cursor.read_u8()?;
185
186        let max_managed_object_size = cursor.read_u32_le()? as u64;
187        let next_huge_id = cursor.read_length(length_size)?;
188        let btree_huge_objects_address = cursor.read_offset(offset_size)?;
189        let _managed_free_space_amount = cursor.read_length(length_size)?;
190        let free_space_managed_address = cursor.read_offset(offset_size)?;
191        let managed_space_amount = cursor.read_length(length_size)?;
192        let managed_alloc_amount = cursor.read_length(length_size)?;
193        let managed_iter_offset = cursor.read_length(length_size)?;
194        let managed_objects_count = cursor.read_length(length_size)?;
195        let huge_objects_size = cursor.read_length(length_size)?;
196        let huge_objects_count = cursor.read_length(length_size)?;
197        let tiny_objects_size = cursor.read_length(length_size)?;
198        let tiny_objects_count = cursor.read_length(length_size)?;
199
200        let table_width = cursor.read_u16_le()?;
201        let starting_block_size = cursor.read_length(length_size)?;
202        let max_direct_block_size = cursor.read_length(length_size)?;
203        let max_heap_size = cursor.read_u16_le()?;
204        let starting_row_root_indirect = cursor.read_u16_le()?;
205        let root_block_address = cursor.read_offset(offset_size)?;
206        let current_rows_in_root_indirect = cursor.read_u16_le()?;
207
208        let (io_filter_size, io_filter_mask) = if io_filters_len > 0 {
209            let size = cursor.read_length(length_size)?;
210            let mask = cursor.read_u32_le()?;
211            (Some(size), Some(mask))
212        } else {
213            (None, None)
214        };
215        let io_filter_info = if io_filters_len > 0 {
216            cursor.read_bytes(usize::from(io_filters_len))?.to_vec()
217        } else {
218            Vec::new()
219        };
220
221        // Verify checksum.
222        let checksum_end = cursor.position();
223        let stored_checksum = cursor.read_u32_le()?;
224        let computed = jenkins_lookup3(&cursor.data()[start as usize..checksum_end as usize]);
225        if computed != stored_checksum {
226            return Err(Error::ChecksumMismatch {
227                expected: stored_checksum,
228                actual: computed,
229            });
230        }
231
232        Ok(FractalHeap {
233            heap_id_len,
234            io_filters_len,
235            flags,
236            max_managed_object_size,
237            next_huge_id,
238            btree_huge_objects_address,
239            free_space_managed_address,
240            managed_space_amount,
241            managed_alloc_amount,
242            managed_iter_offset,
243            managed_objects_count,
244            huge_objects_size,
245            huge_objects_count,
246            tiny_objects_size,
247            tiny_objects_count,
248            table_width,
249            starting_block_size,
250            max_direct_block_size,
251            max_heap_size,
252            starting_row_root_indirect,
253            root_block_address,
254            current_rows_in_root_indirect,
255            io_filter_size,
256            io_filter_mask,
257            io_filter_info,
258        })
259    }
260
261    /// Parse a fractal heap header from random-access storage.
262    pub fn parse_at_storage(
263        storage: &dyn Storage,
264        address: u64,
265        offset_size: u8,
266        length_size: u8,
267    ) -> Result<Self> {
268        let max_header_len = 256usize;
269        let available = storage.len().saturating_sub(address);
270        let len = usize::try_from(available.min(max_header_len as u64)).map_err(|_| {
271            Error::InvalidData("fractal heap header exceeds platform usize capacity".into())
272        })?;
273        let bytes = storage.read_range(address, len)?;
274        let mut cursor = Cursor::new(bytes.as_ref());
275        Self::parse(&mut cursor, offset_size, length_size)
276    }
277
278    /// Extract any fractal heap object given a heap ID.
279    pub fn get_object(
280        &self,
281        heap_id: &[u8],
282        file_data: &[u8],
283        offset_size: u8,
284        length_size: u8,
285    ) -> Result<Vec<u8>> {
286        self.get_object_with_registry(heap_id, file_data, offset_size, length_size, None)
287    }
288
289    /// Extract any fractal heap object using a caller-provided filter registry
290    /// for filtered managed and huge objects.
291    pub fn get_object_with_registry(
292        &self,
293        heap_id: &[u8],
294        file_data: &[u8],
295        offset_size: u8,
296        length_size: u8,
297        filter_registry: Option<&FilterRegistry>,
298    ) -> Result<Vec<u8>> {
299        match self.heap_id_kind(heap_id)? {
300            HeapIdKind::Managed => self.get_managed_object_impl(
301                heap_id,
302                file_data,
303                offset_size,
304                length_size,
305                filter_registry,
306            ),
307            HeapIdKind::Huge => self.get_huge_object(
308                heap_id,
309                file_data,
310                offset_size,
311                length_size,
312                filter_registry,
313            ),
314            HeapIdKind::Tiny => self.decode_tiny_object(heap_id),
315        }
316    }
317
318    /// Extract any fractal heap object from random-access storage.
319    pub fn get_object_storage(
320        &self,
321        heap_id: &[u8],
322        storage: &dyn Storage,
323        offset_size: u8,
324        length_size: u8,
325    ) -> Result<Vec<u8>> {
326        self.get_object_storage_with_registry(heap_id, storage, offset_size, length_size, None)
327    }
328
329    /// Extract any fractal heap object from random-access storage using a
330    /// caller-provided filter registry for filtered managed and huge objects.
331    pub fn get_object_storage_with_registry(
332        &self,
333        heap_id: &[u8],
334        storage: &dyn Storage,
335        offset_size: u8,
336        length_size: u8,
337        filter_registry: Option<&FilterRegistry>,
338    ) -> Result<Vec<u8>> {
339        let mut cache = FractalHeapDirectBlockCache::default();
340        self.get_object_storage_cached_with_registry(
341            heap_id,
342            storage,
343            offset_size,
344            length_size,
345            &mut cache,
346            filter_registry,
347        )
348    }
349
350    /// Extract any fractal heap object from storage, reusing verified direct
351    /// blocks across managed object lookups.
352    pub fn get_object_storage_cached(
353        &self,
354        heap_id: &[u8],
355        storage: &dyn Storage,
356        offset_size: u8,
357        length_size: u8,
358        direct_block_cache: &mut FractalHeapDirectBlockCache,
359    ) -> Result<Vec<u8>> {
360        self.get_object_storage_cached_with_registry(
361            heap_id,
362            storage,
363            offset_size,
364            length_size,
365            direct_block_cache,
366            None,
367        )
368    }
369
370    /// Extract any fractal heap object from storage while reusing verified
371    /// direct blocks and a caller-provided filter registry.
372    pub fn get_object_storage_cached_with_registry(
373        &self,
374        heap_id: &[u8],
375        storage: &dyn Storage,
376        offset_size: u8,
377        length_size: u8,
378        direct_block_cache: &mut FractalHeapDirectBlockCache,
379        filter_registry: Option<&FilterRegistry>,
380    ) -> Result<Vec<u8>> {
381        match self.heap_id_kind(heap_id)? {
382            HeapIdKind::Managed => self.get_managed_object_storage_cached_impl(
383                heap_id,
384                storage,
385                offset_size,
386                length_size,
387                direct_block_cache,
388                filter_registry,
389            ),
390            HeapIdKind::Huge => self.get_huge_object_storage(
391                heap_id,
392                storage,
393                offset_size,
394                length_size,
395                filter_registry,
396            ),
397            HeapIdKind::Tiny => self.decode_tiny_object(heap_id),
398        }
399    }
400
401    /// Extract a fractal heap object. Kept for existing callers; now also
402    /// handles tiny and huge IDs.
403    pub fn get_managed_object(
404        &self,
405        heap_id: &[u8],
406        file_data: &[u8],
407        offset_size: u8,
408        length_size: u8,
409    ) -> Result<Vec<u8>> {
410        self.get_object_with_registry(heap_id, file_data, offset_size, length_size, None)
411    }
412
413    /// Extract a fractal heap object from random-access storage. Kept for
414    /// existing callers; now also handles tiny and huge IDs.
415    pub fn get_managed_object_storage(
416        &self,
417        heap_id: &[u8],
418        storage: &dyn Storage,
419        offset_size: u8,
420        length_size: u8,
421    ) -> Result<Vec<u8>> {
422        self.get_object_storage_with_registry(heap_id, storage, offset_size, length_size, None)
423    }
424
425    fn get_managed_object_impl(
426        &self,
427        heap_id: &[u8],
428        file_data: &[u8],
429        offset_size: u8,
430        length_size: u8,
431        filter_registry: Option<&FilterRegistry>,
432    ) -> Result<Vec<u8>> {
433        let (heap_offset, obj_length) = self.decode_managed_heap_id(heap_id)?;
434
435        if obj_length == 0 {
436            return Ok(Vec::new());
437        }
438
439        let location = self.find_direct_block(heap_offset, file_data, offset_size, length_size)?;
440        let block = self.load_direct_block(file_data, location, filter_registry)?;
441        self.verify_direct_block_bytes(&block, offset_size)?;
442        let offset_in_block = heap_offset
443            .checked_sub(location.block_offset_in_heap)
444            .ok_or_else(|| {
445                Error::InvalidData("fractal heap object precedes direct block".into())
446            })?;
447        let data_start = usize::try_from(offset_in_block)
448            .map_err(|_| Error::InvalidData("fractal heap object offset exceeds usize".into()))?;
449        let len = usize::try_from(obj_length).map_err(|_| {
450            Error::InvalidData("fractal heap object exceeds platform usize capacity".into())
451        })?;
452        let data_end = data_start
453            .checked_add(len)
454            .ok_or(Error::OffsetOutOfBounds(location.address))?;
455
456        if data_end > block.len() {
457            return Err(Error::UnexpectedEof {
458                offset: location
459                    .address
460                    .checked_add(offset_in_block)
461                    .ok_or(Error::OffsetOutOfBounds(location.address))?,
462                needed: obj_length,
463                available: block.len().saturating_sub(data_start) as u64,
464            });
465        }
466
467        Ok(block[data_start..data_end].to_vec())
468    }
469
470    fn get_managed_object_storage_cached_impl(
471        &self,
472        heap_id: &[u8],
473        storage: &dyn Storage,
474        offset_size: u8,
475        length_size: u8,
476        direct_block_cache: &mut FractalHeapDirectBlockCache,
477        filter_registry: Option<&FilterRegistry>,
478    ) -> Result<Vec<u8>> {
479        let (heap_offset, obj_length) = self.decode_managed_heap_id(heap_id)?;
480        if obj_length == 0 {
481            return Ok(Vec::new());
482        }
483
484        let location =
485            self.find_direct_block_storage(heap_offset, storage, offset_size, length_size)?;
486        let block = direct_block_cache.get_verified_block_storage(
487            self,
488            location,
489            storage,
490            offset_size,
491            filter_registry,
492        )?;
493        let offset_in_block = heap_offset
494            .checked_sub(location.block_offset_in_heap)
495            .ok_or_else(|| {
496                Error::InvalidData("fractal heap object precedes direct block".into())
497            })?;
498        let start = usize::try_from(offset_in_block)
499            .map_err(|_| Error::InvalidData("fractal heap object offset exceeds usize".into()))?;
500        let len = usize::try_from(obj_length).map_err(|_| {
501            Error::InvalidData("fractal heap object exceeds platform usize capacity".into())
502        })?;
503        let end = start
504            .checked_add(len)
505            .ok_or(Error::OffsetOutOfBounds(location.address))?;
506        if end > block.len() {
507            let data_start = location
508                .address
509                .checked_add(offset_in_block)
510                .ok_or(Error::OffsetOutOfBounds(location.address))?;
511            return Err(Error::UnexpectedEof {
512                offset: data_start,
513                needed: obj_length,
514                available: block.len().saturating_sub(start) as u64,
515            });
516        }
517        Ok(block[start..end].to_vec())
518    }
519
520    fn get_huge_object(
521        &self,
522        heap_id: &[u8],
523        file_data: &[u8],
524        offset_size: u8,
525        length_size: u8,
526        filter_registry: Option<&FilterRegistry>,
527    ) -> Result<Vec<u8>> {
528        let location = self.resolve_huge_object_location(
529            heap_id,
530            Some(file_data),
531            None,
532            offset_size,
533            length_size,
534        )?;
535        let start = usize::try_from(location.address)
536            .map_err(|_| Error::OffsetOutOfBounds(location.address))?;
537        let len = usize::try_from(location.disk_length).map_err(|_| {
538            Error::InvalidData("huge fractal heap object exceeds platform usize capacity".into())
539        })?;
540        let end = start
541            .checked_add(len)
542            .ok_or(Error::OffsetOutOfBounds(location.address))?;
543        if end > file_data.len() {
544            return Err(Error::UnexpectedEof {
545                offset: location.address,
546                needed: location.disk_length,
547                available: file_data.len().saturating_sub(start) as u64,
548            });
549        }
550        self.decode_huge_object_bytes(&file_data[start..end], location, filter_registry)
551    }
552
553    fn get_huge_object_storage(
554        &self,
555        heap_id: &[u8],
556        storage: &dyn Storage,
557        offset_size: u8,
558        length_size: u8,
559        filter_registry: Option<&FilterRegistry>,
560    ) -> Result<Vec<u8>> {
561        let location = self.resolve_huge_object_location(
562            heap_id,
563            None,
564            Some(storage),
565            offset_size,
566            length_size,
567        )?;
568        let len = usize::try_from(location.disk_length).map_err(|_| {
569            Error::InvalidData("huge fractal heap object exceeds platform usize capacity".into())
570        })?;
571        let bytes = storage.read_range(location.address, len)?;
572        self.decode_huge_object_bytes(bytes.as_ref(), location, filter_registry)
573    }
574
575    fn decode_huge_object_bytes(
576        &self,
577        bytes: &[u8],
578        location: HugeObjectLocation,
579        filter_registry: Option<&FilterRegistry>,
580    ) -> Result<Vec<u8>> {
581        if let Some(memory_length) = location.memory_length {
582            self.apply_heap_filters(
583                bytes,
584                location.filter_mask,
585                memory_length,
586                "filtered fractal heap huge object",
587                filter_registry,
588            )
589        } else {
590            Ok(bytes.to_vec())
591        }
592    }
593
594    fn load_direct_block(
595        &self,
596        file_data: &[u8],
597        location: DirectBlockLocation,
598        filter_registry: Option<&FilterRegistry>,
599    ) -> Result<Vec<u8>> {
600        let read_len = location.filtered_size.unwrap_or(location.block_size);
601        let start = usize::try_from(location.address)
602            .map_err(|_| Error::OffsetOutOfBounds(location.address))?;
603        let len = usize::try_from(read_len).map_err(|_| {
604            Error::InvalidData("fractal heap direct block size exceeds platform usize".into())
605        })?;
606        let end = start
607            .checked_add(len)
608            .ok_or(Error::OffsetOutOfBounds(location.address))?;
609        if end > file_data.len() {
610            return Err(Error::UnexpectedEof {
611                offset: location.address,
612                needed: read_len,
613                available: file_data.len().saturating_sub(start) as u64,
614            });
615        }
616        let block = if location.filtered_size.is_some() {
617            self.apply_heap_filters(
618                &file_data[start..end],
619                location.filter_mask,
620                location.block_size,
621                "filtered fractal heap direct block",
622                filter_registry,
623            )?
624        } else {
625            file_data[start..end].to_vec()
626        };
627        let expected = usize::try_from(location.block_size).map_err(|_| {
628            Error::InvalidData("fractal heap direct block size exceeds platform usize".into())
629        })?;
630        if block.len() != expected {
631            return Err(Error::InvalidData(format!(
632                "fractal heap direct block has {} bytes, expected {} bytes",
633                block.len(),
634                expected
635            )));
636        }
637        Ok(block)
638    }
639
640    fn load_direct_block_storage(
641        &self,
642        location: DirectBlockLocation,
643        storage: &dyn Storage,
644        _offset_size: u8,
645        filter_registry: Option<&FilterRegistry>,
646    ) -> Result<Vec<u8>> {
647        let read_len = location.filtered_size.unwrap_or(location.block_size);
648        let len = usize::try_from(read_len).map_err(|_| {
649            Error::InvalidData("fractal heap direct block size exceeds platform usize".into())
650        })?;
651        let bytes = storage.read_range(location.address, len)?;
652        let block = if location.filtered_size.is_some() {
653            self.apply_heap_filters(
654                bytes.as_ref(),
655                location.filter_mask,
656                location.block_size,
657                "filtered fractal heap direct block",
658                filter_registry,
659            )?
660        } else {
661            bytes.to_vec()
662        };
663        let expected = usize::try_from(location.block_size).map_err(|_| {
664            Error::InvalidData("fractal heap direct block size exceeds platform usize".into())
665        })?;
666        if block.len() != expected {
667            return Err(Error::InvalidData(format!(
668                "fractal heap direct block has {} bytes, expected {} bytes",
669                block.len(),
670                expected
671            )));
672        }
673        Ok(block)
674    }
675
676    fn apply_heap_filters(
677        &self,
678        bytes: &[u8],
679        filter_mask: u32,
680        expected_len: u64,
681        context: &str,
682        filter_registry: Option<&FilterRegistry>,
683    ) -> Result<Vec<u8>> {
684        let pipeline = self.filter_pipeline()?;
685        let expected = usize::try_from(expected_len).map_err(|_| {
686            Error::InvalidData(format!("{context} size exceeds platform usize capacity"))
687        })?;
688        let filter_output_limit = expected.checked_add(1).ok_or_else(|| {
689            Error::InvalidData(format!(
690                "{context} filter output limit exceeds platform usize capacity"
691            ))
692        })?;
693        let decoded = filters::apply_pipeline_with_limit(
694            bytes,
695            &pipeline.filters,
696            filter_mask,
697            1,
698            filter_registry,
699            Some(filter_output_limit),
700        )?;
701        if decoded.len() != expected {
702            return Err(Error::InvalidData(format!(
703                "{context} decoded to {} bytes, expected {} bytes",
704                decoded.len(),
705                expected
706            )));
707        }
708        Ok(decoded)
709    }
710
711    fn filter_pipeline(&self) -> Result<FilterPipelineMessage> {
712        if self.io_filters_len == 0 {
713            return Err(Error::InvalidData(
714                "fractal heap object is marked filtered but the heap has no filter pipeline".into(),
715            ));
716        }
717        let mut cursor = Cursor::new(&self.io_filter_info);
718        crate::messages::filter_pipeline::parse(&mut cursor, 0, 0, self.io_filter_info.len())
719    }
720
721    fn resolve_huge_object_location(
722        &self,
723        heap_id: &[u8],
724        file_data: Option<&[u8]>,
725        storage: Option<&dyn Storage>,
726        offset_size: u8,
727        length_size: u8,
728    ) -> Result<HugeObjectLocation> {
729        let direct_unfiltered_len = 1 + usize::from(offset_size) + usize::from(length_size);
730        let direct_filtered_len = direct_unfiltered_len + 4 + usize::from(length_size);
731
732        if self.io_filters_len > 0 && heap_id.len() >= direct_filtered_len {
733            let mut cursor = Cursor::new(&heap_id[1..]);
734            let address = cursor.read_offset(offset_size)?;
735            let disk_length = cursor.read_length(length_size)?;
736            let filter_mask = cursor.read_u32_le()?;
737            let memory_length = cursor.read_length(length_size)?;
738            return Ok(HugeObjectLocation {
739                address,
740                disk_length,
741                filter_mask,
742                memory_length: Some(memory_length),
743            });
744        }
745
746        if self.io_filters_len == 0 && heap_id.len() >= direct_unfiltered_len {
747            let mut cursor = Cursor::new(&heap_id[1..]);
748            let address = cursor.read_offset(offset_size)?;
749            let disk_length = cursor.read_length(length_size)?;
750            return Ok(HugeObjectLocation {
751                address,
752                disk_length,
753                filter_mask: 0,
754                memory_length: None,
755            });
756        }
757
758        if heap_id.len() < 1 + usize::from(length_size) {
759            return Err(Error::InvalidData(
760                "huge fractal heap ID is too short".into(),
761            ));
762        }
763        if Cursor::is_undefined_offset(self.btree_huge_objects_address, offset_size) {
764            return Err(Error::UndefinedAddress);
765        }
766
767        let mut key_cursor = Cursor::new(&heap_id[1..]);
768        let object_id = key_cursor.read_length(length_size)?;
769
770        let header = if let Some(storage) = storage {
771            crate::btree_v2::BTreeV2Header::parse_at_storage(
772                storage,
773                self.btree_huge_objects_address,
774                offset_size,
775                length_size,
776            )?
777        } else {
778            let data = file_data.expect("file_data must exist when storage is None");
779            let mut cursor = Cursor::new(data);
780            cursor.set_position(self.btree_huge_objects_address);
781            crate::btree_v2::BTreeV2Header::parse(&mut cursor, offset_size, length_size)?
782        };
783
784        let records = if let Some(storage) = storage {
785            crate::btree_v2::collect_btree_v2_records_storage(
786                storage,
787                &header,
788                offset_size,
789                length_size,
790                None,
791                &[],
792                None,
793            )?
794        } else {
795            crate::btree_v2::collect_btree_v2_records(
796                file_data.expect("file_data must exist when storage is None"),
797                &header,
798                offset_size,
799                length_size,
800                None,
801                &[],
802                None,
803            )?
804        };
805
806        for record in records {
807            match record {
808                crate::btree_v2::BTreeV2Record::HugeIndirectNonFiltered {
809                    address,
810                    length,
811                    object_id: record_id,
812                } if record_id == object_id => {
813                    return Ok(HugeObjectLocation {
814                        address,
815                        disk_length: length,
816                        filter_mask: 0,
817                        memory_length: None,
818                    })
819                }
820                crate::btree_v2::BTreeV2Record::HugeIndirectFiltered {
821                    object_id: record_id,
822                    address,
823                    filtered_length,
824                    filter_mask,
825                    memory_length,
826                } if record_id == object_id => {
827                    return Ok(HugeObjectLocation {
828                        address,
829                        disk_length: filtered_length,
830                        filter_mask,
831                        memory_length: Some(memory_length),
832                    });
833                }
834                _ => {}
835            }
836        }
837
838        Err(Error::InvalidData(format!(
839            "huge fractal heap object ID {} not found",
840            object_id
841        )))
842    }
843
844    fn decode_tiny_object(&self, heap_id: &[u8]) -> Result<Vec<u8>> {
845        let extended = self.heap_id_len > 18;
846        let (data_start, len) = if extended {
847            if heap_id.len() < 2 {
848                return Err(Error::InvalidData(
849                    "extended tiny heap ID is too short".into(),
850                ));
851            }
852            let encoded = (u16::from(heap_id[0] & 0x0F) << 8) | u16::from(heap_id[1]);
853            (2usize, usize::from(encoded) + 1)
854        } else {
855            (1usize, usize::from(heap_id[0] & 0x0F) + 1)
856        };
857        let data_end = data_start
858            .checked_add(len)
859            .ok_or_else(|| Error::InvalidData("tiny heap object length overflows".into()))?;
860        if data_end > heap_id.len() {
861            return Err(Error::InvalidData(format!(
862                "tiny heap object needs {} bytes, heap ID has {}",
863                data_end,
864                heap_id.len()
865            )));
866        }
867        Ok(heap_id[data_start..data_end].to_vec())
868    }
869
870    fn heap_id_kind(&self, heap_id: &[u8]) -> Result<HeapIdKind> {
871        if heap_id.is_empty() {
872            return Err(Error::InvalidData("empty fractal heap ID".into()));
873        }
874        let version = heap_id[0] >> 6;
875        if version != 0 {
876            return Err(Error::InvalidData(format!(
877                "unsupported fractal heap ID version {}",
878                version
879            )));
880        }
881        match (heap_id[0] >> 4) & 0x03 {
882            0 => Ok(HeapIdKind::Managed),
883            1 => Ok(HeapIdKind::Huge),
884            2 => Ok(HeapIdKind::Tiny),
885            other => Err(Error::InvalidData(format!(
886                "unknown fractal heap ID type {}",
887                other
888            ))),
889        }
890    }
891
892    fn decode_managed_heap_id(&self, heap_id: &[u8]) -> Result<(u64, u64)> {
893        let (offset_bytes, length_bytes) = self.managed_id_widths();
894        let needed = 1 + offset_bytes + length_bytes;
895        if heap_id.len() < needed {
896            return Err(Error::InvalidData(format!(
897                "managed fractal heap ID too short: need {} bytes, have {}",
898                needed,
899                heap_id.len()
900            )));
901        }
902        let mut cursor = Cursor::new(&heap_id[1..needed]);
903        let heap_offset = cursor.read_uvar(offset_bytes)?;
904        let obj_length = cursor.read_uvar(length_bytes)?;
905        Ok((heap_offset, obj_length))
906    }
907
908    fn managed_id_widths(&self) -> (usize, usize) {
909        let offset_bytes = usize::from(self.max_heap_size).div_ceil(8).max(1);
910        let max_len = self.max_direct_block_size.min(self.max_managed_object_size);
911        let length_bytes = bytes_needed_to_encode(max_len).max(1);
912        (offset_bytes, length_bytes)
913    }
914
915    /// Find the direct block containing a given heap offset.
916    ///
917    /// Returns (block_file_address, block_offset_within_heap, block_size).
918    fn find_direct_block(
919        &self,
920        heap_offset: u64,
921        file_data: &[u8],
922        offset_size: u8,
923        length_size: u8,
924    ) -> Result<DirectBlockLocation> {
925        if Cursor::is_undefined_offset(self.root_block_address, offset_size) {
926            return Err(Error::UndefinedAddress);
927        }
928
929        if self.current_rows_in_root_indirect == 0 {
930            // Root block is a direct block.
931            // The entire managed space is in this one block.
932            Ok(DirectBlockLocation {
933                address: self.root_block_address,
934                block_offset_in_heap: 0,
935                block_size: self.starting_block_size,
936                filtered_size: self.io_filter_size,
937                filter_mask: self.io_filter_mask.unwrap_or(0),
938            })
939        } else {
940            // Root block is an indirect block — traverse the doubling table.
941            self.find_direct_block_via_indirect(
942                self.root_block_address,
943                heap_offset,
944                file_data,
945                offset_size,
946                length_size,
947                self.current_rows_in_root_indirect,
948            )
949        }
950    }
951
952    fn find_direct_block_storage(
953        &self,
954        heap_offset: u64,
955        storage: &dyn Storage,
956        offset_size: u8,
957        length_size: u8,
958    ) -> Result<DirectBlockLocation> {
959        if Cursor::is_undefined_offset(self.root_block_address, offset_size) {
960            return Err(Error::UndefinedAddress);
961        }
962
963        if self.current_rows_in_root_indirect == 0 {
964            Ok(DirectBlockLocation {
965                address: self.root_block_address,
966                block_offset_in_heap: 0,
967                block_size: self.starting_block_size,
968                filtered_size: self.io_filter_size,
969                filter_mask: self.io_filter_mask.unwrap_or(0),
970            })
971        } else {
972            self.find_direct_block_via_indirect_storage(
973                self.root_block_address,
974                heap_offset,
975                storage,
976                offset_size,
977                length_size,
978                self.current_rows_in_root_indirect,
979            )
980        }
981    }
982
983    /// Traverse an indirect block to find the direct block for a given offset.
984    fn find_direct_block_via_indirect(
985        &self,
986        indirect_address: u64,
987        heap_offset: u64,
988        file_data: &[u8],
989        offset_size: u8,
990        length_size: u8,
991        nrows: u16,
992    ) -> Result<DirectBlockLocation> {
993        // Validate FHIB signature
994        let addr = indirect_address as usize;
995        if addr + 4 > file_data.len() {
996            return Err(Error::OffsetOutOfBounds(indirect_address));
997        }
998        if file_data[addr..addr + 4] != FHIB_SIGNATURE {
999            return Err(Error::InvalidData(format!(
1000                "expected FHIB signature at offset {:#x}, got {:?}",
1001                indirect_address,
1002                &file_data[addr..addr + 4]
1003            )));
1004        }
1005
1006        // The doubling table has `table_width` entries per row.
1007        // Row 0 and 1 have blocks of size `starting_block_size`.
1008        // Row r (for r >= 1) has blocks of size `starting_block_size * 2^(r-1)`.
1009        //
1010        // We iterate through the rows to find which block contains the
1011        // target offset, then read the block address from the indirect block.
1012
1013        let width = self.table_width as u64;
1014        let mut running_offset: u64 = 0;
1015
1016        for row in 0..nrows as u64 {
1017            let block_size = self.block_size_for_row(row);
1018            let is_direct = block_size <= self.max_direct_block_size;
1019
1020            for col in 0..width {
1021                let block_end = running_offset + block_size;
1022                if heap_offset >= running_offset && heap_offset < block_end {
1023                    // This is the block we want. Read its address from the
1024                    // indirect block.
1025                    let entry_index = row * width + col;
1026
1027                    // Indirect block layout: signature(4) + version(1) +
1028                    // heap_header_addr(offset_size) + block_offset(max_heap_size/8 rounded up)
1029                    // Then direct-block entries, optionally with filtered size
1030                    // and mask, followed by child indirect-block addresses.
1031                    let iblock_header_size =
1032                        4 + 1 + offset_size as u64 + (self.max_heap_size as u64).div_ceil(8);
1033
1034                    if is_direct {
1035                        let entry_size = self.direct_block_entry_size(offset_size, length_size);
1036                        let entry_pos =
1037                            indirect_address + iblock_header_size + entry_index * entry_size;
1038                        let entry_len = usize::try_from(entry_size).map_err(|_| {
1039                            Error::InvalidData(
1040                                "fractal heap direct entry size exceeds platform usize".into(),
1041                            )
1042                        })?;
1043                        if entry_pos as usize + entry_len > file_data.len() {
1044                            return Err(Error::OffsetOutOfBounds(entry_pos));
1045                        }
1046                        let mut cursor = Cursor::new(file_data);
1047                        cursor.set_position(entry_pos);
1048                        let block_address = cursor.read_offset(offset_size)?;
1049                        if Cursor::is_undefined_offset(block_address, offset_size) {
1050                            return Err(Error::UndefinedAddress);
1051                        }
1052                        let (filtered_size, filter_mask) = if self.io_filters_len > 0 {
1053                            (
1054                                Some(cursor.read_length(length_size)?),
1055                                cursor.read_u32_le()?,
1056                            )
1057                        } else {
1058                            (None, 0)
1059                        };
1060                        return Ok(DirectBlockLocation {
1061                            address: block_address,
1062                            block_offset_in_heap: running_offset,
1063                            block_size,
1064                            filtered_size,
1065                            filter_mask,
1066                        });
1067                    } else {
1068                        // Need to recurse into a sub-indirect block.
1069                        let direct_count =
1070                            self.max_direct_block_rows() * u64::from(self.table_width);
1071                        let indirect_index =
1072                            entry_index.checked_sub(direct_count).ok_or_else(|| {
1073                                Error::InvalidData(
1074                                    "fractal heap indirect entry precedes direct entries".into(),
1075                                )
1076                            })?;
1077                        let entry_pos = indirect_address
1078                            + iblock_header_size
1079                            + direct_count * self.direct_block_entry_size(offset_size, length_size)
1080                            + indirect_index * u64::from(offset_size);
1081                        if entry_pos as usize + offset_size as usize > file_data.len() {
1082                            return Err(Error::OffsetOutOfBounds(entry_pos));
1083                        }
1084                        let mut cursor = Cursor::new(file_data);
1085                        cursor.set_position(entry_pos);
1086                        let block_address = cursor.read_offset(offset_size)?;
1087                        if Cursor::is_undefined_offset(block_address, offset_size) {
1088                            return Err(Error::UndefinedAddress);
1089                        }
1090                        // Determine how many rows the sub-indirect has.
1091                        let sub_rows = self.rows_for_block_size(block_size);
1092                        return self.find_direct_block_via_indirect(
1093                            block_address,
1094                            heap_offset - running_offset,
1095                            file_data,
1096                            offset_size,
1097                            length_size,
1098                            sub_rows,
1099                        );
1100                    }
1101                }
1102                running_offset = block_end;
1103            }
1104        }
1105
1106        Err(Error::InvalidData(format!(
1107            "fractal heap offset {} not found in doubling table",
1108            heap_offset
1109        )))
1110    }
1111
1112    fn find_direct_block_via_indirect_storage(
1113        &self,
1114        indirect_address: u64,
1115        heap_offset: u64,
1116        storage: &dyn Storage,
1117        offset_size: u8,
1118        length_size: u8,
1119        nrows: u16,
1120    ) -> Result<DirectBlockLocation> {
1121        let sig = storage.read_range(indirect_address, 4)?;
1122        if sig.as_ref() != FHIB_SIGNATURE {
1123            return Err(Error::InvalidData(format!(
1124                "expected FHIB signature at offset {:#x}, got {:?}",
1125                indirect_address,
1126                sig.as_ref()
1127            )));
1128        }
1129
1130        let width = self.table_width as u64;
1131        let mut running_offset = 0u64;
1132
1133        for row in 0..u64::from(nrows) {
1134            let block_size = self.block_size_for_row(row);
1135            let is_direct = block_size <= self.max_direct_block_size;
1136
1137            for col in 0..width {
1138                let block_end = running_offset + block_size;
1139                if heap_offset >= running_offset && heap_offset < block_end {
1140                    let entry_index = row * width + col;
1141                    let iblock_header_size = 4
1142                        + 1
1143                        + u64::from(offset_size)
1144                        + (u64::from(self.max_heap_size)).div_ceil(8);
1145
1146                    if is_direct {
1147                        let entry_size = self.direct_block_entry_size(offset_size, length_size);
1148                        let entry_pos =
1149                            indirect_address + iblock_header_size + entry_index * entry_size;
1150                        let entry_len = usize::try_from(entry_size).map_err(|_| {
1151                            Error::InvalidData(
1152                                "fractal heap direct entry size exceeds platform usize".into(),
1153                            )
1154                        })?;
1155                        let entry = storage.read_range(entry_pos, entry_len)?;
1156                        let mut cursor = Cursor::new(entry.as_ref());
1157                        let block_address = cursor.read_offset(offset_size)?;
1158                        if Cursor::is_undefined_offset(block_address, offset_size) {
1159                            return Err(Error::UndefinedAddress);
1160                        }
1161                        let (filtered_size, filter_mask) = if self.io_filters_len > 0 {
1162                            (
1163                                Some(cursor.read_length(length_size)?),
1164                                cursor.read_u32_le()?,
1165                            )
1166                        } else {
1167                            (None, 0)
1168                        };
1169                        return Ok(DirectBlockLocation {
1170                            address: block_address,
1171                            block_offset_in_heap: running_offset,
1172                            block_size,
1173                            filtered_size,
1174                            filter_mask,
1175                        });
1176                    }
1177
1178                    let direct_count = self.max_direct_block_rows() * u64::from(self.table_width);
1179                    let indirect_index =
1180                        entry_index.checked_sub(direct_count).ok_or_else(|| {
1181                            Error::InvalidData(
1182                                "fractal heap indirect entry precedes direct entries".into(),
1183                            )
1184                        })?;
1185                    let entry_addr_pos = indirect_address
1186                        + iblock_header_size
1187                        + direct_count * self.direct_block_entry_size(offset_size, length_size)
1188                        + indirect_index * u64::from(offset_size);
1189                    let entry = storage.read_range(entry_addr_pos, usize::from(offset_size))?;
1190                    let mut cursor = Cursor::new(entry.as_ref());
1191                    let block_address = cursor.read_offset(offset_size)?;
1192                    if Cursor::is_undefined_offset(block_address, offset_size) {
1193                        return Err(Error::UndefinedAddress);
1194                    }
1195
1196                    let sub_rows = self.rows_for_block_size(block_size);
1197                    return self.find_direct_block_via_indirect_storage(
1198                        block_address,
1199                        heap_offset - running_offset,
1200                        storage,
1201                        offset_size,
1202                        length_size,
1203                        sub_rows,
1204                    );
1205                }
1206                running_offset = block_end;
1207            }
1208        }
1209
1210        Err(Error::InvalidData(format!(
1211            "fractal heap offset {} not found in doubling table",
1212            heap_offset
1213        )))
1214    }
1215
1216    /// Compute the block size for a given row in the doubling table.
1217    fn block_size_for_row(&self, row: u64) -> u64 {
1218        if row == 0 {
1219            self.starting_block_size
1220        } else {
1221            self.starting_block_size * (1u64 << (row - 1))
1222        }
1223    }
1224
1225    /// Compute how many rows of the doubling table fit in a block of the
1226    /// given total size.
1227    fn rows_for_block_size(&self, total_size: u64) -> u16 {
1228        let mut rows = 0u16;
1229        let mut accum = 0u64;
1230        let width = self.table_width as u64;
1231        loop {
1232            let bs = self.block_size_for_row(rows as u64);
1233            let row_total = bs * width;
1234            if accum + row_total > total_size {
1235                break;
1236            }
1237            accum += row_total;
1238            rows += 1;
1239            if rows > 1000 {
1240                break; // safety
1241            }
1242        }
1243        rows
1244    }
1245
1246    fn max_direct_block_rows(&self) -> u64 {
1247        let mut rows = 0u64;
1248        loop {
1249            if self.block_size_for_row(rows) > self.max_direct_block_size {
1250                break;
1251            }
1252            rows += 1;
1253            if rows > 1000 {
1254                break;
1255            }
1256        }
1257        rows
1258    }
1259
1260    fn direct_block_entry_size(&self, offset_size: u8, length_size: u8) -> u64 {
1261        let mut size = u64::from(offset_size);
1262        if self.io_filters_len > 0 {
1263            size += u64::from(length_size) + 4;
1264        }
1265        size
1266    }
1267
1268    /// Size in bytes of an unfiltered direct block header.
1269    fn direct_block_header_size(&self, offset_size: u8) -> usize {
1270        // Signature(4) + Version(1) + Heap header address(offset_size) +
1271        // Block offset within heap (max_heap_size bits, rounded up to bytes) +
1272        // optional Checksum(4).
1273        let offset_bytes = (self.max_heap_size as usize).div_ceil(8);
1274        let checksum_bytes = if self.direct_blocks_are_checksummed() {
1275            4
1276        } else {
1277            0
1278        };
1279        4 + 1 + offset_size as usize + offset_bytes + checksum_bytes
1280    }
1281
1282    fn direct_blocks_are_checksummed(&self) -> bool {
1283        self.flags & 0x02 != 0
1284    }
1285
1286    fn direct_block_checksum_pos(&self, offset_size: u8) -> Option<usize> {
1287        if self.direct_blocks_are_checksummed() {
1288            Some(self.direct_block_header_size(offset_size) - 4)
1289        } else {
1290            None
1291        }
1292    }
1293
1294    fn verify_direct_block_bytes(&self, block: &[u8], offset_size: u8) -> Result<()> {
1295        if block.len() < self.direct_block_header_size(offset_size) {
1296            return Err(Error::InvalidData(format!(
1297                "fractal heap direct block has {} bytes, expected at least {}",
1298                block.len(),
1299                self.direct_block_header_size(offset_size)
1300            )));
1301        }
1302        if block[..4] != _FHDB_SIGNATURE {
1303            return Err(Error::InvalidData(
1304                "invalid fractal heap direct block signature".into(),
1305            ));
1306        }
1307        let version = block[4];
1308        if version != 0 {
1309            return Err(Error::UnsupportedFractalHeapVersion(version));
1310        }
1311        if let Some(checksum_pos) = self.direct_block_checksum_pos(offset_size) {
1312            let stored_checksum = u32::from_le_bytes(
1313                block[checksum_pos..checksum_pos + 4]
1314                    .try_into()
1315                    .expect("direct block checksum slice has four bytes"),
1316            );
1317            let mut checksum_data = block.to_vec();
1318            checksum_data[checksum_pos..checksum_pos + 4].fill(0);
1319            let computed = jenkins_lookup3(&checksum_data);
1320            if computed != stored_checksum {
1321                return Err(Error::ChecksumMismatch {
1322                    expected: stored_checksum,
1323                    actual: computed,
1324                });
1325            }
1326        }
1327        Ok(())
1328    }
1329}
1330
1331#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1332enum HeapIdKind {
1333    Managed,
1334    Huge,
1335    Tiny,
1336}
1337
1338fn bytes_needed_to_encode(value: u64) -> usize {
1339    if value <= u8::MAX as u64 {
1340        1
1341    } else if value <= u16::MAX as u64 {
1342        2
1343    } else if value <= 0x00FF_FFFF {
1344        3
1345    } else if value <= u32::MAX as u64 {
1346        4
1347    } else if value <= 0x00FF_FFFF_FFFF {
1348        5
1349    } else if value <= 0x0000_FFFF_FFFF_FFFF {
1350        6
1351    } else if value <= 0x00FF_FFFF_FFFF_FFFF {
1352        7
1353    } else {
1354        8
1355    }
1356}
1357
1358#[cfg(test)]
1359mod tests {
1360    use super::*;
1361    use crate::storage::{Storage, StorageBuffer};
1362    use flate2::write::ZlibEncoder;
1363    use flate2::Compression;
1364    use std::io::Write;
1365    use std::sync::{Arc, Mutex};
1366
1367    fn base_heap() -> FractalHeap {
1368        FractalHeap {
1369            heap_id_len: 8,
1370            io_filters_len: 0,
1371            flags: 0x02,
1372            max_managed_object_size: 128,
1373            next_huge_id: 0,
1374            btree_huge_objects_address: u64::MAX,
1375            free_space_managed_address: 0,
1376            managed_space_amount: 0,
1377            managed_alloc_amount: 0,
1378            managed_iter_offset: 0,
1379            managed_objects_count: 0,
1380            huge_objects_size: 0,
1381            huge_objects_count: 0,
1382            tiny_objects_size: 0,
1383            tiny_objects_count: 0,
1384            table_width: 4,
1385            starting_block_size: 256,
1386            max_direct_block_size: 4096,
1387            max_heap_size: 16,
1388            starting_row_root_indirect: 0,
1389            root_block_address: 0,
1390            current_rows_in_root_indirect: 0,
1391            io_filter_size: None,
1392            io_filter_mask: None,
1393            io_filter_info: Vec::new(),
1394        }
1395    }
1396
1397    fn deflate_filter_info() -> Vec<u8> {
1398        let mut data = vec![0x02, 0x01];
1399        data.extend_from_slice(&1u16.to_le_bytes());
1400        data.extend_from_slice(&0u16.to_le_bytes());
1401        data.extend_from_slice(&1u16.to_le_bytes());
1402        data.extend_from_slice(&6u32.to_le_bytes());
1403        data
1404    }
1405
1406    fn zlib_compress(bytes: &[u8]) -> Vec<u8> {
1407        let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
1408        encoder.write_all(bytes).unwrap();
1409        encoder.finish().unwrap()
1410    }
1411
1412    fn filtered_heap_with_info(filter_info: Vec<u8>) -> FractalHeap {
1413        let mut heap = base_heap();
1414        heap.io_filters_len = filter_info.len() as u16;
1415        heap.io_filter_info = filter_info;
1416        heap
1417    }
1418
1419    fn direct_block_with_object(heap: &FractalHeap, block_size: usize, obj: &[u8]) -> Vec<u8> {
1420        let offset_size = 8;
1421        let mut block = vec![0u8; block_size];
1422        block[0..4].copy_from_slice(b"FHDB");
1423        block[4] = 0;
1424        let obj_offset = heap.direct_block_header_size(offset_size);
1425        block[obj_offset..obj_offset + obj.len()].copy_from_slice(obj);
1426        if let Some(checksum_pos) = heap.direct_block_checksum_pos(offset_size) {
1427            let mut checksum_data = block.clone();
1428            checksum_data[checksum_pos..checksum_pos + 4].fill(0);
1429            let checksum = jenkins_lookup3(&checksum_data);
1430            block[checksum_pos..checksum_pos + 4].copy_from_slice(&checksum.to_le_bytes());
1431        }
1432        block
1433    }
1434
1435    struct CountingStorage {
1436        data: Vec<u8>,
1437        reads: Arc<Mutex<Vec<(u64, usize)>>>,
1438    }
1439
1440    impl Storage for CountingStorage {
1441        fn len(&self) -> u64 {
1442            self.data.len() as u64
1443        }
1444
1445        fn read_range(&self, offset: u64, len: usize) -> Result<StorageBuffer> {
1446            self.reads.lock().unwrap().push((offset, len));
1447            let start = usize::try_from(offset).map_err(|_| Error::OffsetOutOfBounds(offset))?;
1448            let end = start
1449                .checked_add(len)
1450                .ok_or(Error::OffsetOutOfBounds(offset))?;
1451            if end > self.data.len() {
1452                return Err(Error::UnexpectedEof {
1453                    offset,
1454                    needed: len as u64,
1455                    available: self.len().saturating_sub(offset),
1456                });
1457            }
1458            Ok(StorageBuffer::from_vec(self.data[start..end].to_vec()))
1459        }
1460    }
1461
1462    #[test]
1463    fn block_size_for_row_scales_by_table_width() {
1464        let heap = FractalHeap {
1465            heap_id_len: 8,
1466            io_filters_len: 0,
1467            flags: 0x02,
1468            max_managed_object_size: 0,
1469            next_huge_id: 0,
1470            btree_huge_objects_address: 0,
1471            free_space_managed_address: 0,
1472            managed_space_amount: 0,
1473            managed_alloc_amount: 0,
1474            managed_iter_offset: 0,
1475            managed_objects_count: 0,
1476            huge_objects_size: 0,
1477            huge_objects_count: 0,
1478            tiny_objects_size: 0,
1479            tiny_objects_count: 0,
1480            table_width: 4,
1481            starting_block_size: 256,
1482            max_direct_block_size: 4096,
1483            max_heap_size: 16,
1484            starting_row_root_indirect: 0,
1485            root_block_address: 0,
1486            current_rows_in_root_indirect: 0,
1487            io_filter_size: None,
1488            io_filter_mask: None,
1489            io_filter_info: Vec::new(),
1490        };
1491
1492        assert_eq!(heap.block_size_for_row(0), 256);
1493        assert_eq!(heap.block_size_for_row(1), 256); // 256 * 2^0
1494        assert_eq!(heap.block_size_for_row(2), 512); // 256 * 2^1
1495        assert_eq!(heap.block_size_for_row(3), 1024); // 256 * 2^2
1496    }
1497
1498    #[test]
1499    fn get_tiny_object() {
1500        let heap = base_heap();
1501        let heap_id = [0x20 | 3, b't', b'i', b'n', b'y'];
1502        let result = heap.get_object(&heap_id, &[], 8, 8).unwrap();
1503        assert_eq!(result, b"tiny");
1504    }
1505
1506    #[test]
1507    fn get_huge_direct_object() {
1508        let heap = base_heap();
1509        let mut file_data = vec![0u8; 128];
1510        file_data[64..68].copy_from_slice(b"huge");
1511
1512        let mut heap_id = Vec::new();
1513        heap_id.push(0x10);
1514        heap_id.extend_from_slice(&64u64.to_le_bytes());
1515        heap_id.extend_from_slice(&4u64.to_le_bytes());
1516
1517        let result = heap.get_object(&heap_id, &file_data, 8, 8).unwrap();
1518        assert_eq!(result, b"huge");
1519    }
1520
1521    #[test]
1522    fn get_filtered_huge_direct_object() {
1523        let heap = filtered_heap_with_info(deflate_filter_info());
1524        let payload = b"filtered huge payload";
1525        let compressed = zlib_compress(payload);
1526        let address = 64u64;
1527        let mut file_data = vec![0u8; address as usize + compressed.len()];
1528        file_data[address as usize..].copy_from_slice(&compressed);
1529
1530        let mut heap_id = Vec::new();
1531        heap_id.push(0x10);
1532        heap_id.extend_from_slice(&address.to_le_bytes());
1533        heap_id.extend_from_slice(&(compressed.len() as u64).to_le_bytes());
1534        heap_id.extend_from_slice(&0u32.to_le_bytes());
1535        heap_id.extend_from_slice(&(payload.len() as u64).to_le_bytes());
1536
1537        let result = heap.get_object(&heap_id, &file_data, 8, 8).unwrap();
1538        assert_eq!(result, payload);
1539    }
1540
1541    #[test]
1542    fn get_filtered_managed_object_direct_root() {
1543        let mut heap = filtered_heap_with_info(deflate_filter_info());
1544        let block_address = 1000u64;
1545        let obj_data = b"filtered managed";
1546        let block = direct_block_with_object(&heap, heap.starting_block_size as usize, obj_data);
1547        let compressed = zlib_compress(&block);
1548        heap.root_block_address = block_address;
1549        heap.io_filter_size = Some(compressed.len() as u64);
1550        heap.io_filter_mask = Some(0);
1551
1552        let file_size = block_address as usize + compressed.len();
1553        let mut file_data = vec![0u8; file_size];
1554        file_data[block_address as usize..].copy_from_slice(&compressed);
1555
1556        let obj_offset = heap.direct_block_header_size(8) as u16;
1557        let mut heap_id = vec![0x00];
1558        heap_id.extend_from_slice(&obj_offset.to_le_bytes());
1559        heap_id.push(obj_data.len() as u8);
1560
1561        let result = heap.get_object(&heap_id, &file_data, 8, 8).unwrap();
1562        assert_eq!(result, obj_data);
1563    }
1564
1565    #[test]
1566    fn get_filtered_managed_object_from_indirect_block() {
1567        let mut heap = filtered_heap_with_info(deflate_filter_info());
1568        let indirect_address = 512u64;
1569        let block_address = 1000u64;
1570        let obj_data = b"filtered child";
1571        let block = direct_block_with_object(&heap, heap.starting_block_size as usize, obj_data);
1572        let compressed = zlib_compress(&block);
1573
1574        heap.root_block_address = indirect_address;
1575        heap.current_rows_in_root_indirect = 1;
1576
1577        let offset_bytes = usize::from(heap.max_heap_size).div_ceil(8);
1578        let iblock_header_size = 4 + 1 + 8 + offset_bytes;
1579        let direct_entry_size = 8 + 8 + 4;
1580        let mut indirect = vec![0u8; iblock_header_size + direct_entry_size * 4];
1581        indirect[0..4].copy_from_slice(b"FHIB");
1582        indirect[4] = 0;
1583        let entry_pos = iblock_header_size;
1584        indirect[entry_pos..entry_pos + 8].copy_from_slice(&block_address.to_le_bytes());
1585        indirect[entry_pos + 8..entry_pos + 16]
1586            .copy_from_slice(&(compressed.len() as u64).to_le_bytes());
1587        indirect[entry_pos + 16..entry_pos + 20].copy_from_slice(&0u32.to_le_bytes());
1588
1589        let file_size = block_address as usize + compressed.len();
1590        let mut file_data = vec![0u8; file_size];
1591        file_data[indirect_address as usize..indirect_address as usize + indirect.len()]
1592            .copy_from_slice(&indirect);
1593        file_data[block_address as usize..].copy_from_slice(&compressed);
1594        let storage = crate::storage::BytesStorage::new(file_data);
1595
1596        let obj_offset = heap.direct_block_header_size(8) as u16;
1597        let mut heap_id = vec![0x00];
1598        heap_id.extend_from_slice(&obj_offset.to_le_bytes());
1599        heap_id.push(obj_data.len() as u8);
1600
1601        let result = heap.get_object_storage(&heap_id, &storage, 8, 8).unwrap();
1602        assert_eq!(result, obj_data);
1603    }
1604
1605    #[test]
1606    fn direct_block_header_size_includes_optional_fields() {
1607        let heap = FractalHeap {
1608            heap_id_len: 8,
1609            io_filters_len: 0,
1610            flags: 0x02,
1611            max_managed_object_size: 0,
1612            next_huge_id: 0,
1613            btree_huge_objects_address: 0,
1614            free_space_managed_address: 0,
1615            managed_space_amount: 0,
1616            managed_alloc_amount: 0,
1617            managed_iter_offset: 0,
1618            managed_objects_count: 0,
1619            huge_objects_size: 0,
1620            huge_objects_count: 0,
1621            tiny_objects_size: 0,
1622            tiny_objects_count: 0,
1623            table_width: 4,
1624            starting_block_size: 256,
1625            max_direct_block_size: 4096,
1626            max_heap_size: 16,
1627            starting_row_root_indirect: 0,
1628            root_block_address: 0,
1629            current_rows_in_root_indirect: 0,
1630            io_filter_size: None,
1631            io_filter_mask: None,
1632            io_filter_info: Vec::new(),
1633        };
1634
1635        // sig(4) + ver(1) + addr(8) + offset_bytes(2) + checksum(4) = 19
1636        assert_eq!(heap.direct_block_header_size(8), 19);
1637
1638        // With 4-byte offsets: sig(4) + ver(1) + addr(4) + offset_bytes(2) + checksum(4) = 15
1639        assert_eq!(heap.direct_block_header_size(4), 15);
1640    }
1641
1642    #[test]
1643    fn get_managed_object_direct_root() {
1644        // Set up a fractal heap where the root is a direct block.
1645        let offset_size: u8 = 8;
1646        let max_heap_size: u16 = 16;
1647        let starting_block_size: u64 = 256;
1648
1649        // Direct block header size: sig(4) + ver(1) + addr(8) + offset_bytes(2) + checksum(4) = 19
1650        // (no I/O filters => checksum present)
1651        let db_header_size = 19usize;
1652
1653        // Place the direct block at file offset 1000.
1654        let block_address: u64 = 1000;
1655
1656        let heap = FractalHeap {
1657            heap_id_len: 8,
1658            io_filters_len: 0,
1659            flags: 0x02,
1660            max_managed_object_size: 128,
1661            next_huge_id: 0,
1662            btree_huge_objects_address: u64::MAX,
1663            free_space_managed_address: 0,
1664            managed_space_amount: starting_block_size,
1665            managed_alloc_amount: starting_block_size,
1666            managed_iter_offset: 0,
1667            managed_objects_count: 1,
1668            huge_objects_size: 0,
1669            huge_objects_count: 0,
1670            tiny_objects_size: 0,
1671            tiny_objects_count: 0,
1672            table_width: 4,
1673            starting_block_size,
1674            max_direct_block_size: 4096,
1675            max_heap_size,
1676            starting_row_root_indirect: 0,
1677            root_block_address: block_address,
1678            current_rows_in_root_indirect: 0,
1679            io_filter_size: None,
1680            io_filter_mask: None,
1681            io_filter_info: Vec::new(),
1682        };
1683
1684        // Build file data with the direct block.
1685        let file_size = block_address as usize + starting_block_size as usize + 100;
1686        let mut file_data = vec![0u8; file_size];
1687
1688        // Write direct block header at block_address.
1689        let ba = block_address as usize;
1690        file_data[ba..ba + 4].copy_from_slice(b"FHDB");
1691        file_data[ba + 4] = 0; // version
1692                               // heap header address (8 bytes) — doesn't matter for this test
1693                               // block offset (2 bytes) — 0
1694
1695        // Write object data at its direct-block offset, after the block header.
1696        let obj_data = b"test object data";
1697        let obj_start = ba + db_header_size;
1698        file_data[obj_start..obj_start + obj_data.len()].copy_from_slice(obj_data);
1699        let checksum_pos = ba + db_header_size - 4;
1700        let mut checksum_data = file_data[ba..ba + starting_block_size as usize].to_vec();
1701        checksum_data[checksum_pos - ba..checksum_pos - ba + 4].fill(0);
1702        let checksum = jenkins_lookup3(&checksum_data);
1703        file_data[checksum_pos..checksum_pos + 4].copy_from_slice(&checksum.to_le_bytes());
1704
1705        // Build heap ID for managed object at its direct-block offset, length=16.
1706        // Type nibble = 0, offset = direct block header size (16 bits), length = 16.
1707        let heap_id = [0x00, db_header_size as u8, 0x00, 0x10];
1708
1709        let result = heap
1710            .get_managed_object(&heap_id, &file_data, offset_size, 8)
1711            .unwrap();
1712        assert_eq!(result, obj_data);
1713    }
1714
1715    #[test]
1716    fn get_object_storage_cached_reads_direct_block_once() {
1717        let offset_size: u8 = 8;
1718        let max_heap_size: u16 = 16;
1719        let starting_block_size: u64 = 256;
1720        let db_header_size = 19usize;
1721        let block_address: u64 = 1000;
1722
1723        let heap = FractalHeap {
1724            heap_id_len: 8,
1725            io_filters_len: 0,
1726            flags: 0x02,
1727            max_managed_object_size: 128,
1728            next_huge_id: 0,
1729            btree_huge_objects_address: u64::MAX,
1730            free_space_managed_address: 0,
1731            managed_space_amount: starting_block_size,
1732            managed_alloc_amount: starting_block_size,
1733            managed_iter_offset: 0,
1734            managed_objects_count: 2,
1735            huge_objects_size: 0,
1736            huge_objects_count: 0,
1737            tiny_objects_size: 0,
1738            tiny_objects_count: 0,
1739            table_width: 4,
1740            starting_block_size,
1741            max_direct_block_size: 4096,
1742            max_heap_size,
1743            starting_row_root_indirect: 0,
1744            root_block_address: block_address,
1745            current_rows_in_root_indirect: 0,
1746            io_filter_size: None,
1747            io_filter_mask: None,
1748            io_filter_info: Vec::new(),
1749        };
1750
1751        let file_size = block_address as usize + starting_block_size as usize;
1752        let mut file_data = vec![0u8; file_size];
1753        let ba = block_address as usize;
1754        file_data[ba..ba + 4].copy_from_slice(b"FHDB");
1755        file_data[ba + 4] = 0;
1756
1757        let obj1_offset = db_header_size;
1758        let obj2_offset = db_header_size + 16;
1759        file_data[ba + obj1_offset..ba + obj1_offset + 4].copy_from_slice(b"one!");
1760        file_data[ba + obj2_offset..ba + obj2_offset + 4].copy_from_slice(b"two!");
1761
1762        let checksum_pos = ba + db_header_size - 4;
1763        let mut checksum_data = file_data[ba..ba + starting_block_size as usize].to_vec();
1764        checksum_data[checksum_pos - ba..checksum_pos - ba + 4].fill(0);
1765        let checksum = jenkins_lookup3(&checksum_data);
1766        file_data[checksum_pos..checksum_pos + 4].copy_from_slice(&checksum.to_le_bytes());
1767
1768        let reads = Arc::new(Mutex::new(Vec::new()));
1769        let storage = CountingStorage {
1770            data: file_data,
1771            reads: reads.clone(),
1772        };
1773        let mut cache = FractalHeapDirectBlockCache::default();
1774        let heap_id1 = [0x00, obj1_offset as u8, 0x00, 0x04];
1775        let heap_id2 = [0x00, obj2_offset as u8, 0x00, 0x04];
1776
1777        assert_eq!(
1778            heap.get_object_storage_cached(&heap_id1, &storage, offset_size, 8, &mut cache)
1779                .unwrap(),
1780            b"one!"
1781        );
1782        assert_eq!(
1783            heap.get_object_storage_cached(&heap_id2, &storage, offset_size, 8, &mut cache)
1784                .unwrap(),
1785            b"two!"
1786        );
1787
1788        let direct_block_reads = reads
1789            .lock()
1790            .unwrap()
1791            .iter()
1792            .filter(|&&(offset, len)| {
1793                offset == block_address && len == starting_block_size as usize
1794            })
1795            .count();
1796        assert_eq!(direct_block_reads, 1);
1797    }
1798}