Skip to main content

hdf5_reader/
dataset.rs

1use std::mem::MaybeUninit;
2use std::num::NonZeroUsize;
3use std::sync::{Arc, OnceLock};
4
5use lru::LruCache;
6use ndarray::{ArrayD, IxDyn};
7use parking_lot::Mutex;
8#[cfg(feature = "rayon")]
9use rayon::prelude::*;
10use smallvec::SmallVec;
11
12use crate::attribute_api::{
13    collect_attribute_messages_storage, decode_string, read_one_vlen_string_storage,
14    resolve_vlen_bytes_storage, Attribute,
15};
16use crate::cache::{ChunkCache, ChunkCacheStats, ChunkKey};
17use crate::chunk_index;
18use crate::datatype_api::{dtype_element_size, H5Type};
19use crate::error::{ByteOrder, Error, Result};
20use crate::filters::{self, FilterRegistry};
21use crate::io::Cursor;
22use crate::local_heap::LocalHeap;
23use crate::messages::attribute::AttributeMessage;
24use crate::messages::dataspace::{DataspaceMessage, DataspaceType};
25use crate::messages::datatype::{Datatype, StringSize, VarLenKind};
26use crate::messages::external_files::ExternalFilesMessage;
27use crate::messages::fill_value::{FillTime, FillValueMessage};
28use crate::messages::filter_pipeline::FilterPipelineMessage;
29use crate::messages::layout::{ChunkIndexing, DataLayout};
30use crate::messages::HdfMessage;
31use crate::object_header::ObjectHeader;
32use crate::storage::DynStorage;
33use crate::FileContext;
34
35const HOT_FULL_DATASET_CACHE_MAX_BYTES: usize = 32 * 1024 * 1024;
36
37#[derive(Clone, Copy)]
38struct FlatBufferPtr {
39    ptr: *mut u8,
40    len: usize,
41}
42
43#[derive(Clone, Copy)]
44struct ChunkCopyLayout<'a> {
45    chunk_offsets: &'a [u64],
46    chunk_shape: &'a [u64],
47    dataset_shape: &'a [u64],
48    dataset_strides: &'a [usize],
49    chunk_strides: &'a [usize],
50    elem_size: usize,
51}
52
53#[derive(Clone, Copy)]
54struct UnitStrideCopyLayout<'a> {
55    chunk_offsets: &'a [u64],
56    chunk_shape: &'a [u64],
57    dataset_shape: &'a [u64],
58    resolved: &'a ResolvedSelection,
59    chunk_strides: &'a [usize],
60    result_strides: &'a [usize],
61    elem_size: usize,
62}
63
64#[derive(Clone, Copy)]
65struct ContiguousSliceDirectLayout<'a> {
66    dataset_strides: &'a [usize],
67    result_strides: &'a [usize],
68    elem_size: usize,
69    result_total_bytes: usize,
70}
71
72#[derive(Clone)]
73struct ResolvedExternalRawSlot {
74    logical_offset: u64,
75    storage: DynStorage,
76    file_offset: u64,
77    size: u64,
78}
79
80pub(crate) struct DatasetParseContext {
81    pub(crate) context: Arc<FileContext>,
82}
83
84#[derive(Clone, Copy)]
85struct ChunkEntrySelection<'a> {
86    shape: &'a [u64],
87    ndim: usize,
88    elem_size: usize,
89    chunk_bounds: Option<(&'a [u64], &'a [u64])>,
90}
91
92unsafe impl Send for FlatBufferPtr {}
93
94unsafe impl Sync for FlatBufferPtr {}
95
96impl FlatBufferPtr {
97    #[cfg(feature = "rayon")]
98    #[inline(always)]
99    unsafe fn copy_chunk(self, chunk_data: &[u8], layout: ChunkCopyLayout<'_>) -> Result<()> {
100        copy_chunk_to_flat_with_strides_ptr(chunk_data, self, layout)
101    }
102
103    #[cfg(feature = "rayon")]
104    #[inline(always)]
105    unsafe fn copy_selected(
106        self,
107        chunk_data: &[u8],
108        dim_indices: &[Vec<(usize, usize)>],
109        chunk_strides: &[usize],
110        result_strides: &[usize],
111        elem_size: usize,
112        ndim: usize,
113    ) -> Result<()> {
114        copy_selected_elements_ptr(
115            chunk_data,
116            self.ptr,
117            self.len,
118            dim_indices,
119            chunk_strides,
120            result_strides,
121            elem_size,
122            ndim,
123        )
124    }
125
126    #[cfg(feature = "rayon")]
127    #[inline(always)]
128    unsafe fn copy_unit_stride_chunk_overlap(
129        self,
130        chunk_data: &[u8],
131        layout: UnitStrideCopyLayout<'_>,
132    ) -> Result<()> {
133        copy_unit_stride_chunk_overlap_ptr(chunk_data, self, layout)
134    }
135}
136
137/// Hyperslab selection for reading slices of datasets.
138#[derive(Debug, Clone)]
139pub struct SliceInfo {
140    pub selections: Vec<SliceInfoElem>,
141}
142
143/// A single dimension's selection.
144#[derive(Debug, Clone)]
145pub enum SliceInfoElem {
146    /// Select a single index (reduces dimensionality).
147    Index(u64),
148    /// Select a range with optional step.
149    Slice { start: u64, end: u64, step: u64 },
150}
151
152#[derive(Clone, Debug)]
153struct ResolvedSelectionDim {
154    start: u64,
155    end: u64,
156    step: u64,
157    count: usize,
158}
159
160#[derive(Clone, Debug, PartialEq, Eq, Hash)]
161struct ChunkEntryCacheKey {
162    index_address: u64,
163    first_chunk: SmallVec<[u64; 4]>,
164    last_chunk: SmallVec<[u64; 4]>,
165}
166
167impl ResolvedSelectionDim {
168    fn chunk_index_range(&self, chunk_extent: u64) -> Option<(u64, u64)> {
169        if self.count == 0 {
170            return None;
171        }
172
173        Some((self.start / chunk_extent, (self.end - 1) / chunk_extent))
174    }
175}
176
177#[derive(Clone, Debug)]
178struct ResolvedSelection {
179    dims: Vec<ResolvedSelectionDim>,
180    result_shape: Vec<usize>,
181    result_elements: usize,
182}
183
184impl ResolvedSelection {
185    fn result_dims_with_collapsed(&self) -> Vec<usize> {
186        self.dims.iter().map(|dim| dim.count).collect()
187    }
188
189    fn is_unit_stride(&self) -> bool {
190        self.dims.iter().all(|dim| dim.step == 1)
191    }
192}
193
194impl SliceInfo {
195    /// Create a selection that reads everything.
196    pub fn all(ndim: usize) -> Self {
197        SliceInfo {
198            selections: vec![
199                SliceInfoElem::Slice {
200                    start: 0,
201                    end: u64::MAX,
202                    step: 1,
203                };
204                ndim
205            ],
206        }
207    }
208}
209
210fn checked_usize(value: u64, context: &str) -> Result<usize> {
211    usize::try_from(value).map_err(|_| {
212        Error::InvalidData(format!(
213            "{context} value {value} exceeds platform usize capacity"
214        ))
215    })
216}
217
218fn checked_mul_usize(lhs: usize, rhs: usize, context: &str) -> Result<usize> {
219    lhs.checked_mul(rhs)
220        .ok_or_else(|| Error::InvalidData(format!("{context} exceeds platform usize capacity")))
221}
222
223fn checked_add_usize(lhs: usize, rhs: usize, context: &str) -> Result<usize> {
224    lhs.checked_add(rhs)
225        .ok_or_else(|| Error::InvalidData(format!("{context} exceeds platform usize capacity")))
226}
227
228fn checked_mul_u64(lhs: u64, rhs: u64, context: &str) -> Result<u64> {
229    lhs.checked_mul(rhs)
230        .ok_or_else(|| Error::InvalidData(format!("{context} exceeds u64 capacity")))
231}
232
233fn checked_add_u64(lhs: u64, rhs: u64, context: &str) -> Result<u64> {
234    lhs.checked_add(rhs)
235        .ok_or_else(|| Error::InvalidData(format!("{context} exceeds u64 capacity")))
236}
237
238fn checked_shape_elements_usize(shape: &[u64], context: &str) -> Result<usize> {
239    let mut total = 1usize;
240    for &dim in shape {
241        total = checked_mul_usize(total, checked_usize(dim, context)?, context)?;
242    }
243    Ok(total)
244}
245
246fn full_dataset_chunk_bounds(
247    shape: &[u64],
248    chunk_shape: &[u64],
249) -> Result<Option<(Vec<u64>, Vec<u64>)>> {
250    validate_chunk_shape(shape, chunk_shape)?;
251    if shape.contains(&0) {
252        return Ok(None);
253    }
254
255    let first_chunk = vec![0u64; shape.len()];
256    let last_chunk = shape
257        .iter()
258        .zip(chunk_shape.iter())
259        .map(|(&dim, &chunk)| dim.div_ceil(chunk) - 1)
260        .collect();
261    Ok(Some((first_chunk, last_chunk)))
262}
263
264fn validate_chunk_shape(shape: &[u64], chunk_shape: &[u64]) -> Result<()> {
265    if chunk_shape.len() != shape.len() {
266        return Err(Error::InvalidData(format!(
267            "chunk rank {} does not match dataset rank {}",
268            chunk_shape.len(),
269            shape.len()
270        )));
271    }
272    if let Some((dim, _)) = chunk_shape
273        .iter()
274        .enumerate()
275        .find(|(_, chunk)| **chunk == 0)
276    {
277        return Err(Error::InvalidData(format!(
278            "chunk dimension {dim} has zero extent"
279        )));
280    }
281    Ok(())
282}
283
284fn validate_decoded_chunk_len(
285    entry: &chunk_index::ChunkEntry,
286    chunk_shape: &[u64],
287    elem_size: usize,
288    actual_len: usize,
289) -> Result<()> {
290    let chunk_elements = checked_shape_elements_usize(chunk_shape, "decoded chunk element count")?;
291    let expected_len = checked_mul_usize(chunk_elements, elem_size, "decoded chunk byte length")?;
292    if actual_len != expected_len {
293        return Err(Error::InvalidData(format!(
294            "chunk at offsets {:?} decoded to {} bytes, expected {} bytes",
295            entry.offsets, actual_len, expected_len
296        )));
297    }
298    Ok(())
299}
300
301fn validate_chunk_grid_coverage(
302    entries: &mut [chunk_index::ChunkEntry],
303    shape: &[u64],
304    chunk_shape: &[u64],
305    first_chunk: &[u64],
306    last_chunk: &[u64],
307) -> Result<bool> {
308    validate_chunk_shape(shape, chunk_shape)?;
309    if first_chunk.len() != shape.len() || last_chunk.len() != shape.len() {
310        return Err(Error::InvalidData(format!(
311            "chunk grid bounds rank does not match dataset rank {}",
312            shape.len()
313        )));
314    }
315
316    if shape.contains(&0) {
317        if entries.is_empty() {
318            return Ok(true);
319        }
320        return Err(Error::InvalidData(
321            "chunk index contains entries for an empty dataset".into(),
322        ));
323    }
324
325    for dim in 0..shape.len() {
326        if first_chunk[dim] > last_chunk[dim] {
327            return Err(Error::InvalidData(format!(
328                "invalid chunk grid bounds for dimension {dim}: {} > {}",
329                first_chunk[dim], last_chunk[dim]
330            )));
331        }
332    }
333
334    entries.sort_by(|a, b| a.offsets.cmp(&b.offsets));
335
336    for i in 0..entries.len() {
337        validate_chunk_entry_offsets(&entries[i], shape, chunk_shape, first_chunk, last_chunk)?;
338        if i > 0 && entries[i].offsets == entries[i - 1].offsets {
339            return Err(Error::InvalidData(format!(
340                "duplicate chunk output offsets {:?} (addresses {:#x} and {:#x})",
341                entries[i].offsets,
342                entries[i - 1].address,
343                entries[i].address
344            )));
345        }
346    }
347
348    let mut entry_idx = 0usize;
349    let mut expected = first_chunk.to_vec();
350    loop {
351        let expected_offsets: Vec<u64> = expected
352            .iter()
353            .enumerate()
354            .map(|(dim, chunk_index)| chunk_index * chunk_shape[dim])
355            .collect();
356
357        if entry_idx >= entries.len() || entries[entry_idx].offsets != expected_offsets {
358            return Ok(false);
359        }
360        entry_idx += 1;
361
362        if !advance_chunk_index(&mut expected, first_chunk, last_chunk) {
363            break;
364        }
365    }
366
367    Ok(entry_idx == entries.len())
368}
369
370fn validate_chunk_entry_offsets(
371    entry: &chunk_index::ChunkEntry,
372    shape: &[u64],
373    chunk_shape: &[u64],
374    first_chunk: &[u64],
375    last_chunk: &[u64],
376) -> Result<()> {
377    if entry.offsets.len() != shape.len() {
378        return Err(Error::InvalidData(format!(
379            "chunk at address {:#x} has rank {}, expected {}",
380            entry.address,
381            entry.offsets.len(),
382            shape.len()
383        )));
384    }
385
386    for dim in 0..shape.len() {
387        let offset = entry.offsets[dim];
388        if offset >= shape[dim] {
389            return Err(Error::InvalidData(format!(
390                "chunk at address {:#x} has out-of-bounds offset {} for dimension {} of size {}",
391                entry.address, offset, dim, shape[dim]
392            )));
393        }
394        if offset % chunk_shape[dim] != 0 {
395            return Err(Error::InvalidData(format!(
396                "chunk at address {:#x} has non-grid offset {} for dimension {} with chunk extent {}",
397                entry.address, offset, dim, chunk_shape[dim]
398            )));
399        }
400
401        let chunk_index = offset / chunk_shape[dim];
402        if chunk_index < first_chunk[dim] || chunk_index > last_chunk[dim] {
403            return Err(Error::InvalidData(format!(
404                "chunk at address {:#x} has offset {:?} outside requested chunk grid",
405                entry.address, entry.offsets
406            )));
407        }
408    }
409
410    Ok(())
411}
412
413fn advance_chunk_index(index: &mut [u64], first_chunk: &[u64], last_chunk: &[u64]) -> bool {
414    if index.is_empty() {
415        return false;
416    }
417
418    for dim in (0..index.len()).rev() {
419        if index[dim] < last_chunk[dim] {
420            index[dim] += 1;
421            if dim + 1 < index.len() {
422                index[(dim + 1)..].copy_from_slice(&first_chunk[(dim + 1)..]);
423            }
424            return true;
425        }
426    }
427
428    false
429}
430
431fn row_major_strides(shape: &[u64], context: &str) -> Result<Vec<usize>> {
432    let ndim = shape.len();
433    if ndim == 0 {
434        return Ok(Vec::new());
435    }
436
437    let mut strides = vec![1usize; ndim];
438    for i in (0..ndim - 1).rev() {
439        let next_extent = checked_usize(shape[i + 1], context)?;
440        strides[i] = checked_mul_usize(strides[i + 1], next_extent, context)?;
441    }
442    Ok(strides)
443}
444
445fn assume_init_u8_vec(mut buffer: Vec<MaybeUninit<u8>>) -> Vec<u8> {
446    let ptr = buffer.as_mut_ptr() as *mut u8;
447    let len = buffer.len();
448    let capacity = buffer.capacity();
449    std::mem::forget(buffer);
450    unsafe { Vec::from_raw_parts(ptr, len, capacity) }
451}
452
453fn assume_init_vec<T>(mut buffer: Vec<MaybeUninit<T>>) -> Vec<T> {
454    let ptr = buffer.as_mut_ptr() as *mut T;
455    let len = buffer.len();
456    let capacity = buffer.capacity();
457    std::mem::forget(buffer);
458    unsafe { Vec::from_raw_parts(ptr, len, capacity) }
459}
460
461fn normalize_selection(selection: &SliceInfo, shape: &[u64]) -> Result<ResolvedSelection> {
462    if selection.selections.len() != shape.len() {
463        return Err(Error::InvalidData(format!(
464            "slice has {} dimensions but dataset has {}",
465            selection.selections.len(),
466            shape.len()
467        )));
468    }
469
470    let mut dims = Vec::with_capacity(shape.len());
471    let mut result_shape = Vec::new();
472    let mut result_elements = 1usize;
473
474    for (i, sel) in selection.selections.iter().enumerate() {
475        let dim_size = shape[i];
476        match sel {
477            SliceInfoElem::Index(idx) => {
478                if *idx >= dim_size {
479                    return Err(Error::SliceOutOfBounds {
480                        dim: i,
481                        index: *idx,
482                        size: dim_size,
483                    });
484                }
485                dims.push(ResolvedSelectionDim {
486                    start: *idx,
487                    end: *idx + 1,
488                    step: 1,
489                    count: 1,
490                });
491            }
492            SliceInfoElem::Slice { start, end, step } => {
493                if *step == 0 {
494                    return Err(Error::InvalidData("slice step cannot be 0".into()));
495                }
496                if *start > dim_size {
497                    return Err(Error::SliceOutOfBounds {
498                        dim: i,
499                        index: *start,
500                        size: dim_size,
501                    });
502                }
503
504                let actual_end = if *end == u64::MAX {
505                    dim_size
506                } else {
507                    (*end).min(dim_size)
508                };
509                let count_u64 = if *start >= actual_end {
510                    0
511                } else {
512                    (actual_end - *start).div_ceil(*step)
513                };
514                let count = checked_usize(count_u64, "slice element count")?;
515
516                dims.push(ResolvedSelectionDim {
517                    start: *start,
518                    end: actual_end,
519                    step: *step,
520                    count,
521                });
522                result_shape.push(count);
523                result_elements =
524                    checked_mul_usize(result_elements, count, "slice result element count")?;
525            }
526        }
527    }
528
529    Ok(ResolvedSelection {
530        dims,
531        result_shape,
532        result_elements,
533    })
534}
535
536fn selection_dim_is_full_unit(dim: &ResolvedSelectionDim, dim_size: u64) -> bool {
537    dim.step == 1
538        && dim.start == 0
539        && dim.end == dim_size
540        && u64::try_from(dim.count).ok() == Some(dim_size)
541}
542
543fn selection_covers_full_dataset(resolved: &ResolvedSelection, shape: &[u64]) -> bool {
544    resolved.result_shape.len() == shape.len()
545        && resolved
546            .dims
547            .iter()
548            .zip(shape.iter())
549            .all(|(dim, &dim_size)| selection_dim_is_full_unit(dim, dim_size))
550}
551
552fn contiguous_slice_tail_start(shape: &[u64], resolved: &ResolvedSelection) -> usize {
553    let ndim = shape.len();
554    if ndim == 0 {
555        return 0;
556    }
557
558    let mut tail_start = if resolved.dims[ndim - 1].step == 1 {
559        ndim - 1
560    } else {
561        ndim
562    };
563
564    while tail_start > 0 {
565        let prev = tail_start - 1;
566        let later_dims_are_full =
567            (tail_start..ndim).all(|d| selection_dim_is_full_unit(&resolved.dims[d], shape[d]));
568        if resolved.dims[prev].step == 1 && later_dims_are_full {
569            tail_start = prev;
570        } else {
571            break;
572        }
573    }
574
575    tail_start
576}
577
578fn contiguous_slice_block_elements(
579    resolved: &ResolvedSelection,
580    tail_start: usize,
581) -> Result<usize> {
582    let mut elements = 1usize;
583    for dim in &resolved.dims[tail_start..] {
584        elements = checked_mul_usize(elements, dim.count, "contiguous slice block elements")?;
585    }
586    Ok(elements)
587}
588
589fn result_strides_for_dims(result_dims: &[usize]) -> Result<Vec<usize>> {
590    let ndim = result_dims.len();
591    let mut result_strides = vec![1usize; ndim];
592    for d in (0..ndim.saturating_sub(1)).rev() {
593        result_strides[d] =
594            checked_mul_usize(result_strides[d + 1], result_dims[d + 1], "result stride")?;
595    }
596    Ok(result_strides)
597}
598
599/// A dataset within an HDF5 file.
600#[derive(Clone)]
601pub struct Dataset {
602    pub(crate) context: Arc<FileContext>,
603    pub(crate) name: String,
604    pub(crate) data_address: u64,
605    pub(crate) dataspace: DataspaceMessage,
606    pub(crate) datatype: Datatype,
607    pub(crate) layout: DataLayout,
608    pub(crate) fill_value: Option<FillValueMessage>,
609    pub(crate) filters: Option<FilterPipelineMessage>,
610    pub(crate) external_files: Option<ExternalFilesMessage>,
611    pub(crate) attributes: Vec<AttributeMessage>,
612    pub(crate) chunk_cache: Arc<ChunkCache>,
613    chunk_entry_cache: Arc<Mutex<LruCache<ChunkEntryCacheKey, Arc<Vec<chunk_index::ChunkEntry>>>>>,
614    full_chunk_entries: Arc<OnceLock<Arc<Vec<chunk_index::ChunkEntry>>>>,
615    full_dataset_bytes: Arc<OnceLock<Arc<Vec<u8>>>>,
616    external_slots: Arc<OnceLock<Arc<Vec<ResolvedExternalRawSlot>>>>,
617    pub(crate) filter_registry: Arc<FilterRegistry>,
618}
619
620/// One decoded chunk from a chunked dataset.
621pub struct DatasetChunk {
622    offsets: Vec<u64>,
623    shape: Vec<u64>,
624    filter_mask: u32,
625    bytes: Arc<Vec<u8>>,
626}
627
628impl DatasetChunk {
629    /// Dataset coordinates of the first element in this chunk.
630    pub fn offsets(&self) -> &[u64] {
631        &self.offsets
632    }
633
634    /// Logical chunk shape from the dataset layout.
635    pub fn shape(&self) -> &[u64] {
636        &self.shape
637    }
638
639    /// HDF5 filter mask recorded for this chunk.
640    pub fn filter_mask(&self) -> u32 {
641        self.filter_mask
642    }
643
644    /// Decoded logical bytes for this chunk.
645    pub fn bytes(&self) -> &[u8] {
646        self.bytes.as_ref()
647    }
648}
649
650/// Iterator over decoded chunks in a chunked dataset.
651pub struct DatasetChunkIterator {
652    dataset: Dataset,
653    entries: Vec<chunk_index::ChunkEntry>,
654    index_address: u64,
655    chunk_shape: Vec<u64>,
656    elem_size: usize,
657    next: usize,
658}
659
660impl Iterator for DatasetChunkIterator {
661    type Item = Result<DatasetChunk>;
662
663    fn next(&mut self) -> Option<Self::Item> {
664        let entry = self.entries.get(self.next)?;
665        self.next += 1;
666
667        Some(
668            self.dataset
669                .load_exact_chunk_data(entry, self.index_address, &self.chunk_shape, self.elem_size)
670                .map(|bytes| DatasetChunk {
671                    offsets: entry.offsets.clone(),
672                    shape: self.chunk_shape.clone(),
673                    filter_mask: entry.filter_mask,
674                    bytes,
675                }),
676        )
677    }
678}
679
680pub(crate) struct DatasetTemplate {
681    name: String,
682    data_address: u64,
683    dataspace: DataspaceMessage,
684    datatype: Datatype,
685    layout: DataLayout,
686    fill_value: Option<FillValueMessage>,
687    filters: Option<FilterPipelineMessage>,
688    external_files: Option<ExternalFilesMessage>,
689    attributes: Vec<AttributeMessage>,
690    chunk_entry_cache: Arc<Mutex<LruCache<ChunkEntryCacheKey, Arc<Vec<chunk_index::ChunkEntry>>>>>,
691    full_chunk_entries: Arc<OnceLock<Arc<Vec<chunk_index::ChunkEntry>>>>,
692    full_dataset_bytes: Arc<OnceLock<Arc<Vec<u8>>>>,
693    external_slots: Arc<OnceLock<Arc<Vec<ResolvedExternalRawSlot>>>>,
694}
695
696impl Dataset {
697    pub(crate) fn from_template(context: Arc<FileContext>, template: Arc<DatasetTemplate>) -> Self {
698        Dataset {
699            chunk_cache: context.chunk_cache.clone(),
700            filter_registry: context.filter_registry.clone(),
701            context,
702            name: template.name.clone(),
703            data_address: template.data_address,
704            dataspace: template.dataspace.clone(),
705            datatype: template.datatype.clone(),
706            layout: template.layout.clone(),
707            fill_value: template.fill_value.clone(),
708            filters: template.filters.clone(),
709            external_files: template.external_files.clone(),
710            attributes: template.attributes.clone(),
711            chunk_entry_cache: template.chunk_entry_cache.clone(),
712            full_chunk_entries: template.full_chunk_entries.clone(),
713            full_dataset_bytes: template.full_dataset_bytes.clone(),
714            external_slots: template.external_slots.clone(),
715        }
716    }
717
718    pub(crate) fn template(&self) -> Arc<DatasetTemplate> {
719        Arc::new(DatasetTemplate {
720            name: self.name.clone(),
721            data_address: self.data_address,
722            dataspace: self.dataspace.clone(),
723            datatype: self.datatype.clone(),
724            layout: self.layout.clone(),
725            fill_value: self.fill_value.clone(),
726            filters: self.filters.clone(),
727            external_files: self.external_files.clone(),
728            attributes: self.attributes.clone(),
729            chunk_entry_cache: self.chunk_entry_cache.clone(),
730            full_chunk_entries: self.full_chunk_entries.clone(),
731            full_dataset_bytes: self.full_dataset_bytes.clone(),
732            external_slots: self.external_slots.clone(),
733        })
734    }
735
736    pub(crate) fn from_parsed_header(
737        context: DatasetParseContext,
738        address: u64,
739        name: String,
740        header: &ObjectHeader,
741    ) -> Result<Self> {
742        let mut dataspace: Option<DataspaceMessage> = None;
743        let mut datatype: Option<Datatype> = None;
744        let mut layout: Option<DataLayout> = None;
745        let mut fill_value: Option<FillValueMessage> = None;
746        let mut filter_pipeline: Option<FilterPipelineMessage> = None;
747        let mut external_files: Option<ExternalFilesMessage> = None;
748        let attributes = collect_attribute_messages_storage(
749            header,
750            context.context.storage.as_ref(),
751            context.context.superblock.offset_size,
752            context.context.superblock.length_size,
753            Some(context.context.filter_registry.as_ref()),
754        )?;
755
756        for msg in &header.messages {
757            match msg {
758                HdfMessage::Dataspace(ds) => dataspace = Some(ds.clone()),
759                HdfMessage::Datatype(dt) => datatype = Some(dt.datatype.clone()),
760                HdfMessage::DataLayout(dl) => layout = Some(dl.layout.clone()),
761                HdfMessage::FillValue(fv) => fill_value = Some(fv.clone()),
762                HdfMessage::FilterPipeline(fp) => filter_pipeline = Some(fp.clone()),
763                HdfMessage::ExternalFiles(ef) => external_files = Some(ef.clone()),
764                _ => {}
765            }
766        }
767
768        let dataspace =
769            dataspace.ok_or_else(|| Error::InvalidData("dataset missing dataspace".into()))?;
770        let dt = datatype.ok_or_else(|| Error::InvalidData("dataset missing datatype".into()))?;
771        let layout =
772            layout.ok_or_else(|| Error::InvalidData("dataset missing data layout".into()))?;
773        let layout = normalize_layout(layout, &dataspace);
774        let attr_fill_value = attributes
775            .iter()
776            .find(|attr| attr.name == "_FillValue" && attr.dataspace.num_elements() == 1)
777            .map(|attr| FillValueMessage {
778                defined: !attr.raw_data.is_empty(),
779                fill_time: FillTime::IfSet,
780                value: Some(attr.raw_data.clone()),
781            });
782        let fill_value = match fill_value {
783            Some(existing) if existing.value.is_some() => Some(existing),
784            _ => attr_fill_value,
785        };
786
787        Ok(Dataset {
788            context: context.context.clone(),
789            name,
790            data_address: address,
791            dataspace,
792            datatype: dt,
793            layout,
794            fill_value,
795            filters: filter_pipeline,
796            external_files,
797            attributes,
798            chunk_cache: context.context.chunk_cache.clone(),
799            chunk_entry_cache: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(32).unwrap()))),
800            full_chunk_entries: Arc::new(OnceLock::new()),
801            full_dataset_bytes: Arc::new(OnceLock::new()),
802            external_slots: Arc::new(OnceLock::new()),
803            filter_registry: context.context.filter_registry.clone(),
804        })
805    }
806
807    /// Dataset name.
808    pub fn name(&self) -> &str {
809        &self.name
810    }
811
812    /// The object header address used to parse this dataset.
813    /// Useful as an opaque identifier or for NC4 data_offset.
814    pub fn address(&self) -> u64 {
815        self.data_address
816    }
817
818    /// Shape of the dataset (dimensions).
819    pub fn shape(&self) -> &[u64] {
820        &self.dataspace.dims
821    }
822
823    /// Datatype of the dataset.
824    pub fn dtype(&self) -> &Datatype {
825        &self.datatype
826    }
827
828    /// Number of dimensions.
829    pub fn ndim(&self) -> usize {
830        self.dataspace.dims.len()
831    }
832
833    fn offset_size(&self) -> u8 {
834        self.context.superblock.offset_size
835    }
836
837    fn length_size(&self) -> u8 {
838        self.context.superblock.length_size
839    }
840
841    /// Maximum dimension sizes, if defined. `u64::MAX` indicates unlimited.
842    pub fn max_dims(&self) -> Option<&[u64]> {
843        self.dataspace.max_dims.as_deref()
844    }
845
846    /// Chunk dimensions, if the dataset is chunked.
847    pub fn chunks(&self) -> Option<Vec<u32>> {
848        match &self.layout {
849            DataLayout::Chunked { dims, .. } => Some(dims.clone()),
850            _ => None,
851        }
852    }
853
854    /// Iterate over decoded chunks for a chunked dataset.
855    pub fn iter_chunks(&self) -> Result<DatasetChunkIterator> {
856        let DataLayout::Chunked {
857            address,
858            dims,
859            chunk_indexing,
860            ..
861        } = &self.layout
862        else {
863            return Err(Error::InvalidData(format!(
864                "dataset '{}' is not chunked",
865                self.name
866            )));
867        };
868
869        if Cursor::is_undefined_offset(*address, self.offset_size()) {
870            return Ok(DatasetChunkIterator {
871                dataset: self.clone(),
872                entries: Vec::new(),
873                index_address: *address,
874                chunk_shape: dims.iter().map(|&d| d as u64).collect(),
875                elem_size: self.raw_element_size(),
876                next: 0,
877            });
878        }
879
880        let ndim = self.ndim();
881        let shape = &self.dataspace.dims;
882        let elem_size = self.raw_element_size();
883        let chunk_shape: Vec<u64> = dims.iter().map(|&d| d as u64).collect();
884        validate_chunk_shape(shape, &chunk_shape)?;
885        let entries = self.collect_chunk_entries(
886            *address,
887            dims,
888            chunk_indexing.as_ref(),
889            ChunkEntrySelection {
890                shape,
891                ndim,
892                elem_size,
893                chunk_bounds: None,
894            },
895        )?;
896
897        Ok(DatasetChunkIterator {
898            dataset: self.clone(),
899            entries,
900            index_address: *address,
901            chunk_shape,
902            elem_size,
903            next: 0,
904        })
905    }
906
907    /// Return current chunk-cache statistics for this dataset's file.
908    pub fn chunk_cache_stats(&self) -> ChunkCacheStats {
909        self.chunk_cache.stats()
910    }
911
912    /// Fill value, if defined.
913    pub fn fill_value(&self) -> Option<&FillValueMessage> {
914        self.fill_value.as_ref()
915    }
916
917    /// Dataset attributes.
918    pub fn attributes(&self) -> Vec<Attribute> {
919        self.attributes
920            .iter()
921            .map(|a| attribute_from_message_storage(a, self.context.as_ref()))
922            .collect()
923    }
924
925    /// Find an attribute by name.
926    pub fn attribute(&self, name: &str) -> Result<Attribute> {
927        self.attributes
928            .iter()
929            .find(|a| a.name == name)
930            .map(|a| attribute_from_message_storage(a, self.context.as_ref()))
931            .ok_or_else(|| Error::AttributeNotFound(name.to_string()))
932    }
933
934    /// Read a scalar string dataset or single string element.
935    ///
936    /// Use [`Dataset::read_strings`] when the dataset contains multiple strings.
937    pub fn read_string(&self) -> Result<String> {
938        let mut strings = self.read_strings()?;
939        match strings.len() {
940            1 => Ok(strings.swap_remove(0)),
941            0 => Err(Error::InvalidData(format!(
942                "dataset '{}' contains no string elements",
943                self.name
944            ))),
945            count => Err(Error::InvalidData(format!(
946                "dataset '{}' contains {count} string elements; use read_strings()",
947                self.name
948            ))),
949        }
950    }
951
952    /// Read all string elements from a string-typed dataset.
953    pub fn read_strings(&self) -> Result<Vec<String>> {
954        match &self.datatype {
955            Datatype::String {
956                size: StringSize::Fixed(len),
957                encoding,
958                padding,
959            } => {
960                let raw = self.read_raw_bytes()?;
961                let elem_size = *len as usize;
962                let count = checked_usize(self.num_elements(), "dataset string element count")?;
963                let expected_bytes =
964                    checked_mul_usize(count, elem_size, "dataset string byte size")?;
965                if raw.len() < expected_bytes {
966                    return Err(Error::InvalidData(format!(
967                        "dataset '{}' string data too short: need {} bytes, have {}",
968                        self.name,
969                        expected_bytes,
970                        raw.len()
971                    )));
972                }
973
974                let mut strings = Vec::with_capacity(count);
975                for i in 0..count {
976                    let start = i * elem_size;
977                    let end = start + elem_size;
978                    strings.push(decode_string(&raw[start..end], *padding, *encoding)?);
979                }
980                Ok(strings)
981            }
982            Datatype::String {
983                size: StringSize::Variable,
984                encoding,
985                padding,
986            } => {
987                let raw = self.read_raw_bytes()?;
988                let count = checked_usize(self.num_elements(), "dataset string element count")?;
989                let ref_size = 4 + self.offset_size() as usize + 4;
990                let expected_bytes =
991                    checked_mul_usize(count, ref_size, "dataset string reference byte size")?;
992                if raw.len() < expected_bytes {
993                    return Err(Error::InvalidData(format!(
994                        "dataset '{}' vlen string data too short: need {} bytes, have {}",
995                        self.name,
996                        expected_bytes,
997                        raw.len()
998                    )));
999                }
1000
1001                let mut strings = Vec::with_capacity(count);
1002                for i in 0..count {
1003                    let offset = i * ref_size;
1004                    strings.push(read_one_vlen_string_storage(
1005                        &raw,
1006                        offset,
1007                        self.context.storage.as_ref(),
1008                        self.offset_size(),
1009                        self.length_size(),
1010                        *padding,
1011                        *encoding,
1012                    )?);
1013                }
1014                Ok(strings)
1015            }
1016            Datatype::VarLen {
1017                base,
1018                kind: VarLenKind::String,
1019                encoding,
1020                padding,
1021            } => {
1022                if !matches!(base.as_ref(), Datatype::FixedPoint { size: 1, .. }) {
1023                    return Err(Error::TypeMismatch {
1024                        expected: "String dataset".into(),
1025                        actual: format!("{:?}", self.datatype),
1026                    });
1027                }
1028
1029                let raw = self.read_raw_bytes()?;
1030                let count = checked_usize(self.num_elements(), "dataset string element count")?;
1031                let ref_size = 4 + self.offset_size() as usize + 4;
1032                let expected_bytes =
1033                    checked_mul_usize(count, ref_size, "dataset string reference byte size")?;
1034                if raw.len() < expected_bytes {
1035                    return Err(Error::InvalidData(format!(
1036                        "dataset '{}' vlen byte string data too short: need {} bytes, have {}",
1037                        self.name,
1038                        expected_bytes,
1039                        raw.len()
1040                    )));
1041                }
1042
1043                let mut strings = Vec::with_capacity(count);
1044                for i in 0..count {
1045                    let offset = i * ref_size;
1046                    let ref_bytes = &raw[offset..offset + ref_size];
1047                    let value = resolve_vlen_bytes_storage(
1048                        ref_bytes,
1049                        self.context.storage.as_ref(),
1050                        self.offset_size(),
1051                        self.length_size(),
1052                    )
1053                    .unwrap_or_default();
1054                    strings.push(decode_string(&value, *padding, *encoding)?);
1055                }
1056                Ok(strings)
1057            }
1058            _ => Err(Error::TypeMismatch {
1059                expected: "String dataset".into(),
1060                actual: format!("{:?}", self.datatype),
1061            }),
1062        }
1063    }
1064
1065    /// Total number of elements in the dataset.
1066    pub fn num_elements(&self) -> u64 {
1067        if self.dataspace.dims.is_empty() {
1068            match self.dataspace.dataspace_type {
1069                DataspaceType::Scalar => 1,
1070                DataspaceType::Null => 0,
1071                DataspaceType::Simple => 0,
1072            }
1073        } else {
1074            self.dataspace.dims.iter().product()
1075        }
1076    }
1077
1078    /// Read the entire dataset into an n-dimensional array.
1079    pub fn read_array<T: H5Type>(&self) -> Result<ArrayD<T>> {
1080        let result = match &self.layout {
1081            DataLayout::Compact { data } => self.read_compact::<T>(data),
1082            DataLayout::Contiguous { address, size } => self.read_contiguous::<T>(*address, *size),
1083            DataLayout::Chunked {
1084                address,
1085                dims,
1086                element_size,
1087                chunk_indexing,
1088            } => self.read_chunked::<T>(*address, dims, *element_size, chunk_indexing.as_ref()),
1089        };
1090        result.map_err(|e| e.with_context(&self.name))
1091    }
1092
1093    /// Read the entire dataset into a caller-provided typed buffer.
1094    pub fn read_into<T: H5Type>(&self, dst: &mut [T]) -> Result<()> {
1095        let result = (|| {
1096            let element_count = checked_usize(self.num_elements(), "dataset element count")?;
1097            if dst.len() != element_count {
1098                return Err(Error::InvalidData(format!(
1099                    "destination has {} elements, dataset requires {}",
1100                    dst.len(),
1101                    element_count
1102                )));
1103            }
1104
1105            let elem_size = self.raw_element_size();
1106            if T::native_copy_compatible(&self.datatype) && std::mem::size_of::<T>() == elem_size {
1107                let dst_bytes = unsafe {
1108                    std::slice::from_raw_parts_mut(
1109                        dst.as_mut_ptr() as *mut u8,
1110                        checked_mul_usize(dst.len(), elem_size, "destination byte length")?,
1111                    )
1112                };
1113                return self.read_raw_bytes_into_inner(dst_bytes);
1114            }
1115
1116            let array = self.read_array::<T>()?;
1117            let values = array.as_slice_memory_order().ok_or_else(|| {
1118                Error::InvalidData("decoded array is not contiguous in memory order".into())
1119            })?;
1120            dst.clone_from_slice(values);
1121            Ok(())
1122        })();
1123
1124        result.map_err(|e| e.with_context(&self.name))
1125    }
1126
1127    /// Read the entire dataset using internal chunk-level parallelism when possible.
1128    ///
1129    /// Non-chunked datasets fall back to `read_array`.
1130    #[cfg(feature = "rayon")]
1131    pub fn read_array_parallel<T: H5Type>(&self) -> Result<ArrayD<T>> {
1132        match &self.layout {
1133            DataLayout::Chunked {
1134                address,
1135                dims,
1136                element_size,
1137                chunk_indexing,
1138            } => self.read_chunked_parallel::<T>(
1139                *address,
1140                dims,
1141                *element_size,
1142                chunk_indexing.as_ref(),
1143            ),
1144            _ => self.read_array::<T>(),
1145        }
1146    }
1147
1148    /// Read the entire dataset using the provided Rayon thread pool.
1149    ///
1150    /// Non-chunked datasets fall back to `read_array`.
1151    #[cfg(feature = "rayon")]
1152    pub fn read_array_in_pool<T: H5Type>(&self, pool: &rayon::ThreadPool) -> Result<ArrayD<T>> {
1153        match &self.layout {
1154            DataLayout::Chunked {
1155                address,
1156                dims,
1157                element_size,
1158                chunk_indexing,
1159            } => pool.install(|| {
1160                self.read_chunked_parallel::<T>(
1161                    *address,
1162                    dims,
1163                    *element_size,
1164                    chunk_indexing.as_ref(),
1165                )
1166            }),
1167            _ => self.read_array::<T>(),
1168        }
1169    }
1170
1171    /// Read a hyperslab of the dataset using chunk-level parallelism when possible.
1172    ///
1173    /// Chunked datasets decompress overlapping chunks in parallel via Rayon.
1174    /// Non-chunked layouts fall back to `read_slice`.
1175    #[cfg(feature = "rayon")]
1176    pub fn read_slice_parallel<T: H5Type>(&self, selection: &SliceInfo) -> Result<ArrayD<T>> {
1177        let resolved = normalize_selection(selection, &self.dataspace.dims)?;
1178
1179        match &self.layout {
1180            DataLayout::Chunked {
1181                address,
1182                dims,
1183                element_size,
1184                chunk_indexing,
1185            } => self.read_chunked_slice_parallel::<T>(
1186                *address,
1187                dims,
1188                *element_size,
1189                chunk_indexing.as_ref(),
1190                selection,
1191                &resolved,
1192            ),
1193            _ => self.read_slice::<T>(selection),
1194        }
1195    }
1196
1197    /// Read a hyperslab of the dataset.
1198    pub fn read_slice<T: H5Type>(&self, selection: &SliceInfo) -> Result<ArrayD<T>> {
1199        let resolved = normalize_selection(selection, &self.dataspace.dims)?;
1200
1201        match &self.layout {
1202            DataLayout::Contiguous { address, size } => {
1203                self.read_contiguous_slice::<T>(*address, *size, &resolved)
1204            }
1205            DataLayout::Compact { data } => self.read_compact_slice::<T>(data, selection),
1206            DataLayout::Chunked {
1207                address,
1208                dims,
1209                element_size,
1210                chunk_indexing,
1211            } => self.read_chunked_slice::<T>(
1212                *address,
1213                dims,
1214                *element_size,
1215                chunk_indexing.as_ref(),
1216                selection,
1217                &resolved,
1218            ),
1219        }
1220    }
1221
1222    fn read_compact<T: H5Type>(&self, data: &[u8]) -> Result<ArrayD<T>> {
1223        self.validate_allocated_raw_data_len("compact", data.len())?;
1224        self.decode_raw_data::<T>(data)
1225    }
1226
1227    /// Read the dataset as logical element bytes.
1228    ///
1229    /// The returned bytes are after layout handling, chunk decompression, filter
1230    /// decoding, external raw-data resolution, and fill-value materialization.
1231    /// Numeric values remain in the dataset datatype's byte order.
1232    pub fn read_raw_bytes(&self) -> Result<Vec<u8>> {
1233        let result: Result<Vec<u8>> = (|| {
1234            let total_bytes = self.raw_byte_len()?;
1235            let mut output = vec![0u8; total_bytes];
1236            self.read_raw_bytes_into_inner(&mut output)?;
1237            Ok(output)
1238        })();
1239
1240        result.map_err(|e| e.with_context(&self.name))
1241    }
1242
1243    /// Number of logical raw bytes in the entire dataset.
1244    pub fn raw_byte_len(&self) -> Result<usize> {
1245        let elem_size = self.raw_element_size();
1246        let total_elements = checked_usize(self.num_elements(), "dataset element count")?;
1247        checked_mul_usize(total_elements, elem_size, "dataset size in bytes")
1248    }
1249
1250    /// Read logical raw bytes into a caller-provided buffer.
1251    ///
1252    /// The destination length must match [`Dataset::raw_byte_len`]. Numeric
1253    /// values remain in the dataset datatype's byte order.
1254    pub fn read_raw_bytes_into(&self, dst: &mut [u8]) -> Result<()> {
1255        let result: Result<()> = (|| {
1256            let total_bytes = self.raw_byte_len()?;
1257            if dst.len() != total_bytes {
1258                return Err(Error::InvalidData(format!(
1259                    "destination has {} bytes, dataset requires {}",
1260                    dst.len(),
1261                    total_bytes
1262                )));
1263            }
1264            self.read_raw_bytes_into_inner(dst)
1265        })();
1266
1267        result.map_err(|e| e.with_context(&self.name))
1268    }
1269
1270    /// Read logical raw bytes converted to native-endian numeric fields.
1271    pub fn read_native_bytes(&self) -> Result<Vec<u8>> {
1272        let result: Result<Vec<u8>> = (|| {
1273            let total_bytes = self.raw_byte_len()?;
1274            let mut output = vec![0u8; total_bytes];
1275            self.read_raw_bytes_into_inner(&mut output)?;
1276            self.convert_to_native_endian(&mut output)?;
1277            Ok(output)
1278        })();
1279
1280        result.map_err(|e| e.with_context(&self.name))
1281    }
1282
1283    /// Read logical raw bytes into a buffer and convert numeric fields to native endian.
1284    pub fn read_native_bytes_into(&self, dst: &mut [u8]) -> Result<()> {
1285        let result: Result<()> = (|| {
1286            let total_bytes = self.raw_byte_len()?;
1287            if dst.len() != total_bytes {
1288                return Err(Error::InvalidData(format!(
1289                    "destination has {} bytes, dataset requires {}",
1290                    dst.len(),
1291                    total_bytes
1292                )));
1293            }
1294            self.read_raw_bytes_into_inner(dst)?;
1295            self.convert_to_native_endian(dst)
1296        })();
1297
1298        result.map_err(|e| e.with_context(&self.name))
1299    }
1300
1301    fn read_raw_bytes_into_inner(&self, dst: &mut [u8]) -> Result<()> {
1302        match &self.layout {
1303            DataLayout::Compact { data } => {
1304                self.validate_allocated_raw_data_len("compact", data.len())?;
1305                dst.copy_from_slice(data);
1306                Ok(())
1307            }
1308            DataLayout::Contiguous { address, size } => {
1309                self.read_contiguous_bytes_into(*address, *size, dst)
1310            }
1311            DataLayout::Chunked {
1312                address,
1313                dims,
1314                element_size: _,
1315                chunk_indexing,
1316            } => self.read_chunked_bytes_into(*address, dims, chunk_indexing.as_ref(), dst),
1317        }
1318    }
1319
1320    /// Size in bytes of one HDF5 variable-length reference in this file.
1321    pub fn vlen_reference_size(&self) -> usize {
1322        4 + self.offset_size() as usize + 4
1323    }
1324
1325    /// Size in bytes of one logical raw element in this file.
1326    ///
1327    /// Variable-length strings and sequences are stored as global-heap
1328    /// references whose width depends on the file offset size.
1329    pub fn raw_element_size(&self) -> usize {
1330        raw_element_size_for_datatype(&self.datatype, self.vlen_reference_size())
1331    }
1332
1333    /// Resolve one HDF5 variable-length reference into logical element bytes.
1334    ///
1335    /// `base_element_size` is the byte width of one value in the vlen sequence.
1336    /// The sequence length stored in the reference is interpreted as an element
1337    /// count and multiplied by this size.
1338    pub fn resolve_vlen_reference_bytes(
1339        &self,
1340        reference: &[u8],
1341        base_element_size: usize,
1342    ) -> Result<Vec<u8>> {
1343        if reference.len() < self.vlen_reference_size() {
1344            return Err(Error::InvalidData(format!(
1345                "dataset '{}' vlen reference too short: need {} bytes, have {}",
1346                self.name,
1347                self.vlen_reference_size(),
1348                reference.len()
1349            )));
1350        }
1351
1352        let mut cursor = Cursor::new(reference);
1353        let seq_len = cursor.read_u32_le()? as usize;
1354        let heap_addr = cursor.read_offset(self.offset_size())?;
1355        let obj_index = cursor.read_u32_le()? as u16;
1356
1357        if Cursor::is_undefined_offset(heap_addr, self.offset_size()) || obj_index == 0 {
1358            return Ok(Vec::new());
1359        }
1360
1361        let expected_bytes =
1362            checked_mul_usize(seq_len, base_element_size, "vlen sequence byte size")?;
1363        let collection = crate::global_heap::GlobalHeapCollection::parse_at_storage(
1364            self.context.storage.as_ref(),
1365            heap_addr,
1366            self.offset_size(),
1367            self.length_size(),
1368        )?;
1369        let object = collection.get_object(obj_index).ok_or_else(|| {
1370            Error::InvalidData(format!(
1371                "dataset '{}' references missing vlen heap object {}",
1372                self.name, obj_index
1373            ))
1374        })?;
1375        if object.data.len() < expected_bytes {
1376            return Err(Error::InvalidData(format!(
1377                "dataset '{}' vlen heap object too short: need {} bytes, have {}",
1378                self.name,
1379                expected_bytes,
1380                object.data.len()
1381            )));
1382        }
1383
1384        Ok(object.data[..expected_bytes].to_vec())
1385    }
1386
1387    fn read_contiguous<T: H5Type>(&self, address: u64, size: u64) -> Result<ArrayD<T>> {
1388        if self.external_files.is_some() {
1389            let elem_size = self.raw_element_size();
1390            let total_elements = checked_usize(self.num_elements(), "dataset element count")?;
1391            let total_bytes =
1392                checked_mul_usize(total_elements, elem_size, "dataset size in bytes")?;
1393            let raw = self.read_external_range(0, total_bytes)?;
1394            return self.decode_raw_data::<T>(&raw);
1395        }
1396
1397        if Cursor::is_undefined_offset(address, self.offset_size()) || size == 0 {
1398            // Dataset with no data written — return fill values
1399            return self.make_fill_array::<T>();
1400        }
1401
1402        let sz = checked_usize(size, "contiguous dataset size")?;
1403        self.validate_allocated_raw_data_len("contiguous", sz)?;
1404        let raw = self.context.read_range(address, sz)?;
1405        self.decode_raw_data::<T>(raw.as_ref())
1406    }
1407
1408    fn read_contiguous_bytes_into(&self, address: u64, size: u64, dst: &mut [u8]) -> Result<()> {
1409        if self.external_files.is_some() {
1410            return self.read_external_range_into(0, dst);
1411        }
1412
1413        if Cursor::is_undefined_offset(address, self.offset_size()) || size == 0 {
1414            self.fill_output_buffer(dst);
1415            return Ok(());
1416        }
1417
1418        let sz = checked_usize(size, "contiguous dataset size")?;
1419        self.validate_allocated_raw_data_len("contiguous", sz)?;
1420        if dst.is_empty() {
1421            return Ok(());
1422        }
1423        let raw = self.context.read_range(address, sz)?;
1424        dst.copy_from_slice(raw.as_ref());
1425        Ok(())
1426    }
1427
1428    fn read_contiguous_logical_range(
1429        &self,
1430        address: u64,
1431        logical_offset: usize,
1432        len: usize,
1433    ) -> Result<Vec<u8>> {
1434        if self.external_files.is_some() {
1435            return self.read_external_range(logical_offset, len);
1436        }
1437
1438        let file_offset = checked_add_u64(
1439            address,
1440            u64::try_from(logical_offset).map_err(|_| {
1441                Error::InvalidData("contiguous logical offset exceeds u64 capacity".to_string())
1442            })?,
1443            "contiguous read file offset",
1444        )?;
1445        Ok(self.context.read_range(file_offset, len)?.to_vec())
1446    }
1447
1448    fn read_external_range(&self, logical_offset: usize, len: usize) -> Result<Vec<u8>> {
1449        let mut output = vec![0u8; len];
1450        self.read_external_range_into(logical_offset, &mut output)?;
1451        Ok(output)
1452    }
1453
1454    fn read_external_range_into(&self, logical_offset: usize, dst: &mut [u8]) -> Result<()> {
1455        self.fill_output_buffer(dst);
1456        if dst.is_empty() {
1457            return Ok(());
1458        }
1459
1460        let request_start = u64::try_from(logical_offset).map_err(|_| {
1461            Error::InvalidData("external dataset offset exceeds u64 capacity".to_string())
1462        })?;
1463        let request_len = u64::try_from(dst.len()).map_err(|_| {
1464            Error::InvalidData("external dataset length exceeds u64 capacity".to_string())
1465        })?;
1466        let request_end = request_start
1467            .checked_add(request_len)
1468            .ok_or_else(|| Error::InvalidData("external dataset range overflows".into()))?;
1469
1470        for slot in self.external_raw_slots()?.iter() {
1471            let slot_end = slot.logical_offset.saturating_add(slot.size);
1472            let overlap_start = request_start.max(slot.logical_offset);
1473            let overlap_end = request_end.min(slot_end);
1474            if overlap_start >= overlap_end {
1475                continue;
1476            }
1477
1478            let read_offset = slot
1479                .file_offset
1480                .checked_add(overlap_start - slot.logical_offset)
1481                .ok_or_else(|| Error::InvalidData("external file read offset overflows".into()))?;
1482            let read_len = checked_usize(overlap_end - overlap_start, "external read length")?;
1483            let dst_start = checked_usize(overlap_start - request_start, "external read dst")?;
1484            let dst_end = checked_add_usize(dst_start, read_len, "external read dst end")?;
1485            let bytes = slot.storage.read_range(read_offset, read_len)?;
1486            dst[dst_start..dst_end].copy_from_slice(bytes.as_ref());
1487        }
1488
1489        Ok(())
1490    }
1491
1492    fn external_raw_slots(&self) -> Result<Arc<Vec<ResolvedExternalRawSlot>>> {
1493        if let Some(slots) = self.external_slots.get() {
1494            return Ok(slots.clone());
1495        }
1496
1497        let slots = Arc::new(self.load_external_raw_slots()?);
1498        let _ = self.external_slots.set(slots.clone());
1499        Ok(self
1500            .external_slots
1501            .get()
1502            .expect("external slot cache must exist after initialization")
1503            .clone())
1504    }
1505
1506    fn load_external_raw_slots(&self) -> Result<Vec<ResolvedExternalRawSlot>> {
1507        let Some(external_files) = self.external_files.as_ref() else {
1508            return Ok(Vec::new());
1509        };
1510
1511        let heap = LocalHeap::parse_at_storage(
1512            self.context.storage.as_ref(),
1513            external_files.heap_address,
1514            self.offset_size(),
1515            self.length_size(),
1516        )?;
1517
1518        let mut logical_offset = 0u64;
1519        let mut slots = Vec::with_capacity(external_files.slots.len());
1520        for slot in &external_files.slots {
1521            let filename =
1522                heap.get_string_storage(slot.name_offset, self.context.storage.as_ref())?;
1523            let storage = self
1524                .context
1525                .resolve_external_file(&filename)?
1526                .ok_or_else(|| {
1527                    Error::Other(format!(
1528                        "external raw data file '{filename}' could not be resolved"
1529                    ))
1530                })?;
1531            let size = if Cursor::is_undefined_offset(slot.size, self.length_size()) {
1532                u64::MAX.saturating_sub(logical_offset)
1533            } else {
1534                slot.size
1535            };
1536
1537            slots.push(ResolvedExternalRawSlot {
1538                logical_offset,
1539                storage,
1540                file_offset: slot.offset,
1541                size,
1542            });
1543
1544            if Cursor::is_undefined_offset(slot.size, self.length_size()) {
1545                break;
1546            }
1547            logical_offset = logical_offset.checked_add(slot.size).ok_or_else(|| {
1548                Error::InvalidData("external raw data logical offset overflows".into())
1549            })?;
1550        }
1551
1552        Ok(slots)
1553    }
1554
1555    fn read_chunked<T: H5Type>(
1556        &self,
1557        index_address: u64,
1558        chunk_dims: &[u32],
1559        _element_size: u32,
1560        chunk_indexing: Option<&ChunkIndexing>,
1561    ) -> Result<ArrayD<T>> {
1562        if Cursor::is_undefined_offset(index_address, self.offset_size()) {
1563            return self.make_fill_array::<T>();
1564        }
1565
1566        let ndim = self.ndim();
1567        let shape = &self.dataspace.dims;
1568        let elem_size = self.raw_element_size();
1569        let total_elements = checked_usize(self.num_elements(), "dataset element count")?;
1570        let total_bytes = checked_mul_usize(total_elements, elem_size, "dataset size in bytes")?;
1571
1572        if total_bytes <= HOT_FULL_DATASET_CACHE_MAX_BYTES {
1573            if let Some(cached_bytes) = self.full_dataset_bytes.get() {
1574                return self.decode_raw_data::<T>(cached_bytes);
1575            }
1576        }
1577
1578        let chunk_shape: Vec<u64> = chunk_dims.iter().map(|&d| d as u64).collect();
1579        validate_chunk_shape(shape, &chunk_shape)?;
1580        let dataset_strides = row_major_strides(shape, "dataset stride")?;
1581        let chunk_strides = row_major_strides(&chunk_shape, "chunk stride")?;
1582
1583        let mut entries = self.collect_chunk_entries(
1584            index_address,
1585            chunk_dims,
1586            chunk_indexing,
1587            ChunkEntrySelection {
1588                shape,
1589                ndim,
1590                elem_size,
1591                chunk_bounds: None,
1592            },
1593        )?;
1594
1595        let full_chunk_coverage = match full_dataset_chunk_bounds(shape, &chunk_shape)? {
1596            Some((first_chunk, last_chunk)) => validate_chunk_grid_coverage(
1597                &mut entries,
1598                shape,
1599                &chunk_shape,
1600                &first_chunk,
1601                &last_chunk,
1602            )?,
1603            None if entries.is_empty() => true,
1604            None => {
1605                return Err(Error::InvalidData(
1606                    "chunk index contains entries for an empty dataset".into(),
1607                ))
1608            }
1609        };
1610        if full_chunk_coverage {
1611            if T::native_copy_compatible(&self.datatype) && std::mem::size_of::<T>() == elem_size {
1612                let mut result_values: Vec<MaybeUninit<T>> =
1613                    std::iter::repeat_with(MaybeUninit::<T>::uninit)
1614                        .take(total_elements)
1615                        .collect();
1616                let result_ptr = result_values.as_mut_ptr() as *mut u8;
1617                let result_len = checked_mul_usize(
1618                    result_values.len(),
1619                    std::mem::size_of::<T>(),
1620                    "typed dataset size in bytes",
1621                )?;
1622
1623                for entry in &entries {
1624                    let chunk_data =
1625                        self.load_exact_chunk_data(entry, index_address, &chunk_shape, elem_size)?;
1626                    unsafe {
1627                        copy_chunk_to_flat_with_strides_ptr(
1628                            &chunk_data,
1629                            FlatBufferPtr {
1630                                ptr: result_ptr,
1631                                len: result_len,
1632                            },
1633                            ChunkCopyLayout {
1634                                chunk_offsets: &entry.offsets,
1635                                chunk_shape: &chunk_shape,
1636                                dataset_shape: shape,
1637                                dataset_strides: &dataset_strides,
1638                                chunk_strides: &chunk_strides,
1639                                elem_size,
1640                            },
1641                        )?;
1642                    }
1643                }
1644
1645                if total_bytes <= HOT_FULL_DATASET_CACHE_MAX_BYTES {
1646                    let mut cached_bytes = vec![0u8; total_bytes];
1647                    unsafe {
1648                        std::ptr::copy_nonoverlapping(
1649                            result_ptr,
1650                            cached_bytes.as_mut_ptr(),
1651                            total_bytes,
1652                        );
1653                    }
1654                    let _ = self.full_dataset_bytes.set(Arc::new(cached_bytes));
1655                }
1656
1657                let mut result_shape = Vec::with_capacity(shape.len());
1658                for &dim in shape {
1659                    result_shape.push(checked_usize(dim, "dataset dimension")?);
1660                }
1661                let result_values = assume_init_vec(result_values);
1662                return ArrayD::from_shape_vec(IxDyn(&result_shape), result_values)
1663                    .map_err(|e| Error::InvalidData(format!("array shape error: {e}")));
1664            }
1665
1666            let mut flat_data = vec![MaybeUninit::<u8>::uninit(); total_bytes];
1667            let flat_ptr = flat_data.as_mut_ptr() as *mut u8;
1668            let flat_len = flat_data.len();
1669
1670            for entry in &entries {
1671                let chunk_data =
1672                    self.load_exact_chunk_data(entry, index_address, &chunk_shape, elem_size)?;
1673                unsafe {
1674                    copy_chunk_to_flat_with_strides_ptr(
1675                        &chunk_data,
1676                        FlatBufferPtr {
1677                            ptr: flat_ptr,
1678                            len: flat_len,
1679                        },
1680                        ChunkCopyLayout {
1681                            chunk_offsets: &entry.offsets,
1682                            chunk_shape: &chunk_shape,
1683                            dataset_shape: shape,
1684                            dataset_strides: &dataset_strides,
1685                            chunk_strides: &chunk_strides,
1686                            elem_size,
1687                        },
1688                    )?;
1689                }
1690            }
1691
1692            let flat_data = assume_init_u8_vec(flat_data);
1693            if total_bytes <= HOT_FULL_DATASET_CACHE_MAX_BYTES {
1694                let _ = self.full_dataset_bytes.set(Arc::new(flat_data.clone()));
1695            }
1696            return self.decode_raw_data::<T>(&flat_data);
1697        }
1698
1699        let mut flat_data = self.make_output_buffer(total_bytes);
1700        for entry in &entries {
1701            let chunk_data =
1702                self.load_exact_chunk_data(entry, index_address, &chunk_shape, elem_size)?;
1703            copy_chunk_to_flat_with_strides(
1704                &chunk_data,
1705                &mut flat_data,
1706                ChunkCopyLayout {
1707                    chunk_offsets: &entry.offsets,
1708                    chunk_shape: &chunk_shape,
1709                    dataset_shape: shape,
1710                    dataset_strides: &dataset_strides,
1711                    chunk_strides: &chunk_strides,
1712                    elem_size,
1713                },
1714            )?;
1715        }
1716
1717        self.decode_raw_data::<T>(&flat_data)
1718    }
1719
1720    fn read_chunked_bytes_into(
1721        &self,
1722        index_address: u64,
1723        chunk_dims: &[u32],
1724        chunk_indexing: Option<&ChunkIndexing>,
1725        dst: &mut [u8],
1726    ) -> Result<()> {
1727        if Cursor::is_undefined_offset(index_address, self.offset_size()) {
1728            self.fill_output_buffer(dst);
1729            return Ok(());
1730        }
1731
1732        let ndim = self.ndim();
1733        let shape = &self.dataspace.dims;
1734        let elem_size = self.raw_element_size();
1735
1736        if dst.len() <= HOT_FULL_DATASET_CACHE_MAX_BYTES {
1737            if let Some(cached_bytes) = self.full_dataset_bytes.get() {
1738                if cached_bytes.len() == dst.len() {
1739                    dst.copy_from_slice(cached_bytes.as_slice());
1740                    return Ok(());
1741                }
1742            }
1743        }
1744
1745        let chunk_shape: Vec<u64> = chunk_dims.iter().map(|&d| d as u64).collect();
1746        validate_chunk_shape(shape, &chunk_shape)?;
1747        let dataset_strides = row_major_strides(shape, "dataset stride")?;
1748        let chunk_strides = row_major_strides(&chunk_shape, "chunk stride")?;
1749
1750        let mut entries = self.collect_chunk_entries(
1751            index_address,
1752            chunk_dims,
1753            chunk_indexing,
1754            ChunkEntrySelection {
1755                shape,
1756                ndim,
1757                elem_size,
1758                chunk_bounds: None,
1759            },
1760        )?;
1761
1762        let full_chunk_coverage = match full_dataset_chunk_bounds(shape, &chunk_shape)? {
1763            Some((first_chunk, last_chunk)) => validate_chunk_grid_coverage(
1764                &mut entries,
1765                shape,
1766                &chunk_shape,
1767                &first_chunk,
1768                &last_chunk,
1769            )?,
1770            None if entries.is_empty() => true,
1771            None => {
1772                return Err(Error::InvalidData(
1773                    "chunk index contains entries for an empty dataset".into(),
1774                ))
1775            }
1776        };
1777
1778        self.fill_output_buffer(dst);
1779        for entry in &entries {
1780            let chunk_data =
1781                self.load_exact_chunk_data(entry, index_address, &chunk_shape, elem_size)?;
1782            copy_chunk_to_flat_with_strides(
1783                &chunk_data,
1784                dst,
1785                ChunkCopyLayout {
1786                    chunk_offsets: &entry.offsets,
1787                    chunk_shape: &chunk_shape,
1788                    dataset_shape: shape,
1789                    dataset_strides: &dataset_strides,
1790                    chunk_strides: &chunk_strides,
1791                    elem_size,
1792                },
1793            )?;
1794        }
1795
1796        if full_chunk_coverage && dst.len() <= HOT_FULL_DATASET_CACHE_MAX_BYTES {
1797            let _ = self.full_dataset_bytes.set(Arc::new(dst.to_vec()));
1798        }
1799
1800        Ok(())
1801    }
1802
1803    #[cfg(feature = "rayon")]
1804    fn read_chunked_parallel<T: H5Type>(
1805        &self,
1806        index_address: u64,
1807        chunk_dims: &[u32],
1808        _element_size: u32,
1809        chunk_indexing: Option<&ChunkIndexing>,
1810    ) -> Result<ArrayD<T>> {
1811        if Cursor::is_undefined_offset(index_address, self.offset_size()) {
1812            return self.make_fill_array::<T>();
1813        }
1814
1815        let ndim = self.ndim();
1816        let shape = &self.dataspace.dims;
1817        let elem_size = self.raw_element_size();
1818        let total_elements = checked_usize(self.num_elements(), "dataset element count")?;
1819        let total_bytes = checked_mul_usize(total_elements, elem_size, "dataset size in bytes")?;
1820
1821        if total_bytes <= HOT_FULL_DATASET_CACHE_MAX_BYTES {
1822            if let Some(cached_bytes) = self.full_dataset_bytes.get() {
1823                return self.decode_raw_data::<T>(cached_bytes);
1824            }
1825        }
1826
1827        let chunk_shape: Vec<u64> = chunk_dims.iter().map(|&d| d as u64).collect();
1828        validate_chunk_shape(shape, &chunk_shape)?;
1829        let dataset_strides = row_major_strides(shape, "dataset stride")?;
1830        let chunk_strides = row_major_strides(&chunk_shape, "chunk stride")?;
1831
1832        let mut entries = self.collect_chunk_entries(
1833            index_address,
1834            chunk_dims,
1835            chunk_indexing,
1836            ChunkEntrySelection {
1837                shape,
1838                ndim,
1839                elem_size,
1840                chunk_bounds: None,
1841            },
1842        )?;
1843
1844        let full_chunk_coverage = match full_dataset_chunk_bounds(shape, &chunk_shape)? {
1845            Some((first_chunk, last_chunk)) => validate_chunk_grid_coverage(
1846                &mut entries,
1847                shape,
1848                &chunk_shape,
1849                &first_chunk,
1850                &last_chunk,
1851            )?,
1852            None if entries.is_empty() => true,
1853            None => {
1854                return Err(Error::InvalidData(
1855                    "chunk index contains entries for an empty dataset".into(),
1856                ))
1857            }
1858        };
1859        if full_chunk_coverage {
1860            if T::native_copy_compatible(&self.datatype) && std::mem::size_of::<T>() == elem_size {
1861                let mut result_values: Vec<MaybeUninit<T>> =
1862                    std::iter::repeat_with(MaybeUninit::<T>::uninit)
1863                        .take(total_elements)
1864                        .collect();
1865                let flat = FlatBufferPtr {
1866                    ptr: result_values.as_mut_ptr() as *mut u8,
1867                    len: checked_mul_usize(
1868                        result_values.len(),
1869                        std::mem::size_of::<T>(),
1870                        "typed dataset size in bytes",
1871                    )?,
1872                };
1873
1874                entries
1875                    .par_iter()
1876                    .map(|entry| {
1877                        self.load_exact_chunk_data(entry, index_address, &chunk_shape, elem_size)
1878                            .and_then(|data| unsafe {
1879                                flat.copy_chunk(
1880                                    &data,
1881                                    ChunkCopyLayout {
1882                                        chunk_offsets: &entry.offsets,
1883                                        chunk_shape: &chunk_shape,
1884                                        dataset_shape: shape,
1885                                        dataset_strides: &dataset_strides,
1886                                        chunk_strides: &chunk_strides,
1887                                        elem_size,
1888                                    },
1889                                )
1890                            })
1891                    })
1892                    .collect::<std::result::Result<Vec<_>, Error>>()?;
1893
1894                let mut result_shape = Vec::with_capacity(shape.len());
1895                for &dim in shape {
1896                    result_shape.push(checked_usize(dim, "dataset dimension")?);
1897                }
1898                if total_bytes <= HOT_FULL_DATASET_CACHE_MAX_BYTES {
1899                    let mut cached_bytes = vec![0u8; total_bytes];
1900                    unsafe {
1901                        std::ptr::copy_nonoverlapping(
1902                            flat.ptr,
1903                            cached_bytes.as_mut_ptr(),
1904                            total_bytes,
1905                        );
1906                    }
1907                    let _ = self.full_dataset_bytes.set(Arc::new(cached_bytes));
1908                }
1909                let result_values = assume_init_vec(result_values);
1910                return ArrayD::from_shape_vec(IxDyn(&result_shape), result_values)
1911                    .map_err(|e| Error::InvalidData(format!("array shape error: {e}")));
1912            }
1913
1914            let mut flat_data = vec![MaybeUninit::<u8>::uninit(); total_bytes];
1915            let flat = FlatBufferPtr {
1916                ptr: flat_data.as_mut_ptr() as *mut u8,
1917                len: flat_data.len(),
1918            };
1919
1920            entries
1921                .par_iter()
1922                .map(|entry| {
1923                    self.load_exact_chunk_data(entry, index_address, &chunk_shape, elem_size)
1924                        .and_then(|data| unsafe {
1925                            flat.copy_chunk(
1926                                &data,
1927                                ChunkCopyLayout {
1928                                    chunk_offsets: &entry.offsets,
1929                                    chunk_shape: &chunk_shape,
1930                                    dataset_shape: shape,
1931                                    dataset_strides: &dataset_strides,
1932                                    chunk_strides: &chunk_strides,
1933                                    elem_size,
1934                                },
1935                            )
1936                        })
1937                })
1938                .collect::<std::result::Result<Vec<_>, Error>>()?;
1939
1940            let flat_data = assume_init_u8_vec(flat_data);
1941            if total_bytes <= HOT_FULL_DATASET_CACHE_MAX_BYTES {
1942                let _ = self.full_dataset_bytes.set(Arc::new(flat_data.clone()));
1943            }
1944            return self.decode_raw_data::<T>(&flat_data);
1945        }
1946
1947        let mut flat_data = self.make_output_buffer(total_bytes);
1948        let flat = FlatBufferPtr {
1949            ptr: flat_data.as_mut_ptr(),
1950            len: flat_data.len(),
1951        };
1952
1953        entries
1954            .par_iter()
1955            .map(|entry| {
1956                self.load_exact_chunk_data(entry, index_address, &chunk_shape, elem_size)
1957                    .and_then(|data| unsafe {
1958                        flat.copy_chunk(
1959                            &data,
1960                            ChunkCopyLayout {
1961                                chunk_offsets: &entry.offsets,
1962                                chunk_shape: &chunk_shape,
1963                                dataset_shape: shape,
1964                                dataset_strides: &dataset_strides,
1965                                chunk_strides: &chunk_strides,
1966                                elem_size,
1967                            },
1968                        )
1969                    })
1970            })
1971            .collect::<std::result::Result<Vec<_>, Error>>()?;
1972
1973        self.decode_raw_data::<T>(&flat_data)
1974    }
1975
1976    /// Collect all chunk entries by dispatching on the chunk indexing type.
1977    ///
1978    /// Shared by `read_chunked` and `read_chunked_slice`.
1979    fn collect_chunk_entries(
1980        &self,
1981        index_address: u64,
1982        chunk_dims: &[u32],
1983        chunk_indexing: Option<&ChunkIndexing>,
1984        selection: ChunkEntrySelection<'_>,
1985    ) -> Result<Vec<chunk_index::ChunkEntry>> {
1986        if selection.chunk_bounds.is_none() {
1987            if let Some(cached) = self.full_chunk_entries.get() {
1988                return Ok((**cached).clone());
1989            }
1990        }
1991
1992        let cache_key =
1993            selection
1994                .chunk_bounds
1995                .map(|(first_chunk, last_chunk)| ChunkEntryCacheKey {
1996                    index_address,
1997                    first_chunk: SmallVec::from_slice(first_chunk),
1998                    last_chunk: SmallVec::from_slice(last_chunk),
1999                });
2000
2001        if let Some(ref key) = cache_key {
2002            let mut cache = self.chunk_entry_cache.lock();
2003            if let Some(cached) = cache.get(key) {
2004                return Ok((**cached).clone());
2005            }
2006        }
2007
2008        let entries = match chunk_indexing {
2009            None => {
2010                // V1-V3: B-tree v1 chunk indexing
2011                self.collect_btree_v1_entries(
2012                    index_address,
2013                    selection.ndim,
2014                    chunk_dims,
2015                    selection.chunk_bounds,
2016                )
2017            }
2018            Some(ChunkIndexing::SingleChunk {
2019                filtered_size,
2020                filters,
2021            }) => Ok(vec![chunk_index::single_chunk_entry(
2022                index_address,
2023                *filtered_size,
2024                *filters,
2025                selection.ndim,
2026            )]),
2027            Some(ChunkIndexing::BTreeV2) => chunk_index::collect_v2_chunk_entries_storage(
2028                self.context.storage.as_ref(),
2029                index_address,
2030                self.offset_size(),
2031                self.length_size(),
2032                selection.ndim as u32,
2033                chunk_dims,
2034                selection.chunk_bounds,
2035            ),
2036            Some(ChunkIndexing::Implicit) => Ok(chunk_index::collect_implicit_chunk_entries(
2037                index_address,
2038                selection.shape,
2039                chunk_dims,
2040                selection.elem_size,
2041                selection.chunk_bounds,
2042            )),
2043            Some(ChunkIndexing::FixedArray { .. }) => {
2044                crate::fixed_array::collect_fixed_array_chunk_entries_storage(
2045                    self.context.storage.as_ref(),
2046                    index_address,
2047                    self.offset_size(),
2048                    self.length_size(),
2049                    selection.shape,
2050                    chunk_dims,
2051                    selection.chunk_bounds,
2052                )
2053            }
2054            Some(ChunkIndexing::ExtensibleArray { .. }) => {
2055                crate::extensible_array::collect_extensible_array_chunk_entries_storage(
2056                    self.context.storage.as_ref(),
2057                    index_address,
2058                    self.offset_size(),
2059                    self.length_size(),
2060                    selection.shape,
2061                    chunk_dims,
2062                    selection.chunk_bounds,
2063                )
2064            }
2065        }?;
2066
2067        if let Some(key) = cache_key {
2068            let mut cache = self.chunk_entry_cache.lock();
2069            cache.put(key, Arc::new(entries.clone()));
2070        } else {
2071            let _ = self.full_chunk_entries.set(Arc::new(entries.clone()));
2072        }
2073
2074        Ok(entries)
2075    }
2076
2077    /// Collect chunk entries from a B-tree v1 index.
2078    fn collect_btree_v1_entries(
2079        &self,
2080        btree_address: u64,
2081        ndim: usize,
2082        chunk_dims: &[u32],
2083        chunk_bounds: Option<(&[u64], &[u64])>,
2084    ) -> Result<Vec<chunk_index::ChunkEntry>> {
2085        let leaves = crate::btree_v1::collect_btree_v1_leaves_storage(
2086            self.context.storage.as_ref(),
2087            btree_address,
2088            self.offset_size(),
2089            self.length_size(),
2090            Some(ndim as u32),
2091            chunk_dims,
2092            chunk_bounds,
2093        )?;
2094
2095        let mut entries = Vec::with_capacity(leaves.len());
2096        for (key, chunk_addr) in &leaves {
2097            match key {
2098                crate::btree_v1::BTreeV1Key::RawData {
2099                    chunk_size,
2100                    filter_mask,
2101                    offsets,
2102                } => {
2103                    entries.push(chunk_index::ChunkEntry {
2104                        address: *chunk_addr,
2105                        size: *chunk_size as u64,
2106                        filter_mask: *filter_mask,
2107                        offsets: offsets[..ndim].to_vec(),
2108                    });
2109                }
2110                _ => {
2111                    return Err(Error::InvalidData(
2112                        "expected raw data key in chunk B-tree".into(),
2113                    ))
2114                }
2115            }
2116        }
2117        Ok(entries)
2118    }
2119
2120    fn load_chunk_data(
2121        &self,
2122        entry: &chunk_index::ChunkEntry,
2123        dataset_addr: u64,
2124        chunk_shape: &[u64],
2125        elem_size: usize,
2126    ) -> Result<Arc<Vec<u8>>> {
2127        let cache_key = ChunkKey {
2128            dataset_addr,
2129            chunk_offsets: smallvec::SmallVec::from_slice(&entry.offsets),
2130        };
2131
2132        self.chunk_cache.get_or_insert_with(cache_key, || {
2133            let size = if entry.size > 0 {
2134                checked_usize(entry.size, "encoded chunk size")?
2135            } else {
2136                let chunk_elements =
2137                    checked_shape_elements_usize(chunk_shape, "chunk element count")?;
2138                checked_mul_usize(chunk_elements, elem_size, "chunk byte size")?
2139            };
2140            let raw = self.context.read_range(entry.address, size)?;
2141
2142            if let Some(ref pipeline) = self.filters {
2143                filters::apply_pipeline(
2144                    raw.as_ref(),
2145                    &pipeline.filters,
2146                    entry.filter_mask,
2147                    elem_size,
2148                    Some(&self.filter_registry),
2149                )
2150            } else {
2151                Ok(raw.to_vec())
2152            }
2153        })
2154    }
2155
2156    fn load_exact_chunk_data(
2157        &self,
2158        entry: &chunk_index::ChunkEntry,
2159        dataset_addr: u64,
2160        chunk_shape: &[u64],
2161        elem_size: usize,
2162    ) -> Result<Arc<Vec<u8>>> {
2163        let data = self.load_chunk_data(entry, dataset_addr, chunk_shape, elem_size)?;
2164        validate_decoded_chunk_len(entry, chunk_shape, elem_size, data.len())?;
2165        Ok(data)
2166    }
2167
2168    /// Chunked slice: only read chunks that overlap the selection.
2169    ///
2170    /// Resolves each `SliceInfoElem` to concrete ranges, computes the chunk
2171    /// grid range per dimension, and only decompresses overlapping chunks.
2172    fn read_chunked_slice<T: H5Type>(
2173        &self,
2174        index_address: u64,
2175        chunk_dims: &[u32],
2176        _element_size: u32,
2177        chunk_indexing: Option<&ChunkIndexing>,
2178        _selection: &SliceInfo,
2179        resolved: &ResolvedSelection,
2180    ) -> Result<ArrayD<T>> {
2181        if resolved.result_elements == 0 {
2182            return self.make_fill_array_from_shape::<T>(0, &resolved.result_shape);
2183        }
2184
2185        if Cursor::is_undefined_offset(index_address, self.offset_size()) {
2186            return self
2187                .make_fill_array_from_shape::<T>(resolved.result_elements, &resolved.result_shape);
2188        }
2189
2190        let ndim = self.ndim();
2191        let shape = &self.dataspace.dims;
2192        let elem_size = dtype_element_size(&self.datatype);
2193        let chunk_shape: Vec<u64> = chunk_dims.iter().map(|&d| d as u64).collect();
2194        validate_chunk_shape(shape, &chunk_shape)?;
2195        let mut first_chunk = vec![0u64; ndim];
2196        let mut last_chunk = vec![0u64; ndim];
2197        for d in 0..ndim {
2198            let (first, last) = resolved.dims[d]
2199                .chunk_index_range(chunk_shape[d])
2200                .expect("zero-sized result handled above");
2201            first_chunk[d] = first;
2202            last_chunk[d] = last;
2203        }
2204
2205        // Collect all chunk entries.
2206        let mut overlapping = self.collect_chunk_entries(
2207            index_address,
2208            chunk_dims,
2209            chunk_indexing,
2210            ChunkEntrySelection {
2211                shape,
2212                ndim,
2213                elem_size,
2214                chunk_bounds: Some((&first_chunk, &last_chunk)),
2215            },
2216        )?;
2217        let fully_covered_grid = validate_chunk_grid_coverage(
2218            &mut overlapping,
2219            shape,
2220            &chunk_shape,
2221            &first_chunk,
2222            &last_chunk,
2223        )?;
2224
2225        let result_total_bytes = checked_mul_usize(
2226            resolved.result_elements,
2227            elem_size,
2228            "slice result size in bytes",
2229        )?;
2230        // Compute result strides (including collapsed dims — they have count=1).
2231        let result_dims = resolved.result_dims_with_collapsed();
2232        let mut result_strides = vec![1usize; ndim];
2233        for d in (0..ndim.saturating_sub(1)).rev() {
2234            result_strides[d] =
2235                checked_mul_usize(result_strides[d + 1], result_dims[d + 1], "result stride")?;
2236        }
2237        let mut chunk_strides = vec![1usize; ndim];
2238        for d in (0..ndim.saturating_sub(1)).rev() {
2239            chunk_strides[d] = checked_mul_usize(
2240                chunk_strides[d + 1],
2241                chunk_shape[d + 1] as usize,
2242                "chunk stride",
2243            )?;
2244        }
2245        let use_unit_stride_fast_path = resolved.is_unit_stride();
2246        let fully_covered_unit_stride = use_unit_stride_fast_path && fully_covered_grid;
2247
2248        if fully_covered_unit_stride {
2249            if T::native_copy_compatible(&self.datatype) && std::mem::size_of::<T>() == elem_size {
2250                let mut result_values: Vec<MaybeUninit<T>> =
2251                    std::iter::repeat_with(MaybeUninit::<T>::uninit)
2252                        .take(resolved.result_elements)
2253                        .collect();
2254                let result_ptr = result_values.as_mut_ptr() as *mut u8;
2255                let result_len = checked_mul_usize(
2256                    result_values.len(),
2257                    std::mem::size_of::<T>(),
2258                    "typed slice result size in bytes",
2259                )?;
2260
2261                for entry in &overlapping {
2262                    let chunk_data =
2263                        self.load_exact_chunk_data(entry, index_address, &chunk_shape, elem_size)?;
2264
2265                    unsafe {
2266                        copy_unit_stride_chunk_overlap_ptr(
2267                            &chunk_data,
2268                            FlatBufferPtr {
2269                                ptr: result_ptr,
2270                                len: result_len,
2271                            },
2272                            UnitStrideCopyLayout {
2273                                chunk_offsets: &entry.offsets,
2274                                chunk_shape: &chunk_shape,
2275                                dataset_shape: shape,
2276                                resolved,
2277                                chunk_strides: &chunk_strides,
2278                                result_strides: &result_strides,
2279                                elem_size,
2280                            },
2281                        )?;
2282                    }
2283                }
2284
2285                let result_values = assume_init_vec(result_values);
2286                return ArrayD::from_shape_vec(IxDyn(&resolved.result_shape), result_values)
2287                    .map_err(|e| Error::InvalidData(format!("array shape error: {e}")));
2288            }
2289
2290            let mut result_buf = vec![MaybeUninit::<u8>::uninit(); result_total_bytes];
2291            let result_ptr = result_buf.as_mut_ptr() as *mut u8;
2292            let result_len = result_buf.len();
2293
2294            for entry in &overlapping {
2295                let chunk_data =
2296                    self.load_exact_chunk_data(entry, index_address, &chunk_shape, elem_size)?;
2297
2298                unsafe {
2299                    copy_unit_stride_chunk_overlap_ptr(
2300                        &chunk_data,
2301                        FlatBufferPtr {
2302                            ptr: result_ptr,
2303                            len: result_len,
2304                        },
2305                        UnitStrideCopyLayout {
2306                            chunk_offsets: &entry.offsets,
2307                            chunk_shape: &chunk_shape,
2308                            dataset_shape: shape,
2309                            resolved,
2310                            chunk_strides: &chunk_strides,
2311                            result_strides: &result_strides,
2312                            elem_size,
2313                        },
2314                    )?;
2315                }
2316            }
2317
2318            let result_buf = assume_init_u8_vec(result_buf);
2319            return self.decode_buffer_with_shape::<T>(
2320                &result_buf,
2321                resolved.result_elements,
2322                &resolved.result_shape,
2323            );
2324        }
2325
2326        let mut result_buf = self.make_output_buffer(result_total_bytes);
2327
2328        // For each overlapping chunk: decompress and copy matching elements.
2329        for entry in &overlapping {
2330            let chunk_data =
2331                self.load_exact_chunk_data(entry, index_address, &chunk_shape, elem_size)?;
2332
2333            if use_unit_stride_fast_path {
2334                copy_unit_stride_chunk_overlap(
2335                    &chunk_data,
2336                    &mut result_buf,
2337                    UnitStrideCopyLayout {
2338                        chunk_offsets: &entry.offsets,
2339                        chunk_shape: &chunk_shape,
2340                        dataset_shape: shape,
2341                        resolved,
2342                        chunk_strides: &chunk_strides,
2343                        result_strides: &result_strides,
2344                        elem_size,
2345                    },
2346                )?;
2347                continue;
2348            }
2349
2350            // For each dimension, compute which elements within this chunk fall
2351            // within the selection.
2352            let mut dim_indices: Vec<Vec<(usize, usize)>> = Vec::with_capacity(ndim);
2353            for d in 0..ndim {
2354                let chunk_start = entry.offsets[d];
2355                let chunk_end = (chunk_start + chunk_shape[d]).min(shape[d]);
2356                let dim = &resolved.dims[d];
2357                let sel_start = dim.start;
2358                let sel_end = dim.end;
2359                let sel_step = dim.step;
2360                let mut indices = Vec::new();
2361
2362                // Find first selected index >= chunk_start
2363                let first_sel = if sel_start >= chunk_start {
2364                    sel_start
2365                } else {
2366                    let steps_to_skip = (chunk_start - sel_start).div_ceil(sel_step);
2367                    sel_start + steps_to_skip * sel_step
2368                };
2369
2370                let mut sel_idx = first_sel;
2371                while sel_idx < sel_end && sel_idx < chunk_end {
2372                    let chunk_local = checked_usize(sel_idx - chunk_start, "chunk-local index")?;
2373                    // Compute result-space index for this dimension.
2374                    let result_dim_idx =
2375                        checked_usize((sel_idx - dim.start) / sel_step, "result index")?;
2376                    indices.push((chunk_local, result_dim_idx));
2377                    sel_idx += sel_step;
2378                }
2379
2380                dim_indices.push(indices);
2381            }
2382
2383            // Iterate over the cartesian product of matching indices.
2384            copy_selected_elements(
2385                &chunk_data,
2386                &mut result_buf,
2387                &dim_indices,
2388                &chunk_strides,
2389                &result_strides,
2390                elem_size,
2391                ndim,
2392            )?;
2393        }
2394
2395        self.decode_buffer_with_shape::<T>(
2396            &result_buf,
2397            resolved.result_elements,
2398            &resolved.result_shape,
2399        )
2400    }
2401
2402    /// Parallel variant of `read_chunked_slice`: decompresses overlapping chunks
2403    /// in parallel using Rayon, then copies selected elements into the result buffer.
2404    ///
2405    /// Each chunk writes to a disjoint region of the result buffer (chunks don't
2406    /// overlap in output space), so this is safe to parallelize.
2407    #[cfg(feature = "rayon")]
2408    fn read_chunked_slice_parallel<T: H5Type>(
2409        &self,
2410        index_address: u64,
2411        chunk_dims: &[u32],
2412        _element_size: u32,
2413        chunk_indexing: Option<&ChunkIndexing>,
2414        _selection: &SliceInfo,
2415        resolved: &ResolvedSelection,
2416    ) -> Result<ArrayD<T>> {
2417        if resolved.result_elements == 0 {
2418            return self.make_fill_array_from_shape::<T>(0, &resolved.result_shape);
2419        }
2420
2421        if Cursor::is_undefined_offset(index_address, self.offset_size()) {
2422            return self
2423                .make_fill_array_from_shape::<T>(resolved.result_elements, &resolved.result_shape);
2424        }
2425
2426        let ndim = self.ndim();
2427        let shape = &self.dataspace.dims;
2428        let elem_size = dtype_element_size(&self.datatype);
2429        let chunk_shape: Vec<u64> = chunk_dims.iter().map(|&d| d as u64).collect();
2430        validate_chunk_shape(shape, &chunk_shape)?;
2431        let mut first_chunk = vec![0u64; ndim];
2432        let mut last_chunk = vec![0u64; ndim];
2433        for d in 0..ndim {
2434            let (first, last) = resolved.dims[d]
2435                .chunk_index_range(chunk_shape[d])
2436                .expect("zero-sized result handled above");
2437            first_chunk[d] = first;
2438            last_chunk[d] = last;
2439        }
2440
2441        // Collect all chunk entries.
2442        let mut overlapping = self.collect_chunk_entries(
2443            index_address,
2444            chunk_dims,
2445            chunk_indexing,
2446            ChunkEntrySelection {
2447                shape,
2448                ndim,
2449                elem_size,
2450                chunk_bounds: Some((&first_chunk, &last_chunk)),
2451            },
2452        )?;
2453        let fully_covered_grid = validate_chunk_grid_coverage(
2454            &mut overlapping,
2455            shape,
2456            &chunk_shape,
2457            &first_chunk,
2458            &last_chunk,
2459        )?;
2460
2461        // Allocate result buffer (raw bytes) initialized from fill value.
2462        let result_total_bytes = checked_mul_usize(
2463            resolved.result_elements,
2464            elem_size,
2465            "slice result size in bytes",
2466        )?;
2467        // Compute result strides (including collapsed dims — they have count=1).
2468        let result_dims = resolved.result_dims_with_collapsed();
2469        let mut result_strides = vec![1usize; ndim];
2470        for d in (0..ndim.saturating_sub(1)).rev() {
2471            result_strides[d] =
2472                checked_mul_usize(result_strides[d + 1], result_dims[d + 1], "result stride")?;
2473        }
2474        let mut chunk_strides = vec![1usize; ndim];
2475        for d in (0..ndim.saturating_sub(1)).rev() {
2476            chunk_strides[d] = checked_mul_usize(
2477                chunk_strides[d + 1],
2478                chunk_shape[d + 1] as usize,
2479                "chunk stride",
2480            )?;
2481        }
2482        let use_unit_stride_fast_path = resolved.is_unit_stride();
2483        let fully_covered_unit_stride = use_unit_stride_fast_path && fully_covered_grid;
2484
2485        if fully_covered_unit_stride {
2486            if T::native_copy_compatible(&self.datatype) && std::mem::size_of::<T>() == elem_size {
2487                let mut result_values: Vec<MaybeUninit<T>> =
2488                    std::iter::repeat_with(MaybeUninit::<T>::uninit)
2489                        .take(resolved.result_elements)
2490                        .collect();
2491                let flat = FlatBufferPtr {
2492                    ptr: result_values.as_mut_ptr() as *mut u8,
2493                    len: checked_mul_usize(
2494                        result_values.len(),
2495                        std::mem::size_of::<T>(),
2496                        "typed slice result size in bytes",
2497                    )?,
2498                };
2499
2500                overlapping
2501                    .par_iter()
2502                    .map(|entry| {
2503                        let chunk_data = self.load_exact_chunk_data(
2504                            entry,
2505                            index_address,
2506                            &chunk_shape,
2507                            elem_size,
2508                        )?;
2509
2510                        unsafe {
2511                            flat.copy_unit_stride_chunk_overlap(
2512                                &chunk_data,
2513                                UnitStrideCopyLayout {
2514                                    chunk_offsets: &entry.offsets,
2515                                    chunk_shape: &chunk_shape,
2516                                    dataset_shape: shape,
2517                                    resolved,
2518                                    chunk_strides: &chunk_strides,
2519                                    result_strides: &result_strides,
2520                                    elem_size,
2521                                },
2522                            )?;
2523                        }
2524
2525                        Ok(())
2526                    })
2527                    .collect::<std::result::Result<Vec<_>, Error>>()?;
2528
2529                let result_values = assume_init_vec(result_values);
2530                return ArrayD::from_shape_vec(IxDyn(&resolved.result_shape), result_values)
2531                    .map_err(|e| Error::InvalidData(format!("array shape error: {e}")));
2532            }
2533
2534            let mut result_buf = vec![MaybeUninit::<u8>::uninit(); result_total_bytes];
2535            let flat = FlatBufferPtr {
2536                ptr: result_buf.as_mut_ptr() as *mut u8,
2537                len: result_buf.len(),
2538            };
2539
2540            overlapping
2541                .par_iter()
2542                .map(|entry| {
2543                    let chunk_data =
2544                        self.load_exact_chunk_data(entry, index_address, &chunk_shape, elem_size)?;
2545
2546                    unsafe {
2547                        flat.copy_unit_stride_chunk_overlap(
2548                            &chunk_data,
2549                            UnitStrideCopyLayout {
2550                                chunk_offsets: &entry.offsets,
2551                                chunk_shape: &chunk_shape,
2552                                dataset_shape: shape,
2553                                resolved,
2554                                chunk_strides: &chunk_strides,
2555                                result_strides: &result_strides,
2556                                elem_size,
2557                            },
2558                        )?;
2559                    }
2560
2561                    Ok(())
2562                })
2563                .collect::<std::result::Result<Vec<_>, Error>>()?;
2564
2565            let result_buf = assume_init_u8_vec(result_buf);
2566            return self.decode_buffer_with_shape::<T>(
2567                &result_buf,
2568                resolved.result_elements,
2569                &resolved.result_shape,
2570            );
2571        }
2572
2573        let mut result_buf = self.make_output_buffer(result_total_bytes);
2574
2575        let flat = FlatBufferPtr {
2576            ptr: result_buf.as_mut_ptr(),
2577            len: result_buf.len(),
2578        };
2579
2580        overlapping
2581            .par_iter()
2582            .map(|entry| {
2583                let chunk_data =
2584                    self.load_exact_chunk_data(entry, index_address, &chunk_shape, elem_size)?;
2585
2586                if use_unit_stride_fast_path {
2587                    unsafe {
2588                        flat.copy_unit_stride_chunk_overlap(
2589                            &chunk_data,
2590                            UnitStrideCopyLayout {
2591                                chunk_offsets: &entry.offsets,
2592                                chunk_shape: &chunk_shape,
2593                                dataset_shape: shape,
2594                                resolved,
2595                                chunk_strides: &chunk_strides,
2596                                result_strides: &result_strides,
2597                                elem_size,
2598                            },
2599                        )?;
2600                    }
2601                    return Ok(());
2602                }
2603
2604                // For each dimension, compute which elements within this chunk fall
2605                // within the selection.
2606                let mut dim_indices: Vec<Vec<(usize, usize)>> = Vec::with_capacity(ndim);
2607                for d in 0..ndim {
2608                    let chunk_start = entry.offsets[d];
2609                    let chunk_end = (chunk_start + chunk_shape[d]).min(shape[d]);
2610                    let dim = &resolved.dims[d];
2611                    let sel_start = dim.start;
2612                    let sel_end = dim.end;
2613                    let sel_step = dim.step;
2614                    let mut indices = Vec::new();
2615
2616                    let first_sel = if sel_start >= chunk_start {
2617                        sel_start
2618                    } else {
2619                        let steps_to_skip = (chunk_start - sel_start).div_ceil(sel_step);
2620                        sel_start + steps_to_skip * sel_step
2621                    };
2622
2623                    let mut sel_idx = first_sel;
2624                    while sel_idx < sel_end && sel_idx < chunk_end {
2625                        let chunk_local =
2626                            checked_usize(sel_idx - chunk_start, "chunk-local index")?;
2627                        let result_dim_idx =
2628                            checked_usize((sel_idx - dim.start) / sel_step, "result index")?;
2629                        indices.push((chunk_local, result_dim_idx));
2630                        sel_idx += sel_step;
2631                    }
2632
2633                    dim_indices.push(indices);
2634                }
2635
2636                // SAFETY: each chunk writes to disjoint output positions because
2637                // chunks occupy non-overlapping regions of the dataset grid and
2638                // the selection maps each dataset coordinate to a unique result index.
2639                unsafe {
2640                    flat.copy_selected(
2641                        &chunk_data,
2642                        &dim_indices,
2643                        &chunk_strides,
2644                        &result_strides,
2645                        elem_size,
2646                        ndim,
2647                    )?;
2648                }
2649
2650                Ok(())
2651            })
2652            .collect::<std::result::Result<Vec<_>, Error>>()?;
2653
2654        self.decode_buffer_with_shape::<T>(
2655            &result_buf,
2656            resolved.result_elements,
2657            &resolved.result_shape,
2658        )
2659    }
2660
2661    fn read_contiguous_slice<T: H5Type>(
2662        &self,
2663        address: u64,
2664        size: u64,
2665        resolved: &ResolvedSelection,
2666    ) -> Result<ArrayD<T>> {
2667        if resolved.result_elements == 0 {
2668            return self.make_fill_array_from_shape::<T>(0, &resolved.result_shape);
2669        }
2670
2671        if self.external_files.is_none()
2672            && (Cursor::is_undefined_offset(address, self.offset_size()) || size == 0)
2673        {
2674            return self
2675                .make_fill_array_from_shape::<T>(resolved.result_elements, &resolved.result_shape);
2676        }
2677        if self.external_files.is_none() {
2678            self.validate_allocated_raw_data_len(
2679                "contiguous",
2680                checked_usize(size, "contiguous dataset size")?,
2681            )?;
2682        }
2683
2684        let shape = &self.dataspace.dims;
2685        if selection_covers_full_dataset(resolved, shape) {
2686            return self.read_contiguous::<T>(address, size);
2687        }
2688
2689        let elem_size = self.raw_element_size();
2690        let result_total_bytes = checked_mul_usize(
2691            resolved.result_elements,
2692            elem_size,
2693            "contiguous slice result size in bytes",
2694        )?;
2695        let dataset_strides = row_major_strides(shape, "contiguous dataset stride")?;
2696        let result_dims = resolved.result_dims_with_collapsed();
2697        let result_strides = result_strides_for_dims(&result_dims)?;
2698        let result_buf = self.read_contiguous_slice_bytes_direct(
2699            address,
2700            size,
2701            resolved,
2702            ContiguousSliceDirectLayout {
2703                dataset_strides: &dataset_strides,
2704                result_strides: &result_strides,
2705                elem_size,
2706                result_total_bytes,
2707            },
2708        )?;
2709
2710        self.decode_buffer_with_shape::<T>(
2711            &result_buf,
2712            resolved.result_elements,
2713            &resolved.result_shape,
2714        )
2715    }
2716
2717    fn read_contiguous_slice_bytes_direct(
2718        &self,
2719        address: u64,
2720        size: u64,
2721        resolved: &ResolvedSelection,
2722        layout: ContiguousSliceDirectLayout<'_>,
2723    ) -> Result<Vec<u8>> {
2724        let shape = &self.dataspace.dims;
2725        let ndim = shape.len();
2726        if resolved.dims.len() != ndim
2727            || layout.dataset_strides.len() != ndim
2728            || layout.result_strides.len() != ndim
2729        {
2730            return Err(Error::InvalidData(format!(
2731                "contiguous slice layout rank does not match dataset rank {ndim}"
2732            )));
2733        }
2734
2735        let storage_len = if self.external_files.is_some() {
2736            checked_mul_usize(
2737                checked_usize(self.num_elements(), "dataset element count")?,
2738                layout.elem_size,
2739                "external dataset size",
2740            )?
2741        } else {
2742            checked_usize(size, "contiguous dataset size")?
2743        };
2744        let tail_start = contiguous_slice_tail_start(shape, resolved);
2745        let block_elements = contiguous_slice_block_elements(resolved, tail_start)?;
2746        let block_bytes = checked_mul_usize(
2747            block_elements,
2748            layout.elem_size,
2749            "contiguous slice block size in bytes",
2750        )?;
2751        let mut result_buf = self.make_output_buffer(layout.result_total_bytes);
2752
2753        let prefix_blocks =
2754            resolved.dims[..tail_start]
2755                .iter()
2756                .try_fold(1usize, |acc, dim| -> Result<usize> {
2757                    checked_mul_usize(acc, dim.count, "contiguous slice block count")
2758                })?;
2759        let mut counters = vec![0usize; tail_start];
2760
2761        for _ in 0..prefix_blocks {
2762            let mut source_elem = 0usize;
2763            let mut result_elem = 0usize;
2764
2765            for (d, &counter) in counters.iter().enumerate().take(tail_start) {
2766                let ordinal = u64::try_from(counter).map_err(|_| {
2767                    Error::InvalidData("contiguous slice ordinal exceeds u64".to_string())
2768                })?;
2769                let coord = checked_add_u64(
2770                    resolved.dims[d].start,
2771                    checked_mul_u64(
2772                        ordinal,
2773                        resolved.dims[d].step,
2774                        "contiguous slice coordinate",
2775                    )?,
2776                    "contiguous slice coordinate",
2777                )?;
2778                let coord = checked_usize(coord, "contiguous slice source index")?;
2779                let source_term =
2780                    checked_mul_usize(coord, layout.dataset_strides[d], "contiguous slice source")?;
2781                let result_term = checked_mul_usize(
2782                    counter,
2783                    layout.result_strides[d],
2784                    "contiguous slice result",
2785                )?;
2786                source_elem =
2787                    checked_add_usize(source_elem, source_term, "contiguous slice source")?;
2788                result_elem =
2789                    checked_add_usize(result_elem, result_term, "contiguous slice result")?;
2790            }
2791
2792            for (d, &dataset_stride) in layout
2793                .dataset_strides
2794                .iter()
2795                .enumerate()
2796                .take(ndim)
2797                .skip(tail_start)
2798            {
2799                let coord = checked_usize(resolved.dims[d].start, "contiguous slice source index")?;
2800                let source_term =
2801                    checked_mul_usize(coord, dataset_stride, "contiguous slice source")?;
2802                source_elem =
2803                    checked_add_usize(source_elem, source_term, "contiguous slice source")?;
2804            }
2805
2806            let source_start = checked_mul_usize(
2807                source_elem,
2808                layout.elem_size,
2809                "contiguous slice source byte offset",
2810            )?;
2811            let source_end = checked_add_usize(
2812                source_start,
2813                block_bytes,
2814                "contiguous slice source byte end",
2815            )?;
2816            if source_end > storage_len {
2817                return Err(Error::InvalidData(format!(
2818                    "contiguous slice range {}..{} exceeds dataset storage size {}",
2819                    source_start, source_end, storage_len
2820                )));
2821            }
2822
2823            let dst_start = checked_mul_usize(
2824                result_elem,
2825                layout.elem_size,
2826                "contiguous slice destination byte offset",
2827            )?;
2828            let dst_end = checked_add_usize(
2829                dst_start,
2830                block_bytes,
2831                "contiguous slice destination byte end",
2832            )?;
2833            if dst_end > result_buf.len() {
2834                return Err(Error::InvalidData(format!(
2835                    "contiguous slice destination range {}..{} exceeds result size {}",
2836                    dst_start,
2837                    dst_end,
2838                    result_buf.len()
2839                )));
2840            }
2841
2842            let block = self.read_contiguous_logical_range(address, source_start, block_bytes)?;
2843            if block.len() != block_bytes {
2844                return Err(Error::InvalidData(format!(
2845                    "contiguous slice read returned {} bytes, expected {}",
2846                    block.len(),
2847                    block_bytes
2848                )));
2849            }
2850            result_buf[dst_start..dst_end].copy_from_slice(&block);
2851
2852            let mut carry = true;
2853            for d in (0..tail_start).rev() {
2854                if carry {
2855                    counters[d] += 1;
2856                    if counters[d] < resolved.dims[d].count {
2857                        carry = false;
2858                    } else {
2859                        counters[d] = 0;
2860                    }
2861                }
2862            }
2863        }
2864
2865        Ok(result_buf)
2866    }
2867
2868    fn read_compact_slice<T: H5Type>(
2869        &self,
2870        data: &[u8],
2871        selection: &SliceInfo,
2872    ) -> Result<ArrayD<T>> {
2873        let full = self.read_compact::<T>(data)?;
2874        slice_array(&full, selection, &self.dataspace.dims)
2875    }
2876
2877    fn decode_buffer_with_shape<T: H5Type>(
2878        &self,
2879        raw: &[u8],
2880        n: usize,
2881        shape: &[usize],
2882    ) -> Result<ArrayD<T>> {
2883        let elem_size = self.raw_element_size();
2884        let expected_bytes = checked_mul_usize(n, elem_size, "decoded buffer byte length")?;
2885        if raw.len() != expected_bytes {
2886            return Err(Error::InvalidData(format!(
2887                "decoded buffer has {} bytes, expected {} bytes",
2888                raw.len(),
2889                expected_bytes
2890            )));
2891        }
2892
2893        if let Some(elements) = T::decode_vec(raw, &self.datatype, n) {
2894            let elements = elements?;
2895            return ArrayD::from_shape_vec(IxDyn(shape), elements)
2896                .map_err(|e| Error::InvalidData(format!("array shape error: {e}")));
2897        }
2898
2899        let mut elements = Vec::with_capacity(n);
2900        for i in 0..n {
2901            let start = checked_mul_usize(i, elem_size, "decoded element byte offset")?;
2902            let end = checked_mul_usize(i + 1, elem_size, "decoded element end offset")?;
2903            elements.push(T::from_bytes(&raw[start..end], &self.datatype)?);
2904        }
2905
2906        ArrayD::from_shape_vec(IxDyn(shape), elements)
2907            .map_err(|e| Error::InvalidData(format!("array shape error: {e}")))
2908    }
2909
2910    fn decode_raw_data<T: H5Type>(&self, raw: &[u8]) -> Result<ArrayD<T>> {
2911        let n = checked_usize(self.num_elements(), "dataset element count")?;
2912        let mut shape = Vec::with_capacity(self.dataspace.dims.len());
2913        for &dim in &self.dataspace.dims {
2914            shape.push(checked_usize(dim, "dataset dimension")?);
2915        }
2916        self.decode_buffer_with_shape::<T>(raw, n, &shape)
2917    }
2918
2919    fn make_fill_array<T: H5Type>(&self) -> Result<ArrayD<T>> {
2920        let n = checked_usize(self.num_elements(), "dataset element count")?;
2921        let mut shape = Vec::with_capacity(self.dataspace.dims.len());
2922        for &dim in &self.dataspace.dims {
2923            shape.push(checked_usize(dim, "dataset dimension")?);
2924        }
2925        self.make_fill_array_from_shape::<T>(n, &shape)
2926    }
2927
2928    fn make_fill_array_from_shape<T: H5Type>(
2929        &self,
2930        element_count: usize,
2931        shape: &[usize],
2932    ) -> Result<ArrayD<T>> {
2933        let elem_size = dtype_element_size(&self.datatype);
2934        let total_bytes = checked_mul_usize(element_count, elem_size, "fill result size in bytes")?;
2935        let fill = self.make_output_buffer(total_bytes);
2936        self.decode_buffer_with_shape::<T>(&fill, element_count, shape)
2937    }
2938
2939    fn make_output_buffer(&self, total_bytes: usize) -> Vec<u8> {
2940        let mut buf = vec![0u8; total_bytes];
2941        self.fill_output_buffer(&mut buf);
2942        buf
2943    }
2944
2945    fn fill_output_buffer(&self, buf: &mut [u8]) {
2946        buf.fill(0);
2947        if let Some(ref fv) = self.fill_value {
2948            if let Some(ref fill_bytes) = fv.value {
2949                if !fill_bytes.is_empty() {
2950                    for chunk in buf.chunks_exact_mut(fill_bytes.len()) {
2951                        chunk.copy_from_slice(fill_bytes);
2952                    }
2953                }
2954            }
2955        }
2956    }
2957
2958    fn validate_allocated_raw_data_len(&self, storage_kind: &str, actual_len: usize) -> Result<()> {
2959        let expected_len = self.raw_byte_len()?;
2960        if actual_len != expected_len {
2961            return Err(Error::InvalidData(format!(
2962                "{storage_kind} raw data has {actual_len} bytes, expected {expected_len} bytes"
2963            )));
2964        }
2965        Ok(())
2966    }
2967
2968    fn convert_to_native_endian(&self, bytes: &mut [u8]) -> Result<()> {
2969        let count = checked_usize(self.num_elements(), "dataset element count")?;
2970        convert_datatype_to_native_endian(&self.datatype, self.vlen_reference_size(), bytes, count)
2971    }
2972}
2973
2974fn native_byte_order() -> ByteOrder {
2975    if cfg!(target_endian = "little") {
2976        ByteOrder::LittleEndian
2977    } else {
2978        ByteOrder::BigEndian
2979    }
2980}
2981
2982fn convert_datatype_to_native_endian(
2983    dtype: &Datatype,
2984    vlen_reference_size: usize,
2985    bytes: &mut [u8],
2986    count: usize,
2987) -> Result<()> {
2988    match dtype {
2989        Datatype::FixedPoint {
2990            size, byte_order, ..
2991        }
2992        | Datatype::FloatingPoint { size, byte_order }
2993        | Datatype::Bitfield { size, byte_order } => {
2994            swap_elements_to_native(bytes, count, *size as usize, *byte_order)
2995        }
2996        Datatype::Enum { base, .. } => {
2997            convert_datatype_to_native_endian(base, vlen_reference_size, bytes, count)
2998        }
2999        Datatype::Array { base, dims } => {
3000            let array_count = dims.iter().try_fold(1usize, |acc, &dim| {
3001                checked_mul_usize(
3002                    acc,
3003                    checked_usize(dim, "array datatype dimension")?,
3004                    "array datatype element count",
3005                )
3006            })?;
3007            let total_count =
3008                checked_mul_usize(count, array_count, "array datatype total element count")?;
3009            convert_datatype_to_native_endian(base, vlen_reference_size, bytes, total_count)
3010        }
3011        Datatype::Compound { size, fields } => {
3012            let record_size = *size as usize;
3013            let required = checked_mul_usize(count, record_size, "compound byte length")?;
3014            if bytes.len() < required {
3015                return Err(Error::InvalidData(format!(
3016                    "compound native-endian conversion needs {required} bytes, got {}",
3017                    bytes.len()
3018                )));
3019            }
3020
3021            for record in 0..count {
3022                let record_start =
3023                    checked_mul_usize(record, record_size, "compound record byte offset")?;
3024                for field in fields {
3025                    let field_offset = field.byte_offset as usize;
3026                    let field_size =
3027                        raw_element_size_for_datatype(&field.datatype, vlen_reference_size);
3028                    let field_start = checked_add_usize(
3029                        record_start,
3030                        field_offset,
3031                        "compound field byte offset",
3032                    )?;
3033                    let field_end =
3034                        checked_add_usize(field_start, field_size, "compound field byte end")?;
3035                    if field_end > bytes.len() || field_offset + field_size > record_size {
3036                        return Err(Error::InvalidData(format!(
3037                            "compound field '{}' range exceeds record size",
3038                            field.name
3039                        )));
3040                    }
3041                    convert_datatype_to_native_endian(
3042                        &field.datatype,
3043                        vlen_reference_size,
3044                        &mut bytes[field_start..field_end],
3045                        1,
3046                    )?;
3047                }
3048            }
3049            Ok(())
3050        }
3051        Datatype::String { .. }
3052        | Datatype::VarLen { .. }
3053        | Datatype::Opaque { .. }
3054        | Datatype::Reference { .. } => Ok(()),
3055    }
3056}
3057
3058fn swap_elements_to_native(
3059    bytes: &mut [u8],
3060    count: usize,
3061    elem_size: usize,
3062    byte_order: ByteOrder,
3063) -> Result<()> {
3064    let required = checked_mul_usize(count, elem_size, "native-endian byte length")?;
3065    if bytes.len() < required {
3066        return Err(Error::InvalidData(format!(
3067            "native-endian conversion needs {required} bytes, got {}",
3068            bytes.len()
3069        )));
3070    }
3071
3072    if elem_size <= 1 || byte_order == native_byte_order() {
3073        return Ok(());
3074    }
3075
3076    for chunk in bytes[..required].chunks_exact_mut(elem_size) {
3077        chunk.reverse();
3078    }
3079    Ok(())
3080}
3081
3082fn attribute_from_message_storage(message: &AttributeMessage, context: &FileContext) -> Attribute {
3083    let raw_data = match &message.datatype {
3084        Datatype::VarLen {
3085            base,
3086            kind: VarLenKind::String,
3087            ..
3088        } if matches!(base.as_ref(), Datatype::FixedPoint { size: 1, .. })
3089            && message.dataspace.num_elements() == 1 =>
3090        {
3091            resolve_vlen_bytes_storage(
3092                &message.raw_data,
3093                context.storage.as_ref(),
3094                context.superblock.offset_size,
3095                context.superblock.length_size,
3096            )
3097            .unwrap_or_else(|| message.raw_data.clone())
3098        }
3099        _ => message.raw_data.clone(),
3100    };
3101
3102    Attribute {
3103        name: message.name.clone(),
3104        datatype: message.datatype.clone(),
3105        shape: match message.dataspace.dataspace_type {
3106            DataspaceType::Scalar => vec![],
3107            DataspaceType::Null => vec![0],
3108            DataspaceType::Simple => message.dataspace.dims.clone(),
3109        },
3110        raw_data,
3111    }
3112}
3113
3114fn normalize_layout(layout: DataLayout, dataspace: &DataspaceMessage) -> DataLayout {
3115    match layout {
3116        DataLayout::Chunked {
3117            address,
3118            mut dims,
3119            mut element_size,
3120            chunk_indexing,
3121        } if dims.len() == dataspace.dims.len() + 1 => {
3122            if let Some(legacy_element_size) = dims.pop() {
3123                if element_size == 0 {
3124                    element_size = legacy_element_size;
3125                }
3126            }
3127            DataLayout::Chunked {
3128                address,
3129                dims,
3130                element_size,
3131                chunk_indexing,
3132            }
3133        }
3134        other => other,
3135    }
3136}
3137
3138fn raw_element_size_for_datatype(dtype: &Datatype, vlen_reference_size: usize) -> usize {
3139    match dtype {
3140        Datatype::String {
3141            size: StringSize::Variable,
3142            ..
3143        }
3144        | Datatype::VarLen { .. } => vlen_reference_size,
3145        Datatype::Array { base, dims } => {
3146            let base_size = raw_element_size_for_datatype(base, vlen_reference_size);
3147            let count: u64 = dims.iter().product();
3148            base_size * count as usize
3149        }
3150        Datatype::Enum { base, .. } => raw_element_size_for_datatype(base, vlen_reference_size),
3151        Datatype::FixedPoint { size, .. }
3152        | Datatype::FloatingPoint { size, .. }
3153        | Datatype::Bitfield { size, .. }
3154        | Datatype::Reference { size, .. } => *size as usize,
3155        Datatype::String {
3156            size: StringSize::Fixed(len),
3157            ..
3158        } => *len as usize,
3159        Datatype::Compound { size, .. } | Datatype::Opaque { size, .. } => *size as usize,
3160    }
3161}
3162
3163#[cfg(test)]
3164/// Copy a chunk's data into the flat output buffer at the correct position.
3165fn copy_chunk_to_flat(
3166    chunk_data: &[u8],
3167    flat: &mut [u8],
3168    chunk_offsets: &[u64],
3169    chunk_shape: &[u64],
3170    dataset_shape: &[u64],
3171    elem_size: usize,
3172) -> Result<()> {
3173    let dataset_strides = row_major_strides(dataset_shape, "dataset stride")
3174        .expect("dataset strides should fit in usize");
3175    let chunk_strides =
3176        row_major_strides(chunk_shape, "chunk stride").expect("chunk strides should fit in usize");
3177    copy_chunk_to_flat_with_strides(
3178        chunk_data,
3179        flat,
3180        ChunkCopyLayout {
3181            chunk_offsets,
3182            chunk_shape,
3183            dataset_shape,
3184            dataset_strides: &dataset_strides,
3185            chunk_strides: &chunk_strides,
3186            elem_size,
3187        },
3188    )
3189}
3190
3191fn copy_chunk_to_flat_with_strides(
3192    chunk_data: &[u8],
3193    flat: &mut [u8],
3194    layout: ChunkCopyLayout<'_>,
3195) -> Result<()> {
3196    unsafe {
3197        copy_chunk_to_flat_with_strides_ptr(
3198            chunk_data,
3199            FlatBufferPtr {
3200                ptr: flat.as_mut_ptr(),
3201                len: flat.len(),
3202            },
3203            layout,
3204        )
3205    }
3206}
3207
3208#[inline(always)]
3209unsafe fn copy_chunk_to_flat_with_strides_ptr(
3210    chunk_data: &[u8],
3211    flat: FlatBufferPtr,
3212    layout: ChunkCopyLayout<'_>,
3213) -> Result<()> {
3214    let ndim = layout.dataset_shape.len();
3215    if layout.chunk_offsets.len() != ndim
3216        || layout.chunk_shape.len() != ndim
3217        || layout.dataset_strides.len() != ndim
3218        || layout.chunk_strides.len() != ndim
3219    {
3220        return Err(Error::InvalidData(format!(
3221            "chunk copy layout rank does not match dataset rank {ndim}"
3222        )));
3223    }
3224
3225    if ndim == 0 {
3226        if chunk_data.len() < layout.elem_size || flat.len < layout.elem_size {
3227            return Err(Error::InvalidData(format!(
3228                "scalar chunk copy requires {} bytes, got source {} and destination {}",
3229                layout.elem_size,
3230                chunk_data.len(),
3231                flat.len
3232            )));
3233        }
3234        std::ptr::copy_nonoverlapping(chunk_data.as_ptr(), flat.ptr, layout.elem_size);
3235        return Ok(());
3236    }
3237
3238    // Total elements in this chunk (clamped to dataset boundaries)
3239    let mut actual_chunk_shape = Vec::with_capacity(ndim);
3240    for i in 0..ndim {
3241        if layout.chunk_offsets[i] >= layout.dataset_shape[i] {
3242            return Err(Error::InvalidData(format!(
3243                "chunk offset {} is outside dimension {} of size {}",
3244                layout.chunk_offsets[i], i, layout.dataset_shape[i]
3245            )));
3246        }
3247        let remaining = layout.dataset_shape[i] - layout.chunk_offsets[i];
3248        actual_chunk_shape.push(checked_usize(
3249            remaining.min(layout.chunk_shape[i]),
3250            "actual chunk extent",
3251        )?);
3252    }
3253
3254    let row_elems = *actual_chunk_shape.last().unwrap_or(&1);
3255    let row_bytes = checked_mul_usize(row_elems, layout.elem_size, "chunk row bytes")?;
3256    let mut dataset_origin = 0usize;
3257    for (d, offset) in layout.chunk_offsets.iter().enumerate() {
3258        let offset = checked_usize(*offset, "chunk offset")?;
3259        let term = checked_mul_usize(offset, layout.dataset_strides[d], "chunk origin")?;
3260        dataset_origin = checked_add_usize(dataset_origin, term, "chunk origin")?;
3261    }
3262
3263    if ndim == 1 {
3264        let dst_start = checked_mul_usize(dataset_origin, layout.elem_size, "chunk dst offset")?;
3265        let dst_end = checked_add_usize(dst_start, row_bytes, "chunk dst end")?;
3266        if row_bytes > chunk_data.len() || dst_end > flat.len {
3267            return Err(Error::InvalidData(format!(
3268                "chunk copy out of bounds: source row needs {} bytes from {} bytes, destination range {}..{} exceeds {} bytes",
3269                row_bytes,
3270                chunk_data.len(),
3271                dst_start,
3272                dst_end,
3273                flat.len
3274            )));
3275        }
3276        std::ptr::copy_nonoverlapping(chunk_data.as_ptr(), flat.ptr.add(dst_start), row_bytes);
3277        return Ok(());
3278    }
3279
3280    let outer_dims = &actual_chunk_shape[..ndim - 1];
3281    let total_rows = checked_product_usize(outer_dims, "chunk row count")?;
3282    let mut outer_idx = vec![0usize; ndim - 1];
3283
3284    for _ in 0..total_rows {
3285        let mut chunk_row = 0usize;
3286        let mut dataset_row = dataset_origin;
3287        for (d, outer) in outer_idx.iter().copied().enumerate() {
3288            let chunk_term = checked_mul_usize(outer, layout.chunk_strides[d], "chunk row")?;
3289            let dataset_term = checked_mul_usize(outer, layout.dataset_strides[d], "dataset row")?;
3290            chunk_row = checked_add_usize(chunk_row, chunk_term, "chunk row")?;
3291            dataset_row = checked_add_usize(dataset_row, dataset_term, "dataset row")?;
3292        }
3293
3294        let src_start = checked_mul_usize(chunk_row, layout.elem_size, "chunk src offset")?;
3295        let dst_start = checked_mul_usize(dataset_row, layout.elem_size, "chunk dst offset")?;
3296        let src_end = checked_add_usize(src_start, row_bytes, "chunk src end")?;
3297        let dst_end = checked_add_usize(dst_start, row_bytes, "chunk dst end")?;
3298        if src_end > chunk_data.len() || dst_end > flat.len {
3299            return Err(Error::InvalidData(format!(
3300                "chunk copy out of bounds: source range {}..{} of {} bytes, destination range {}..{} of {} bytes",
3301                src_start,
3302                src_end,
3303                chunk_data.len(),
3304                dst_start,
3305                dst_end,
3306                flat.len
3307            )));
3308        }
3309        std::ptr::copy_nonoverlapping(
3310            chunk_data.as_ptr().add(src_start),
3311            flat.ptr.add(dst_start),
3312            row_bytes,
3313        );
3314
3315        let mut carry = true;
3316        for d in (0..outer_idx.len()).rev() {
3317            if carry {
3318                outer_idx[d] += 1;
3319                if outer_idx[d] < outer_dims[d] {
3320                    carry = false;
3321                } else {
3322                    outer_idx[d] = 0;
3323                }
3324            }
3325        }
3326    }
3327
3328    Ok(())
3329}
3330
3331fn checked_product_usize(values: &[usize], context: &str) -> Result<usize> {
3332    let mut product = 1usize;
3333    for &value in values {
3334        product = checked_mul_usize(product, value, context)?;
3335    }
3336    Ok(product)
3337}
3338
3339fn unit_stride_chunk_overlap_plan(
3340    chunk_offsets: &[u64],
3341    chunk_shape: &[u64],
3342    dataset_shape: &[u64],
3343    resolved: &ResolvedSelection,
3344) -> Result<(Vec<usize>, Vec<usize>, Vec<usize>)> {
3345    let ndim = dataset_shape.len();
3346    let mut overlap_counts = Vec::with_capacity(ndim);
3347    let mut chunk_local_start = Vec::with_capacity(ndim);
3348    let mut result_start = Vec::with_capacity(ndim);
3349
3350    for d in 0..ndim {
3351        let chunk_start = chunk_offsets[d];
3352        let chunk_end = (chunk_start + chunk_shape[d]).min(dataset_shape[d]);
3353        let dim = &resolved.dims[d];
3354        let overlap_start = chunk_start.max(dim.start);
3355        let overlap_end = chunk_end.min(dim.end);
3356        if overlap_start >= overlap_end {
3357            return Ok((Vec::new(), Vec::new(), Vec::new()));
3358        }
3359
3360        overlap_counts.push(checked_usize(
3361            overlap_end - overlap_start,
3362            "chunk overlap size",
3363        )?);
3364        chunk_local_start.push(checked_usize(
3365            overlap_start - chunk_start,
3366            "chunk overlap start",
3367        )?);
3368        result_start.push(checked_usize(
3369            overlap_start - dim.start,
3370            "slice result overlap start",
3371        )?);
3372    }
3373
3374    Ok((overlap_counts, chunk_local_start, result_start))
3375}
3376
3377#[inline(always)]
3378fn copy_unit_stride_chunk_overlap(
3379    chunk_data: &[u8],
3380    result_buf: &mut [u8],
3381    layout: UnitStrideCopyLayout<'_>,
3382) -> Result<()> {
3383    unsafe {
3384        copy_unit_stride_chunk_overlap_ptr(
3385            chunk_data,
3386            FlatBufferPtr {
3387                ptr: result_buf.as_mut_ptr(),
3388                len: result_buf.len(),
3389            },
3390            layout,
3391        )
3392    }
3393}
3394
3395/// Copy a unit-step rectangular overlap from a chunk into the result buffer.
3396///
3397/// This is the hot path for contiguous hyperslab reads over chunked datasets:
3398/// rather than copying one element at a time, it copies contiguous runs along
3399/// the innermost dimension with a single memcpy per output row.
3400///
3401/// # Safety
3402///
3403/// The caller must guarantee that `[result_ptr .. result_ptr + result_len)` is
3404/// valid for writes. Concurrent callers must write to disjoint byte ranges.
3405#[inline(always)]
3406unsafe fn copy_unit_stride_chunk_overlap_ptr(
3407    chunk_data: &[u8],
3408    result: FlatBufferPtr,
3409    layout: UnitStrideCopyLayout<'_>,
3410) -> Result<()> {
3411    let ndim = layout.dataset_shape.len();
3412    if layout.chunk_offsets.len() != ndim
3413        || layout.chunk_shape.len() != ndim
3414        || layout.resolved.dims.len() != ndim
3415        || layout.chunk_strides.len() != ndim
3416        || layout.result_strides.len() != ndim
3417    {
3418        return Err(Error::InvalidData(format!(
3419            "unit-stride copy layout rank does not match dataset rank {ndim}"
3420        )));
3421    }
3422
3423    if ndim == 0 {
3424        if chunk_data.len() < layout.elem_size || result.len < layout.elem_size {
3425            return Err(Error::InvalidData(format!(
3426                "scalar slice copy requires {} bytes, got source {} and destination {}",
3427                layout.elem_size,
3428                chunk_data.len(),
3429                result.len
3430            )));
3431        }
3432        std::ptr::copy_nonoverlapping(chunk_data.as_ptr(), result.ptr, layout.elem_size);
3433        return Ok(());
3434    }
3435
3436    let (overlap_counts, chunk_local_start, result_start) = unit_stride_chunk_overlap_plan(
3437        layout.chunk_offsets,
3438        layout.chunk_shape,
3439        layout.dataset_shape,
3440        layout.resolved,
3441    )?;
3442    if overlap_counts.is_empty() {
3443        return Ok(());
3444    }
3445
3446    let row_elems = *overlap_counts.last().unwrap_or(&1);
3447    let row_bytes = checked_mul_usize(row_elems, layout.elem_size, "unit-stride slice row bytes")?;
3448
3449    let mut chunk_origin = 0usize;
3450    let mut result_origin = 0usize;
3451    for d in 0..ndim {
3452        let chunk_term = checked_mul_usize(
3453            chunk_local_start[d],
3454            layout.chunk_strides[d],
3455            "chunk overlap origin",
3456        )?;
3457        let result_term = checked_mul_usize(
3458            result_start[d],
3459            layout.result_strides[d],
3460            "slice result origin",
3461        )?;
3462        chunk_origin = checked_add_usize(chunk_origin, chunk_term, "chunk overlap origin")?;
3463        result_origin = checked_add_usize(result_origin, result_term, "slice result origin")?;
3464    }
3465
3466    if ndim == 1 {
3467        let src_start = checked_mul_usize(chunk_origin, layout.elem_size, "slice src offset")?;
3468        let dst_start = checked_mul_usize(result_origin, layout.elem_size, "slice dst offset")?;
3469        let src_end = checked_add_usize(src_start, row_bytes, "slice src end")?;
3470        let dst_end = checked_add_usize(dst_start, row_bytes, "slice dst end")?;
3471        if src_end > chunk_data.len() || dst_end > result.len {
3472            return Err(Error::InvalidData(format!(
3473                "unit-stride slice copy out of bounds: source range {}..{} of {} bytes, destination range {}..{} of {} bytes",
3474                src_start,
3475                src_end,
3476                chunk_data.len(),
3477                dst_start,
3478                dst_end,
3479                result.len
3480            )));
3481        }
3482        std::ptr::copy_nonoverlapping(
3483            chunk_data.as_ptr().add(src_start),
3484            result.ptr.add(dst_start),
3485            row_bytes,
3486        );
3487        return Ok(());
3488    }
3489
3490    let outer_counts = &overlap_counts[..ndim - 1];
3491    let total_rows = checked_product_usize(outer_counts, "unit-stride slice row count")?;
3492    let mut outer_idx = vec![0usize; ndim - 1];
3493
3494    for _ in 0..total_rows {
3495        let mut chunk_row = chunk_origin;
3496        let mut result_row = result_origin;
3497        for (d, outer) in outer_idx.iter().copied().enumerate() {
3498            let chunk_term = checked_mul_usize(outer, layout.chunk_strides[d], "slice chunk row")?;
3499            let result_term =
3500                checked_mul_usize(outer, layout.result_strides[d], "slice result row")?;
3501            chunk_row = checked_add_usize(chunk_row, chunk_term, "slice chunk row")?;
3502            result_row = checked_add_usize(result_row, result_term, "slice result row")?;
3503        }
3504
3505        let src_start = checked_mul_usize(chunk_row, layout.elem_size, "slice src offset")?;
3506        let dst_start = checked_mul_usize(result_row, layout.elem_size, "slice dst offset")?;
3507        let src_end = checked_add_usize(src_start, row_bytes, "slice src end")?;
3508        let dst_end = checked_add_usize(dst_start, row_bytes, "slice dst end")?;
3509        if src_end > chunk_data.len() || dst_end > result.len {
3510            return Err(Error::InvalidData(format!(
3511                "unit-stride slice copy out of bounds: source range {}..{} of {} bytes, destination range {}..{} of {} bytes",
3512                src_start,
3513                src_end,
3514                chunk_data.len(),
3515                dst_start,
3516                dst_end,
3517                result.len
3518            )));
3519        }
3520        std::ptr::copy_nonoverlapping(
3521            chunk_data.as_ptr().add(src_start),
3522            result.ptr.add(dst_start),
3523            row_bytes,
3524        );
3525
3526        let mut carry = true;
3527        for d in (0..outer_idx.len()).rev() {
3528            if carry {
3529                outer_idx[d] += 1;
3530                if outer_idx[d] < outer_counts[d] {
3531                    carry = false;
3532                } else {
3533                    outer_idx[d] = 0;
3534                }
3535            }
3536        }
3537    }
3538
3539    Ok(())
3540}
3541
3542#[allow(clippy::too_many_arguments)]
3543/// Copy selected elements from a chunk into the result buffer.
3544///
3545/// `dim_indices[d]` is a list of `(chunk_local_idx, result_dim_idx)` pairs for dimension `d`.
3546#[inline(always)]
3547fn copy_selected_elements(
3548    chunk_data: &[u8],
3549    result_buf: &mut [u8],
3550    dim_indices: &[Vec<(usize, usize)>],
3551    chunk_strides: &[usize],
3552    result_strides: &[usize],
3553    elem_size: usize,
3554    ndim: usize,
3555) -> Result<()> {
3556    if dim_indices.len() != ndim || chunk_strides.len() != ndim || result_strides.len() != ndim {
3557        return Err(Error::InvalidData(format!(
3558            "selected-element copy layout rank does not match rank {ndim}"
3559        )));
3560    }
3561
3562    // Check for empty selection
3563    if dim_indices.iter().any(|v| v.is_empty()) {
3564        return Ok(());
3565    }
3566
3567    // Recursive cartesian-product iteration, but unrolled iteratively.
3568    let counts: Vec<usize> = dim_indices.iter().map(|v| v.len()).collect();
3569    let total = checked_product_usize(&counts, "selected-element copy count")?;
3570    let mut counters = vec![0usize; ndim];
3571
3572    for _ in 0..total {
3573        let mut chunk_flat = 0;
3574        let mut result_flat = 0;
3575        for d in 0..ndim {
3576            let (cl, ri) = dim_indices[d][counters[d]];
3577            let chunk_term = checked_mul_usize(cl, chunk_strides[d], "selected chunk offset")?;
3578            let result_term = checked_mul_usize(ri, result_strides[d], "selected result offset")?;
3579            chunk_flat = checked_add_usize(chunk_flat, chunk_term, "selected chunk offset")?;
3580            result_flat = checked_add_usize(result_flat, result_term, "selected result offset")?;
3581        }
3582
3583        let src_start = checked_mul_usize(chunk_flat, elem_size, "selected source byte offset")?;
3584        let dst_start =
3585            checked_mul_usize(result_flat, elem_size, "selected destination byte offset")?;
3586        let src_end = checked_add_usize(src_start, elem_size, "selected source byte end")?;
3587        let dst_end = checked_add_usize(dst_start, elem_size, "selected destination byte end")?;
3588
3589        if src_end > chunk_data.len() || dst_end > result_buf.len() {
3590            return Err(Error::InvalidData(format!(
3591                "selected-element copy out of bounds: source range {}..{} of {} bytes, destination range {}..{} of {} bytes",
3592                src_start,
3593                src_end,
3594                chunk_data.len(),
3595                dst_start,
3596                dst_end,
3597                result_buf.len()
3598            )));
3599        }
3600        result_buf[dst_start..dst_end].copy_from_slice(&chunk_data[src_start..src_end]);
3601
3602        // Increment counters (row-major)
3603        let mut carry = true;
3604        for d in (0..ndim).rev() {
3605            if carry {
3606                counters[d] += 1;
3607                if counters[d] < dim_indices[d].len() {
3608                    carry = false;
3609                } else {
3610                    counters[d] = 0;
3611                }
3612            }
3613        }
3614    }
3615
3616    Ok(())
3617}
3618
3619/// Copy selected elements from a chunk into a raw output pointer.
3620///
3621/// This is the pointer-based variant of `copy_selected_elements`, suitable for
3622/// parallel use where multiple threads write to disjoint regions of the same buffer.
3623///
3624/// # Safety
3625///
3626/// The caller must guarantee that no two concurrent calls write to the same
3627/// byte range within `[result_ptr .. result_ptr + result_len)`.
3628#[cfg(feature = "rayon")]
3629#[allow(clippy::too_many_arguments)]
3630#[inline(always)]
3631unsafe fn copy_selected_elements_ptr(
3632    chunk_data: &[u8],
3633    result_ptr: *mut u8,
3634    result_len: usize,
3635    dim_indices: &[Vec<(usize, usize)>],
3636    chunk_strides: &[usize],
3637    result_strides: &[usize],
3638    elem_size: usize,
3639    ndim: usize,
3640) -> Result<()> {
3641    if dim_indices.len() != ndim || chunk_strides.len() != ndim || result_strides.len() != ndim {
3642        return Err(Error::InvalidData(format!(
3643            "selected-element copy layout rank does not match rank {ndim}"
3644        )));
3645    }
3646
3647    if dim_indices.iter().any(|v| v.is_empty()) {
3648        return Ok(());
3649    }
3650
3651    let counts: Vec<usize> = dim_indices.iter().map(|v| v.len()).collect();
3652    let total = checked_product_usize(&counts, "selected-element copy count")?;
3653    let mut counters = vec![0usize; ndim];
3654
3655    for _ in 0..total {
3656        let mut chunk_flat = 0;
3657        let mut result_flat = 0;
3658        for d in 0..ndim {
3659            let (cl, ri) = dim_indices[d][counters[d]];
3660            let chunk_term = checked_mul_usize(cl, chunk_strides[d], "selected chunk offset")?;
3661            let result_term = checked_mul_usize(ri, result_strides[d], "selected result offset")?;
3662            chunk_flat = checked_add_usize(chunk_flat, chunk_term, "selected chunk offset")?;
3663            result_flat = checked_add_usize(result_flat, result_term, "selected result offset")?;
3664        }
3665
3666        let src_start = checked_mul_usize(chunk_flat, elem_size, "selected source byte offset")?;
3667        let dst_start =
3668            checked_mul_usize(result_flat, elem_size, "selected destination byte offset")?;
3669        let src_end = checked_add_usize(src_start, elem_size, "selected source byte end")?;
3670        let dst_end = checked_add_usize(dst_start, elem_size, "selected destination byte end")?;
3671
3672        if src_end > chunk_data.len() || dst_end > result_len {
3673            return Err(Error::InvalidData(format!(
3674                "selected-element copy out of bounds: source range {}..{} of {} bytes, destination range {}..{} of {} bytes",
3675                src_start,
3676                src_end,
3677                chunk_data.len(),
3678                dst_start,
3679                dst_end,
3680                result_len
3681            )));
3682        }
3683        std::ptr::copy_nonoverlapping(
3684            chunk_data.as_ptr().add(src_start),
3685            result_ptr.add(dst_start),
3686            elem_size,
3687        );
3688
3689        let mut carry = true;
3690        for d in (0..ndim).rev() {
3691            if carry {
3692                counters[d] += 1;
3693                if counters[d] < dim_indices[d].len() {
3694                    carry = false;
3695                } else {
3696                    counters[d] = 0;
3697                }
3698            }
3699        }
3700    }
3701
3702    Ok(())
3703}
3704
3705/// Slice an ndarray according to a SliceInfo selection.
3706fn slice_array<T: H5Type + Clone>(
3707    array: &ArrayD<T>,
3708    selection: &SliceInfo,
3709    shape: &[u64],
3710) -> Result<ArrayD<T>> {
3711    // Build result shape
3712    let mut result_shape = Vec::new();
3713
3714    for (i, sel) in selection.selections.iter().enumerate() {
3715        let dim_size = shape[i];
3716        match sel {
3717            SliceInfoElem::Index(idx) => {
3718                if *idx >= dim_size {
3719                    return Err(Error::SliceOutOfBounds {
3720                        dim: i,
3721                        index: *idx,
3722                        size: dim_size,
3723                    });
3724                }
3725                // Don't add to result_shape — this dimension is collapsed
3726            }
3727            SliceInfoElem::Slice { start, end, step } => {
3728                let dim_size = checked_usize(dim_size, "slice dimension size")?;
3729                let actual_end = if *end == u64::MAX {
3730                    dim_size
3731                } else {
3732                    checked_usize(*end, "slice end")?.min(dim_size)
3733                };
3734                let actual_start = checked_usize(*start, "slice start")?;
3735                let actual_step = checked_usize(*step, "slice step")?;
3736                if actual_step == 0 {
3737                    return Err(Error::InvalidData("slice step cannot be 0".into()));
3738                }
3739                if actual_start > dim_size {
3740                    return Err(Error::SliceOutOfBounds {
3741                        dim: i,
3742                        index: *start,
3743                        size: shape[i],
3744                    });
3745                }
3746                let n = (actual_end - actual_start).div_ceil(actual_step);
3747                result_shape.push(n);
3748            }
3749        }
3750    }
3751
3752    // Extract elements manually (ndarray's slicing API is complex with dynamic dims)
3753    let ndim = shape.len();
3754    let total = checked_product_usize(&result_shape, "slice result element count")?;
3755    let mut elements = Vec::with_capacity(total);
3756
3757    // Generate all indices in the result
3758    let mut result_idx = vec![0usize; result_shape.len()];
3759
3760    for _ in 0..total {
3761        // Map result index to source index
3762        let mut src_idx = Vec::with_capacity(ndim);
3763        let mut ri = 0;
3764        for sel in selection.selections.iter() {
3765            match sel {
3766                SliceInfoElem::Index(idx) => {
3767                    src_idx.push(checked_usize(*idx, "slice source index")?);
3768                }
3769                SliceInfoElem::Slice { start, step, .. } => {
3770                    let start = checked_usize(*start, "slice start")?;
3771                    let step = checked_usize(*step, "slice step")?;
3772                    let offset =
3773                        checked_mul_usize(result_idx[ri], step, "slice source index offset")?;
3774                    src_idx.push(checked_add_usize(start, offset, "slice source index")?);
3775                    ri += 1;
3776                }
3777            }
3778        }
3779
3780        elements.push(array[IxDyn(&src_idx)].clone());
3781
3782        // Increment result index
3783        if !result_shape.is_empty() {
3784            let mut carry = true;
3785            for d in (0..result_shape.len()).rev() {
3786                if carry {
3787                    result_idx[d] += 1;
3788                    if result_idx[d] < result_shape[d] {
3789                        carry = false;
3790                    } else {
3791                        result_idx[d] = 0;
3792                    }
3793                }
3794            }
3795        }
3796    }
3797
3798    ArrayD::from_shape_vec(IxDyn(&result_shape), elements)
3799        .map_err(|e| Error::InvalidData(format!("slice shape error: {e}")))
3800}
3801
3802#[cfg(test)]
3803mod tests {
3804    use super::*;
3805    use crate::storage::BytesStorage;
3806    use crate::superblock::Superblock;
3807    use std::collections::HashMap;
3808
3809    fn test_context(bytes: Vec<u8>) -> Arc<FileContext> {
3810        let storage: DynStorage = Arc::new(BytesStorage::new(bytes));
3811        Arc::new(FileContext {
3812            storage,
3813            superblock: Superblock {
3814                version: 2,
3815                offset_size: 8,
3816                length_size: 8,
3817                group_leaf_node_k: 0,
3818                group_internal_node_k: 0,
3819                indexed_storage_k: 0,
3820                consistency_flags: 0,
3821                base_address: 0,
3822                free_space_address: u64::MAX,
3823                eof_address: 0,
3824                driver_info_address: u64::MAX,
3825                root_symbol_table_entry: None,
3826                root_object_header_address: Some(0),
3827                extension_address: None,
3828            },
3829            chunk_cache: Arc::new(ChunkCache::new(1024, 8)),
3830            header_cache: Arc::new(Mutex::new(HashMap::new())),
3831            dataset_path_cache: Arc::new(Mutex::new(HashMap::new())),
3832            filter_registry: Arc::new(FilterRegistry::default()),
3833            external_file_resolver: None,
3834            external_link_resolver: None,
3835            external_file_cache: Mutex::new(HashMap::new()),
3836            sohm_table: OnceLock::new(),
3837            full_file_cache: OnceLock::new(),
3838        })
3839    }
3840
3841    fn fixed_u16_dataset(layout: DataLayout, storage_bytes: Vec<u8>) -> Dataset {
3842        let context = test_context(storage_bytes);
3843        Dataset {
3844            context: context.clone(),
3845            name: "short".to_string(),
3846            data_address: 0,
3847            dataspace: DataspaceMessage {
3848                rank: 1,
3849                dims: vec![3],
3850                max_dims: None,
3851                dataspace_type: DataspaceType::Simple,
3852            },
3853            datatype: Datatype::FixedPoint {
3854                size: 2,
3855                signed: false,
3856                byte_order: ByteOrder::LittleEndian,
3857            },
3858            layout,
3859            fill_value: None,
3860            filters: None,
3861            external_files: None,
3862            attributes: Vec::new(),
3863            chunk_cache: context.chunk_cache.clone(),
3864            chunk_entry_cache: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(32).unwrap()))),
3865            full_chunk_entries: Arc::new(OnceLock::new()),
3866            full_dataset_bytes: Arc::new(OnceLock::new()),
3867            external_slots: Arc::new(OnceLock::new()),
3868            filter_registry: context.filter_registry.clone(),
3869        }
3870    }
3871
3872    #[test]
3873    fn test_slice_info_all() {
3874        let s = SliceInfo::all(3);
3875        assert_eq!(s.selections.len(), 3);
3876    }
3877
3878    #[test]
3879    fn test_raw_element_size_uses_file_vlen_reference_width() {
3880        let dtype = Datatype::VarLen {
3881            base: Box::new(Datatype::FixedPoint {
3882                size: 1,
3883                signed: false,
3884                byte_order: crate::error::ByteOrder::LittleEndian,
3885            }),
3886            kind: VarLenKind::Sequence,
3887            encoding: crate::messages::datatype::StringEncoding::Ascii,
3888            padding: crate::messages::datatype::StringPadding::NullTerminate,
3889        };
3890
3891        assert_eq!(raw_element_size_for_datatype(&dtype, 12), 12);
3892        assert_eq!(
3893            raw_element_size_for_datatype(
3894                &Datatype::Array {
3895                    base: Box::new(dtype),
3896                    dims: vec![2, 3],
3897                },
3898                12,
3899            ),
3900            72
3901        );
3902    }
3903
3904    #[test]
3905    fn test_compact_raw_data_requires_exact_logical_length() {
3906        let dataset = fixed_u16_dataset(
3907            DataLayout::Compact {
3908                data: vec![1, 0, 2, 0, 3],
3909            },
3910            Vec::new(),
3911        );
3912
3913        let err = dataset.read_array::<u16>().unwrap_err();
3914        assert!(
3915            matches!(err, Error::Context { .. })
3916                && err
3917                    .to_string()
3918                    .contains("compact raw data has 5 bytes, expected 6 bytes"),
3919            "expected compact raw length error, got: {err}"
3920        );
3921    }
3922
3923    #[test]
3924    fn test_contiguous_raw_data_requires_exact_logical_length() {
3925        let dataset = fixed_u16_dataset(
3926            DataLayout::Contiguous {
3927                address: 0,
3928                size: 5,
3929            },
3930            vec![1, 0, 2, 0, 3],
3931        );
3932
3933        let err = dataset.read_raw_bytes().unwrap_err();
3934        assert!(
3935            matches!(err, Error::Context { .. })
3936                && err
3937                    .to_string()
3938                    .contains("contiguous raw data has 5 bytes, expected 6 bytes"),
3939            "expected contiguous raw length error, got: {err}"
3940        );
3941    }
3942
3943    #[test]
3944    fn test_copy_chunk_1d() {
3945        let chunk_data = vec![1u8, 2, 3, 4]; // 4 elements of 1 byte each
3946        let mut flat = vec![0u8; 8];
3947        let chunk_offsets = vec![2u64]; // starts at index 2
3948        let chunk_shape = vec![4u64];
3949        let dataset_shape = vec![8u64];
3950
3951        copy_chunk_to_flat(
3952            &chunk_data,
3953            &mut flat,
3954            &chunk_offsets,
3955            &chunk_shape,
3956            &dataset_shape,
3957            1,
3958        )
3959        .unwrap();
3960        assert_eq!(flat, vec![0, 0, 1, 2, 3, 4, 0, 0]);
3961    }
3962
3963    #[test]
3964    fn test_copy_chunk_2d_rowwise() {
3965        let chunk_data = vec![1u8, 2, 3, 4, 5, 6];
3966        let mut flat = vec![0u8; 16];
3967        let chunk_offsets = vec![1u64, 1u64];
3968        let chunk_shape = vec![2u64, 3u64];
3969        let dataset_shape = vec![4u64, 4u64];
3970
3971        copy_chunk_to_flat(
3972            &chunk_data,
3973            &mut flat,
3974            &chunk_offsets,
3975            &chunk_shape,
3976            &dataset_shape,
3977            1,
3978        )
3979        .unwrap();
3980
3981        assert_eq!(flat, vec![0, 0, 0, 0, 0, 1, 2, 3, 0, 4, 5, 6, 0, 0, 0, 0,]);
3982    }
3983
3984    #[test]
3985    fn test_copy_unit_stride_chunk_overlap_2d_partial() {
3986        let chunk_data: Vec<u8> = (1..=16).collect();
3987        let mut result = vec![0u8; 6];
3988        let chunk_offsets = vec![0u64, 0u64];
3989        let chunk_shape = vec![4u64, 4u64];
3990        let dataset_shape = vec![4u64, 4u64];
3991        let resolved = ResolvedSelection {
3992            dims: vec![
3993                ResolvedSelectionDim {
3994                    start: 1,
3995                    end: 3,
3996                    step: 1,
3997                    count: 2,
3998                },
3999                ResolvedSelectionDim {
4000                    start: 1,
4001                    end: 4,
4002                    step: 1,
4003                    count: 3,
4004                },
4005            ],
4006            result_shape: vec![2, 3],
4007            result_elements: 6,
4008        };
4009        let chunk_strides = vec![4usize, 1usize];
4010        let result_strides = vec![3usize, 1usize];
4011
4012        copy_unit_stride_chunk_overlap(
4013            &chunk_data,
4014            &mut result,
4015            UnitStrideCopyLayout {
4016                chunk_offsets: &chunk_offsets,
4017                chunk_shape: &chunk_shape,
4018                dataset_shape: &dataset_shape,
4019                resolved: &resolved,
4020                chunk_strides: &chunk_strides,
4021                result_strides: &result_strides,
4022                elem_size: 1,
4023            },
4024        )
4025        .unwrap();
4026
4027        assert_eq!(result, vec![6, 7, 8, 10, 11, 12]);
4028    }
4029
4030    fn chunk_entry(offsets: &[u64], address: u64) -> chunk_index::ChunkEntry {
4031        chunk_index::ChunkEntry {
4032            address,
4033            size: 0,
4034            filter_mask: 0,
4035            offsets: offsets.to_vec(),
4036        }
4037    }
4038
4039    #[test]
4040    fn test_chunk_grid_coverage_detects_missing_chunk() {
4041        let mut entries = vec![
4042            chunk_entry(&[0, 0], 0x1000),
4043            chunk_entry(&[0, 2], 0x2000),
4044            chunk_entry(&[2, 0], 0x3000),
4045        ];
4046
4047        let complete =
4048            validate_chunk_grid_coverage(&mut entries, &[4, 4], &[2, 2], &[0, 0], &[1, 1]).unwrap();
4049
4050        assert!(!complete);
4051    }
4052
4053    #[test]
4054    fn test_chunk_grid_coverage_rejects_duplicate_offsets() {
4055        let mut entries = vec![
4056            chunk_entry(&[0, 0], 0x1000),
4057            chunk_entry(&[0, 0], 0x2000),
4058            chunk_entry(&[0, 2], 0x3000),
4059            chunk_entry(&[2, 0], 0x4000),
4060        ];
4061
4062        let err = validate_chunk_grid_coverage(&mut entries, &[4, 4], &[2, 2], &[0, 0], &[1, 1])
4063            .unwrap_err();
4064
4065        assert!(matches!(err, Error::InvalidData(_)));
4066    }
4067
4068    #[test]
4069    fn test_decoded_chunk_len_requires_exact_size() {
4070        let entry = chunk_entry(&[0, 0], 0x1000);
4071
4072        validate_decoded_chunk_len(&entry, &[2, 3], 4, 24).unwrap();
4073        let err = validate_decoded_chunk_len(&entry, &[2, 3], 4, 23).unwrap_err();
4074
4075        assert!(matches!(err, Error::InvalidData(_)));
4076    }
4077
4078    #[test]
4079    fn test_copy_chunk_errors_on_short_row() {
4080        let chunk_data = vec![1u8, 2, 3, 4, 5];
4081        let mut flat = vec![0u8; 16];
4082        let chunk_offsets = vec![1u64, 1u64];
4083        let chunk_shape = vec![2u64, 3u64];
4084        let dataset_shape = vec![4u64, 4u64];
4085
4086        let err = copy_chunk_to_flat(
4087            &chunk_data,
4088            &mut flat,
4089            &chunk_offsets,
4090            &chunk_shape,
4091            &dataset_shape,
4092            1,
4093        )
4094        .unwrap_err();
4095
4096        assert!(matches!(err, Error::InvalidData(_)));
4097    }
4098
4099    #[test]
4100    fn test_copy_unit_stride_chunk_overlap_errors_on_short_row() {
4101        let chunk_data: Vec<u8> = (1..=7).collect();
4102        let mut result = vec![0u8; 6];
4103        let chunk_offsets = vec![0u64, 0u64];
4104        let chunk_shape = vec![4u64, 4u64];
4105        let dataset_shape = vec![4u64, 4u64];
4106        let resolved = ResolvedSelection {
4107            dims: vec![
4108                ResolvedSelectionDim {
4109                    start: 1,
4110                    end: 3,
4111                    step: 1,
4112                    count: 2,
4113                },
4114                ResolvedSelectionDim {
4115                    start: 1,
4116                    end: 4,
4117                    step: 1,
4118                    count: 3,
4119                },
4120            ],
4121            result_shape: vec![2, 3],
4122            result_elements: 6,
4123        };
4124        let chunk_strides = vec![4usize, 1usize];
4125        let result_strides = vec![3usize, 1usize];
4126
4127        let err = copy_unit_stride_chunk_overlap(
4128            &chunk_data,
4129            &mut result,
4130            UnitStrideCopyLayout {
4131                chunk_offsets: &chunk_offsets,
4132                chunk_shape: &chunk_shape,
4133                dataset_shape: &dataset_shape,
4134                resolved: &resolved,
4135                chunk_strides: &chunk_strides,
4136                result_strides: &result_strides,
4137                elem_size: 1,
4138            },
4139        )
4140        .unwrap_err();
4141
4142        assert!(matches!(err, Error::InvalidData(_)));
4143    }
4144}