Skip to main content

hdf5_reader/
extensible_array.rs

1//! HDF5 Extensible Array (EA) chunk index.
2//!
3//! This is the default chunk index for datasets with one unlimited dimension
4//! and `libver='latest'`. It uses a three-level hierarchy:
5//!
6//! - `EAHD` — Extensible Array Header
7//! - `EAIB` — Extensible Array Index Block
8//! - `EADB` — Extensible Array Data Block
9//! - `EASB` — Extensible Array Secondary Block
10
11use crate::checksum::jenkins_lookup3;
12use crate::chunk_index::ChunkEntry;
13use crate::error::{Error, Result};
14use crate::io::Cursor;
15use crate::storage::Storage;
16
17const EAHD_SIGNATURE: [u8; 4] = *b"EAHD";
18const EAIB_SIGNATURE: [u8; 4] = *b"EAIB";
19const EADB_SIGNATURE: [u8; 4] = *b"EADB";
20const EASB_SIGNATURE: [u8; 4] = *b"EASB";
21
22/// Parsed Extensible Array Header.
23#[derive(Debug)]
24struct EaHeader {
25    client_id: u8,
26    element_size: u8,
27    _max_nelmts_bits: u8,
28    idx_blk_elmts: u8,
29    data_blk_min_elmts: u8,
30    sec_blk_min_data_ptrs: u8,
31    max_dblk_page_nelmts_bits: u8,
32    _nelmts: u64,
33    index_block_address: u64,
34}
35
36/// Parse the Extensible Array Header.
37///
38/// On-disk layout (from H5EA_HEADER_SIZE):
39/// sig(4) + ver(1) + client_id(1) +
40/// element_size(1) + max_nelmts_bits(1) + idx_blk_elmts(1) +
41/// data_blk_min_elmts(1) + sec_blk_min_data_ptrs(1) + max_dblk_page_nelmts_bits(1) +
42/// 6 statistics fields (each length_size) +
43/// index_block_address(offset_size) + checksum(4)
44fn parse_header(data: &[u8], address: u64, offset_size: u8, length_size: u8) -> Result<EaHeader> {
45    let mut cursor = Cursor::new(data);
46    cursor.set_position(address);
47
48    let sig = cursor.read_bytes(4)?;
49    if sig != EAHD_SIGNATURE {
50        return Err(Error::InvalidExtensibleArraySignature {
51            context: "header signature mismatch",
52        });
53    }
54
55    let version = cursor.read_u8()?;
56    if version != 0 {
57        return Err(Error::Other(format!(
58            "unsupported extensible array header version {}",
59            version
60        )));
61    }
62
63    let client_id = cursor.read_u8()?;
64    let element_size = cursor.read_u8()?;
65    let max_nelmts_bits = cursor.read_u8()?;
66    let idx_blk_elmts = cursor.read_u8()?;
67    let data_blk_min_elmts = cursor.read_u8()?;
68    let sec_blk_min_data_ptrs = cursor.read_u8()?;
69    let max_dblk_page_nelmts_bits = cursor.read_u8()?;
70
71    // Statistics (6 fields, each length_size bytes)
72    let _nsuper_blks = cursor.read_length(length_size)?;
73    let _super_blk_size = cursor.read_length(length_size)?;
74    let _ndata_blks = cursor.read_length(length_size)?;
75    let _data_blk_size = cursor.read_length(length_size)?;
76    let _max_idx_set = cursor.read_length(length_size)?;
77    let nelmts = cursor.read_length(length_size)?;
78
79    let index_block_address = cursor.read_offset(offset_size)?;
80
81    // Checksum
82    let header_end = cursor.position();
83    let header_bytes = &data[address as usize..header_end as usize];
84    let stored_checksum = cursor.read_u32_le()?;
85    let computed = jenkins_lookup3(header_bytes);
86    if stored_checksum != computed {
87        return Err(Error::ChecksumMismatch {
88            expected: stored_checksum,
89            actual: computed,
90        });
91    }
92
93    Ok(EaHeader {
94        client_id,
95        element_size,
96        _max_nelmts_bits: max_nelmts_bits,
97        idx_blk_elmts,
98        data_blk_min_elmts,
99        sec_blk_min_data_ptrs,
100        max_dblk_page_nelmts_bits,
101        _nelmts: nelmts,
102        index_block_address,
103    })
104}
105
106fn parse_header_storage(
107    storage: &dyn Storage,
108    address: u64,
109    offset_size: u8,
110    length_size: u8,
111) -> Result<EaHeader> {
112    let header_len = 4
113        + 1
114        + 1
115        + 1
116        + 1
117        + 1
118        + 1
119        + 1
120        + 1
121        + 6 * usize::from(length_size)
122        + usize::from(offset_size)
123        + 4;
124    let bytes = storage.read_range(address, header_len)?;
125    parse_header(bytes.as_ref(), 0, offset_size, length_size)
126}
127
128/// Compute the super block layout.
129///
130/// Returns a vec of (elements_per_data_block, num_data_blocks) for each super block.
131/// Stops generating entries once cumulative capacity exceeds `nelmts`.
132fn compute_super_block_layout(header: &EaHeader) -> Vec<(u64, u64)> {
133    let mut layout = Vec::new();
134    let dblk_min = header.data_blk_min_elmts as u64;
135    let sblk_min = header.sec_blk_min_data_ptrs as u64;
136    let nelmts = header._nelmts;
137    let mut cumulative = header.idx_blk_elmts as u64;
138
139    for sb_idx in 0u32..64 {
140        if cumulative >= nelmts {
141            break;
142        }
143        let elmts_per_dblk = dblk_min * (1u64 << (sb_idx / 2));
144        let num_dblks = sblk_min * (1u64 << (sb_idx.div_ceil(2)));
145        layout.push((elmts_per_dblk, num_dblks));
146        cumulative += elmts_per_dblk * num_dblks;
147    }
148
149    layout
150}
151
152fn checked_add_usize(left: usize, right: usize, context: &str) -> Result<usize> {
153    left.checked_add(right)
154        .ok_or_else(|| Error::InvalidData(format!("{context} exceeds platform usize capacity")))
155}
156
157fn checked_mul_usize(left: usize, right: usize, context: &str) -> Result<usize> {
158    left.checked_mul(right)
159        .ok_or_else(|| Error::InvalidData(format!("{context} exceeds platform usize capacity")))
160}
161
162fn checked_add_u64(left: u64, right: u64, context: &str) -> Result<u64> {
163    left.checked_add(right)
164        .ok_or_else(|| Error::InvalidData(format!("{context} exceeds u64 capacity")))
165}
166
167fn checked_usize_from_u64(value: u64, context: &str) -> Result<usize> {
168    usize::try_from(value).map_err(|_| {
169        Error::InvalidData(format!(
170            "{context} value {value} exceeds platform usize capacity"
171        ))
172    })
173}
174
175fn checked_u64_from_usize(value: usize, context: &str) -> Result<u64> {
176    u64::try_from(value).map_err(|_| Error::InvalidData(format!("{context} exceeds u64 capacity")))
177}
178
179fn ea_block_header_len(offset_size: u8, sizeof_nelmts: usize) -> Result<usize> {
180    let len = checked_add_usize(4, 1, "extensible array block header length")?;
181    let len = checked_add_usize(len, 1, "extensible array block header length")?;
182    let len = checked_add_usize(
183        len,
184        usize::from(offset_size),
185        "extensible array block header length",
186    )?;
187    checked_add_usize(len, sizeof_nelmts, "extensible array block header length")
188}
189
190fn ea_page_nelmts(max_page_bits: u8) -> Result<usize> {
191    if max_page_bits == 0 {
192        return Ok(0);
193    }
194    1usize.checked_shl(u32::from(max_page_bits)).ok_or_else(|| {
195        Error::InvalidData("extensible array page element count exceeds usize capacity".into())
196    })
197}
198
199fn index_block_len(header: &EaHeader, sb_layout: &[(u64, u64)], offset_size: u8) -> Result<usize> {
200    let offset_size = usize::from(offset_size);
201    let inline_bytes = checked_mul_usize(
202        usize::from(header.idx_blk_elmts),
203        usize::from(header.element_size),
204        "extensible array index block inline entry bytes",
205    )?;
206    let direct_ptrs = checked_mul_usize(
207        2,
208        usize::from(header.sec_blk_min_data_ptrs),
209        "extensible array index block direct data block pointer count",
210    )?;
211    let direct_bytes = checked_mul_usize(
212        direct_ptrs,
213        offset_size,
214        "extensible array index block direct data block address bytes",
215    )?;
216    let secondary_ptrs = sb_layout.len().saturating_sub(direct_ptrs);
217    let secondary_bytes = checked_mul_usize(
218        secondary_ptrs,
219        offset_size,
220        "extensible array index block secondary block address bytes",
221    )?;
222
223    let mut len = 4usize;
224    for part in [
225        1,
226        1,
227        offset_size,
228        inline_bytes,
229        direct_bytes,
230        secondary_bytes,
231        4,
232    ] {
233        len = checked_add_usize(len, part, "extensible array index block length")?;
234    }
235    Ok(len)
236}
237
238fn secondary_page_bitmap_bytes(
239    max_page_bits: u8,
240    elmts_per_dblk: u64,
241    num_dblks: u64,
242) -> Result<usize> {
243    let page_nelmts = ea_page_nelmts(max_page_bits)?;
244    if page_nelmts == 0 {
245        return Ok(0);
246    }
247
248    let elmts_per_dblk =
249        checked_usize_from_u64(elmts_per_dblk, "extensible array data block element count")?;
250    if elmts_per_dblk <= page_nelmts {
251        return Ok(0);
252    }
253
254    let num_dblks = checked_usize_from_u64(
255        num_dblks,
256        "extensible array secondary block data block count",
257    )?;
258    let pages_per_dblk = elmts_per_dblk.div_ceil(page_nelmts);
259    let total_pages = checked_mul_usize(
260        num_dblks,
261        pages_per_dblk,
262        "extensible array secondary block page bitmap bit count",
263    )?;
264    Ok(total_pages.div_ceil(8))
265}
266
267fn secondary_block_len(
268    num_dblk_addrs: usize,
269    offset_size: u8,
270    sizeof_nelmts: usize,
271    page_bitmap_bytes: usize,
272) -> Result<usize> {
273    let offset_bytes = usize::from(offset_size);
274    let addr_bytes = checked_mul_usize(
275        num_dblk_addrs,
276        offset_bytes,
277        "extensible array secondary block data block address bytes",
278    )?;
279
280    let mut len = ea_block_header_len(offset_size, sizeof_nelmts)?;
281    for part in [page_bitmap_bytes, addr_bytes, 4] {
282        len = checked_add_usize(len, part, "extensible array secondary block length")?;
283    }
284    Ok(len)
285}
286
287fn data_block_len_from_bitmap(
288    num_entries: usize,
289    page_nelmts: usize,
290    entry_size: u8,
291    offset_size: u8,
292    sizeof_nelmts: usize,
293    page_bitmap: &[u8],
294) -> Result<usize> {
295    let num_pages = num_entries.div_ceil(page_nelmts);
296    let bitmap_bytes = num_pages.div_ceil(8);
297    let mut len = checked_add_usize(
298        ea_block_header_len(offset_size, sizeof_nelmts)?,
299        bitmap_bytes,
300        "extensible array data block length",
301    )?;
302
303    for page_idx in 0..num_pages {
304        let byte_idx = page_idx / 8;
305        let bit_idx = page_idx % 8;
306        let page_initialized =
307            byte_idx < page_bitmap.len() && (page_bitmap[byte_idx] & (1 << bit_idx)) != 0;
308        if !page_initialized {
309            continue;
310        }
311
312        let entries_in_page = if page_idx == num_pages - 1 {
313            let remainder = num_entries % page_nelmts;
314            if remainder == 0 {
315                page_nelmts
316            } else {
317                remainder
318            }
319        } else {
320            page_nelmts
321        };
322        let entry_bytes = checked_mul_usize(
323            entries_in_page,
324            usize::from(entry_size),
325            "extensible array data block page entry bytes",
326        )?;
327        let page_bytes = checked_add_usize(
328            entry_bytes,
329            4,
330            "extensible array data block page byte length",
331        )?;
332        len = checked_add_usize(len, page_bytes, "extensible array data block length")?;
333    }
334
335    Ok(len)
336}
337
338fn unpaged_data_block_len(
339    num_entries: usize,
340    offset_size: u8,
341    entry_size: u8,
342    sizeof_nelmts: usize,
343) -> Result<usize> {
344    let entry_bytes = checked_mul_usize(
345        num_entries,
346        usize::from(entry_size),
347        "extensible array data block entry bytes",
348    )?;
349    let len = checked_add_usize(
350        ea_block_header_len(offset_size, sizeof_nelmts)?,
351        entry_bytes,
352        "extensible array data block length",
353    )?;
354    checked_add_usize(len, 4, "extensible array data block length")
355}
356
357/// A single raw entry.
358struct EaRawEntry {
359    address: u64,
360    chunk_size: u64,
361    filter_mask: u32,
362}
363
364/// Read `count` entries from the cursor.
365fn read_entries(
366    cursor: &mut Cursor<'_>,
367    count: usize,
368    is_filtered: bool,
369    offset_size: u8,
370    entry_size: u8,
371) -> Result<Vec<EaRawEntry>> {
372    let mut entries = Vec::with_capacity(count);
373    for _ in 0..count {
374        let address = cursor.read_offset(offset_size)?;
375        let (chunk_size, filter_mask) = if is_filtered {
376            let chunk_size_len = entry_size
377                .checked_sub(offset_size)
378                .and_then(|remaining| remaining.checked_sub(4))
379                .ok_or_else(|| Error::InvalidData("invalid extensible array entry size".into()))?;
380            let cs = cursor.read_length(chunk_size_len)?;
381            let fm = cursor.read_u32_le()?;
382            (cs, fm)
383        } else {
384            (0, 0)
385        };
386        entries.push(EaRawEntry {
387            address,
388            chunk_size,
389            filter_mask,
390        });
391    }
392    Ok(entries)
393}
394
395/// Parse a data block and return its entries.
396///
397/// `sizeof_nelmts` is `ceil(max_nelmts_bits / 8)` — used for the block_off field.
398#[allow(clippy::too_many_arguments)]
399fn parse_data_block(
400    data: &[u8],
401    address: u64,
402    num_entries: usize,
403    is_filtered: bool,
404    max_page_bits: u8,
405    offset_size: u8,
406    entry_size: u8,
407    sizeof_nelmts: usize,
408) -> Result<Vec<EaRawEntry>> {
409    let mut cursor = Cursor::new(data);
410    cursor.set_position(address);
411
412    let sig = cursor.read_bytes(4)?;
413    if sig != EADB_SIGNATURE {
414        return Err(Error::InvalidExtensibleArraySignature {
415            context: "data block signature mismatch",
416        });
417    }
418
419    let version = cursor.read_u8()?;
420    if version != 0 {
421        return Err(Error::Other(format!(
422            "unsupported extensible array data block version {}",
423            version
424        )));
425    }
426
427    let _client_id = cursor.read_u8()?;
428    let _header_address = cursor.read_offset(offset_size)?;
429
430    // Block offset: sizeof_nelmts bytes indicating this block's element index offset.
431    cursor.skip(sizeof_nelmts)?;
432
433    // Paging is used only when nelmts exceeds 2^page_bits.
434    let page_nelmts = ea_page_nelmts(max_page_bits)?;
435
436    if page_nelmts > 0 && num_entries > page_nelmts {
437        // Paged data block
438        let num_pages = num_entries.div_ceil(page_nelmts);
439        let bitmap_bytes = num_pages.div_ceil(8);
440        let page_bitmap = cursor.read_bytes(bitmap_bytes)?.to_vec();
441
442        let mut all_entries = Vec::with_capacity(num_entries);
443        for page_idx in 0..num_pages {
444            let byte_idx = page_idx / 8;
445            let bit_idx = page_idx % 8;
446            let page_initialized =
447                byte_idx < page_bitmap.len() && (page_bitmap[byte_idx] & (1 << bit_idx)) != 0;
448
449            let entries_in_page = if page_idx == num_pages - 1 {
450                let remainder = num_entries % page_nelmts;
451                if remainder == 0 {
452                    page_nelmts
453                } else {
454                    remainder
455                }
456            } else {
457                page_nelmts
458            };
459
460            if page_initialized {
461                let page_entries = read_entries(
462                    &mut cursor,
463                    entries_in_page,
464                    is_filtered,
465                    offset_size,
466                    entry_size,
467                )?;
468                let _page_checksum = cursor.read_u32_le()?;
469                all_entries.extend(page_entries);
470            } else {
471                for _ in 0..entries_in_page {
472                    all_entries.push(EaRawEntry {
473                        address: u64::MAX,
474                        chunk_size: 0,
475                        filter_mask: 0,
476                    });
477                }
478            }
479        }
480        Ok(all_entries)
481    } else {
482        // Non-paged data block
483        let entries = read_entries(
484            &mut cursor,
485            num_entries,
486            is_filtered,
487            offset_size,
488            entry_size,
489        )?;
490        let _checksum = cursor.read_u32_le()?;
491        Ok(entries)
492    }
493}
494
495/// Parse a secondary block and return its data block addresses.
496fn parse_secondary_block(
497    data: &[u8],
498    address: u64,
499    num_dblk_addrs: usize,
500    offset_size: u8,
501    sizeof_nelmts: usize,
502    page_bitmap_bytes: usize,
503) -> Result<Vec<u64>> {
504    let mut cursor = Cursor::new(data);
505    cursor.set_position(address);
506
507    let sig = cursor.read_bytes(4)?;
508    if sig != EASB_SIGNATURE {
509        return Err(Error::InvalidExtensibleArraySignature {
510            context: "secondary block signature mismatch",
511        });
512    }
513
514    let version = cursor.read_u8()?;
515    if version != 0 {
516        return Err(Error::Other(format!(
517            "unsupported extensible array secondary block version {}",
518            version
519        )));
520    }
521
522    let _client_id = cursor.read_u8()?;
523    let _header_address = cursor.read_offset(offset_size)?;
524    cursor.skip(sizeof_nelmts)?;
525
526    if page_bitmap_bytes > 0 {
527        cursor.skip(page_bitmap_bytes)?;
528    }
529
530    let mut addrs = Vec::with_capacity(num_dblk_addrs);
531    for _ in 0..num_dblk_addrs {
532        addrs.push(cursor.read_offset(offset_size)?);
533    }
534
535    // Skip checksum
536    let _checksum = cursor.read_u32_le()?;
537
538    Ok(addrs)
539}
540
541fn parse_secondary_block_storage(
542    storage: &dyn Storage,
543    address: u64,
544    num_dblk_addrs: usize,
545    offset_size: u8,
546    sizeof_nelmts: usize,
547    page_bitmap_bytes: usize,
548) -> Result<Vec<u64>> {
549    let read_len = secondary_block_len(
550        num_dblk_addrs,
551        offset_size,
552        sizeof_nelmts,
553        page_bitmap_bytes,
554    )?;
555    let bytes = storage.read_range(address, read_len)?;
556    parse_secondary_block(
557        bytes.as_ref(),
558        0,
559        num_dblk_addrs,
560        offset_size,
561        sizeof_nelmts,
562        page_bitmap_bytes,
563    )
564}
565
566fn read_entry_at(
567    data: &[u8],
568    position: u64,
569    is_filtered: bool,
570    offset_size: u8,
571    entry_size: u8,
572) -> Result<EaRawEntry> {
573    let mut cursor = Cursor::new(data);
574    cursor.set_position(position);
575    let mut entries = read_entries(&mut cursor, 1, is_filtered, offset_size, entry_size)?;
576    entries
577        .pop()
578        .ok_or_else(|| Error::InvalidData("missing extensible array entry".into()))
579}
580
581fn read_entry_at_storage(
582    storage: &dyn Storage,
583    position: u64,
584    is_filtered: bool,
585    offset_size: u8,
586    entry_size: u8,
587) -> Result<EaRawEntry> {
588    let bytes = storage.read_range(position, usize::from(entry_size))?;
589    let mut cursor = Cursor::new(bytes.as_ref());
590    let mut entries = read_entries(&mut cursor, 1, is_filtered, offset_size, entry_size)?;
591    entries
592        .pop()
593        .ok_or_else(|| Error::InvalidData("missing extensible array entry".into()))
594}
595
596fn linear_target_offsets(
597    dataset_shape: &[u64],
598    chunk_dims: &[u32],
599    chunk_bounds: Option<(&[u64], &[u64])>,
600) -> Vec<(usize, Vec<u64>)> {
601    let ndim = dataset_shape.len();
602    let chunks_per_dim: Vec<u64> = (0..ndim)
603        .map(|i| dataset_shape[i].div_ceil(chunk_dims[i] as u64))
604        .collect();
605
606    if ndim == 0 {
607        return vec![(0, Vec::new())];
608    }
609
610    let (first_chunk, last_chunk): (Vec<u64>, Vec<u64>) = match chunk_bounds {
611        Some((first, last)) => (first.to_vec(), last.to_vec()),
612        None => (
613            vec![0u64; ndim],
614            chunks_per_dim
615                .iter()
616                .map(|count| count.saturating_sub(1))
617                .collect(),
618        ),
619    };
620
621    let mut targets = Vec::new();
622    let mut chunk_indices = first_chunk.clone();
623    loop {
624        let mut linear_idx = 0u64;
625        for (dim, chunk_index) in chunk_indices.iter().enumerate() {
626            linear_idx = linear_idx * chunks_per_dim[dim] + chunk_index;
627        }
628        let offsets = chunk_indices
629            .iter()
630            .enumerate()
631            .map(|(dim, chunk_index)| chunk_index * u64::from(chunk_dims[dim]))
632            .collect();
633        targets.push((linear_idx as usize, offsets));
634
635        let mut advanced = false;
636        for dim in (0..ndim).rev() {
637            if chunk_indices[dim] < last_chunk[dim] {
638                chunk_indices[dim] += 1;
639                if dim + 1 < ndim {
640                    chunk_indices[(dim + 1)..ndim].copy_from_slice(&first_chunk[(dim + 1)..ndim]);
641                }
642                advanced = true;
643                break;
644            }
645        }
646
647        if !advanced {
648            break;
649        }
650    }
651
652    targets
653}
654
655#[allow(clippy::too_many_arguments)]
656fn read_data_block_entry(
657    data: &[u8],
658    address: u64,
659    num_entries: usize,
660    local_idx: usize,
661    is_filtered: bool,
662    max_page_bits: u8,
663    offset_size: u8,
664    entry_size: u8,
665    sizeof_nelmts: usize,
666) -> Result<EaRawEntry> {
667    let mut cursor = Cursor::new(data);
668    cursor.set_position(address);
669
670    let sig = cursor.read_bytes(4)?;
671    if sig != EADB_SIGNATURE {
672        return Err(Error::InvalidExtensibleArraySignature {
673            context: "data block signature mismatch",
674        });
675    }
676
677    let version = cursor.read_u8()?;
678    if version != 0 {
679        return Err(Error::Other(format!(
680            "unsupported extensible array data block version {}",
681            version
682        )));
683    }
684
685    let _client_id = cursor.read_u8()?;
686    let _header_address = cursor.read_offset(offset_size)?;
687    cursor.skip(sizeof_nelmts)?;
688
689    let page_nelmts = ea_page_nelmts(max_page_bits)?;
690
691    if page_nelmts > 0 && num_entries > page_nelmts {
692        let num_pages = num_entries.div_ceil(page_nelmts);
693        let bitmap_bytes = num_pages.div_ceil(8);
694        let page_bitmap = cursor.read_bytes(bitmap_bytes)?.to_vec();
695        let data_start = cursor.position();
696
697        let target_page = local_idx / page_nelmts;
698        let within_page = local_idx % page_nelmts;
699        let byte_idx = target_page / 8;
700        let bit_idx = target_page % 8;
701        let page_initialized =
702            byte_idx < page_bitmap.len() && (page_bitmap[byte_idx] & (1 << bit_idx)) != 0;
703        if !page_initialized {
704            return Ok(EaRawEntry {
705                address: u64::MAX,
706                chunk_size: 0,
707                filter_mask: 0,
708            });
709        }
710
711        let mut page_start = data_start;
712        for page_idx in 0..target_page {
713            let entries_in_page = if page_idx == num_pages - 1 {
714                let remainder = num_entries % page_nelmts;
715                if remainder == 0 {
716                    page_nelmts
717                } else {
718                    remainder
719                }
720            } else {
721                page_nelmts
722            };
723            let page_byte_idx = page_idx / 8;
724            let page_bit_idx = page_idx % 8;
725            let initialized = page_byte_idx < page_bitmap.len()
726                && (page_bitmap[page_byte_idx] & (1 << page_bit_idx)) != 0;
727            if initialized {
728                page_start += (entries_in_page * entry_size as usize + 4) as u64;
729            }
730        }
731
732        let position = page_start + (within_page * entry_size as usize) as u64;
733        return read_entry_at(data, position, is_filtered, offset_size, entry_size);
734    }
735
736    let position = cursor.position() + (local_idx * entry_size as usize) as u64;
737    read_entry_at(data, position, is_filtered, offset_size, entry_size)
738}
739
740#[allow(clippy::too_many_arguments)]
741fn read_data_block_entry_storage(
742    storage: &dyn Storage,
743    address: u64,
744    num_entries: usize,
745    local_idx: usize,
746    is_filtered: bool,
747    max_page_bits: u8,
748    offset_size: u8,
749    entry_size: u8,
750    sizeof_nelmts: usize,
751) -> Result<EaRawEntry> {
752    let header_len = 4 + 1 + 1 + usize::from(offset_size) + sizeof_nelmts;
753    let header = storage.read_range(address, header_len)?;
754    let mut cursor = Cursor::new(header.as_ref());
755
756    let sig = cursor.read_bytes(4)?;
757    if sig != EADB_SIGNATURE {
758        return Err(Error::InvalidExtensibleArraySignature {
759            context: "data block signature mismatch",
760        });
761    }
762
763    let version = cursor.read_u8()?;
764    if version != 0 {
765        return Err(Error::Other(format!(
766            "unsupported extensible array data block version {}",
767            version
768        )));
769    }
770
771    let _client_id = cursor.read_u8()?;
772    let _header_address = cursor.read_offset(offset_size)?;
773    cursor.skip(sizeof_nelmts)?;
774
775    let base = checked_add_u64(
776        address,
777        checked_u64_from_usize(header_len, "EA data block header length")?,
778        "EA data block entry base",
779    )?;
780    let page_nelmts = ea_page_nelmts(max_page_bits)?;
781
782    if page_nelmts > 0 && num_entries > page_nelmts {
783        let num_pages = num_entries.div_ceil(page_nelmts);
784        let bitmap_bytes = num_pages.div_ceil(8);
785        let page_bitmap = storage.read_range(base, bitmap_bytes)?;
786        let data_start = checked_add_u64(
787            base,
788            checked_u64_from_usize(bitmap_bytes, "EA bitmap size")?,
789            "EA data block page data start",
790        )?;
791
792        let target_page = local_idx / page_nelmts;
793        let within_page = local_idx % page_nelmts;
794        let byte_idx = target_page / 8;
795        let bit_idx = target_page % 8;
796        let page_initialized =
797            byte_idx < page_bitmap.len() && (page_bitmap[byte_idx] & (1 << bit_idx)) != 0;
798        if !page_initialized {
799            return Ok(EaRawEntry {
800                address: u64::MAX,
801                chunk_size: 0,
802                filter_mask: 0,
803            });
804        }
805
806        let mut page_start = data_start;
807        for page_idx in 0..target_page {
808            let entries_in_page = if page_idx == num_pages - 1 {
809                let remainder = num_entries % page_nelmts;
810                if remainder == 0 {
811                    page_nelmts
812                } else {
813                    remainder
814                }
815            } else {
816                page_nelmts
817            };
818            let page_byte_idx = page_idx / 8;
819            let page_bit_idx = page_idx % 8;
820            let initialized = page_byte_idx < page_bitmap.len()
821                && (page_bitmap[page_byte_idx] & (1 << page_bit_idx)) != 0;
822            if initialized {
823                let entry_bytes = checked_mul_usize(
824                    entries_in_page,
825                    usize::from(entry_size),
826                    "EA page entry bytes",
827                )?;
828                let page_size = checked_add_usize(entry_bytes, 4, "EA page size")?;
829                page_start = checked_add_u64(
830                    page_start,
831                    checked_u64_from_usize(page_size, "EA page size")?,
832                    "EA page start",
833                )?;
834            }
835        }
836
837        let within_page_offset =
838            checked_mul_usize(within_page, usize::from(entry_size), "EA page entry offset")?;
839        let position = checked_add_u64(
840            page_start,
841            checked_u64_from_usize(within_page_offset, "EA page entry offset")?,
842            "EA page entry position",
843        )?;
844        return read_entry_at_storage(storage, position, is_filtered, offset_size, entry_size);
845    }
846
847    let local_offset = checked_mul_usize(local_idx, usize::from(entry_size), "EA entry offset")?;
848    let position = checked_add_u64(
849        base,
850        checked_u64_from_usize(local_offset, "EA entry offset")?,
851        "EA entry position",
852    )?;
853    read_entry_at_storage(storage, position, is_filtered, offset_size, entry_size)
854}
855
856#[allow(clippy::too_many_arguments)]
857fn read_data_block_storage(
858    storage: &dyn Storage,
859    address: u64,
860    num_entries: usize,
861    is_filtered: bool,
862    max_page_bits: u8,
863    offset_size: u8,
864    entry_size: u8,
865    sizeof_nelmts: usize,
866) -> Result<Vec<EaRawEntry>> {
867    let page_nelmts = ea_page_nelmts(max_page_bits)?;
868    let read_len = if page_nelmts > 0 && num_entries > page_nelmts {
869        let header_len = ea_block_header_len(offset_size, sizeof_nelmts)?;
870        let num_pages = num_entries.div_ceil(page_nelmts);
871        let bitmap_bytes = num_pages.div_ceil(8);
872        let bitmap_end = checked_add_usize(
873            header_len,
874            bitmap_bytes,
875            "extensible array data block bitmap range",
876        )?;
877        let prefix = storage.read_range(address, bitmap_end)?;
878        let page_bitmap = &prefix.as_ref()[header_len..bitmap_end];
879        data_block_len_from_bitmap(
880            num_entries,
881            page_nelmts,
882            entry_size,
883            offset_size,
884            sizeof_nelmts,
885            page_bitmap,
886        )?
887    } else {
888        unpaged_data_block_len(num_entries, offset_size, entry_size, sizeof_nelmts)?
889    };
890
891    let block = storage.read_range(address, read_len)?;
892    parse_data_block(
893        block.as_ref(),
894        0,
895        num_entries,
896        is_filtered,
897        max_page_bits,
898        offset_size,
899        entry_size,
900        sizeof_nelmts,
901    )
902}
903
904#[allow(clippy::too_many_arguments)]
905fn collect_extensible_array_chunk_entries_bounded(
906    data: &[u8],
907    header: &EaHeader,
908    offset_size: u8,
909    dataset_shape: &[u64],
910    chunk_dims: &[u32],
911    chunk_bounds: (&[u64], &[u64]),
912    sb_layout: &[(u64, u64)],
913    sizeof_nelmts: usize,
914) -> Result<Vec<ChunkEntry>> {
915    let is_filtered = header.client_id == 1;
916    let targets = linear_target_offsets(dataset_shape, chunk_dims, Some(chunk_bounds));
917
918    let mut cursor = Cursor::new(data);
919    cursor.set_position(header.index_block_address);
920
921    let sig = cursor.read_bytes(4)?;
922    if sig != EAIB_SIGNATURE {
923        return Err(Error::InvalidExtensibleArraySignature {
924            context: "index block signature mismatch",
925        });
926    }
927
928    let version = cursor.read_u8()?;
929    if version != 0 {
930        return Err(Error::Other(format!(
931            "unsupported extensible array index block version {}",
932            version
933        )));
934    }
935
936    let _client_id = cursor.read_u8()?;
937    let _header_address = cursor.read_offset(offset_size)?;
938
939    let num_inline = header.idx_blk_elmts as usize;
940    let inline_start = cursor.position();
941    cursor.skip(num_inline * header.element_size as usize)?;
942
943    let ndblk_addrs = 2 * header.sec_blk_min_data_ptrs as usize;
944    let mut direct_dblk_addrs = Vec::with_capacity(ndblk_addrs);
945    for _ in 0..ndblk_addrs {
946        direct_dblk_addrs.push(cursor.read_offset(offset_size)?);
947    }
948
949    let nsblks = sb_layout.len();
950    let nsblk_addrs = nsblks.saturating_sub(ndblk_addrs);
951    let mut sec_block_addrs = Vec::with_capacity(nsblk_addrs);
952    for _ in 0..nsblk_addrs {
953        sec_block_addrs.push(cursor.read_offset(offset_size)?);
954    }
955
956    let mut secondary_block_cache: Vec<Option<Vec<u64>>> = vec![None; sec_block_addrs.len()];
957    let mut entries = Vec::new();
958
959    for (linear_idx, offsets) in targets {
960        let raw = if linear_idx < num_inline {
961            read_entry_at(
962                data,
963                inline_start + (linear_idx * header.element_size as usize) as u64,
964                is_filtered,
965                offset_size,
966                header.element_size,
967            )?
968        } else {
969            let mut relative_idx = (linear_idx - num_inline) as u64;
970            let mut sb_idx = None;
971            for (candidate_idx, (elmts_per_dblk, num_dblks)) in sb_layout.iter().enumerate() {
972                let capacity = elmts_per_dblk * num_dblks;
973                if relative_idx < capacity {
974                    sb_idx = Some(candidate_idx);
975                    break;
976                }
977                relative_idx -= capacity;
978            }
979
980            let Some(sb_idx) = sb_idx else {
981                continue;
982            };
983            let (elmts_per_dblk, _) = sb_layout[sb_idx];
984            let dblk_idx = (relative_idx / elmts_per_dblk) as usize;
985            let local_idx = (relative_idx % elmts_per_dblk) as usize;
986
987            let dblk_addr = if sb_idx < 2 {
988                let base = sb_layout[..sb_idx]
989                    .iter()
990                    .map(|(_, num_dblks)| *num_dblks as usize)
991                    .sum::<usize>();
992                *direct_dblk_addrs.get(base + dblk_idx).unwrap_or(&u64::MAX)
993            } else {
994                let sec_cache_idx = sb_idx - 2;
995                if secondary_block_cache[sec_cache_idx].is_none() {
996                    let sec_addr = sec_block_addrs
997                        .get(sec_cache_idx)
998                        .copied()
999                        .unwrap_or(u64::MAX);
1000                    if Cursor::is_undefined_offset(sec_addr, offset_size) {
1001                        secondary_block_cache[sec_cache_idx] = Some(Vec::new());
1002                    } else {
1003                        let (_, num_dblks) = sb_layout[sb_idx];
1004                        let page_bitmap_bytes = secondary_page_bitmap_bytes(
1005                            header.max_dblk_page_nelmts_bits,
1006                            elmts_per_dblk,
1007                            num_dblks,
1008                        )?;
1009                        secondary_block_cache[sec_cache_idx] = Some(parse_secondary_block(
1010                            data,
1011                            sec_addr,
1012                            checked_usize_from_u64(
1013                                num_dblks,
1014                                "extensible array secondary block data block count",
1015                            )?,
1016                            offset_size,
1017                            sizeof_nelmts,
1018                            page_bitmap_bytes,
1019                        )?);
1020                    }
1021                }
1022
1023                secondary_block_cache[sec_cache_idx]
1024                    .as_ref()
1025                    .and_then(|addrs| addrs.get(dblk_idx))
1026                    .copied()
1027                    .unwrap_or(u64::MAX)
1028            };
1029
1030            if Cursor::is_undefined_offset(dblk_addr, offset_size) {
1031                continue;
1032            }
1033
1034            read_data_block_entry(
1035                data,
1036                dblk_addr,
1037                elmts_per_dblk as usize,
1038                local_idx,
1039                is_filtered,
1040                header.max_dblk_page_nelmts_bits,
1041                offset_size,
1042                header.element_size,
1043                sizeof_nelmts,
1044            )?
1045        };
1046
1047        if Cursor::is_undefined_offset(raw.address, offset_size) {
1048            continue;
1049        }
1050
1051        entries.push(ChunkEntry {
1052            address: raw.address,
1053            size: raw.chunk_size,
1054            filter_mask: raw.filter_mask,
1055            offsets,
1056        });
1057    }
1058
1059    Ok(entries)
1060}
1061
1062/// Collect chunk entries from an Extensible Array index.
1063///
1064/// Walks the EAHD → EAIB → (EADB / EASB → EADB) hierarchy and converts
1065/// linear entry indices to multi-dimensional chunk offsets.
1066pub fn collect_extensible_array_chunk_entries(
1067    data: &[u8],
1068    header_address: u64,
1069    offset_size: u8,
1070    length_size: u8,
1071    dataset_shape: &[u64],
1072    chunk_dims: &[u32],
1073    chunk_bounds: Option<(&[u64], &[u64])>,
1074) -> Result<Vec<ChunkEntry>> {
1075    let header = parse_header(data, header_address, offset_size, length_size)?;
1076
1077    if Cursor::is_undefined_offset(header.index_block_address, offset_size) {
1078        return Ok(Vec::new());
1079    }
1080
1081    let is_filtered = header.client_id == 1;
1082    let sb_layout = compute_super_block_layout(&header);
1083    let sizeof_nelmts = (header._max_nelmts_bits as usize).div_ceil(8);
1084
1085    if let Some(bounds) = chunk_bounds {
1086        return collect_extensible_array_chunk_entries_bounded(
1087            data,
1088            &header,
1089            offset_size,
1090            dataset_shape,
1091            chunk_dims,
1092            bounds,
1093            &sb_layout,
1094            sizeof_nelmts,
1095        );
1096    }
1097
1098    // Parse the index block.
1099    let mut cursor = Cursor::new(data);
1100    cursor.set_position(header.index_block_address);
1101
1102    let sig = cursor.read_bytes(4)?;
1103    if sig != EAIB_SIGNATURE {
1104        return Err(Error::InvalidExtensibleArraySignature {
1105            context: "index block signature mismatch",
1106        });
1107    }
1108
1109    let version = cursor.read_u8()?;
1110    if version != 0 {
1111        return Err(Error::Other(format!(
1112            "unsupported extensible array index block version {}",
1113            version
1114        )));
1115    }
1116
1117    let _client_id = cursor.read_u8()?;
1118    let _header_address = cursor.read_offset(offset_size)?;
1119
1120    // 1. Inline elements (idx_blk_elmts entries stored directly).
1121    let num_inline = header.idx_blk_elmts as usize;
1122    let inline_entries = read_entries(
1123        &mut cursor,
1124        num_inline,
1125        is_filtered,
1126        offset_size,
1127        header.element_size,
1128    )?;
1129
1130    // 2. Data block addresses stored directly in the index block.
1131    // The number is 2 * sec_blk_min_data_ptrs (from HDF5: EA_IBLOCK_NDBLK_ADDRS).
1132    let ndblk_addrs = 2 * header.sec_blk_min_data_ptrs as usize;
1133    let mut direct_dblk_addrs = Vec::with_capacity(ndblk_addrs);
1134    for _ in 0..ndblk_addrs {
1135        direct_dblk_addrs.push(cursor.read_offset(offset_size)?);
1136    }
1137
1138    // 3. Secondary block addresses for super blocks 2+.
1139    // nsblk_addrs = max(0, nsblks - ndblk_addrs) where nsblks is the total
1140    // number of super blocks needed to cover nelmts.
1141    // compute_super_block_layout already stops once capacity >= nelmts,
1142    // so sb_layout.len() is the total number of super blocks needed.
1143    let nsblks = sb_layout.len();
1144
1145    let nsblk_addrs = nsblks.saturating_sub(ndblk_addrs);
1146    let mut sec_block_addrs = Vec::with_capacity(nsblk_addrs);
1147    for _ in 0..nsblk_addrs {
1148        sec_block_addrs.push(cursor.read_offset(offset_size)?);
1149    }
1150
1151    // Skip checksum at end of index block
1152    let _checksum = cursor.read_u32_le()?;
1153
1154    // Now collect all entries.
1155    let mut all_entries: Vec<EaRawEntry> = Vec::new();
1156
1157    // Inline entries
1158    all_entries.extend(inline_entries);
1159
1160    // Data blocks from direct addresses (super blocks 0-1)
1161    let mut dblk_addr_idx = 0;
1162    for sb_idx_iter in 0..2usize.min(nsblks) {
1163        if sb_idx_iter >= sb_layout.len() {
1164            break;
1165        }
1166        let (elmts_per_dblk, num_dblks) = sb_layout[sb_idx_iter];
1167        for _ in 0..num_dblks {
1168            if dblk_addr_idx >= direct_dblk_addrs.len() {
1169                break;
1170            }
1171            let dblk_addr = direct_dblk_addrs[dblk_addr_idx];
1172            dblk_addr_idx += 1;
1173
1174            if Cursor::is_undefined_offset(dblk_addr, offset_size) {
1175                for _ in 0..elmts_per_dblk {
1176                    all_entries.push(EaRawEntry {
1177                        address: u64::MAX,
1178                        chunk_size: 0,
1179                        filter_mask: 0,
1180                    });
1181                }
1182            } else {
1183                let dblk_entries = parse_data_block(
1184                    data,
1185                    dblk_addr,
1186                    elmts_per_dblk as usize,
1187                    is_filtered,
1188                    header.max_dblk_page_nelmts_bits,
1189                    offset_size,
1190                    header.element_size,
1191                    sizeof_nelmts,
1192                )?;
1193                all_entries.extend(dblk_entries);
1194            }
1195        }
1196    }
1197
1198    // Data blocks from super blocks 2+ (via secondary blocks)
1199    for (sec_idx, &sec_addr) in sec_block_addrs.iter().enumerate() {
1200        let sb_idx_iter = sec_idx + 2;
1201        if sb_idx_iter >= sb_layout.len() {
1202            break;
1203        }
1204        let (elmts_per_dblk, num_dblks) = sb_layout[sb_idx_iter];
1205
1206        if Cursor::is_undefined_offset(sec_addr, offset_size) {
1207            for _ in 0..(elmts_per_dblk * num_dblks) {
1208                all_entries.push(EaRawEntry {
1209                    address: u64::MAX,
1210                    chunk_size: 0,
1211                    filter_mask: 0,
1212                });
1213            }
1214            continue;
1215        }
1216
1217        // Per HDF5 spec III.H "Extensible Array Secondary Block", the secondary
1218        // block contains a page initialization bitmap when data blocks are paged.
1219        // Bitmap size = ceil(num_dblks * pages_per_dblk / 8).
1220        let page_bitmap_bytes = secondary_page_bitmap_bytes(
1221            header.max_dblk_page_nelmts_bits,
1222            elmts_per_dblk,
1223            num_dblks,
1224        )?;
1225        let dblk_addrs = parse_secondary_block(
1226            data,
1227            sec_addr,
1228            checked_usize_from_u64(
1229                num_dblks,
1230                "extensible array secondary block data block count",
1231            )?,
1232            offset_size,
1233            sizeof_nelmts,
1234            page_bitmap_bytes,
1235        )?;
1236
1237        for &dblk_addr in &dblk_addrs {
1238            if Cursor::is_undefined_offset(dblk_addr, offset_size) {
1239                for _ in 0..elmts_per_dblk {
1240                    all_entries.push(EaRawEntry {
1241                        address: u64::MAX,
1242                        chunk_size: 0,
1243                        filter_mask: 0,
1244                    });
1245                }
1246            } else {
1247                let dblk_entries = parse_data_block(
1248                    data,
1249                    dblk_addr,
1250                    elmts_per_dblk as usize,
1251                    is_filtered,
1252                    header.max_dblk_page_nelmts_bits,
1253                    offset_size,
1254                    header.element_size,
1255                    sizeof_nelmts,
1256                )?;
1257                all_entries.extend(dblk_entries);
1258            }
1259        }
1260    }
1261
1262    // Convert linear indices to chunk offsets.
1263    let ndim = dataset_shape.len();
1264    let chunks_per_dim: Vec<u64> = (0..ndim)
1265        .map(|i| dataset_shape[i].div_ceil(chunk_dims[i] as u64))
1266        .collect();
1267
1268    let mut entries = Vec::new();
1269    for (linear_idx, raw) in all_entries.iter().enumerate() {
1270        if Cursor::is_undefined_offset(raw.address, offset_size) {
1271            continue;
1272        }
1273
1274        let mut remaining = linear_idx as u64;
1275        let mut offsets = vec![0u64; ndim];
1276        for d in (0..ndim).rev() {
1277            offsets[d] = (remaining % chunks_per_dim[d]) * chunk_dims[d] as u64;
1278            remaining /= chunks_per_dim[d];
1279        }
1280
1281        if let Some((first_chunk, last_chunk)) = chunk_bounds {
1282            let overlaps = offsets.iter().enumerate().all(|(dim, offset)| {
1283                let chunk_index = *offset / u64::from(chunk_dims[dim]);
1284                chunk_index >= first_chunk[dim] && chunk_index <= last_chunk[dim]
1285            });
1286            if !overlaps {
1287                continue;
1288            }
1289        }
1290
1291        entries.push(ChunkEntry {
1292            address: raw.address,
1293            size: raw.chunk_size,
1294            filter_mask: raw.filter_mask,
1295            offsets,
1296        });
1297    }
1298
1299    Ok(entries)
1300}
1301
1302/// Collect chunk entries from an Extensible Array index using random-access storage.
1303pub fn collect_extensible_array_chunk_entries_storage(
1304    storage: &dyn Storage,
1305    header_address: u64,
1306    offset_size: u8,
1307    length_size: u8,
1308    dataset_shape: &[u64],
1309    chunk_dims: &[u32],
1310    chunk_bounds: Option<(&[u64], &[u64])>,
1311) -> Result<Vec<ChunkEntry>> {
1312    let header = parse_header_storage(storage, header_address, offset_size, length_size)?;
1313
1314    if Cursor::is_undefined_offset(header.index_block_address, offset_size) {
1315        return Ok(Vec::new());
1316    }
1317
1318    let is_filtered = header.client_id == 1;
1319    let sb_layout = compute_super_block_layout(&header);
1320    let sizeof_nelmts = (header._max_nelmts_bits as usize).div_ceil(8);
1321
1322    if let Some(bounds) = chunk_bounds {
1323        let targets = linear_target_offsets(dataset_shape, chunk_dims, Some(bounds));
1324        let index_block = storage.read_range(
1325            header.index_block_address,
1326            index_block_len(&header, &sb_layout, offset_size)?,
1327        )?;
1328        let mut cursor = Cursor::new(index_block.as_ref());
1329        let sig = cursor.read_bytes(4)?;
1330        if sig != EAIB_SIGNATURE {
1331            return Err(Error::InvalidExtensibleArraySignature {
1332                context: "index block signature mismatch",
1333            });
1334        }
1335        let version = cursor.read_u8()?;
1336        if version != 0 {
1337            return Err(Error::Other(format!(
1338                "unsupported extensible array index block version {}",
1339                version
1340            )));
1341        }
1342        let _client_id = cursor.read_u8()?;
1343        let _header_address = cursor.read_offset(offset_size)?;
1344        let num_inline = header.idx_blk_elmts as usize;
1345        let inline_start = cursor.position();
1346        cursor.skip(num_inline * header.element_size as usize)?;
1347
1348        let ndblk_addrs = 2 * header.sec_blk_min_data_ptrs as usize;
1349        let mut direct_dblk_addrs = Vec::with_capacity(ndblk_addrs);
1350        for _ in 0..ndblk_addrs {
1351            direct_dblk_addrs.push(cursor.read_offset(offset_size)?);
1352        }
1353
1354        let nsblks = sb_layout.len();
1355        let nsblk_addrs = nsblks.saturating_sub(ndblk_addrs);
1356        let mut sec_block_addrs = Vec::with_capacity(nsblk_addrs);
1357        for _ in 0..nsblk_addrs {
1358            sec_block_addrs.push(cursor.read_offset(offset_size)?);
1359        }
1360
1361        let mut secondary_block_cache: Vec<Option<Vec<u64>>> = vec![None; sec_block_addrs.len()];
1362        let mut entries = Vec::new();
1363
1364        for (linear_idx, offsets) in targets {
1365            let raw = if linear_idx < num_inline {
1366                let inline_offset = inline_start
1367                    + u64::try_from(linear_idx * usize::from(header.element_size)).map_err(
1368                        |_| {
1369                            Error::InvalidData("EA inline entry offset exceeds u64 capacity".into())
1370                        },
1371                    )?;
1372                let position = header.index_block_address + inline_offset;
1373                read_entry_at_storage(
1374                    storage,
1375                    position,
1376                    is_filtered,
1377                    offset_size,
1378                    header.element_size,
1379                )?
1380            } else {
1381                let mut relative_idx = (linear_idx - num_inline) as u64;
1382                let mut sb_idx = None;
1383                for (candidate_idx, (elmts_per_dblk, num_dblks)) in sb_layout.iter().enumerate() {
1384                    let capacity = elmts_per_dblk * num_dblks;
1385                    if relative_idx < capacity {
1386                        sb_idx = Some(candidate_idx);
1387                        break;
1388                    }
1389                    relative_idx -= capacity;
1390                }
1391
1392                let Some(sb_idx) = sb_idx else {
1393                    continue;
1394                };
1395                let (elmts_per_dblk, _) = sb_layout[sb_idx];
1396                let dblk_idx = (relative_idx / elmts_per_dblk) as usize;
1397                let local_idx = (relative_idx % elmts_per_dblk) as usize;
1398
1399                let dblk_addr = if sb_idx < 2 {
1400                    let base = sb_layout[..sb_idx]
1401                        .iter()
1402                        .map(|(_, num_dblks)| *num_dblks as usize)
1403                        .sum::<usize>();
1404                    *direct_dblk_addrs.get(base + dblk_idx).unwrap_or(&u64::MAX)
1405                } else {
1406                    let sec_cache_idx = sb_idx - 2;
1407                    if secondary_block_cache[sec_cache_idx].is_none() {
1408                        let sec_addr = sec_block_addrs
1409                            .get(sec_cache_idx)
1410                            .copied()
1411                            .unwrap_or(u64::MAX);
1412                        if Cursor::is_undefined_offset(sec_addr, offset_size) {
1413                            secondary_block_cache[sec_cache_idx] = Some(Vec::new());
1414                        } else {
1415                            let (_, num_dblks) = sb_layout[sb_idx];
1416                            let page_bitmap_bytes = secondary_page_bitmap_bytes(
1417                                header.max_dblk_page_nelmts_bits,
1418                                elmts_per_dblk,
1419                                num_dblks,
1420                            )?;
1421                            secondary_block_cache[sec_cache_idx] =
1422                                Some(parse_secondary_block_storage(
1423                                    storage,
1424                                    sec_addr,
1425                                    checked_usize_from_u64(
1426                                        num_dblks,
1427                                        "extensible array secondary block data block count",
1428                                    )?,
1429                                    offset_size,
1430                                    sizeof_nelmts,
1431                                    page_bitmap_bytes,
1432                                )?);
1433                        }
1434                    }
1435
1436                    secondary_block_cache[sec_cache_idx]
1437                        .as_ref()
1438                        .and_then(|addrs| addrs.get(dblk_idx))
1439                        .copied()
1440                        .unwrap_or(u64::MAX)
1441                };
1442
1443                if Cursor::is_undefined_offset(dblk_addr, offset_size) {
1444                    continue;
1445                }
1446
1447                read_data_block_entry_storage(
1448                    storage,
1449                    dblk_addr,
1450                    checked_usize_from_u64(
1451                        elmts_per_dblk,
1452                        "extensible array data block element count",
1453                    )?,
1454                    local_idx,
1455                    is_filtered,
1456                    header.max_dblk_page_nelmts_bits,
1457                    offset_size,
1458                    header.element_size,
1459                    sizeof_nelmts,
1460                )?
1461            };
1462
1463            if Cursor::is_undefined_offset(raw.address, offset_size) {
1464                continue;
1465            }
1466
1467            entries.push(ChunkEntry {
1468                address: raw.address,
1469                size: raw.chunk_size,
1470                filter_mask: raw.filter_mask,
1471                offsets,
1472            });
1473        }
1474
1475        return Ok(entries);
1476    }
1477
1478    let index_block_len = index_block_len(&header, &sb_layout, offset_size)?;
1479    let data = storage.read_range(header.index_block_address, index_block_len)?;
1480    let mut cursor = Cursor::new(data.as_ref());
1481    cursor.set_position(0);
1482
1483    let sig = cursor.read_bytes(4)?;
1484    if sig != EAIB_SIGNATURE {
1485        return Err(Error::InvalidExtensibleArraySignature {
1486            context: "index block signature mismatch",
1487        });
1488    }
1489
1490    let version = cursor.read_u8()?;
1491    if version != 0 {
1492        return Err(Error::Other(format!(
1493            "unsupported extensible array index block version {}",
1494            version
1495        )));
1496    }
1497
1498    let _client_id = cursor.read_u8()?;
1499    let _header_address = cursor.read_offset(offset_size)?;
1500
1501    let num_inline = header.idx_blk_elmts as usize;
1502    let inline_entries = read_entries(
1503        &mut cursor,
1504        num_inline,
1505        is_filtered,
1506        offset_size,
1507        header.element_size,
1508    )?;
1509
1510    let ndblk_addrs = 2 * header.sec_blk_min_data_ptrs as usize;
1511    let mut direct_dblk_addrs = Vec::with_capacity(ndblk_addrs);
1512    for _ in 0..ndblk_addrs {
1513        direct_dblk_addrs.push(cursor.read_offset(offset_size)?);
1514    }
1515
1516    let nsblks = sb_layout.len();
1517    let nsblk_addrs = nsblks.saturating_sub(ndblk_addrs);
1518    let mut sec_block_addrs = Vec::with_capacity(nsblk_addrs);
1519    for _ in 0..nsblk_addrs {
1520        sec_block_addrs.push(cursor.read_offset(offset_size)?);
1521    }
1522    let _checksum = cursor.read_u32_le()?;
1523
1524    let mut all_entries: Vec<EaRawEntry> = Vec::new();
1525    all_entries.extend(inline_entries);
1526
1527    let mut dblk_addr_idx = 0;
1528    for sb_idx_iter in 0..2usize.min(nsblks) {
1529        if sb_idx_iter >= sb_layout.len() {
1530            break;
1531        }
1532        let (elmts_per_dblk, num_dblks) = sb_layout[sb_idx_iter];
1533        for _ in 0..num_dblks {
1534            if dblk_addr_idx >= direct_dblk_addrs.len() {
1535                break;
1536            }
1537            let dblk_addr = direct_dblk_addrs[dblk_addr_idx];
1538            dblk_addr_idx += 1;
1539
1540            if Cursor::is_undefined_offset(dblk_addr, offset_size) {
1541                for _ in 0..elmts_per_dblk {
1542                    all_entries.push(EaRawEntry {
1543                        address: u64::MAX,
1544                        chunk_size: 0,
1545                        filter_mask: 0,
1546                    });
1547                }
1548            } else {
1549                let dblk_entries = read_data_block_storage(
1550                    storage,
1551                    dblk_addr,
1552                    checked_usize_from_u64(
1553                        elmts_per_dblk,
1554                        "extensible array data block element count",
1555                    )?,
1556                    is_filtered,
1557                    header.max_dblk_page_nelmts_bits,
1558                    offset_size,
1559                    header.element_size,
1560                    sizeof_nelmts,
1561                )?;
1562                all_entries.extend(dblk_entries);
1563            }
1564        }
1565    }
1566
1567    for (sb_idx_iter, &(elmts_per_dblk, num_dblks)) in sb_layout.iter().enumerate().skip(2) {
1568        let sec_idx = sb_idx_iter - 2;
1569        let sec_addr = *sec_block_addrs.get(sec_idx).unwrap_or(&u64::MAX);
1570        if Cursor::is_undefined_offset(sec_addr, offset_size) {
1571            for _ in 0..(elmts_per_dblk * num_dblks) {
1572                all_entries.push(EaRawEntry {
1573                    address: u64::MAX,
1574                    chunk_size: 0,
1575                    filter_mask: 0,
1576                });
1577            }
1578            continue;
1579        }
1580
1581        let page_bitmap_bytes = secondary_page_bitmap_bytes(
1582            header.max_dblk_page_nelmts_bits,
1583            elmts_per_dblk,
1584            num_dblks,
1585        )?;
1586        let dblk_addrs = parse_secondary_block_storage(
1587            storage,
1588            sec_addr,
1589            checked_usize_from_u64(
1590                num_dblks,
1591                "extensible array secondary block data block count",
1592            )?,
1593            offset_size,
1594            sizeof_nelmts,
1595            page_bitmap_bytes,
1596        )?;
1597
1598        for dblk_addr in dblk_addrs {
1599            if Cursor::is_undefined_offset(dblk_addr, offset_size) {
1600                for _ in 0..elmts_per_dblk {
1601                    all_entries.push(EaRawEntry {
1602                        address: u64::MAX,
1603                        chunk_size: 0,
1604                        filter_mask: 0,
1605                    });
1606                }
1607            } else {
1608                let dblk_entries = read_data_block_storage(
1609                    storage,
1610                    dblk_addr,
1611                    checked_usize_from_u64(
1612                        elmts_per_dblk,
1613                        "extensible array data block element count",
1614                    )?,
1615                    is_filtered,
1616                    header.max_dblk_page_nelmts_bits,
1617                    offset_size,
1618                    header.element_size,
1619                    sizeof_nelmts,
1620                )?;
1621                all_entries.extend(dblk_entries);
1622            }
1623        }
1624    }
1625
1626    let ndim = dataset_shape.len();
1627    let chunks_per_dim: Vec<u64> = (0..ndim)
1628        .map(|i| dataset_shape[i].div_ceil(chunk_dims[i] as u64))
1629        .collect();
1630
1631    let mut entries = Vec::new();
1632    for (linear_idx, raw) in all_entries.iter().enumerate() {
1633        if Cursor::is_undefined_offset(raw.address, offset_size) {
1634            continue;
1635        }
1636
1637        let mut remaining = linear_idx as u64;
1638        let mut offsets = vec![0u64; ndim];
1639        for d in (0..ndim).rev() {
1640            offsets[d] = (remaining % chunks_per_dim[d]) * chunk_dims[d] as u64;
1641            remaining /= chunks_per_dim[d];
1642        }
1643
1644        if let Some((first_chunk, last_chunk)) = chunk_bounds {
1645            let overlaps = offsets.iter().enumerate().all(|(dim, offset)| {
1646                let chunk_index = *offset / u64::from(chunk_dims[dim]);
1647                chunk_index >= first_chunk[dim] && chunk_index <= last_chunk[dim]
1648            });
1649            if !overlaps {
1650                continue;
1651            }
1652        }
1653
1654        entries.push(ChunkEntry {
1655            address: raw.address,
1656            size: raw.chunk_size,
1657            filter_mask: raw.filter_mask,
1658            offsets,
1659        });
1660    }
1661
1662    Ok(entries)
1663}
1664
1665#[cfg(test)]
1666mod tests {
1667    use super::*;
1668    use crate::storage::{Storage, StorageBuffer};
1669    use std::sync::Mutex;
1670
1671    const TEST_OFFSET_SIZE: u8 = 8;
1672    const TEST_LENGTH_SIZE: u8 = 8;
1673    const TEST_HEADER_ADDR: u64 = 0;
1674    const TEST_INDEX_ADDR: u64 = 128;
1675    const TEST_SECONDARY_ADDR: u64 = 512;
1676    const TEST_DATA_BLOCK_ADDR: u64 = 768;
1677
1678    struct RecordingStorage {
1679        data: Vec<u8>,
1680        ranges: Mutex<Vec<(u64, usize)>>,
1681    }
1682
1683    impl RecordingStorage {
1684        fn new(data: Vec<u8>) -> Self {
1685            Self {
1686                data,
1687                ranges: Mutex::new(Vec::new()),
1688            }
1689        }
1690
1691        fn ranges(&self) -> Vec<(u64, usize)> {
1692            self.ranges.lock().unwrap().clone()
1693        }
1694
1695        fn clear_ranges(&self) {
1696            self.ranges.lock().unwrap().clear();
1697        }
1698    }
1699
1700    impl Storage for RecordingStorage {
1701        fn len(&self) -> u64 {
1702            self.data.len() as u64
1703        }
1704
1705        fn read_range(&self, offset: u64, len: usize) -> Result<StorageBuffer> {
1706            let start = usize::try_from(offset).map_err(|_| Error::OffsetOutOfBounds(offset))?;
1707            let end = start.checked_add(len).ok_or(Error::UnexpectedEof {
1708                offset,
1709                needed: len as u64,
1710                available: self.data.len().saturating_sub(start) as u64,
1711            })?;
1712            if end > self.data.len() {
1713                return Err(Error::UnexpectedEof {
1714                    offset,
1715                    needed: len as u64,
1716                    available: self.data.len().saturating_sub(start) as u64,
1717                });
1718            }
1719            self.ranges.lock().unwrap().push((offset, len));
1720            Ok(StorageBuffer::from_vec(self.data[start..end].to_vec()))
1721        }
1722    }
1723
1724    fn put(data: &mut [u8], offset: u64, bytes: &[u8]) {
1725        let start = offset as usize;
1726        data[start..start + bytes.len()].copy_from_slice(bytes);
1727    }
1728
1729    fn push_u64(buf: &mut Vec<u8>, value: u64) {
1730        buf.extend_from_slice(&value.to_le_bytes());
1731    }
1732
1733    fn push_offset(buf: &mut Vec<u8>, value: u64) {
1734        push_u64(buf, value);
1735    }
1736
1737    fn ea_header_bytes(nelmts: u64) -> Vec<u8> {
1738        let mut buf = Vec::new();
1739        buf.extend_from_slice(&EAHD_SIGNATURE);
1740        buf.push(0); // version
1741        buf.push(0); // unfiltered raw chunk client
1742        buf.push(8); // element size: address only
1743        buf.push(8); // max_nelmts_bits, so sizeof_nelmts is 1
1744        buf.push(0); // inline elements
1745        buf.push(1); // minimum data block elements
1746        buf.push(1); // minimum secondary block data pointers
1747        buf.push(0); // unpaged data blocks
1748        for value in [0, 0, 0, 0, 0, nelmts] {
1749            push_u64(&mut buf, value);
1750        }
1751        push_offset(&mut buf, TEST_INDEX_ADDR);
1752        let checksum = jenkins_lookup3(&buf);
1753        buf.extend_from_slice(&checksum.to_le_bytes());
1754        buf
1755    }
1756
1757    fn index_block_bytes() -> Vec<u8> {
1758        let mut buf = Vec::new();
1759        buf.extend_from_slice(&EAIB_SIGNATURE);
1760        buf.push(0); // version
1761        buf.push(0); // client id
1762        push_offset(&mut buf, TEST_HEADER_ADDR);
1763        push_offset(&mut buf, u64::MAX); // first direct data block
1764        push_offset(&mut buf, u64::MAX); // second direct data block
1765        push_offset(&mut buf, TEST_SECONDARY_ADDR);
1766        buf.extend_from_slice(&0u32.to_le_bytes());
1767        buf
1768    }
1769
1770    fn secondary_block_bytes() -> Vec<u8> {
1771        let mut buf = Vec::new();
1772        buf.extend_from_slice(&EASB_SIGNATURE);
1773        buf.push(0); // version
1774        buf.push(0); // client id
1775        push_offset(&mut buf, TEST_HEADER_ADDR);
1776        buf.push(0); // block offset, sizeof_nelmts = 1
1777        push_offset(&mut buf, TEST_DATA_BLOCK_ADDR);
1778        push_offset(&mut buf, u64::MAX);
1779        buf.extend_from_slice(&0u32.to_le_bytes());
1780        buf
1781    }
1782
1783    fn data_block_bytes() -> Vec<u8> {
1784        let mut buf = Vec::new();
1785        buf.extend_from_slice(&EADB_SIGNATURE);
1786        buf.push(0); // version
1787        buf.push(0); // client id
1788        push_offset(&mut buf, TEST_HEADER_ADDR);
1789        buf.push(0); // block offset, sizeof_nelmts = 1
1790        push_offset(&mut buf, 0x2000);
1791        push_offset(&mut buf, 0x2010);
1792        buf.extend_from_slice(&0u32.to_le_bytes());
1793        buf
1794    }
1795
1796    fn storage_fixture() -> RecordingStorage {
1797        let mut data = vec![0u8; 4096];
1798        put(&mut data, TEST_HEADER_ADDR, &ea_header_bytes(7));
1799        put(&mut data, TEST_INDEX_ADDR, &index_block_bytes());
1800        put(&mut data, TEST_SECONDARY_ADDR, &secondary_block_bytes());
1801        put(&mut data, TEST_DATA_BLOCK_ADDR, &data_block_bytes());
1802        RecordingStorage::new(data)
1803    }
1804
1805    #[test]
1806    fn eahd_bad_signature() {
1807        let mut data = vec![0u8; 64];
1808        data[0..4].copy_from_slice(b"XXXX");
1809        let err = parse_header(&data, 0, 8, 8).unwrap_err();
1810        assert!(matches!(err, Error::InvalidExtensibleArraySignature { .. }));
1811    }
1812
1813    #[test]
1814    fn super_block_layout_matches_spec_rows() {
1815        let header = EaHeader {
1816            client_id: 0,
1817            element_size: 8,
1818            _max_nelmts_bits: 32,
1819            idx_blk_elmts: 2,
1820            data_blk_min_elmts: 2,
1821            sec_blk_min_data_ptrs: 2,
1822            max_dblk_page_nelmts_bits: 0,
1823            _nelmts: 100,
1824            index_block_address: 0,
1825        };
1826        let layout = compute_super_block_layout(&header);
1827        // sb 0: elmts_per_dblk = 2 * 2^0 = 2, num_dblks = 2 * 2^0 = 2  (cap = 4 elements)
1828        assert_eq!(layout[0], (2, 2));
1829        // sb 1: elmts_per_dblk = 2 * 2^0 = 2, num_dblks = 2 * 2^1 = 4  (cap = 8 elements)
1830        assert_eq!(layout[1], (2, 4));
1831        // sb 2: elmts_per_dblk = 2 * 2^1 = 4, num_dblks = 2 * 2^1 = 4  (cap = 16 elements)
1832        assert_eq!(layout[2], (4, 4));
1833        // sb 3: elmts_per_dblk = 2 * 2^1 = 4, num_dblks = 2 * 2^2 = 8  (cap = 32 elements)
1834        assert_eq!(layout[3], (4, 8));
1835    }
1836
1837    #[test]
1838    fn storage_full_scan_reads_exact_secondary_and_data_block_lengths() {
1839        let storage = storage_fixture();
1840
1841        let entries = collect_extensible_array_chunk_entries_storage(
1842            &storage,
1843            TEST_HEADER_ADDR,
1844            TEST_OFFSET_SIZE,
1845            TEST_LENGTH_SIZE,
1846            &[7],
1847            &[1],
1848            None,
1849        )
1850        .unwrap();
1851
1852        assert!(!entries.is_empty());
1853        let ranges = storage.ranges();
1854        assert!(ranges.contains(&(TEST_HEADER_ADDR, 72)));
1855        assert!(ranges.contains(&(TEST_INDEX_ADDR, 42)));
1856        assert!(ranges.contains(&(TEST_SECONDARY_ADDR, 35)));
1857        assert!(ranges.contains(&(TEST_DATA_BLOCK_ADDR, 35)));
1858        assert!(ranges.iter().all(|(_, len)| *len <= 72), "{ranges:?}");
1859    }
1860
1861    #[test]
1862    fn storage_bounded_scan_reads_exact_index_block_length() {
1863        let storage = storage_fixture();
1864        storage.clear_ranges();
1865
1866        let entries = collect_extensible_array_chunk_entries_storage(
1867            &storage,
1868            TEST_HEADER_ADDR,
1869            TEST_OFFSET_SIZE,
1870            TEST_LENGTH_SIZE,
1871            &[7],
1872            &[1],
1873            Some((&[3], &[3])),
1874        )
1875        .unwrap();
1876
1877        assert_eq!(entries.len(), 1);
1878        assert_eq!(entries[0].address, 0x2000);
1879        let ranges = storage.ranges();
1880        assert!(ranges.contains(&(TEST_HEADER_ADDR, 72)));
1881        assert!(ranges.contains(&(TEST_INDEX_ADDR, 42)));
1882        assert!(ranges.contains(&(TEST_SECONDARY_ADDR, 35)));
1883        assert!(ranges.iter().all(|(_, len)| *len <= 72), "{ranges:?}");
1884    }
1885}