Skip to main content

hdf5_reader/
dataset.rs

1use std::mem::MaybeUninit;
2use std::num::NonZeroUsize;
3use std::sync::{Arc, OnceLock};
4
5use lru::LruCache;
6use ndarray::{ArrayD, IxDyn};
7use parking_lot::Mutex;
8#[cfg(feature = "rayon")]
9use rayon::prelude::*;
10use smallvec::SmallVec;
11
12use crate::attribute_api::{
13    collect_attribute_messages_storage, decode_string, read_one_vlen_string_storage,
14    resolve_vlen_bytes_storage, Attribute,
15};
16use crate::cache::{ChunkCache, ChunkCacheStats, ChunkKey};
17use crate::chunk_index;
18use crate::datatype_api::{dtype_element_size, H5Type};
19use crate::error::{ByteOrder, Error, Result};
20use crate::filters::{self, FilterRegistry};
21use crate::io::Cursor;
22use crate::local_heap::LocalHeap;
23use crate::messages::attribute::AttributeMessage;
24use crate::messages::dataspace::{DataspaceMessage, DataspaceType};
25use crate::messages::datatype::{Datatype, StringSize, VarLenKind};
26use crate::messages::external_files::ExternalFilesMessage;
27use crate::messages::fill_value::{FillTime, FillValueMessage};
28use crate::messages::filter_pipeline::FilterPipelineMessage;
29use crate::messages::layout::{ChunkIndexing, DataLayout};
30use crate::messages::HdfMessage;
31use crate::object_header::ObjectHeader;
32use crate::storage::DynStorage;
33use crate::FileContext;
34
35const HOT_FULL_DATASET_CACHE_MAX_BYTES: usize = 32 * 1024 * 1024;
36
37#[derive(Clone, Copy)]
38struct FlatBufferPtr {
39    ptr: *mut u8,
40    len: usize,
41}
42
43#[derive(Clone, Copy)]
44struct ChunkCopyLayout<'a> {
45    chunk_offsets: &'a [u64],
46    chunk_shape: &'a [u64],
47    dataset_shape: &'a [u64],
48    dataset_strides: &'a [usize],
49    chunk_strides: &'a [usize],
50    elem_size: usize,
51}
52
53#[derive(Clone, Copy)]
54struct UnitStrideCopyLayout<'a> {
55    chunk_offsets: &'a [u64],
56    chunk_shape: &'a [u64],
57    dataset_shape: &'a [u64],
58    resolved: &'a ResolvedSelection,
59    chunk_strides: &'a [usize],
60    result_strides: &'a [usize],
61    elem_size: usize,
62}
63
64#[derive(Clone, Copy)]
65struct ContiguousSliceDirectLayout<'a> {
66    dataset_strides: &'a [usize],
67    result_strides: &'a [usize],
68    elem_size: usize,
69    result_total_bytes: usize,
70}
71
72#[derive(Clone)]
73struct ResolvedExternalRawSlot {
74    logical_offset: u64,
75    storage: DynStorage,
76    file_offset: u64,
77    size: u64,
78}
79
80pub(crate) struct DatasetParseContext {
81    pub(crate) context: Arc<FileContext>,
82}
83
84#[derive(Clone, Copy)]
85struct ChunkEntrySelection<'a> {
86    shape: &'a [u64],
87    ndim: usize,
88    elem_size: usize,
89    chunk_bounds: Option<(&'a [u64], &'a [u64])>,
90}
91
92unsafe impl Send for FlatBufferPtr {}
93
94unsafe impl Sync for FlatBufferPtr {}
95
96impl FlatBufferPtr {
97    #[cfg(feature = "rayon")]
98    #[inline(always)]
99    unsafe fn copy_chunk(self, chunk_data: &[u8], layout: ChunkCopyLayout<'_>) -> Result<()> {
100        copy_chunk_to_flat_with_strides_ptr(chunk_data, self, layout)
101    }
102
103    #[cfg(feature = "rayon")]
104    #[inline(always)]
105    unsafe fn copy_selected(
106        self,
107        chunk_data: &[u8],
108        dim_indices: &[Vec<(usize, usize)>],
109        chunk_strides: &[usize],
110        result_strides: &[usize],
111        elem_size: usize,
112        ndim: usize,
113    ) -> Result<()> {
114        copy_selected_elements_ptr(
115            chunk_data,
116            self.ptr,
117            self.len,
118            dim_indices,
119            chunk_strides,
120            result_strides,
121            elem_size,
122            ndim,
123        )
124    }
125
126    #[cfg(feature = "rayon")]
127    #[inline(always)]
128    unsafe fn copy_unit_stride_chunk_overlap(
129        self,
130        chunk_data: &[u8],
131        layout: UnitStrideCopyLayout<'_>,
132    ) -> Result<()> {
133        copy_unit_stride_chunk_overlap_ptr(chunk_data, self, layout)
134    }
135}
136
137/// Hyperslab selection for reading slices of datasets.
138#[derive(Debug, Clone)]
139pub struct SliceInfo {
140    pub selections: Vec<SliceInfoElem>,
141}
142
143/// A single dimension's selection.
144#[derive(Debug, Clone)]
145pub enum SliceInfoElem {
146    /// Select a single index (reduces dimensionality).
147    Index(u64),
148    /// Select a range with optional step.
149    Slice { start: u64, end: u64, step: u64 },
150}
151
152#[derive(Clone, Debug)]
153struct ResolvedSelectionDim {
154    start: u64,
155    end: u64,
156    step: u64,
157    count: usize,
158}
159
160#[derive(Clone, Debug, PartialEq, Eq, Hash)]
161struct ChunkEntryCacheKey {
162    index_address: u64,
163    first_chunk: SmallVec<[u64; 4]>,
164    last_chunk: SmallVec<[u64; 4]>,
165}
166
167impl ResolvedSelectionDim {
168    fn chunk_index_range(&self, chunk_extent: u64) -> Option<(u64, u64)> {
169        if self.count == 0 {
170            return None;
171        }
172
173        Some((self.start / chunk_extent, (self.end - 1) / chunk_extent))
174    }
175}
176
177#[derive(Clone, Debug)]
178struct ResolvedSelection {
179    dims: Vec<ResolvedSelectionDim>,
180    result_shape: Vec<usize>,
181    result_elements: usize,
182}
183
184impl ResolvedSelection {
185    fn result_dims_with_collapsed(&self) -> Vec<usize> {
186        self.dims.iter().map(|dim| dim.count).collect()
187    }
188
189    fn is_unit_stride(&self) -> bool {
190        self.dims.iter().all(|dim| dim.step == 1)
191    }
192}
193
194impl SliceInfo {
195    /// Create a selection that reads everything.
196    pub fn all(ndim: usize) -> Self {
197        SliceInfo {
198            selections: vec![
199                SliceInfoElem::Slice {
200                    start: 0,
201                    end: u64::MAX,
202                    step: 1,
203                };
204                ndim
205            ],
206        }
207    }
208}
209
210fn checked_usize(value: u64, context: &str) -> Result<usize> {
211    usize::try_from(value).map_err(|_| {
212        Error::InvalidData(format!(
213            "{context} value {value} exceeds platform usize capacity"
214        ))
215    })
216}
217
218fn checked_mul_usize(lhs: usize, rhs: usize, context: &str) -> Result<usize> {
219    lhs.checked_mul(rhs)
220        .ok_or_else(|| Error::InvalidData(format!("{context} exceeds platform usize capacity")))
221}
222
223fn checked_add_usize(lhs: usize, rhs: usize, context: &str) -> Result<usize> {
224    lhs.checked_add(rhs)
225        .ok_or_else(|| Error::InvalidData(format!("{context} exceeds platform usize capacity")))
226}
227
228fn checked_mul_u64(lhs: u64, rhs: u64, context: &str) -> Result<u64> {
229    lhs.checked_mul(rhs)
230        .ok_or_else(|| Error::InvalidData(format!("{context} exceeds u64 capacity")))
231}
232
233fn checked_add_u64(lhs: u64, rhs: u64, context: &str) -> Result<u64> {
234    lhs.checked_add(rhs)
235        .ok_or_else(|| Error::InvalidData(format!("{context} exceeds u64 capacity")))
236}
237
238fn checked_shape_elements_usize(shape: &[u64], context: &str) -> Result<usize> {
239    let mut total = 1usize;
240    for &dim in shape {
241        total = checked_mul_usize(total, checked_usize(dim, context)?, context)?;
242    }
243    Ok(total)
244}
245
246fn full_dataset_chunk_bounds(
247    shape: &[u64],
248    chunk_shape: &[u64],
249) -> Result<Option<(Vec<u64>, Vec<u64>)>> {
250    validate_chunk_shape(shape, chunk_shape)?;
251    if shape.contains(&0) {
252        return Ok(None);
253    }
254
255    let first_chunk = vec![0u64; shape.len()];
256    let last_chunk = shape
257        .iter()
258        .zip(chunk_shape.iter())
259        .map(|(&dim, &chunk)| dim.div_ceil(chunk) - 1)
260        .collect();
261    Ok(Some((first_chunk, last_chunk)))
262}
263
264fn validate_chunk_shape(shape: &[u64], chunk_shape: &[u64]) -> Result<()> {
265    if chunk_shape.len() != shape.len() {
266        return Err(Error::InvalidData(format!(
267            "chunk rank {} does not match dataset rank {}",
268            chunk_shape.len(),
269            shape.len()
270        )));
271    }
272    if let Some((dim, _)) = chunk_shape
273        .iter()
274        .enumerate()
275        .find(|(_, chunk)| **chunk == 0)
276    {
277        return Err(Error::InvalidData(format!(
278            "chunk dimension {dim} has zero extent"
279        )));
280    }
281    Ok(())
282}
283
284fn validate_decoded_chunk_len(
285    entry: &chunk_index::ChunkEntry,
286    chunk_shape: &[u64],
287    elem_size: usize,
288    actual_len: usize,
289) -> Result<()> {
290    let expected_len = decoded_chunk_expected_len(chunk_shape, elem_size)?;
291    if actual_len != expected_len {
292        return Err(Error::InvalidData(format!(
293            "chunk at offsets {:?} decoded to {} bytes, expected {} bytes",
294            entry.offsets, actual_len, expected_len
295        )));
296    }
297    Ok(())
298}
299
300fn decoded_chunk_expected_len(chunk_shape: &[u64], elem_size: usize) -> Result<usize> {
301    let chunk_elements = checked_shape_elements_usize(chunk_shape, "decoded chunk element count")?;
302    checked_mul_usize(chunk_elements, elem_size, "decoded chunk byte length")
303}
304
305fn validate_chunk_grid_coverage(
306    entries: &mut [chunk_index::ChunkEntry],
307    shape: &[u64],
308    chunk_shape: &[u64],
309    first_chunk: &[u64],
310    last_chunk: &[u64],
311) -> Result<bool> {
312    validate_chunk_shape(shape, chunk_shape)?;
313    if first_chunk.len() != shape.len() || last_chunk.len() != shape.len() {
314        return Err(Error::InvalidData(format!(
315            "chunk grid bounds rank does not match dataset rank {}",
316            shape.len()
317        )));
318    }
319
320    if shape.contains(&0) {
321        if entries.is_empty() {
322            return Ok(true);
323        }
324        return Err(Error::InvalidData(
325            "chunk index contains entries for an empty dataset".into(),
326        ));
327    }
328
329    for dim in 0..shape.len() {
330        if first_chunk[dim] > last_chunk[dim] {
331            return Err(Error::InvalidData(format!(
332                "invalid chunk grid bounds for dimension {dim}: {} > {}",
333                first_chunk[dim], last_chunk[dim]
334            )));
335        }
336    }
337
338    entries.sort_by(|a, b| a.offsets.cmp(&b.offsets));
339
340    for i in 0..entries.len() {
341        validate_chunk_entry_offsets(&entries[i], shape, chunk_shape, first_chunk, last_chunk)?;
342        if i > 0 && entries[i].offsets == entries[i - 1].offsets {
343            return Err(Error::InvalidData(format!(
344                "duplicate chunk output offsets {:?} (addresses {:#x} and {:#x})",
345                entries[i].offsets,
346                entries[i - 1].address,
347                entries[i].address
348            )));
349        }
350    }
351
352    let mut entry_idx = 0usize;
353    let mut expected = first_chunk.to_vec();
354    loop {
355        let expected_offsets: Vec<u64> = expected
356            .iter()
357            .enumerate()
358            .map(|(dim, chunk_index)| chunk_index * chunk_shape[dim])
359            .collect();
360
361        if entry_idx >= entries.len() || entries[entry_idx].offsets != expected_offsets {
362            return Ok(false);
363        }
364        entry_idx += 1;
365
366        if !advance_chunk_index(&mut expected, first_chunk, last_chunk) {
367            break;
368        }
369    }
370
371    Ok(entry_idx == entries.len())
372}
373
374fn validate_chunk_entry_offsets(
375    entry: &chunk_index::ChunkEntry,
376    shape: &[u64],
377    chunk_shape: &[u64],
378    first_chunk: &[u64],
379    last_chunk: &[u64],
380) -> Result<()> {
381    if entry.offsets.len() != shape.len() {
382        return Err(Error::InvalidData(format!(
383            "chunk at address {:#x} has rank {}, expected {}",
384            entry.address,
385            entry.offsets.len(),
386            shape.len()
387        )));
388    }
389
390    for dim in 0..shape.len() {
391        let offset = entry.offsets[dim];
392        if offset >= shape[dim] {
393            return Err(Error::InvalidData(format!(
394                "chunk at address {:#x} has out-of-bounds offset {} for dimension {} of size {}",
395                entry.address, offset, dim, shape[dim]
396            )));
397        }
398        if offset % chunk_shape[dim] != 0 {
399            return Err(Error::InvalidData(format!(
400                "chunk at address {:#x} has non-grid offset {} for dimension {} with chunk extent {}",
401                entry.address, offset, dim, chunk_shape[dim]
402            )));
403        }
404
405        let chunk_index = offset / chunk_shape[dim];
406        if chunk_index < first_chunk[dim] || chunk_index > last_chunk[dim] {
407            return Err(Error::InvalidData(format!(
408                "chunk at address {:#x} has offset {:?} outside requested chunk grid",
409                entry.address, entry.offsets
410            )));
411        }
412    }
413
414    Ok(())
415}
416
417fn advance_chunk_index(index: &mut [u64], first_chunk: &[u64], last_chunk: &[u64]) -> bool {
418    if index.is_empty() {
419        return false;
420    }
421
422    for dim in (0..index.len()).rev() {
423        if index[dim] < last_chunk[dim] {
424            index[dim] += 1;
425            if dim + 1 < index.len() {
426                index[(dim + 1)..].copy_from_slice(&first_chunk[(dim + 1)..]);
427            }
428            return true;
429        }
430    }
431
432    false
433}
434
435fn row_major_strides(shape: &[u64], context: &str) -> Result<Vec<usize>> {
436    let ndim = shape.len();
437    if ndim == 0 {
438        return Ok(Vec::new());
439    }
440
441    let mut strides = vec![1usize; ndim];
442    for i in (0..ndim - 1).rev() {
443        let next_extent = checked_usize(shape[i + 1], context)?;
444        strides[i] = checked_mul_usize(strides[i + 1], next_extent, context)?;
445    }
446    Ok(strides)
447}
448
449fn assume_init_u8_vec(mut buffer: Vec<MaybeUninit<u8>>) -> Vec<u8> {
450    let ptr = buffer.as_mut_ptr() as *mut u8;
451    let len = buffer.len();
452    let capacity = buffer.capacity();
453    std::mem::forget(buffer);
454    unsafe { Vec::from_raw_parts(ptr, len, capacity) }
455}
456
457fn assume_init_vec<T>(mut buffer: Vec<MaybeUninit<T>>) -> Vec<T> {
458    let ptr = buffer.as_mut_ptr() as *mut T;
459    let len = buffer.len();
460    let capacity = buffer.capacity();
461    std::mem::forget(buffer);
462    unsafe { Vec::from_raw_parts(ptr, len, capacity) }
463}
464
465fn normalize_selection(selection: &SliceInfo, shape: &[u64]) -> Result<ResolvedSelection> {
466    if selection.selections.len() != shape.len() {
467        return Err(Error::InvalidData(format!(
468            "slice has {} dimensions but dataset has {}",
469            selection.selections.len(),
470            shape.len()
471        )));
472    }
473
474    let mut dims = Vec::with_capacity(shape.len());
475    let mut result_shape = Vec::new();
476    let mut result_elements = 1usize;
477
478    for (i, sel) in selection.selections.iter().enumerate() {
479        let dim_size = shape[i];
480        match sel {
481            SliceInfoElem::Index(idx) => {
482                if *idx >= dim_size {
483                    return Err(Error::SliceOutOfBounds {
484                        dim: i,
485                        index: *idx,
486                        size: dim_size,
487                    });
488                }
489                dims.push(ResolvedSelectionDim {
490                    start: *idx,
491                    end: *idx + 1,
492                    step: 1,
493                    count: 1,
494                });
495            }
496            SliceInfoElem::Slice { start, end, step } => {
497                if *step == 0 {
498                    return Err(Error::InvalidData("slice step cannot be 0".into()));
499                }
500                if *start > dim_size {
501                    return Err(Error::SliceOutOfBounds {
502                        dim: i,
503                        index: *start,
504                        size: dim_size,
505                    });
506                }
507
508                let actual_end = if *end == u64::MAX {
509                    dim_size
510                } else {
511                    (*end).min(dim_size)
512                };
513                let count_u64 = if *start >= actual_end {
514                    0
515                } else {
516                    (actual_end - *start).div_ceil(*step)
517                };
518                let count = checked_usize(count_u64, "slice element count")?;
519
520                dims.push(ResolvedSelectionDim {
521                    start: *start,
522                    end: actual_end,
523                    step: *step,
524                    count,
525                });
526                result_shape.push(count);
527                result_elements =
528                    checked_mul_usize(result_elements, count, "slice result element count")?;
529            }
530        }
531    }
532
533    Ok(ResolvedSelection {
534        dims,
535        result_shape,
536        result_elements,
537    })
538}
539
540fn selection_dim_is_full_unit(dim: &ResolvedSelectionDim, dim_size: u64) -> bool {
541    dim.step == 1
542        && dim.start == 0
543        && dim.end == dim_size
544        && u64::try_from(dim.count).ok() == Some(dim_size)
545}
546
547fn selection_covers_full_dataset(resolved: &ResolvedSelection, shape: &[u64]) -> bool {
548    resolved.result_shape.len() == shape.len()
549        && resolved
550            .dims
551            .iter()
552            .zip(shape.iter())
553            .all(|(dim, &dim_size)| selection_dim_is_full_unit(dim, dim_size))
554}
555
556fn contiguous_slice_tail_start(shape: &[u64], resolved: &ResolvedSelection) -> usize {
557    let ndim = shape.len();
558    if ndim == 0 {
559        return 0;
560    }
561
562    let mut tail_start = if resolved.dims[ndim - 1].step == 1 {
563        ndim - 1
564    } else {
565        ndim
566    };
567
568    while tail_start > 0 {
569        let prev = tail_start - 1;
570        let later_dims_are_full =
571            (tail_start..ndim).all(|d| selection_dim_is_full_unit(&resolved.dims[d], shape[d]));
572        if resolved.dims[prev].step == 1 && later_dims_are_full {
573            tail_start = prev;
574        } else {
575            break;
576        }
577    }
578
579    tail_start
580}
581
582fn contiguous_slice_block_elements(
583    resolved: &ResolvedSelection,
584    tail_start: usize,
585) -> Result<usize> {
586    let mut elements = 1usize;
587    for dim in &resolved.dims[tail_start..] {
588        elements = checked_mul_usize(elements, dim.count, "contiguous slice block elements")?;
589    }
590    Ok(elements)
591}
592
593fn result_strides_for_dims(result_dims: &[usize]) -> Result<Vec<usize>> {
594    let ndim = result_dims.len();
595    let mut result_strides = vec![1usize; ndim];
596    for d in (0..ndim.saturating_sub(1)).rev() {
597        result_strides[d] =
598            checked_mul_usize(result_strides[d + 1], result_dims[d + 1], "result stride")?;
599    }
600    Ok(result_strides)
601}
602
603/// A dataset within an HDF5 file.
604#[derive(Clone)]
605pub struct Dataset {
606    pub(crate) context: Arc<FileContext>,
607    pub(crate) name: String,
608    pub(crate) data_address: u64,
609    pub(crate) dataspace: DataspaceMessage,
610    pub(crate) datatype: Datatype,
611    pub(crate) layout: DataLayout,
612    pub(crate) fill_value: Option<FillValueMessage>,
613    pub(crate) filters: Option<FilterPipelineMessage>,
614    pub(crate) external_files: Option<ExternalFilesMessage>,
615    pub(crate) attributes: Vec<AttributeMessage>,
616    pub(crate) chunk_cache: Arc<ChunkCache>,
617    chunk_entry_cache: Arc<Mutex<LruCache<ChunkEntryCacheKey, Arc<Vec<chunk_index::ChunkEntry>>>>>,
618    full_chunk_entries: Arc<OnceLock<Arc<Vec<chunk_index::ChunkEntry>>>>,
619    full_dataset_bytes: Arc<OnceLock<Arc<Vec<u8>>>>,
620    external_slots: Arc<OnceLock<Arc<Vec<ResolvedExternalRawSlot>>>>,
621    pub(crate) filter_registry: Arc<FilterRegistry>,
622}
623
624/// One decoded chunk from a chunked dataset.
625pub struct DatasetChunk {
626    offsets: Vec<u64>,
627    shape: Vec<u64>,
628    filter_mask: u32,
629    bytes: Arc<Vec<u8>>,
630}
631
632impl DatasetChunk {
633    /// Dataset coordinates of the first element in this chunk.
634    pub fn offsets(&self) -> &[u64] {
635        &self.offsets
636    }
637
638    /// Logical chunk shape from the dataset layout.
639    pub fn shape(&self) -> &[u64] {
640        &self.shape
641    }
642
643    /// HDF5 filter mask recorded for this chunk.
644    pub fn filter_mask(&self) -> u32 {
645        self.filter_mask
646    }
647
648    /// Decoded logical bytes for this chunk.
649    pub fn bytes(&self) -> &[u8] {
650        self.bytes.as_ref()
651    }
652}
653
654/// Iterator over decoded chunks in a chunked dataset.
655pub struct DatasetChunkIterator {
656    dataset: Dataset,
657    entries: Vec<chunk_index::ChunkEntry>,
658    index_address: u64,
659    chunk_shape: Vec<u64>,
660    elem_size: usize,
661    next: usize,
662}
663
664impl Iterator for DatasetChunkIterator {
665    type Item = Result<DatasetChunk>;
666
667    fn next(&mut self) -> Option<Self::Item> {
668        let entry = self.entries.get(self.next)?;
669        self.next += 1;
670
671        Some(
672            self.dataset
673                .load_exact_chunk_data(entry, self.index_address, &self.chunk_shape, self.elem_size)
674                .map(|bytes| DatasetChunk {
675                    offsets: entry.offsets.clone(),
676                    shape: self.chunk_shape.clone(),
677                    filter_mask: entry.filter_mask,
678                    bytes,
679                }),
680        )
681    }
682}
683
684pub(crate) struct DatasetTemplate {
685    name: String,
686    data_address: u64,
687    dataspace: DataspaceMessage,
688    datatype: Datatype,
689    layout: DataLayout,
690    fill_value: Option<FillValueMessage>,
691    filters: Option<FilterPipelineMessage>,
692    external_files: Option<ExternalFilesMessage>,
693    attributes: Vec<AttributeMessage>,
694    chunk_entry_cache: Arc<Mutex<LruCache<ChunkEntryCacheKey, Arc<Vec<chunk_index::ChunkEntry>>>>>,
695    full_chunk_entries: Arc<OnceLock<Arc<Vec<chunk_index::ChunkEntry>>>>,
696    full_dataset_bytes: Arc<OnceLock<Arc<Vec<u8>>>>,
697    external_slots: Arc<OnceLock<Arc<Vec<ResolvedExternalRawSlot>>>>,
698}
699
700impl Dataset {
701    pub(crate) fn from_template(context: Arc<FileContext>, template: Arc<DatasetTemplate>) -> Self {
702        Dataset {
703            chunk_cache: context.chunk_cache.clone(),
704            filter_registry: context.filter_registry.clone(),
705            context,
706            name: template.name.clone(),
707            data_address: template.data_address,
708            dataspace: template.dataspace.clone(),
709            datatype: template.datatype.clone(),
710            layout: template.layout.clone(),
711            fill_value: template.fill_value.clone(),
712            filters: template.filters.clone(),
713            external_files: template.external_files.clone(),
714            attributes: template.attributes.clone(),
715            chunk_entry_cache: template.chunk_entry_cache.clone(),
716            full_chunk_entries: template.full_chunk_entries.clone(),
717            full_dataset_bytes: template.full_dataset_bytes.clone(),
718            external_slots: template.external_slots.clone(),
719        }
720    }
721
722    pub(crate) fn template(&self) -> Arc<DatasetTemplate> {
723        Arc::new(DatasetTemplate {
724            name: self.name.clone(),
725            data_address: self.data_address,
726            dataspace: self.dataspace.clone(),
727            datatype: self.datatype.clone(),
728            layout: self.layout.clone(),
729            fill_value: self.fill_value.clone(),
730            filters: self.filters.clone(),
731            external_files: self.external_files.clone(),
732            attributes: self.attributes.clone(),
733            chunk_entry_cache: self.chunk_entry_cache.clone(),
734            full_chunk_entries: self.full_chunk_entries.clone(),
735            full_dataset_bytes: self.full_dataset_bytes.clone(),
736            external_slots: self.external_slots.clone(),
737        })
738    }
739
740    pub(crate) fn from_parsed_header(
741        context: DatasetParseContext,
742        address: u64,
743        name: String,
744        header: &ObjectHeader,
745    ) -> Result<Self> {
746        let mut dataspace: Option<DataspaceMessage> = None;
747        let mut datatype: Option<Datatype> = None;
748        let mut layout: Option<DataLayout> = None;
749        let mut fill_value: Option<FillValueMessage> = None;
750        let mut filter_pipeline: Option<FilterPipelineMessage> = None;
751        let mut external_files: Option<ExternalFilesMessage> = None;
752        let attributes = collect_attribute_messages_storage(
753            header,
754            context.context.storage.as_ref(),
755            context.context.superblock.offset_size,
756            context.context.superblock.length_size,
757            Some(context.context.filter_registry.as_ref()),
758        )?;
759
760        for msg in &header.messages {
761            match msg {
762                HdfMessage::Dataspace(ds) => dataspace = Some(ds.clone()),
763                HdfMessage::Datatype(dt) => datatype = Some(dt.datatype.clone()),
764                HdfMessage::DataLayout(dl) => layout = Some(dl.layout.clone()),
765                HdfMessage::FillValue(fv) => fill_value = Some(fv.clone()),
766                HdfMessage::FilterPipeline(fp) => filter_pipeline = Some(fp.clone()),
767                HdfMessage::ExternalFiles(ef) => external_files = Some(ef.clone()),
768                _ => {}
769            }
770        }
771
772        let dataspace =
773            dataspace.ok_or_else(|| Error::InvalidData("dataset missing dataspace".into()))?;
774        let dt = datatype.ok_or_else(|| Error::InvalidData("dataset missing datatype".into()))?;
775        let layout =
776            layout.ok_or_else(|| Error::InvalidData("dataset missing data layout".into()))?;
777        let layout = normalize_layout(layout, &dataspace);
778        let attr_fill_value = attributes
779            .iter()
780            .find(|attr| {
781                attr.name == "_FillValue" && matches!(attr.dataspace.num_elements(), Ok(1))
782            })
783            .map(|attr| FillValueMessage {
784                defined: !attr.raw_data.is_empty(),
785                fill_time: FillTime::IfSet,
786                value: Some(attr.raw_data.clone()),
787            });
788        let fill_value = match fill_value {
789            Some(existing) if existing.value.is_some() => Some(existing),
790            _ => attr_fill_value,
791        };
792
793        Ok(Dataset {
794            context: context.context.clone(),
795            name,
796            data_address: address,
797            dataspace,
798            datatype: dt,
799            layout,
800            fill_value,
801            filters: filter_pipeline,
802            external_files,
803            attributes,
804            chunk_cache: context.context.chunk_cache.clone(),
805            chunk_entry_cache: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(32).unwrap()))),
806            full_chunk_entries: Arc::new(OnceLock::new()),
807            full_dataset_bytes: Arc::new(OnceLock::new()),
808            external_slots: Arc::new(OnceLock::new()),
809            filter_registry: context.context.filter_registry.clone(),
810        })
811    }
812
813    /// Dataset name.
814    pub fn name(&self) -> &str {
815        &self.name
816    }
817
818    /// The object header address used to parse this dataset.
819    /// Useful as an opaque identifier or for NC4 data_offset.
820    pub fn address(&self) -> u64 {
821        self.data_address
822    }
823
824    /// Shape of the dataset (dimensions).
825    pub fn shape(&self) -> &[u64] {
826        &self.dataspace.dims
827    }
828
829    /// Datatype of the dataset.
830    pub fn dtype(&self) -> &Datatype {
831        &self.datatype
832    }
833
834    /// Number of dimensions.
835    pub fn ndim(&self) -> usize {
836        self.dataspace.dims.len()
837    }
838
839    fn offset_size(&self) -> u8 {
840        self.context.superblock.offset_size
841    }
842
843    fn length_size(&self) -> u8 {
844        self.context.superblock.length_size
845    }
846
847    /// Maximum dimension sizes, if defined. `u64::MAX` indicates unlimited.
848    pub fn max_dims(&self) -> Option<&[u64]> {
849        self.dataspace.max_dims.as_deref()
850    }
851
852    /// Chunk dimensions, if the dataset is chunked.
853    pub fn chunks(&self) -> Option<Vec<u32>> {
854        match &self.layout {
855            DataLayout::Chunked { dims, .. } => Some(dims.clone()),
856            _ => None,
857        }
858    }
859
860    /// Iterate over decoded chunks for a chunked dataset.
861    pub fn iter_chunks(&self) -> Result<DatasetChunkIterator> {
862        let DataLayout::Chunked {
863            address,
864            dims,
865            chunk_indexing,
866            ..
867        } = &self.layout
868        else {
869            return Err(Error::InvalidData(format!(
870                "dataset '{}' is not chunked",
871                self.name
872            )));
873        };
874
875        if Cursor::is_undefined_offset(*address, self.offset_size()) {
876            return Ok(DatasetChunkIterator {
877                dataset: self.clone(),
878                entries: Vec::new(),
879                index_address: *address,
880                chunk_shape: dims.iter().map(|&d| d as u64).collect(),
881                elem_size: self.raw_element_size()?,
882                next: 0,
883            });
884        }
885
886        let ndim = self.ndim();
887        let shape = &self.dataspace.dims;
888        let elem_size = self.raw_element_size()?;
889        let chunk_shape: Vec<u64> = dims.iter().map(|&d| d as u64).collect();
890        validate_chunk_shape(shape, &chunk_shape)?;
891        let entries = self.collect_chunk_entries(
892            *address,
893            dims,
894            chunk_indexing.as_ref(),
895            ChunkEntrySelection {
896                shape,
897                ndim,
898                elem_size,
899                chunk_bounds: None,
900            },
901        )?;
902
903        Ok(DatasetChunkIterator {
904            dataset: self.clone(),
905            entries,
906            index_address: *address,
907            chunk_shape,
908            elem_size,
909            next: 0,
910        })
911    }
912
913    /// Return current chunk-cache statistics for this dataset's file.
914    pub fn chunk_cache_stats(&self) -> ChunkCacheStats {
915        self.chunk_cache.stats()
916    }
917
918    /// Fill value, if defined.
919    pub fn fill_value(&self) -> Option<&FillValueMessage> {
920        self.fill_value.as_ref()
921    }
922
923    /// Dataset attributes.
924    pub fn attributes(&self) -> Vec<Attribute> {
925        self.attributes
926            .iter()
927            .map(|a| attribute_from_message_storage(a, self.context.as_ref()))
928            .collect()
929    }
930
931    /// Find an attribute by name.
932    pub fn attribute(&self, name: &str) -> Result<Attribute> {
933        self.attributes
934            .iter()
935            .find(|a| a.name == name)
936            .map(|a| attribute_from_message_storage(a, self.context.as_ref()))
937            .ok_or_else(|| Error::AttributeNotFound(name.to_string()))
938    }
939
940    /// Read a scalar string dataset or single string element.
941    ///
942    /// Use [`Dataset::read_strings`] when the dataset contains multiple strings.
943    pub fn read_string(&self) -> Result<String> {
944        let mut strings = self.read_strings()?;
945        match strings.len() {
946            1 => Ok(strings.swap_remove(0)),
947            0 => Err(Error::InvalidData(format!(
948                "dataset '{}' contains no string elements",
949                self.name
950            ))),
951            count => Err(Error::InvalidData(format!(
952                "dataset '{}' contains {count} string elements; use read_strings()",
953                self.name
954            ))),
955        }
956    }
957
958    /// Read all string elements from a string-typed dataset.
959    pub fn read_strings(&self) -> Result<Vec<String>> {
960        match &self.datatype {
961            Datatype::String {
962                size: StringSize::Fixed(len),
963                encoding,
964                padding,
965            } => {
966                let raw = self.read_raw_bytes()?;
967                let elem_size = *len as usize;
968                let count = checked_usize(self.num_elements()?, "dataset string element count")?;
969                let expected_bytes =
970                    checked_mul_usize(count, elem_size, "dataset string byte size")?;
971                if raw.len() < expected_bytes {
972                    return Err(Error::InvalidData(format!(
973                        "dataset '{}' string data too short: need {} bytes, have {}",
974                        self.name,
975                        expected_bytes,
976                        raw.len()
977                    )));
978                }
979
980                let mut strings = Vec::with_capacity(count);
981                for i in 0..count {
982                    let start = i * elem_size;
983                    let end = start + elem_size;
984                    strings.push(decode_string(&raw[start..end], *padding, *encoding)?);
985                }
986                Ok(strings)
987            }
988            Datatype::String {
989                size: StringSize::Variable,
990                encoding,
991                padding,
992            } => {
993                let raw = self.read_raw_bytes()?;
994                let count = checked_usize(self.num_elements()?, "dataset string element count")?;
995                let ref_size = 4 + self.offset_size() as usize + 4;
996                let expected_bytes =
997                    checked_mul_usize(count, ref_size, "dataset string reference byte size")?;
998                if raw.len() < expected_bytes {
999                    return Err(Error::InvalidData(format!(
1000                        "dataset '{}' vlen string data too short: need {} bytes, have {}",
1001                        self.name,
1002                        expected_bytes,
1003                        raw.len()
1004                    )));
1005                }
1006
1007                let mut strings = Vec::with_capacity(count);
1008                for i in 0..count {
1009                    let offset = i * ref_size;
1010                    strings.push(read_one_vlen_string_storage(
1011                        &raw,
1012                        offset,
1013                        self.context.storage.as_ref(),
1014                        self.offset_size(),
1015                        self.length_size(),
1016                        *padding,
1017                        *encoding,
1018                    )?);
1019                }
1020                Ok(strings)
1021            }
1022            Datatype::VarLen {
1023                base,
1024                kind: VarLenKind::String,
1025                encoding,
1026                padding,
1027            } => {
1028                if !matches!(base.as_ref(), Datatype::FixedPoint { size: 1, .. }) {
1029                    return Err(Error::TypeMismatch {
1030                        expected: "String dataset".into(),
1031                        actual: format!("{:?}", self.datatype),
1032                    });
1033                }
1034
1035                let raw = self.read_raw_bytes()?;
1036                let count = checked_usize(self.num_elements()?, "dataset string element count")?;
1037                let ref_size = 4 + self.offset_size() as usize + 4;
1038                let expected_bytes =
1039                    checked_mul_usize(count, ref_size, "dataset string reference byte size")?;
1040                if raw.len() < expected_bytes {
1041                    return Err(Error::InvalidData(format!(
1042                        "dataset '{}' vlen byte string data too short: need {} bytes, have {}",
1043                        self.name,
1044                        expected_bytes,
1045                        raw.len()
1046                    )));
1047                }
1048
1049                let mut strings = Vec::with_capacity(count);
1050                for i in 0..count {
1051                    let offset = i * ref_size;
1052                    let ref_bytes = &raw[offset..offset + ref_size];
1053                    let value = resolve_vlen_bytes_storage(
1054                        ref_bytes,
1055                        self.context.storage.as_ref(),
1056                        self.offset_size(),
1057                        self.length_size(),
1058                    )
1059                    .unwrap_or_default();
1060                    strings.push(decode_string(&value, *padding, *encoding)?);
1061                }
1062                Ok(strings)
1063            }
1064            _ => Err(Error::TypeMismatch {
1065                expected: "String dataset".into(),
1066                actual: format!("{:?}", self.datatype),
1067            }),
1068        }
1069    }
1070
1071    /// Total number of elements in the dataset.
1072    pub fn num_elements(&self) -> Result<u64> {
1073        self.dataspace.num_elements()
1074    }
1075
1076    /// Read the entire dataset into an n-dimensional array.
1077    pub fn read_array<T: H5Type>(&self) -> Result<ArrayD<T>> {
1078        let result = match &self.layout {
1079            DataLayout::Compact { data } => self.read_compact::<T>(data),
1080            DataLayout::Contiguous { address, size } => self.read_contiguous::<T>(*address, *size),
1081            DataLayout::Chunked {
1082                address,
1083                dims,
1084                element_size,
1085                chunk_indexing,
1086            } => self.read_chunked::<T>(*address, dims, *element_size, chunk_indexing.as_ref()),
1087        };
1088        result.map_err(|e| e.with_context(&self.name))
1089    }
1090
1091    /// Read the entire dataset into a caller-provided typed buffer.
1092    pub fn read_into<T: H5Type>(&self, dst: &mut [T]) -> Result<()> {
1093        let result = (|| {
1094            let element_count = checked_usize(self.num_elements()?, "dataset element count")?;
1095            if dst.len() != element_count {
1096                return Err(Error::InvalidData(format!(
1097                    "destination has {} elements, dataset requires {}",
1098                    dst.len(),
1099                    element_count
1100                )));
1101            }
1102
1103            let elem_size = self.raw_element_size()?;
1104            if T::native_copy_compatible(&self.datatype) && std::mem::size_of::<T>() == elem_size {
1105                let dst_bytes = unsafe {
1106                    std::slice::from_raw_parts_mut(
1107                        dst.as_mut_ptr() as *mut u8,
1108                        checked_mul_usize(dst.len(), elem_size, "destination byte length")?,
1109                    )
1110                };
1111                return self.read_raw_bytes_into_inner(dst_bytes);
1112            }
1113
1114            let array = self.read_array::<T>()?;
1115            let values = array.as_slice_memory_order().ok_or_else(|| {
1116                Error::InvalidData("decoded array is not contiguous in memory order".into())
1117            })?;
1118            dst.clone_from_slice(values);
1119            Ok(())
1120        })();
1121
1122        result.map_err(|e| e.with_context(&self.name))
1123    }
1124
1125    /// Read the entire dataset using internal chunk-level parallelism when possible.
1126    ///
1127    /// Non-chunked datasets fall back to `read_array`.
1128    #[cfg(feature = "rayon")]
1129    pub fn read_array_parallel<T: H5Type>(&self) -> Result<ArrayD<T>> {
1130        match &self.layout {
1131            DataLayout::Chunked {
1132                address,
1133                dims,
1134                element_size,
1135                chunk_indexing,
1136            } => self.read_chunked_parallel::<T>(
1137                *address,
1138                dims,
1139                *element_size,
1140                chunk_indexing.as_ref(),
1141            ),
1142            _ => self.read_array::<T>(),
1143        }
1144    }
1145
1146    /// Read the entire dataset using the provided Rayon thread pool.
1147    ///
1148    /// Non-chunked datasets fall back to `read_array`.
1149    #[cfg(feature = "rayon")]
1150    pub fn read_array_in_pool<T: H5Type>(&self, pool: &rayon::ThreadPool) -> Result<ArrayD<T>> {
1151        match &self.layout {
1152            DataLayout::Chunked {
1153                address,
1154                dims,
1155                element_size,
1156                chunk_indexing,
1157            } => pool.install(|| {
1158                self.read_chunked_parallel::<T>(
1159                    *address,
1160                    dims,
1161                    *element_size,
1162                    chunk_indexing.as_ref(),
1163                )
1164            }),
1165            _ => self.read_array::<T>(),
1166        }
1167    }
1168
1169    /// Read a hyperslab of the dataset using chunk-level parallelism when possible.
1170    ///
1171    /// Chunked datasets decompress overlapping chunks in parallel via Rayon.
1172    /// Non-chunked layouts fall back to `read_slice`.
1173    #[cfg(feature = "rayon")]
1174    pub fn read_slice_parallel<T: H5Type>(&self, selection: &SliceInfo) -> Result<ArrayD<T>> {
1175        let resolved = normalize_selection(selection, &self.dataspace.dims)?;
1176
1177        match &self.layout {
1178            DataLayout::Chunked {
1179                address,
1180                dims,
1181                element_size,
1182                chunk_indexing,
1183            } => self.read_chunked_slice_parallel::<T>(
1184                *address,
1185                dims,
1186                *element_size,
1187                chunk_indexing.as_ref(),
1188                selection,
1189                &resolved,
1190            ),
1191            _ => self.read_slice::<T>(selection),
1192        }
1193    }
1194
1195    /// Read a hyperslab of the dataset.
1196    pub fn read_slice<T: H5Type>(&self, selection: &SliceInfo) -> Result<ArrayD<T>> {
1197        let resolved = normalize_selection(selection, &self.dataspace.dims)?;
1198
1199        match &self.layout {
1200            DataLayout::Contiguous { address, size } => {
1201                self.read_contiguous_slice::<T>(*address, *size, &resolved)
1202            }
1203            DataLayout::Compact { data } => self.read_compact_slice::<T>(data, selection),
1204            DataLayout::Chunked {
1205                address,
1206                dims,
1207                element_size,
1208                chunk_indexing,
1209            } => self.read_chunked_slice::<T>(
1210                *address,
1211                dims,
1212                *element_size,
1213                chunk_indexing.as_ref(),
1214                selection,
1215                &resolved,
1216            ),
1217        }
1218    }
1219
1220    fn read_compact<T: H5Type>(&self, data: &[u8]) -> Result<ArrayD<T>> {
1221        self.validate_allocated_raw_data_len("compact", data.len())?;
1222        self.decode_raw_data::<T>(data)
1223    }
1224
1225    /// Read the dataset as logical element bytes.
1226    ///
1227    /// The returned bytes are after layout handling, chunk decompression, filter
1228    /// decoding, external raw-data resolution, and fill-value materialization.
1229    /// Numeric values remain in the dataset datatype's byte order.
1230    pub fn read_raw_bytes(&self) -> Result<Vec<u8>> {
1231        let result: Result<Vec<u8>> = (|| {
1232            let total_bytes = self.raw_byte_len()?;
1233            let mut output = vec![0u8; total_bytes];
1234            self.read_raw_bytes_into_inner(&mut output)?;
1235            Ok(output)
1236        })();
1237
1238        result.map_err(|e| e.with_context(&self.name))
1239    }
1240
1241    /// Number of logical raw bytes in the entire dataset.
1242    pub fn raw_byte_len(&self) -> Result<usize> {
1243        let elem_size = self.raw_element_size()?;
1244        let total_elements = checked_usize(self.num_elements()?, "dataset element count")?;
1245        checked_mul_usize(total_elements, elem_size, "dataset size in bytes")
1246    }
1247
1248    /// Read logical raw bytes into a caller-provided buffer.
1249    ///
1250    /// The destination length must match [`Dataset::raw_byte_len`]. Numeric
1251    /// values remain in the dataset datatype's byte order.
1252    pub fn read_raw_bytes_into(&self, dst: &mut [u8]) -> Result<()> {
1253        let result: Result<()> = (|| {
1254            let total_bytes = self.raw_byte_len()?;
1255            if dst.len() != total_bytes {
1256                return Err(Error::InvalidData(format!(
1257                    "destination has {} bytes, dataset requires {}",
1258                    dst.len(),
1259                    total_bytes
1260                )));
1261            }
1262            self.read_raw_bytes_into_inner(dst)
1263        })();
1264
1265        result.map_err(|e| e.with_context(&self.name))
1266    }
1267
1268    /// Read logical raw bytes converted to native-endian numeric fields.
1269    pub fn read_native_bytes(&self) -> Result<Vec<u8>> {
1270        let result: Result<Vec<u8>> = (|| {
1271            let total_bytes = self.raw_byte_len()?;
1272            let mut output = vec![0u8; total_bytes];
1273            self.read_raw_bytes_into_inner(&mut output)?;
1274            self.convert_to_native_endian(&mut output)?;
1275            Ok(output)
1276        })();
1277
1278        result.map_err(|e| e.with_context(&self.name))
1279    }
1280
1281    /// Read logical raw bytes into a buffer and convert numeric fields to native endian.
1282    pub fn read_native_bytes_into(&self, dst: &mut [u8]) -> Result<()> {
1283        let result: Result<()> = (|| {
1284            let total_bytes = self.raw_byte_len()?;
1285            if dst.len() != total_bytes {
1286                return Err(Error::InvalidData(format!(
1287                    "destination has {} bytes, dataset requires {}",
1288                    dst.len(),
1289                    total_bytes
1290                )));
1291            }
1292            self.read_raw_bytes_into_inner(dst)?;
1293            self.convert_to_native_endian(dst)
1294        })();
1295
1296        result.map_err(|e| e.with_context(&self.name))
1297    }
1298
1299    fn read_raw_bytes_into_inner(&self, dst: &mut [u8]) -> Result<()> {
1300        match &self.layout {
1301            DataLayout::Compact { data } => {
1302                self.validate_allocated_raw_data_len("compact", data.len())?;
1303                dst.copy_from_slice(data);
1304                Ok(())
1305            }
1306            DataLayout::Contiguous { address, size } => {
1307                self.read_contiguous_bytes_into(*address, *size, dst)
1308            }
1309            DataLayout::Chunked {
1310                address,
1311                dims,
1312                element_size: _,
1313                chunk_indexing,
1314            } => self.read_chunked_bytes_into(*address, dims, chunk_indexing.as_ref(), dst),
1315        }
1316    }
1317
1318    /// Size in bytes of one HDF5 variable-length reference in this file.
1319    pub fn vlen_reference_size(&self) -> usize {
1320        4 + self.offset_size() as usize + 4
1321    }
1322
1323    /// Size in bytes of one logical raw element in this file.
1324    ///
1325    /// Variable-length strings and sequences are stored as global-heap
1326    /// references whose width depends on the file offset size.
1327    pub fn raw_element_size(&self) -> Result<usize> {
1328        raw_element_size_for_datatype(&self.datatype, self.vlen_reference_size())
1329    }
1330
1331    /// Resolve one HDF5 variable-length reference into logical element bytes.
1332    ///
1333    /// `base_element_size` is the byte width of one value in the vlen sequence.
1334    /// The sequence length stored in the reference is interpreted as an element
1335    /// count and multiplied by this size.
1336    pub fn resolve_vlen_reference_bytes(
1337        &self,
1338        reference: &[u8],
1339        base_element_size: usize,
1340    ) -> Result<Vec<u8>> {
1341        if reference.len() < self.vlen_reference_size() {
1342            return Err(Error::InvalidData(format!(
1343                "dataset '{}' vlen reference too short: need {} bytes, have {}",
1344                self.name,
1345                self.vlen_reference_size(),
1346                reference.len()
1347            )));
1348        }
1349
1350        let mut cursor = Cursor::new(reference);
1351        let seq_len = cursor.read_u32_le()? as usize;
1352        let heap_addr = cursor.read_offset(self.offset_size())?;
1353        let obj_index = cursor.read_u32_le()? as u16;
1354
1355        if Cursor::is_undefined_offset(heap_addr, self.offset_size()) || obj_index == 0 {
1356            return Ok(Vec::new());
1357        }
1358
1359        let expected_bytes =
1360            checked_mul_usize(seq_len, base_element_size, "vlen sequence byte size")?;
1361        let collection = crate::global_heap::GlobalHeapCollection::parse_at_storage(
1362            self.context.storage.as_ref(),
1363            heap_addr,
1364            self.offset_size(),
1365            self.length_size(),
1366        )?;
1367        let object = collection.get_object(obj_index).ok_or_else(|| {
1368            Error::InvalidData(format!(
1369                "dataset '{}' references missing vlen heap object {}",
1370                self.name, obj_index
1371            ))
1372        })?;
1373        if object.data.len() < expected_bytes {
1374            return Err(Error::InvalidData(format!(
1375                "dataset '{}' vlen heap object too short: need {} bytes, have {}",
1376                self.name,
1377                expected_bytes,
1378                object.data.len()
1379            )));
1380        }
1381
1382        Ok(object.data[..expected_bytes].to_vec())
1383    }
1384
1385    fn read_contiguous<T: H5Type>(&self, address: u64, size: u64) -> Result<ArrayD<T>> {
1386        if self.external_files.is_some() {
1387            let elem_size = self.raw_element_size()?;
1388            let total_elements = checked_usize(self.num_elements()?, "dataset element count")?;
1389            let total_bytes =
1390                checked_mul_usize(total_elements, elem_size, "dataset size in bytes")?;
1391            let raw = self.read_external_range(0, total_bytes)?;
1392            return self.decode_raw_data::<T>(&raw);
1393        }
1394
1395        if Cursor::is_undefined_offset(address, self.offset_size()) || size == 0 {
1396            // Dataset with no data written — return fill values
1397            return self.make_fill_array::<T>();
1398        }
1399
1400        let sz = checked_usize(size, "contiguous dataset size")?;
1401        self.validate_allocated_raw_data_len("contiguous", sz)?;
1402        let raw = self.context.read_range(address, sz)?;
1403        self.decode_raw_data::<T>(raw.as_ref())
1404    }
1405
1406    fn read_contiguous_bytes_into(&self, address: u64, size: u64, dst: &mut [u8]) -> Result<()> {
1407        if self.external_files.is_some() {
1408            return self.read_external_range_into(0, dst);
1409        }
1410
1411        if Cursor::is_undefined_offset(address, self.offset_size()) || size == 0 {
1412            self.fill_output_buffer(dst);
1413            return Ok(());
1414        }
1415
1416        let sz = checked_usize(size, "contiguous dataset size")?;
1417        self.validate_allocated_raw_data_len("contiguous", sz)?;
1418        if dst.is_empty() {
1419            return Ok(());
1420        }
1421        let raw = self.context.read_range(address, sz)?;
1422        dst.copy_from_slice(raw.as_ref());
1423        Ok(())
1424    }
1425
1426    fn read_contiguous_logical_range(
1427        &self,
1428        address: u64,
1429        logical_offset: usize,
1430        len: usize,
1431    ) -> Result<Vec<u8>> {
1432        if self.external_files.is_some() {
1433            return self.read_external_range(logical_offset, len);
1434        }
1435
1436        let file_offset = checked_add_u64(
1437            address,
1438            u64::try_from(logical_offset).map_err(|_| {
1439                Error::InvalidData("contiguous logical offset exceeds u64 capacity".to_string())
1440            })?,
1441            "contiguous read file offset",
1442        )?;
1443        Ok(self.context.read_range(file_offset, len)?.to_vec())
1444    }
1445
1446    fn read_external_range(&self, logical_offset: usize, len: usize) -> Result<Vec<u8>> {
1447        let mut output = vec![0u8; len];
1448        self.read_external_range_into(logical_offset, &mut output)?;
1449        Ok(output)
1450    }
1451
1452    fn read_external_range_into(&self, logical_offset: usize, dst: &mut [u8]) -> Result<()> {
1453        self.fill_output_buffer(dst);
1454        if dst.is_empty() {
1455            return Ok(());
1456        }
1457
1458        let request_start = u64::try_from(logical_offset).map_err(|_| {
1459            Error::InvalidData("external dataset offset exceeds u64 capacity".to_string())
1460        })?;
1461        let request_len = u64::try_from(dst.len()).map_err(|_| {
1462            Error::InvalidData("external dataset length exceeds u64 capacity".to_string())
1463        })?;
1464        let request_end = request_start
1465            .checked_add(request_len)
1466            .ok_or_else(|| Error::InvalidData("external dataset range overflows".into()))?;
1467
1468        for slot in self.external_raw_slots()?.iter() {
1469            let slot_end = slot.logical_offset.saturating_add(slot.size);
1470            let overlap_start = request_start.max(slot.logical_offset);
1471            let overlap_end = request_end.min(slot_end);
1472            if overlap_start >= overlap_end {
1473                continue;
1474            }
1475
1476            let read_offset = slot
1477                .file_offset
1478                .checked_add(overlap_start - slot.logical_offset)
1479                .ok_or_else(|| Error::InvalidData("external file read offset overflows".into()))?;
1480            let read_len = checked_usize(overlap_end - overlap_start, "external read length")?;
1481            let dst_start = checked_usize(overlap_start - request_start, "external read dst")?;
1482            let dst_end = checked_add_usize(dst_start, read_len, "external read dst end")?;
1483            let bytes = slot.storage.read_range(read_offset, read_len)?;
1484            dst[dst_start..dst_end].copy_from_slice(bytes.as_ref());
1485        }
1486
1487        Ok(())
1488    }
1489
1490    fn external_raw_slots(&self) -> Result<Arc<Vec<ResolvedExternalRawSlot>>> {
1491        if let Some(slots) = self.external_slots.get() {
1492            return Ok(slots.clone());
1493        }
1494
1495        let slots = Arc::new(self.load_external_raw_slots()?);
1496        let _ = self.external_slots.set(slots.clone());
1497        Ok(self
1498            .external_slots
1499            .get()
1500            .expect("external slot cache must exist after initialization")
1501            .clone())
1502    }
1503
1504    fn load_external_raw_slots(&self) -> Result<Vec<ResolvedExternalRawSlot>> {
1505        let Some(external_files) = self.external_files.as_ref() else {
1506            return Ok(Vec::new());
1507        };
1508
1509        let heap = LocalHeap::parse_at_storage(
1510            self.context.storage.as_ref(),
1511            external_files.heap_address,
1512            self.offset_size(),
1513            self.length_size(),
1514        )?;
1515
1516        let mut logical_offset = 0u64;
1517        let mut slots = Vec::with_capacity(external_files.slots.len());
1518        for slot in &external_files.slots {
1519            let filename =
1520                heap.get_string_storage(slot.name_offset, self.context.storage.as_ref())?;
1521            let storage = self
1522                .context
1523                .resolve_external_file(&filename)?
1524                .ok_or_else(|| {
1525                    Error::Other(format!(
1526                        "external raw data file '{filename}' could not be resolved"
1527                    ))
1528                })?;
1529            let size = if Cursor::is_undefined_offset(slot.size, self.length_size()) {
1530                u64::MAX.saturating_sub(logical_offset)
1531            } else {
1532                slot.size
1533            };
1534
1535            slots.push(ResolvedExternalRawSlot {
1536                logical_offset,
1537                storage,
1538                file_offset: slot.offset,
1539                size,
1540            });
1541
1542            if Cursor::is_undefined_offset(slot.size, self.length_size()) {
1543                break;
1544            }
1545            logical_offset = logical_offset.checked_add(slot.size).ok_or_else(|| {
1546                Error::InvalidData("external raw data logical offset overflows".into())
1547            })?;
1548        }
1549
1550        Ok(slots)
1551    }
1552
1553    fn read_chunked<T: H5Type>(
1554        &self,
1555        index_address: u64,
1556        chunk_dims: &[u32],
1557        _element_size: u32,
1558        chunk_indexing: Option<&ChunkIndexing>,
1559    ) -> Result<ArrayD<T>> {
1560        if Cursor::is_undefined_offset(index_address, self.offset_size()) {
1561            return self.make_fill_array::<T>();
1562        }
1563
1564        let ndim = self.ndim();
1565        let shape = &self.dataspace.dims;
1566        let elem_size = self.raw_element_size()?;
1567        let total_elements = checked_usize(self.num_elements()?, "dataset element count")?;
1568        let total_bytes = checked_mul_usize(total_elements, elem_size, "dataset size in bytes")?;
1569
1570        if total_bytes <= HOT_FULL_DATASET_CACHE_MAX_BYTES {
1571            if let Some(cached_bytes) = self.full_dataset_bytes.get() {
1572                return self.decode_raw_data::<T>(cached_bytes);
1573            }
1574        }
1575
1576        let chunk_shape: Vec<u64> = chunk_dims.iter().map(|&d| d as u64).collect();
1577        validate_chunk_shape(shape, &chunk_shape)?;
1578        let dataset_strides = row_major_strides(shape, "dataset stride")?;
1579        let chunk_strides = row_major_strides(&chunk_shape, "chunk stride")?;
1580
1581        let mut entries = self.collect_chunk_entries(
1582            index_address,
1583            chunk_dims,
1584            chunk_indexing,
1585            ChunkEntrySelection {
1586                shape,
1587                ndim,
1588                elem_size,
1589                chunk_bounds: None,
1590            },
1591        )?;
1592
1593        let full_chunk_coverage = match full_dataset_chunk_bounds(shape, &chunk_shape)? {
1594            Some((first_chunk, last_chunk)) => validate_chunk_grid_coverage(
1595                &mut entries,
1596                shape,
1597                &chunk_shape,
1598                &first_chunk,
1599                &last_chunk,
1600            )?,
1601            None if entries.is_empty() => true,
1602            None => {
1603                return Err(Error::InvalidData(
1604                    "chunk index contains entries for an empty dataset".into(),
1605                ))
1606            }
1607        };
1608        if full_chunk_coverage {
1609            if T::native_copy_compatible(&self.datatype) && std::mem::size_of::<T>() == elem_size {
1610                let mut result_values: Vec<MaybeUninit<T>> =
1611                    std::iter::repeat_with(MaybeUninit::<T>::uninit)
1612                        .take(total_elements)
1613                        .collect();
1614                let result_ptr = result_values.as_mut_ptr() as *mut u8;
1615                let result_len = checked_mul_usize(
1616                    result_values.len(),
1617                    std::mem::size_of::<T>(),
1618                    "typed dataset size in bytes",
1619                )?;
1620
1621                for entry in &entries {
1622                    let chunk_data =
1623                        self.load_exact_chunk_data(entry, index_address, &chunk_shape, elem_size)?;
1624                    unsafe {
1625                        copy_chunk_to_flat_with_strides_ptr(
1626                            &chunk_data,
1627                            FlatBufferPtr {
1628                                ptr: result_ptr,
1629                                len: result_len,
1630                            },
1631                            ChunkCopyLayout {
1632                                chunk_offsets: &entry.offsets,
1633                                chunk_shape: &chunk_shape,
1634                                dataset_shape: shape,
1635                                dataset_strides: &dataset_strides,
1636                                chunk_strides: &chunk_strides,
1637                                elem_size,
1638                            },
1639                        )?;
1640                    }
1641                }
1642
1643                if total_bytes <= HOT_FULL_DATASET_CACHE_MAX_BYTES {
1644                    let mut cached_bytes = vec![0u8; total_bytes];
1645                    unsafe {
1646                        std::ptr::copy_nonoverlapping(
1647                            result_ptr,
1648                            cached_bytes.as_mut_ptr(),
1649                            total_bytes,
1650                        );
1651                    }
1652                    let _ = self.full_dataset_bytes.set(Arc::new(cached_bytes));
1653                }
1654
1655                let mut result_shape = Vec::with_capacity(shape.len());
1656                for &dim in shape {
1657                    result_shape.push(checked_usize(dim, "dataset dimension")?);
1658                }
1659                let result_values = assume_init_vec(result_values);
1660                return ArrayD::from_shape_vec(IxDyn(&result_shape), result_values)
1661                    .map_err(|e| Error::InvalidData(format!("array shape error: {e}")));
1662            }
1663
1664            let mut flat_data = vec![MaybeUninit::<u8>::uninit(); total_bytes];
1665            let flat_ptr = flat_data.as_mut_ptr() as *mut u8;
1666            let flat_len = flat_data.len();
1667
1668            for entry in &entries {
1669                let chunk_data =
1670                    self.load_exact_chunk_data(entry, index_address, &chunk_shape, elem_size)?;
1671                unsafe {
1672                    copy_chunk_to_flat_with_strides_ptr(
1673                        &chunk_data,
1674                        FlatBufferPtr {
1675                            ptr: flat_ptr,
1676                            len: flat_len,
1677                        },
1678                        ChunkCopyLayout {
1679                            chunk_offsets: &entry.offsets,
1680                            chunk_shape: &chunk_shape,
1681                            dataset_shape: shape,
1682                            dataset_strides: &dataset_strides,
1683                            chunk_strides: &chunk_strides,
1684                            elem_size,
1685                        },
1686                    )?;
1687                }
1688            }
1689
1690            let flat_data = assume_init_u8_vec(flat_data);
1691            if total_bytes <= HOT_FULL_DATASET_CACHE_MAX_BYTES {
1692                let _ = self.full_dataset_bytes.set(Arc::new(flat_data.clone()));
1693            }
1694            return self.decode_raw_data::<T>(&flat_data);
1695        }
1696
1697        let mut flat_data = self.make_output_buffer(total_bytes);
1698        for entry in &entries {
1699            let chunk_data =
1700                self.load_exact_chunk_data(entry, index_address, &chunk_shape, elem_size)?;
1701            copy_chunk_to_flat_with_strides(
1702                &chunk_data,
1703                &mut flat_data,
1704                ChunkCopyLayout {
1705                    chunk_offsets: &entry.offsets,
1706                    chunk_shape: &chunk_shape,
1707                    dataset_shape: shape,
1708                    dataset_strides: &dataset_strides,
1709                    chunk_strides: &chunk_strides,
1710                    elem_size,
1711                },
1712            )?;
1713        }
1714
1715        self.decode_raw_data::<T>(&flat_data)
1716    }
1717
1718    fn read_chunked_bytes_into(
1719        &self,
1720        index_address: u64,
1721        chunk_dims: &[u32],
1722        chunk_indexing: Option<&ChunkIndexing>,
1723        dst: &mut [u8],
1724    ) -> Result<()> {
1725        if Cursor::is_undefined_offset(index_address, self.offset_size()) {
1726            self.fill_output_buffer(dst);
1727            return Ok(());
1728        }
1729
1730        let ndim = self.ndim();
1731        let shape = &self.dataspace.dims;
1732        let elem_size = self.raw_element_size()?;
1733
1734        if dst.len() <= HOT_FULL_DATASET_CACHE_MAX_BYTES {
1735            if let Some(cached_bytes) = self.full_dataset_bytes.get() {
1736                if cached_bytes.len() == dst.len() {
1737                    dst.copy_from_slice(cached_bytes.as_slice());
1738                    return Ok(());
1739                }
1740            }
1741        }
1742
1743        let chunk_shape: Vec<u64> = chunk_dims.iter().map(|&d| d as u64).collect();
1744        validate_chunk_shape(shape, &chunk_shape)?;
1745        let dataset_strides = row_major_strides(shape, "dataset stride")?;
1746        let chunk_strides = row_major_strides(&chunk_shape, "chunk stride")?;
1747
1748        let mut entries = self.collect_chunk_entries(
1749            index_address,
1750            chunk_dims,
1751            chunk_indexing,
1752            ChunkEntrySelection {
1753                shape,
1754                ndim,
1755                elem_size,
1756                chunk_bounds: None,
1757            },
1758        )?;
1759
1760        let full_chunk_coverage = match full_dataset_chunk_bounds(shape, &chunk_shape)? {
1761            Some((first_chunk, last_chunk)) => validate_chunk_grid_coverage(
1762                &mut entries,
1763                shape,
1764                &chunk_shape,
1765                &first_chunk,
1766                &last_chunk,
1767            )?,
1768            None if entries.is_empty() => true,
1769            None => {
1770                return Err(Error::InvalidData(
1771                    "chunk index contains entries for an empty dataset".into(),
1772                ))
1773            }
1774        };
1775
1776        self.fill_output_buffer(dst);
1777        for entry in &entries {
1778            let chunk_data =
1779                self.load_exact_chunk_data(entry, index_address, &chunk_shape, elem_size)?;
1780            copy_chunk_to_flat_with_strides(
1781                &chunk_data,
1782                dst,
1783                ChunkCopyLayout {
1784                    chunk_offsets: &entry.offsets,
1785                    chunk_shape: &chunk_shape,
1786                    dataset_shape: shape,
1787                    dataset_strides: &dataset_strides,
1788                    chunk_strides: &chunk_strides,
1789                    elem_size,
1790                },
1791            )?;
1792        }
1793
1794        if full_chunk_coverage && dst.len() <= HOT_FULL_DATASET_CACHE_MAX_BYTES {
1795            let _ = self.full_dataset_bytes.set(Arc::new(dst.to_vec()));
1796        }
1797
1798        Ok(())
1799    }
1800
1801    #[cfg(feature = "rayon")]
1802    fn read_chunked_parallel<T: H5Type>(
1803        &self,
1804        index_address: u64,
1805        chunk_dims: &[u32],
1806        _element_size: u32,
1807        chunk_indexing: Option<&ChunkIndexing>,
1808    ) -> Result<ArrayD<T>> {
1809        if Cursor::is_undefined_offset(index_address, self.offset_size()) {
1810            return self.make_fill_array::<T>();
1811        }
1812
1813        let ndim = self.ndim();
1814        let shape = &self.dataspace.dims;
1815        let elem_size = self.raw_element_size()?;
1816        let total_elements = checked_usize(self.num_elements()?, "dataset element count")?;
1817        let total_bytes = checked_mul_usize(total_elements, elem_size, "dataset size in bytes")?;
1818
1819        if total_bytes <= HOT_FULL_DATASET_CACHE_MAX_BYTES {
1820            if let Some(cached_bytes) = self.full_dataset_bytes.get() {
1821                return self.decode_raw_data::<T>(cached_bytes);
1822            }
1823        }
1824
1825        let chunk_shape: Vec<u64> = chunk_dims.iter().map(|&d| d as u64).collect();
1826        validate_chunk_shape(shape, &chunk_shape)?;
1827        let dataset_strides = row_major_strides(shape, "dataset stride")?;
1828        let chunk_strides = row_major_strides(&chunk_shape, "chunk stride")?;
1829
1830        let mut entries = self.collect_chunk_entries(
1831            index_address,
1832            chunk_dims,
1833            chunk_indexing,
1834            ChunkEntrySelection {
1835                shape,
1836                ndim,
1837                elem_size,
1838                chunk_bounds: None,
1839            },
1840        )?;
1841
1842        let full_chunk_coverage = match full_dataset_chunk_bounds(shape, &chunk_shape)? {
1843            Some((first_chunk, last_chunk)) => validate_chunk_grid_coverage(
1844                &mut entries,
1845                shape,
1846                &chunk_shape,
1847                &first_chunk,
1848                &last_chunk,
1849            )?,
1850            None if entries.is_empty() => true,
1851            None => {
1852                return Err(Error::InvalidData(
1853                    "chunk index contains entries for an empty dataset".into(),
1854                ))
1855            }
1856        };
1857        if full_chunk_coverage {
1858            if T::native_copy_compatible(&self.datatype) && std::mem::size_of::<T>() == elem_size {
1859                let mut result_values: Vec<MaybeUninit<T>> =
1860                    std::iter::repeat_with(MaybeUninit::<T>::uninit)
1861                        .take(total_elements)
1862                        .collect();
1863                let flat = FlatBufferPtr {
1864                    ptr: result_values.as_mut_ptr() as *mut u8,
1865                    len: checked_mul_usize(
1866                        result_values.len(),
1867                        std::mem::size_of::<T>(),
1868                        "typed dataset size in bytes",
1869                    )?,
1870                };
1871
1872                entries
1873                    .par_iter()
1874                    .map(|entry| {
1875                        self.load_exact_chunk_data(entry, index_address, &chunk_shape, elem_size)
1876                            .and_then(|data| unsafe {
1877                                flat.copy_chunk(
1878                                    &data,
1879                                    ChunkCopyLayout {
1880                                        chunk_offsets: &entry.offsets,
1881                                        chunk_shape: &chunk_shape,
1882                                        dataset_shape: shape,
1883                                        dataset_strides: &dataset_strides,
1884                                        chunk_strides: &chunk_strides,
1885                                        elem_size,
1886                                    },
1887                                )
1888                            })
1889                    })
1890                    .collect::<std::result::Result<Vec<_>, Error>>()?;
1891
1892                let mut result_shape = Vec::with_capacity(shape.len());
1893                for &dim in shape {
1894                    result_shape.push(checked_usize(dim, "dataset dimension")?);
1895                }
1896                if total_bytes <= HOT_FULL_DATASET_CACHE_MAX_BYTES {
1897                    let mut cached_bytes = vec![0u8; total_bytes];
1898                    unsafe {
1899                        std::ptr::copy_nonoverlapping(
1900                            flat.ptr,
1901                            cached_bytes.as_mut_ptr(),
1902                            total_bytes,
1903                        );
1904                    }
1905                    let _ = self.full_dataset_bytes.set(Arc::new(cached_bytes));
1906                }
1907                let result_values = assume_init_vec(result_values);
1908                return ArrayD::from_shape_vec(IxDyn(&result_shape), result_values)
1909                    .map_err(|e| Error::InvalidData(format!("array shape error: {e}")));
1910            }
1911
1912            let mut flat_data = vec![MaybeUninit::<u8>::uninit(); total_bytes];
1913            let flat = FlatBufferPtr {
1914                ptr: flat_data.as_mut_ptr() as *mut u8,
1915                len: flat_data.len(),
1916            };
1917
1918            entries
1919                .par_iter()
1920                .map(|entry| {
1921                    self.load_exact_chunk_data(entry, index_address, &chunk_shape, elem_size)
1922                        .and_then(|data| unsafe {
1923                            flat.copy_chunk(
1924                                &data,
1925                                ChunkCopyLayout {
1926                                    chunk_offsets: &entry.offsets,
1927                                    chunk_shape: &chunk_shape,
1928                                    dataset_shape: shape,
1929                                    dataset_strides: &dataset_strides,
1930                                    chunk_strides: &chunk_strides,
1931                                    elem_size,
1932                                },
1933                            )
1934                        })
1935                })
1936                .collect::<std::result::Result<Vec<_>, Error>>()?;
1937
1938            let flat_data = assume_init_u8_vec(flat_data);
1939            if total_bytes <= HOT_FULL_DATASET_CACHE_MAX_BYTES {
1940                let _ = self.full_dataset_bytes.set(Arc::new(flat_data.clone()));
1941            }
1942            return self.decode_raw_data::<T>(&flat_data);
1943        }
1944
1945        let mut flat_data = self.make_output_buffer(total_bytes);
1946        let flat = FlatBufferPtr {
1947            ptr: flat_data.as_mut_ptr(),
1948            len: flat_data.len(),
1949        };
1950
1951        entries
1952            .par_iter()
1953            .map(|entry| {
1954                self.load_exact_chunk_data(entry, index_address, &chunk_shape, elem_size)
1955                    .and_then(|data| unsafe {
1956                        flat.copy_chunk(
1957                            &data,
1958                            ChunkCopyLayout {
1959                                chunk_offsets: &entry.offsets,
1960                                chunk_shape: &chunk_shape,
1961                                dataset_shape: shape,
1962                                dataset_strides: &dataset_strides,
1963                                chunk_strides: &chunk_strides,
1964                                elem_size,
1965                            },
1966                        )
1967                    })
1968            })
1969            .collect::<std::result::Result<Vec<_>, Error>>()?;
1970
1971        self.decode_raw_data::<T>(&flat_data)
1972    }
1973
1974    /// Collect all chunk entries by dispatching on the chunk indexing type.
1975    ///
1976    /// Shared by `read_chunked` and `read_chunked_slice`.
1977    fn collect_chunk_entries(
1978        &self,
1979        index_address: u64,
1980        chunk_dims: &[u32],
1981        chunk_indexing: Option<&ChunkIndexing>,
1982        selection: ChunkEntrySelection<'_>,
1983    ) -> Result<Vec<chunk_index::ChunkEntry>> {
1984        if selection.chunk_bounds.is_none() {
1985            if let Some(cached) = self.full_chunk_entries.get() {
1986                return Ok((**cached).clone());
1987            }
1988        }
1989
1990        let cache_key =
1991            selection
1992                .chunk_bounds
1993                .map(|(first_chunk, last_chunk)| ChunkEntryCacheKey {
1994                    index_address,
1995                    first_chunk: SmallVec::from_slice(first_chunk),
1996                    last_chunk: SmallVec::from_slice(last_chunk),
1997                });
1998
1999        if let Some(ref key) = cache_key {
2000            let mut cache = self.chunk_entry_cache.lock();
2001            if let Some(cached) = cache.get(key) {
2002                return Ok((**cached).clone());
2003            }
2004        }
2005
2006        let entries = match chunk_indexing {
2007            None => {
2008                // V1-V3: B-tree v1 chunk indexing
2009                self.collect_btree_v1_entries(
2010                    index_address,
2011                    selection.ndim,
2012                    chunk_dims,
2013                    selection.chunk_bounds,
2014                )
2015            }
2016            Some(ChunkIndexing::SingleChunk {
2017                filtered_size,
2018                filters,
2019            }) => Ok(vec![chunk_index::single_chunk_entry(
2020                index_address,
2021                *filtered_size,
2022                *filters,
2023                selection.ndim,
2024            )]),
2025            Some(ChunkIndexing::BTreeV2) => chunk_index::collect_v2_chunk_entries_storage(
2026                self.context.storage.as_ref(),
2027                index_address,
2028                self.offset_size(),
2029                self.length_size(),
2030                selection.ndim as u32,
2031                chunk_dims,
2032                selection.chunk_bounds,
2033            ),
2034            Some(ChunkIndexing::Implicit) => chunk_index::collect_implicit_chunk_entries(
2035                index_address,
2036                selection.shape,
2037                chunk_dims,
2038                selection.elem_size,
2039                selection.chunk_bounds,
2040            ),
2041            Some(ChunkIndexing::FixedArray { .. }) => {
2042                crate::fixed_array::collect_fixed_array_chunk_entries_storage(
2043                    self.context.storage.as_ref(),
2044                    index_address,
2045                    self.offset_size(),
2046                    self.length_size(),
2047                    selection.shape,
2048                    chunk_dims,
2049                    selection.chunk_bounds,
2050                )
2051            }
2052            Some(ChunkIndexing::ExtensibleArray { .. }) => {
2053                crate::extensible_array::collect_extensible_array_chunk_entries_storage(
2054                    self.context.storage.as_ref(),
2055                    index_address,
2056                    self.offset_size(),
2057                    self.length_size(),
2058                    selection.shape,
2059                    chunk_dims,
2060                    selection.chunk_bounds,
2061                )
2062            }
2063        }?;
2064
2065        if let Some(key) = cache_key {
2066            let mut cache = self.chunk_entry_cache.lock();
2067            cache.put(key, Arc::new(entries.clone()));
2068        } else {
2069            let _ = self.full_chunk_entries.set(Arc::new(entries.clone()));
2070        }
2071
2072        Ok(entries)
2073    }
2074
2075    /// Collect chunk entries from a B-tree v1 index.
2076    fn collect_btree_v1_entries(
2077        &self,
2078        btree_address: u64,
2079        ndim: usize,
2080        chunk_dims: &[u32],
2081        chunk_bounds: Option<(&[u64], &[u64])>,
2082    ) -> Result<Vec<chunk_index::ChunkEntry>> {
2083        let leaves = crate::btree_v1::collect_btree_v1_leaves_storage(
2084            self.context.storage.as_ref(),
2085            btree_address,
2086            self.offset_size(),
2087            self.length_size(),
2088            Some(ndim as u32),
2089            chunk_dims,
2090            chunk_bounds,
2091        )?;
2092
2093        let mut entries = Vec::with_capacity(leaves.len());
2094        for (key, chunk_addr) in &leaves {
2095            match key {
2096                crate::btree_v1::BTreeV1Key::RawData {
2097                    chunk_size,
2098                    filter_mask,
2099                    offsets,
2100                } => {
2101                    entries.push(chunk_index::ChunkEntry {
2102                        address: *chunk_addr,
2103                        size: *chunk_size as u64,
2104                        filter_mask: *filter_mask,
2105                        offsets: offsets[..ndim].to_vec(),
2106                    });
2107                }
2108                _ => {
2109                    return Err(Error::InvalidData(
2110                        "expected raw data key in chunk B-tree".into(),
2111                    ))
2112                }
2113            }
2114        }
2115        Ok(entries)
2116    }
2117
2118    fn load_chunk_data(
2119        &self,
2120        entry: &chunk_index::ChunkEntry,
2121        dataset_addr: u64,
2122        chunk_shape: &[u64],
2123        elem_size: usize,
2124    ) -> Result<Arc<Vec<u8>>> {
2125        let cache_key = ChunkKey {
2126            dataset_addr,
2127            chunk_offsets: smallvec::SmallVec::from_slice(&entry.offsets),
2128        };
2129        let expected_len = decoded_chunk_expected_len(chunk_shape, elem_size)?;
2130        let filter_output_limit =
2131            checked_add_usize(expected_len, 1, "decoded chunk filter output limit")?;
2132
2133        self.chunk_cache.get_or_insert_with(cache_key, || {
2134            let size = if entry.size > 0 {
2135                checked_usize(entry.size, "encoded chunk size")?
2136            } else {
2137                expected_len
2138            };
2139            let raw = self.context.read_range(entry.address, size)?;
2140
2141            if let Some(ref pipeline) = self.filters {
2142                filters::apply_pipeline_with_limit(
2143                    raw.as_ref(),
2144                    &pipeline.filters,
2145                    entry.filter_mask,
2146                    elem_size,
2147                    Some(&self.filter_registry),
2148                    Some(filter_output_limit),
2149                )
2150            } else {
2151                Ok(raw.to_vec())
2152            }
2153        })
2154    }
2155
2156    fn load_exact_chunk_data(
2157        &self,
2158        entry: &chunk_index::ChunkEntry,
2159        dataset_addr: u64,
2160        chunk_shape: &[u64],
2161        elem_size: usize,
2162    ) -> Result<Arc<Vec<u8>>> {
2163        let data = self.load_chunk_data(entry, dataset_addr, chunk_shape, elem_size)?;
2164        validate_decoded_chunk_len(entry, chunk_shape, elem_size, data.len())?;
2165        Ok(data)
2166    }
2167
2168    /// Chunked slice: only read chunks that overlap the selection.
2169    ///
2170    /// Resolves each `SliceInfoElem` to concrete ranges, computes the chunk
2171    /// grid range per dimension, and only decompresses overlapping chunks.
2172    fn read_chunked_slice<T: H5Type>(
2173        &self,
2174        index_address: u64,
2175        chunk_dims: &[u32],
2176        _element_size: u32,
2177        chunk_indexing: Option<&ChunkIndexing>,
2178        _selection: &SliceInfo,
2179        resolved: &ResolvedSelection,
2180    ) -> Result<ArrayD<T>> {
2181        if resolved.result_elements == 0 {
2182            return self.make_fill_array_from_shape::<T>(0, &resolved.result_shape);
2183        }
2184
2185        if Cursor::is_undefined_offset(index_address, self.offset_size()) {
2186            return self
2187                .make_fill_array_from_shape::<T>(resolved.result_elements, &resolved.result_shape);
2188        }
2189
2190        let ndim = self.ndim();
2191        let shape = &self.dataspace.dims;
2192        let elem_size = dtype_element_size(&self.datatype)?;
2193        let chunk_shape: Vec<u64> = chunk_dims.iter().map(|&d| d as u64).collect();
2194        validate_chunk_shape(shape, &chunk_shape)?;
2195        let mut first_chunk = vec![0u64; ndim];
2196        let mut last_chunk = vec![0u64; ndim];
2197        for d in 0..ndim {
2198            let (first, last) = resolved.dims[d]
2199                .chunk_index_range(chunk_shape[d])
2200                .expect("zero-sized result handled above");
2201            first_chunk[d] = first;
2202            last_chunk[d] = last;
2203        }
2204
2205        // Collect all chunk entries.
2206        let mut overlapping = self.collect_chunk_entries(
2207            index_address,
2208            chunk_dims,
2209            chunk_indexing,
2210            ChunkEntrySelection {
2211                shape,
2212                ndim,
2213                elem_size,
2214                chunk_bounds: Some((&first_chunk, &last_chunk)),
2215            },
2216        )?;
2217        let fully_covered_grid = validate_chunk_grid_coverage(
2218            &mut overlapping,
2219            shape,
2220            &chunk_shape,
2221            &first_chunk,
2222            &last_chunk,
2223        )?;
2224
2225        let result_total_bytes = checked_mul_usize(
2226            resolved.result_elements,
2227            elem_size,
2228            "slice result size in bytes",
2229        )?;
2230        // Compute result strides (including collapsed dims — they have count=1).
2231        let result_dims = resolved.result_dims_with_collapsed();
2232        let mut result_strides = vec![1usize; ndim];
2233        for d in (0..ndim.saturating_sub(1)).rev() {
2234            result_strides[d] =
2235                checked_mul_usize(result_strides[d + 1], result_dims[d + 1], "result stride")?;
2236        }
2237        let mut chunk_strides = vec![1usize; ndim];
2238        for d in (0..ndim.saturating_sub(1)).rev() {
2239            chunk_strides[d] = checked_mul_usize(
2240                chunk_strides[d + 1],
2241                chunk_shape[d + 1] as usize,
2242                "chunk stride",
2243            )?;
2244        }
2245        let use_unit_stride_fast_path = resolved.is_unit_stride();
2246        let fully_covered_unit_stride = use_unit_stride_fast_path && fully_covered_grid;
2247
2248        if fully_covered_unit_stride {
2249            if T::native_copy_compatible(&self.datatype) && std::mem::size_of::<T>() == elem_size {
2250                let mut result_values: Vec<MaybeUninit<T>> =
2251                    std::iter::repeat_with(MaybeUninit::<T>::uninit)
2252                        .take(resolved.result_elements)
2253                        .collect();
2254                let result_ptr = result_values.as_mut_ptr() as *mut u8;
2255                let result_len = checked_mul_usize(
2256                    result_values.len(),
2257                    std::mem::size_of::<T>(),
2258                    "typed slice result size in bytes",
2259                )?;
2260
2261                for entry in &overlapping {
2262                    let chunk_data =
2263                        self.load_exact_chunk_data(entry, index_address, &chunk_shape, elem_size)?;
2264
2265                    unsafe {
2266                        copy_unit_stride_chunk_overlap_ptr(
2267                            &chunk_data,
2268                            FlatBufferPtr {
2269                                ptr: result_ptr,
2270                                len: result_len,
2271                            },
2272                            UnitStrideCopyLayout {
2273                                chunk_offsets: &entry.offsets,
2274                                chunk_shape: &chunk_shape,
2275                                dataset_shape: shape,
2276                                resolved,
2277                                chunk_strides: &chunk_strides,
2278                                result_strides: &result_strides,
2279                                elem_size,
2280                            },
2281                        )?;
2282                    }
2283                }
2284
2285                let result_values = assume_init_vec(result_values);
2286                return ArrayD::from_shape_vec(IxDyn(&resolved.result_shape), result_values)
2287                    .map_err(|e| Error::InvalidData(format!("array shape error: {e}")));
2288            }
2289
2290            let mut result_buf = vec![MaybeUninit::<u8>::uninit(); result_total_bytes];
2291            let result_ptr = result_buf.as_mut_ptr() as *mut u8;
2292            let result_len = result_buf.len();
2293
2294            for entry in &overlapping {
2295                let chunk_data =
2296                    self.load_exact_chunk_data(entry, index_address, &chunk_shape, elem_size)?;
2297
2298                unsafe {
2299                    copy_unit_stride_chunk_overlap_ptr(
2300                        &chunk_data,
2301                        FlatBufferPtr {
2302                            ptr: result_ptr,
2303                            len: result_len,
2304                        },
2305                        UnitStrideCopyLayout {
2306                            chunk_offsets: &entry.offsets,
2307                            chunk_shape: &chunk_shape,
2308                            dataset_shape: shape,
2309                            resolved,
2310                            chunk_strides: &chunk_strides,
2311                            result_strides: &result_strides,
2312                            elem_size,
2313                        },
2314                    )?;
2315                }
2316            }
2317
2318            let result_buf = assume_init_u8_vec(result_buf);
2319            return self.decode_buffer_with_shape::<T>(
2320                &result_buf,
2321                resolved.result_elements,
2322                &resolved.result_shape,
2323            );
2324        }
2325
2326        let mut result_buf = self.make_output_buffer(result_total_bytes);
2327
2328        // For each overlapping chunk: decompress and copy matching elements.
2329        for entry in &overlapping {
2330            let chunk_data =
2331                self.load_exact_chunk_data(entry, index_address, &chunk_shape, elem_size)?;
2332
2333            if use_unit_stride_fast_path {
2334                copy_unit_stride_chunk_overlap(
2335                    &chunk_data,
2336                    &mut result_buf,
2337                    UnitStrideCopyLayout {
2338                        chunk_offsets: &entry.offsets,
2339                        chunk_shape: &chunk_shape,
2340                        dataset_shape: shape,
2341                        resolved,
2342                        chunk_strides: &chunk_strides,
2343                        result_strides: &result_strides,
2344                        elem_size,
2345                    },
2346                )?;
2347                continue;
2348            }
2349
2350            // For each dimension, compute which elements within this chunk fall
2351            // within the selection.
2352            let mut dim_indices: Vec<Vec<(usize, usize)>> = Vec::with_capacity(ndim);
2353            for d in 0..ndim {
2354                let chunk_start = entry.offsets[d];
2355                let chunk_end = (chunk_start + chunk_shape[d]).min(shape[d]);
2356                let dim = &resolved.dims[d];
2357                let sel_start = dim.start;
2358                let sel_end = dim.end;
2359                let sel_step = dim.step;
2360                let mut indices = Vec::new();
2361
2362                // Find first selected index >= chunk_start
2363                let first_sel = if sel_start >= chunk_start {
2364                    sel_start
2365                } else {
2366                    let steps_to_skip = (chunk_start - sel_start).div_ceil(sel_step);
2367                    sel_start + steps_to_skip * sel_step
2368                };
2369
2370                let mut sel_idx = first_sel;
2371                while sel_idx < sel_end && sel_idx < chunk_end {
2372                    let chunk_local = checked_usize(sel_idx - chunk_start, "chunk-local index")?;
2373                    // Compute result-space index for this dimension.
2374                    let result_dim_idx =
2375                        checked_usize((sel_idx - dim.start) / sel_step, "result index")?;
2376                    indices.push((chunk_local, result_dim_idx));
2377                    sel_idx += sel_step;
2378                }
2379
2380                dim_indices.push(indices);
2381            }
2382
2383            // Iterate over the cartesian product of matching indices.
2384            copy_selected_elements(
2385                &chunk_data,
2386                &mut result_buf,
2387                &dim_indices,
2388                &chunk_strides,
2389                &result_strides,
2390                elem_size,
2391                ndim,
2392            )?;
2393        }
2394
2395        self.decode_buffer_with_shape::<T>(
2396            &result_buf,
2397            resolved.result_elements,
2398            &resolved.result_shape,
2399        )
2400    }
2401
2402    /// Parallel variant of `read_chunked_slice`: decompresses overlapping chunks
2403    /// in parallel using Rayon, then copies selected elements into the result buffer.
2404    ///
2405    /// Each chunk writes to a disjoint region of the result buffer (chunks don't
2406    /// overlap in output space), so this is safe to parallelize.
2407    #[cfg(feature = "rayon")]
2408    fn read_chunked_slice_parallel<T: H5Type>(
2409        &self,
2410        index_address: u64,
2411        chunk_dims: &[u32],
2412        _element_size: u32,
2413        chunk_indexing: Option<&ChunkIndexing>,
2414        _selection: &SliceInfo,
2415        resolved: &ResolvedSelection,
2416    ) -> Result<ArrayD<T>> {
2417        if resolved.result_elements == 0 {
2418            return self.make_fill_array_from_shape::<T>(0, &resolved.result_shape);
2419        }
2420
2421        if Cursor::is_undefined_offset(index_address, self.offset_size()) {
2422            return self
2423                .make_fill_array_from_shape::<T>(resolved.result_elements, &resolved.result_shape);
2424        }
2425
2426        let ndim = self.ndim();
2427        let shape = &self.dataspace.dims;
2428        let elem_size = dtype_element_size(&self.datatype)?;
2429        let chunk_shape: Vec<u64> = chunk_dims.iter().map(|&d| d as u64).collect();
2430        validate_chunk_shape(shape, &chunk_shape)?;
2431        let mut first_chunk = vec![0u64; ndim];
2432        let mut last_chunk = vec![0u64; ndim];
2433        for d in 0..ndim {
2434            let (first, last) = resolved.dims[d]
2435                .chunk_index_range(chunk_shape[d])
2436                .expect("zero-sized result handled above");
2437            first_chunk[d] = first;
2438            last_chunk[d] = last;
2439        }
2440
2441        // Collect all chunk entries.
2442        let mut overlapping = self.collect_chunk_entries(
2443            index_address,
2444            chunk_dims,
2445            chunk_indexing,
2446            ChunkEntrySelection {
2447                shape,
2448                ndim,
2449                elem_size,
2450                chunk_bounds: Some((&first_chunk, &last_chunk)),
2451            },
2452        )?;
2453        let fully_covered_grid = validate_chunk_grid_coverage(
2454            &mut overlapping,
2455            shape,
2456            &chunk_shape,
2457            &first_chunk,
2458            &last_chunk,
2459        )?;
2460
2461        // Allocate result buffer (raw bytes) initialized from fill value.
2462        let result_total_bytes = checked_mul_usize(
2463            resolved.result_elements,
2464            elem_size,
2465            "slice result size in bytes",
2466        )?;
2467        // Compute result strides (including collapsed dims — they have count=1).
2468        let result_dims = resolved.result_dims_with_collapsed();
2469        let mut result_strides = vec![1usize; ndim];
2470        for d in (0..ndim.saturating_sub(1)).rev() {
2471            result_strides[d] =
2472                checked_mul_usize(result_strides[d + 1], result_dims[d + 1], "result stride")?;
2473        }
2474        let mut chunk_strides = vec![1usize; ndim];
2475        for d in (0..ndim.saturating_sub(1)).rev() {
2476            chunk_strides[d] = checked_mul_usize(
2477                chunk_strides[d + 1],
2478                chunk_shape[d + 1] as usize,
2479                "chunk stride",
2480            )?;
2481        }
2482        let use_unit_stride_fast_path = resolved.is_unit_stride();
2483        let fully_covered_unit_stride = use_unit_stride_fast_path && fully_covered_grid;
2484
2485        if fully_covered_unit_stride {
2486            if T::native_copy_compatible(&self.datatype) && std::mem::size_of::<T>() == elem_size {
2487                let mut result_values: Vec<MaybeUninit<T>> =
2488                    std::iter::repeat_with(MaybeUninit::<T>::uninit)
2489                        .take(resolved.result_elements)
2490                        .collect();
2491                let flat = FlatBufferPtr {
2492                    ptr: result_values.as_mut_ptr() as *mut u8,
2493                    len: checked_mul_usize(
2494                        result_values.len(),
2495                        std::mem::size_of::<T>(),
2496                        "typed slice result size in bytes",
2497                    )?,
2498                };
2499
2500                overlapping
2501                    .par_iter()
2502                    .map(|entry| {
2503                        let chunk_data = self.load_exact_chunk_data(
2504                            entry,
2505                            index_address,
2506                            &chunk_shape,
2507                            elem_size,
2508                        )?;
2509
2510                        unsafe {
2511                            flat.copy_unit_stride_chunk_overlap(
2512                                &chunk_data,
2513                                UnitStrideCopyLayout {
2514                                    chunk_offsets: &entry.offsets,
2515                                    chunk_shape: &chunk_shape,
2516                                    dataset_shape: shape,
2517                                    resolved,
2518                                    chunk_strides: &chunk_strides,
2519                                    result_strides: &result_strides,
2520                                    elem_size,
2521                                },
2522                            )?;
2523                        }
2524
2525                        Ok(())
2526                    })
2527                    .collect::<std::result::Result<Vec<_>, Error>>()?;
2528
2529                let result_values = assume_init_vec(result_values);
2530                return ArrayD::from_shape_vec(IxDyn(&resolved.result_shape), result_values)
2531                    .map_err(|e| Error::InvalidData(format!("array shape error: {e}")));
2532            }
2533
2534            let mut result_buf = vec![MaybeUninit::<u8>::uninit(); result_total_bytes];
2535            let flat = FlatBufferPtr {
2536                ptr: result_buf.as_mut_ptr() as *mut u8,
2537                len: result_buf.len(),
2538            };
2539
2540            overlapping
2541                .par_iter()
2542                .map(|entry| {
2543                    let chunk_data =
2544                        self.load_exact_chunk_data(entry, index_address, &chunk_shape, elem_size)?;
2545
2546                    unsafe {
2547                        flat.copy_unit_stride_chunk_overlap(
2548                            &chunk_data,
2549                            UnitStrideCopyLayout {
2550                                chunk_offsets: &entry.offsets,
2551                                chunk_shape: &chunk_shape,
2552                                dataset_shape: shape,
2553                                resolved,
2554                                chunk_strides: &chunk_strides,
2555                                result_strides: &result_strides,
2556                                elem_size,
2557                            },
2558                        )?;
2559                    }
2560
2561                    Ok(())
2562                })
2563                .collect::<std::result::Result<Vec<_>, Error>>()?;
2564
2565            let result_buf = assume_init_u8_vec(result_buf);
2566            return self.decode_buffer_with_shape::<T>(
2567                &result_buf,
2568                resolved.result_elements,
2569                &resolved.result_shape,
2570            );
2571        }
2572
2573        let mut result_buf = self.make_output_buffer(result_total_bytes);
2574
2575        let flat = FlatBufferPtr {
2576            ptr: result_buf.as_mut_ptr(),
2577            len: result_buf.len(),
2578        };
2579
2580        overlapping
2581            .par_iter()
2582            .map(|entry| {
2583                let chunk_data =
2584                    self.load_exact_chunk_data(entry, index_address, &chunk_shape, elem_size)?;
2585
2586                if use_unit_stride_fast_path {
2587                    unsafe {
2588                        flat.copy_unit_stride_chunk_overlap(
2589                            &chunk_data,
2590                            UnitStrideCopyLayout {
2591                                chunk_offsets: &entry.offsets,
2592                                chunk_shape: &chunk_shape,
2593                                dataset_shape: shape,
2594                                resolved,
2595                                chunk_strides: &chunk_strides,
2596                                result_strides: &result_strides,
2597                                elem_size,
2598                            },
2599                        )?;
2600                    }
2601                    return Ok(());
2602                }
2603
2604                // For each dimension, compute which elements within this chunk fall
2605                // within the selection.
2606                let mut dim_indices: Vec<Vec<(usize, usize)>> = Vec::with_capacity(ndim);
2607                for d in 0..ndim {
2608                    let chunk_start = entry.offsets[d];
2609                    let chunk_end = (chunk_start + chunk_shape[d]).min(shape[d]);
2610                    let dim = &resolved.dims[d];
2611                    let sel_start = dim.start;
2612                    let sel_end = dim.end;
2613                    let sel_step = dim.step;
2614                    let mut indices = Vec::new();
2615
2616                    let first_sel = if sel_start >= chunk_start {
2617                        sel_start
2618                    } else {
2619                        let steps_to_skip = (chunk_start - sel_start).div_ceil(sel_step);
2620                        sel_start + steps_to_skip * sel_step
2621                    };
2622
2623                    let mut sel_idx = first_sel;
2624                    while sel_idx < sel_end && sel_idx < chunk_end {
2625                        let chunk_local =
2626                            checked_usize(sel_idx - chunk_start, "chunk-local index")?;
2627                        let result_dim_idx =
2628                            checked_usize((sel_idx - dim.start) / sel_step, "result index")?;
2629                        indices.push((chunk_local, result_dim_idx));
2630                        sel_idx += sel_step;
2631                    }
2632
2633                    dim_indices.push(indices);
2634                }
2635
2636                // SAFETY: each chunk writes to disjoint output positions because
2637                // chunks occupy non-overlapping regions of the dataset grid and
2638                // the selection maps each dataset coordinate to a unique result index.
2639                unsafe {
2640                    flat.copy_selected(
2641                        &chunk_data,
2642                        &dim_indices,
2643                        &chunk_strides,
2644                        &result_strides,
2645                        elem_size,
2646                        ndim,
2647                    )?;
2648                }
2649
2650                Ok(())
2651            })
2652            .collect::<std::result::Result<Vec<_>, Error>>()?;
2653
2654        self.decode_buffer_with_shape::<T>(
2655            &result_buf,
2656            resolved.result_elements,
2657            &resolved.result_shape,
2658        )
2659    }
2660
2661    fn read_contiguous_slice<T: H5Type>(
2662        &self,
2663        address: u64,
2664        size: u64,
2665        resolved: &ResolvedSelection,
2666    ) -> Result<ArrayD<T>> {
2667        if resolved.result_elements == 0 {
2668            return self.make_fill_array_from_shape::<T>(0, &resolved.result_shape);
2669        }
2670
2671        if self.external_files.is_none()
2672            && (Cursor::is_undefined_offset(address, self.offset_size()) || size == 0)
2673        {
2674            return self
2675                .make_fill_array_from_shape::<T>(resolved.result_elements, &resolved.result_shape);
2676        }
2677        if self.external_files.is_none() {
2678            self.validate_allocated_raw_data_len(
2679                "contiguous",
2680                checked_usize(size, "contiguous dataset size")?,
2681            )?;
2682        }
2683
2684        let shape = &self.dataspace.dims;
2685        if selection_covers_full_dataset(resolved, shape) {
2686            return self.read_contiguous::<T>(address, size);
2687        }
2688
2689        let elem_size = self.raw_element_size()?;
2690        let result_total_bytes = checked_mul_usize(
2691            resolved.result_elements,
2692            elem_size,
2693            "contiguous slice result size in bytes",
2694        )?;
2695        let dataset_strides = row_major_strides(shape, "contiguous dataset stride")?;
2696        let result_dims = resolved.result_dims_with_collapsed();
2697        let result_strides = result_strides_for_dims(&result_dims)?;
2698        let result_buf = self.read_contiguous_slice_bytes_direct(
2699            address,
2700            size,
2701            resolved,
2702            ContiguousSliceDirectLayout {
2703                dataset_strides: &dataset_strides,
2704                result_strides: &result_strides,
2705                elem_size,
2706                result_total_bytes,
2707            },
2708        )?;
2709
2710        self.decode_buffer_with_shape::<T>(
2711            &result_buf,
2712            resolved.result_elements,
2713            &resolved.result_shape,
2714        )
2715    }
2716
2717    fn read_contiguous_slice_bytes_direct(
2718        &self,
2719        address: u64,
2720        size: u64,
2721        resolved: &ResolvedSelection,
2722        layout: ContiguousSliceDirectLayout<'_>,
2723    ) -> Result<Vec<u8>> {
2724        let shape = &self.dataspace.dims;
2725        let ndim = shape.len();
2726        if resolved.dims.len() != ndim
2727            || layout.dataset_strides.len() != ndim
2728            || layout.result_strides.len() != ndim
2729        {
2730            return Err(Error::InvalidData(format!(
2731                "contiguous slice layout rank does not match dataset rank {ndim}"
2732            )));
2733        }
2734
2735        let storage_len = if self.external_files.is_some() {
2736            checked_mul_usize(
2737                checked_usize(self.num_elements()?, "dataset element count")?,
2738                layout.elem_size,
2739                "external dataset size",
2740            )?
2741        } else {
2742            checked_usize(size, "contiguous dataset size")?
2743        };
2744        let tail_start = contiguous_slice_tail_start(shape, resolved);
2745        let block_elements = contiguous_slice_block_elements(resolved, tail_start)?;
2746        let block_bytes = checked_mul_usize(
2747            block_elements,
2748            layout.elem_size,
2749            "contiguous slice block size in bytes",
2750        )?;
2751        let mut result_buf = self.make_output_buffer(layout.result_total_bytes);
2752
2753        let prefix_blocks =
2754            resolved.dims[..tail_start]
2755                .iter()
2756                .try_fold(1usize, |acc, dim| -> Result<usize> {
2757                    checked_mul_usize(acc, dim.count, "contiguous slice block count")
2758                })?;
2759        let mut counters = vec![0usize; tail_start];
2760
2761        for _ in 0..prefix_blocks {
2762            let mut source_elem = 0usize;
2763            let mut result_elem = 0usize;
2764
2765            for (d, &counter) in counters.iter().enumerate().take(tail_start) {
2766                let ordinal = u64::try_from(counter).map_err(|_| {
2767                    Error::InvalidData("contiguous slice ordinal exceeds u64".to_string())
2768                })?;
2769                let coord = checked_add_u64(
2770                    resolved.dims[d].start,
2771                    checked_mul_u64(
2772                        ordinal,
2773                        resolved.dims[d].step,
2774                        "contiguous slice coordinate",
2775                    )?,
2776                    "contiguous slice coordinate",
2777                )?;
2778                let coord = checked_usize(coord, "contiguous slice source index")?;
2779                let source_term =
2780                    checked_mul_usize(coord, layout.dataset_strides[d], "contiguous slice source")?;
2781                let result_term = checked_mul_usize(
2782                    counter,
2783                    layout.result_strides[d],
2784                    "contiguous slice result",
2785                )?;
2786                source_elem =
2787                    checked_add_usize(source_elem, source_term, "contiguous slice source")?;
2788                result_elem =
2789                    checked_add_usize(result_elem, result_term, "contiguous slice result")?;
2790            }
2791
2792            for (d, &dataset_stride) in layout
2793                .dataset_strides
2794                .iter()
2795                .enumerate()
2796                .take(ndim)
2797                .skip(tail_start)
2798            {
2799                let coord = checked_usize(resolved.dims[d].start, "contiguous slice source index")?;
2800                let source_term =
2801                    checked_mul_usize(coord, dataset_stride, "contiguous slice source")?;
2802                source_elem =
2803                    checked_add_usize(source_elem, source_term, "contiguous slice source")?;
2804            }
2805
2806            let source_start = checked_mul_usize(
2807                source_elem,
2808                layout.elem_size,
2809                "contiguous slice source byte offset",
2810            )?;
2811            let source_end = checked_add_usize(
2812                source_start,
2813                block_bytes,
2814                "contiguous slice source byte end",
2815            )?;
2816            if source_end > storage_len {
2817                return Err(Error::InvalidData(format!(
2818                    "contiguous slice range {}..{} exceeds dataset storage size {}",
2819                    source_start, source_end, storage_len
2820                )));
2821            }
2822
2823            let dst_start = checked_mul_usize(
2824                result_elem,
2825                layout.elem_size,
2826                "contiguous slice destination byte offset",
2827            )?;
2828            let dst_end = checked_add_usize(
2829                dst_start,
2830                block_bytes,
2831                "contiguous slice destination byte end",
2832            )?;
2833            if dst_end > result_buf.len() {
2834                return Err(Error::InvalidData(format!(
2835                    "contiguous slice destination range {}..{} exceeds result size {}",
2836                    dst_start,
2837                    dst_end,
2838                    result_buf.len()
2839                )));
2840            }
2841
2842            let block = self.read_contiguous_logical_range(address, source_start, block_bytes)?;
2843            if block.len() != block_bytes {
2844                return Err(Error::InvalidData(format!(
2845                    "contiguous slice read returned {} bytes, expected {}",
2846                    block.len(),
2847                    block_bytes
2848                )));
2849            }
2850            result_buf[dst_start..dst_end].copy_from_slice(&block);
2851
2852            let mut carry = true;
2853            for d in (0..tail_start).rev() {
2854                if carry {
2855                    counters[d] += 1;
2856                    if counters[d] < resolved.dims[d].count {
2857                        carry = false;
2858                    } else {
2859                        counters[d] = 0;
2860                    }
2861                }
2862            }
2863        }
2864
2865        Ok(result_buf)
2866    }
2867
2868    fn read_compact_slice<T: H5Type>(
2869        &self,
2870        data: &[u8],
2871        selection: &SliceInfo,
2872    ) -> Result<ArrayD<T>> {
2873        let full = self.read_compact::<T>(data)?;
2874        slice_array(&full, selection, &self.dataspace.dims)
2875    }
2876
2877    fn decode_buffer_with_shape<T: H5Type>(
2878        &self,
2879        raw: &[u8],
2880        n: usize,
2881        shape: &[usize],
2882    ) -> Result<ArrayD<T>> {
2883        let elem_size = self.raw_element_size()?;
2884        let expected_bytes = checked_mul_usize(n, elem_size, "decoded buffer byte length")?;
2885        if raw.len() != expected_bytes {
2886            return Err(Error::InvalidData(format!(
2887                "decoded buffer has {} bytes, expected {} bytes",
2888                raw.len(),
2889                expected_bytes
2890            )));
2891        }
2892
2893        if let Some(elements) = T::decode_vec(raw, &self.datatype, n) {
2894            let elements = elements?;
2895            return ArrayD::from_shape_vec(IxDyn(shape), elements)
2896                .map_err(|e| Error::InvalidData(format!("array shape error: {e}")));
2897        }
2898
2899        let mut elements = Vec::with_capacity(n);
2900        for i in 0..n {
2901            let start = checked_mul_usize(i, elem_size, "decoded element byte offset")?;
2902            let end = checked_mul_usize(i + 1, elem_size, "decoded element end offset")?;
2903            elements.push(T::from_bytes(&raw[start..end], &self.datatype)?);
2904        }
2905
2906        ArrayD::from_shape_vec(IxDyn(shape), elements)
2907            .map_err(|e| Error::InvalidData(format!("array shape error: {e}")))
2908    }
2909
2910    fn decode_raw_data<T: H5Type>(&self, raw: &[u8]) -> Result<ArrayD<T>> {
2911        let n = checked_usize(self.num_elements()?, "dataset element count")?;
2912        let mut shape = Vec::with_capacity(self.dataspace.dims.len());
2913        for &dim in &self.dataspace.dims {
2914            shape.push(checked_usize(dim, "dataset dimension")?);
2915        }
2916        self.decode_buffer_with_shape::<T>(raw, n, &shape)
2917    }
2918
2919    fn make_fill_array<T: H5Type>(&self) -> Result<ArrayD<T>> {
2920        let n = checked_usize(self.num_elements()?, "dataset element count")?;
2921        let mut shape = Vec::with_capacity(self.dataspace.dims.len());
2922        for &dim in &self.dataspace.dims {
2923            shape.push(checked_usize(dim, "dataset dimension")?);
2924        }
2925        self.make_fill_array_from_shape::<T>(n, &shape)
2926    }
2927
2928    fn make_fill_array_from_shape<T: H5Type>(
2929        &self,
2930        element_count: usize,
2931        shape: &[usize],
2932    ) -> Result<ArrayD<T>> {
2933        let elem_size = dtype_element_size(&self.datatype)?;
2934        let total_bytes = checked_mul_usize(element_count, elem_size, "fill result size in bytes")?;
2935        let fill = self.make_output_buffer(total_bytes);
2936        self.decode_buffer_with_shape::<T>(&fill, element_count, shape)
2937    }
2938
2939    fn make_output_buffer(&self, total_bytes: usize) -> Vec<u8> {
2940        let mut buf = vec![0u8; total_bytes];
2941        self.fill_output_buffer(&mut buf);
2942        buf
2943    }
2944
2945    fn fill_output_buffer(&self, buf: &mut [u8]) {
2946        buf.fill(0);
2947        if let Some(ref fv) = self.fill_value {
2948            if let Some(ref fill_bytes) = fv.value {
2949                if !fill_bytes.is_empty() {
2950                    for chunk in buf.chunks_exact_mut(fill_bytes.len()) {
2951                        chunk.copy_from_slice(fill_bytes);
2952                    }
2953                }
2954            }
2955        }
2956    }
2957
2958    fn validate_allocated_raw_data_len(&self, storage_kind: &str, actual_len: usize) -> Result<()> {
2959        let expected_len = self.raw_byte_len()?;
2960        if actual_len != expected_len {
2961            return Err(Error::InvalidData(format!(
2962                "{storage_kind} raw data has {actual_len} bytes, expected {expected_len} bytes"
2963            )));
2964        }
2965        Ok(())
2966    }
2967
2968    fn convert_to_native_endian(&self, bytes: &mut [u8]) -> Result<()> {
2969        let count = checked_usize(self.num_elements()?, "dataset element count")?;
2970        convert_datatype_to_native_endian(&self.datatype, self.vlen_reference_size(), bytes, count)
2971    }
2972}
2973
2974fn native_byte_order() -> ByteOrder {
2975    if cfg!(target_endian = "little") {
2976        ByteOrder::LittleEndian
2977    } else {
2978        ByteOrder::BigEndian
2979    }
2980}
2981
2982fn convert_datatype_to_native_endian(
2983    dtype: &Datatype,
2984    vlen_reference_size: usize,
2985    bytes: &mut [u8],
2986    count: usize,
2987) -> Result<()> {
2988    match dtype {
2989        Datatype::FixedPoint {
2990            size, byte_order, ..
2991        }
2992        | Datatype::FloatingPoint { size, byte_order }
2993        | Datatype::Bitfield { size, byte_order } => {
2994            swap_elements_to_native(bytes, count, *size as usize, *byte_order)
2995        }
2996        Datatype::Enum { base, .. } => {
2997            convert_datatype_to_native_endian(base, vlen_reference_size, bytes, count)
2998        }
2999        Datatype::Array { base, dims } => {
3000            let array_count = dims.iter().try_fold(1usize, |acc, &dim| {
3001                checked_mul_usize(
3002                    acc,
3003                    checked_usize(dim, "array datatype dimension")?,
3004                    "array datatype element count",
3005                )
3006            })?;
3007            let total_count =
3008                checked_mul_usize(count, array_count, "array datatype total element count")?;
3009            convert_datatype_to_native_endian(base, vlen_reference_size, bytes, total_count)
3010        }
3011        Datatype::Compound { size, fields } => {
3012            let record_size = *size as usize;
3013            let required = checked_mul_usize(count, record_size, "compound byte length")?;
3014            if bytes.len() < required {
3015                return Err(Error::InvalidData(format!(
3016                    "compound native-endian conversion needs {required} bytes, got {}",
3017                    bytes.len()
3018                )));
3019            }
3020
3021            for record in 0..count {
3022                let record_start =
3023                    checked_mul_usize(record, record_size, "compound record byte offset")?;
3024                for field in fields {
3025                    let field_offset = field.byte_offset as usize;
3026                    let field_size =
3027                        raw_element_size_for_datatype(&field.datatype, vlen_reference_size)?;
3028                    let field_start = checked_add_usize(
3029                        record_start,
3030                        field_offset,
3031                        "compound field byte offset",
3032                    )?;
3033                    let field_end =
3034                        checked_add_usize(field_start, field_size, "compound field byte end")?;
3035                    if field_end > bytes.len() || field_offset + field_size > record_size {
3036                        return Err(Error::InvalidData(format!(
3037                            "compound field '{}' range exceeds record size",
3038                            field.name
3039                        )));
3040                    }
3041                    convert_datatype_to_native_endian(
3042                        &field.datatype,
3043                        vlen_reference_size,
3044                        &mut bytes[field_start..field_end],
3045                        1,
3046                    )?;
3047                }
3048            }
3049            Ok(())
3050        }
3051        Datatype::String { .. }
3052        | Datatype::VarLen { .. }
3053        | Datatype::Opaque { .. }
3054        | Datatype::Reference { .. } => Ok(()),
3055    }
3056}
3057
3058fn swap_elements_to_native(
3059    bytes: &mut [u8],
3060    count: usize,
3061    elem_size: usize,
3062    byte_order: ByteOrder,
3063) -> Result<()> {
3064    let required = checked_mul_usize(count, elem_size, "native-endian byte length")?;
3065    if bytes.len() < required {
3066        return Err(Error::InvalidData(format!(
3067            "native-endian conversion needs {required} bytes, got {}",
3068            bytes.len()
3069        )));
3070    }
3071
3072    if elem_size <= 1 || byte_order == native_byte_order() {
3073        return Ok(());
3074    }
3075
3076    for chunk in bytes[..required].chunks_exact_mut(elem_size) {
3077        chunk.reverse();
3078    }
3079    Ok(())
3080}
3081
3082fn attribute_from_message_storage(message: &AttributeMessage, context: &FileContext) -> Attribute {
3083    let raw_data = match &message.datatype {
3084        Datatype::VarLen {
3085            base,
3086            kind: VarLenKind::String,
3087            ..
3088        } if matches!(base.as_ref(), Datatype::FixedPoint { size: 1, .. })
3089            && matches!(message.dataspace.num_elements(), Ok(1)) =>
3090        {
3091            resolve_vlen_bytes_storage(
3092                &message.raw_data,
3093                context.storage.as_ref(),
3094                context.superblock.offset_size,
3095                context.superblock.length_size,
3096            )
3097            .unwrap_or_else(|| message.raw_data.clone())
3098        }
3099        _ => message.raw_data.clone(),
3100    };
3101
3102    Attribute {
3103        name: message.name.clone(),
3104        datatype: message.datatype.clone(),
3105        shape: match message.dataspace.dataspace_type {
3106            DataspaceType::Scalar => vec![],
3107            DataspaceType::Null => vec![0],
3108            DataspaceType::Simple => message.dataspace.dims.clone(),
3109        },
3110        raw_data,
3111    }
3112}
3113
3114fn normalize_layout(layout: DataLayout, dataspace: &DataspaceMessage) -> DataLayout {
3115    match layout {
3116        DataLayout::Chunked {
3117            address,
3118            mut dims,
3119            mut element_size,
3120            chunk_indexing,
3121        } if dims.len() == dataspace.dims.len() + 1 => {
3122            if let Some(legacy_element_size) = dims.pop() {
3123                if element_size == 0 {
3124                    element_size = legacy_element_size;
3125                }
3126            }
3127            DataLayout::Chunked {
3128                address,
3129                dims,
3130                element_size,
3131                chunk_indexing,
3132            }
3133        }
3134        other => other,
3135    }
3136}
3137
3138fn raw_element_size_for_datatype(dtype: &Datatype, vlen_reference_size: usize) -> Result<usize> {
3139    match dtype {
3140        Datatype::String {
3141            size: StringSize::Variable,
3142            ..
3143        }
3144        | Datatype::VarLen { .. } => Ok(vlen_reference_size),
3145        Datatype::Array { base, dims } => {
3146            let base_size = raw_element_size_for_datatype(base, vlen_reference_size)?;
3147            let count = dims.iter().try_fold(1usize, |acc, &dim| {
3148                let dim = checked_usize(dim, "array datatype dimension")?;
3149                checked_mul_usize(acc, dim, "array datatype element count")
3150            })?;
3151            checked_mul_usize(base_size, count, "array datatype byte size")
3152        }
3153        Datatype::Enum { base, .. } => raw_element_size_for_datatype(base, vlen_reference_size),
3154        Datatype::FixedPoint { size, .. }
3155        | Datatype::FloatingPoint { size, .. }
3156        | Datatype::Bitfield { size, .. }
3157        | Datatype::Reference { size, .. } => Ok(*size as usize),
3158        Datatype::String {
3159            size: StringSize::Fixed(len),
3160            ..
3161        } => Ok(*len as usize),
3162        Datatype::Compound { size, .. } | Datatype::Opaque { size, .. } => Ok(*size as usize),
3163    }
3164}
3165
3166#[cfg(test)]
3167/// Copy a chunk's data into the flat output buffer at the correct position.
3168fn copy_chunk_to_flat(
3169    chunk_data: &[u8],
3170    flat: &mut [u8],
3171    chunk_offsets: &[u64],
3172    chunk_shape: &[u64],
3173    dataset_shape: &[u64],
3174    elem_size: usize,
3175) -> Result<()> {
3176    let dataset_strides = row_major_strides(dataset_shape, "dataset stride")
3177        .expect("dataset strides should fit in usize");
3178    let chunk_strides =
3179        row_major_strides(chunk_shape, "chunk stride").expect("chunk strides should fit in usize");
3180    copy_chunk_to_flat_with_strides(
3181        chunk_data,
3182        flat,
3183        ChunkCopyLayout {
3184            chunk_offsets,
3185            chunk_shape,
3186            dataset_shape,
3187            dataset_strides: &dataset_strides,
3188            chunk_strides: &chunk_strides,
3189            elem_size,
3190        },
3191    )
3192}
3193
3194fn copy_chunk_to_flat_with_strides(
3195    chunk_data: &[u8],
3196    flat: &mut [u8],
3197    layout: ChunkCopyLayout<'_>,
3198) -> Result<()> {
3199    unsafe {
3200        copy_chunk_to_flat_with_strides_ptr(
3201            chunk_data,
3202            FlatBufferPtr {
3203                ptr: flat.as_mut_ptr(),
3204                len: flat.len(),
3205            },
3206            layout,
3207        )
3208    }
3209}
3210
3211#[inline(always)]
3212unsafe fn copy_chunk_to_flat_with_strides_ptr(
3213    chunk_data: &[u8],
3214    flat: FlatBufferPtr,
3215    layout: ChunkCopyLayout<'_>,
3216) -> Result<()> {
3217    let ndim = layout.dataset_shape.len();
3218    if layout.chunk_offsets.len() != ndim
3219        || layout.chunk_shape.len() != ndim
3220        || layout.dataset_strides.len() != ndim
3221        || layout.chunk_strides.len() != ndim
3222    {
3223        return Err(Error::InvalidData(format!(
3224            "chunk copy layout rank does not match dataset rank {ndim}"
3225        )));
3226    }
3227
3228    if ndim == 0 {
3229        if chunk_data.len() < layout.elem_size || flat.len < layout.elem_size {
3230            return Err(Error::InvalidData(format!(
3231                "scalar chunk copy requires {} bytes, got source {} and destination {}",
3232                layout.elem_size,
3233                chunk_data.len(),
3234                flat.len
3235            )));
3236        }
3237        std::ptr::copy_nonoverlapping(chunk_data.as_ptr(), flat.ptr, layout.elem_size);
3238        return Ok(());
3239    }
3240
3241    // Total elements in this chunk (clamped to dataset boundaries)
3242    let mut actual_chunk_shape = Vec::with_capacity(ndim);
3243    for i in 0..ndim {
3244        if layout.chunk_offsets[i] >= layout.dataset_shape[i] {
3245            return Err(Error::InvalidData(format!(
3246                "chunk offset {} is outside dimension {} of size {}",
3247                layout.chunk_offsets[i], i, layout.dataset_shape[i]
3248            )));
3249        }
3250        let remaining = layout.dataset_shape[i] - layout.chunk_offsets[i];
3251        actual_chunk_shape.push(checked_usize(
3252            remaining.min(layout.chunk_shape[i]),
3253            "actual chunk extent",
3254        )?);
3255    }
3256
3257    let row_elems = *actual_chunk_shape.last().unwrap_or(&1);
3258    let row_bytes = checked_mul_usize(row_elems, layout.elem_size, "chunk row bytes")?;
3259    let mut dataset_origin = 0usize;
3260    for (d, offset) in layout.chunk_offsets.iter().enumerate() {
3261        let offset = checked_usize(*offset, "chunk offset")?;
3262        let term = checked_mul_usize(offset, layout.dataset_strides[d], "chunk origin")?;
3263        dataset_origin = checked_add_usize(dataset_origin, term, "chunk origin")?;
3264    }
3265
3266    if ndim == 1 {
3267        let dst_start = checked_mul_usize(dataset_origin, layout.elem_size, "chunk dst offset")?;
3268        let dst_end = checked_add_usize(dst_start, row_bytes, "chunk dst end")?;
3269        if row_bytes > chunk_data.len() || dst_end > flat.len {
3270            return Err(Error::InvalidData(format!(
3271                "chunk copy out of bounds: source row needs {} bytes from {} bytes, destination range {}..{} exceeds {} bytes",
3272                row_bytes,
3273                chunk_data.len(),
3274                dst_start,
3275                dst_end,
3276                flat.len
3277            )));
3278        }
3279        std::ptr::copy_nonoverlapping(chunk_data.as_ptr(), flat.ptr.add(dst_start), row_bytes);
3280        return Ok(());
3281    }
3282
3283    let outer_dims = &actual_chunk_shape[..ndim - 1];
3284    let total_rows = checked_product_usize(outer_dims, "chunk row count")?;
3285    let mut outer_idx = vec![0usize; ndim - 1];
3286
3287    for _ in 0..total_rows {
3288        let mut chunk_row = 0usize;
3289        let mut dataset_row = dataset_origin;
3290        for (d, outer) in outer_idx.iter().copied().enumerate() {
3291            let chunk_term = checked_mul_usize(outer, layout.chunk_strides[d], "chunk row")?;
3292            let dataset_term = checked_mul_usize(outer, layout.dataset_strides[d], "dataset row")?;
3293            chunk_row = checked_add_usize(chunk_row, chunk_term, "chunk row")?;
3294            dataset_row = checked_add_usize(dataset_row, dataset_term, "dataset row")?;
3295        }
3296
3297        let src_start = checked_mul_usize(chunk_row, layout.elem_size, "chunk src offset")?;
3298        let dst_start = checked_mul_usize(dataset_row, layout.elem_size, "chunk dst offset")?;
3299        let src_end = checked_add_usize(src_start, row_bytes, "chunk src end")?;
3300        let dst_end = checked_add_usize(dst_start, row_bytes, "chunk dst end")?;
3301        if src_end > chunk_data.len() || dst_end > flat.len {
3302            return Err(Error::InvalidData(format!(
3303                "chunk copy out of bounds: source range {}..{} of {} bytes, destination range {}..{} of {} bytes",
3304                src_start,
3305                src_end,
3306                chunk_data.len(),
3307                dst_start,
3308                dst_end,
3309                flat.len
3310            )));
3311        }
3312        std::ptr::copy_nonoverlapping(
3313            chunk_data.as_ptr().add(src_start),
3314            flat.ptr.add(dst_start),
3315            row_bytes,
3316        );
3317
3318        let mut carry = true;
3319        for d in (0..outer_idx.len()).rev() {
3320            if carry {
3321                outer_idx[d] += 1;
3322                if outer_idx[d] < outer_dims[d] {
3323                    carry = false;
3324                } else {
3325                    outer_idx[d] = 0;
3326                }
3327            }
3328        }
3329    }
3330
3331    Ok(())
3332}
3333
3334fn checked_product_usize(values: &[usize], context: &str) -> Result<usize> {
3335    let mut product = 1usize;
3336    for &value in values {
3337        product = checked_mul_usize(product, value, context)?;
3338    }
3339    Ok(product)
3340}
3341
3342fn unit_stride_chunk_overlap_plan(
3343    chunk_offsets: &[u64],
3344    chunk_shape: &[u64],
3345    dataset_shape: &[u64],
3346    resolved: &ResolvedSelection,
3347) -> Result<(Vec<usize>, Vec<usize>, Vec<usize>)> {
3348    let ndim = dataset_shape.len();
3349    let mut overlap_counts = Vec::with_capacity(ndim);
3350    let mut chunk_local_start = Vec::with_capacity(ndim);
3351    let mut result_start = Vec::with_capacity(ndim);
3352
3353    for d in 0..ndim {
3354        let chunk_start = chunk_offsets[d];
3355        let chunk_end = (chunk_start + chunk_shape[d]).min(dataset_shape[d]);
3356        let dim = &resolved.dims[d];
3357        let overlap_start = chunk_start.max(dim.start);
3358        let overlap_end = chunk_end.min(dim.end);
3359        if overlap_start >= overlap_end {
3360            return Ok((Vec::new(), Vec::new(), Vec::new()));
3361        }
3362
3363        overlap_counts.push(checked_usize(
3364            overlap_end - overlap_start,
3365            "chunk overlap size",
3366        )?);
3367        chunk_local_start.push(checked_usize(
3368            overlap_start - chunk_start,
3369            "chunk overlap start",
3370        )?);
3371        result_start.push(checked_usize(
3372            overlap_start - dim.start,
3373            "slice result overlap start",
3374        )?);
3375    }
3376
3377    Ok((overlap_counts, chunk_local_start, result_start))
3378}
3379
3380#[inline(always)]
3381fn copy_unit_stride_chunk_overlap(
3382    chunk_data: &[u8],
3383    result_buf: &mut [u8],
3384    layout: UnitStrideCopyLayout<'_>,
3385) -> Result<()> {
3386    unsafe {
3387        copy_unit_stride_chunk_overlap_ptr(
3388            chunk_data,
3389            FlatBufferPtr {
3390                ptr: result_buf.as_mut_ptr(),
3391                len: result_buf.len(),
3392            },
3393            layout,
3394        )
3395    }
3396}
3397
3398/// Copy a unit-step rectangular overlap from a chunk into the result buffer.
3399///
3400/// This is the hot path for contiguous hyperslab reads over chunked datasets:
3401/// rather than copying one element at a time, it copies contiguous runs along
3402/// the innermost dimension with a single memcpy per output row.
3403///
3404/// # Safety
3405///
3406/// The caller must guarantee that `[result_ptr .. result_ptr + result_len)` is
3407/// valid for writes. Concurrent callers must write to disjoint byte ranges.
3408#[inline(always)]
3409unsafe fn copy_unit_stride_chunk_overlap_ptr(
3410    chunk_data: &[u8],
3411    result: FlatBufferPtr,
3412    layout: UnitStrideCopyLayout<'_>,
3413) -> Result<()> {
3414    let ndim = layout.dataset_shape.len();
3415    if layout.chunk_offsets.len() != ndim
3416        || layout.chunk_shape.len() != ndim
3417        || layout.resolved.dims.len() != ndim
3418        || layout.chunk_strides.len() != ndim
3419        || layout.result_strides.len() != ndim
3420    {
3421        return Err(Error::InvalidData(format!(
3422            "unit-stride copy layout rank does not match dataset rank {ndim}"
3423        )));
3424    }
3425
3426    if ndim == 0 {
3427        if chunk_data.len() < layout.elem_size || result.len < layout.elem_size {
3428            return Err(Error::InvalidData(format!(
3429                "scalar slice copy requires {} bytes, got source {} and destination {}",
3430                layout.elem_size,
3431                chunk_data.len(),
3432                result.len
3433            )));
3434        }
3435        std::ptr::copy_nonoverlapping(chunk_data.as_ptr(), result.ptr, layout.elem_size);
3436        return Ok(());
3437    }
3438
3439    let (overlap_counts, chunk_local_start, result_start) = unit_stride_chunk_overlap_plan(
3440        layout.chunk_offsets,
3441        layout.chunk_shape,
3442        layout.dataset_shape,
3443        layout.resolved,
3444    )?;
3445    if overlap_counts.is_empty() {
3446        return Ok(());
3447    }
3448
3449    let row_elems = *overlap_counts.last().unwrap_or(&1);
3450    let row_bytes = checked_mul_usize(row_elems, layout.elem_size, "unit-stride slice row bytes")?;
3451
3452    let mut chunk_origin = 0usize;
3453    let mut result_origin = 0usize;
3454    for d in 0..ndim {
3455        let chunk_term = checked_mul_usize(
3456            chunk_local_start[d],
3457            layout.chunk_strides[d],
3458            "chunk overlap origin",
3459        )?;
3460        let result_term = checked_mul_usize(
3461            result_start[d],
3462            layout.result_strides[d],
3463            "slice result origin",
3464        )?;
3465        chunk_origin = checked_add_usize(chunk_origin, chunk_term, "chunk overlap origin")?;
3466        result_origin = checked_add_usize(result_origin, result_term, "slice result origin")?;
3467    }
3468
3469    if ndim == 1 {
3470        let src_start = checked_mul_usize(chunk_origin, layout.elem_size, "slice src offset")?;
3471        let dst_start = checked_mul_usize(result_origin, layout.elem_size, "slice dst offset")?;
3472        let src_end = checked_add_usize(src_start, row_bytes, "slice src end")?;
3473        let dst_end = checked_add_usize(dst_start, row_bytes, "slice dst end")?;
3474        if src_end > chunk_data.len() || dst_end > result.len {
3475            return Err(Error::InvalidData(format!(
3476                "unit-stride slice copy out of bounds: source range {}..{} of {} bytes, destination range {}..{} of {} bytes",
3477                src_start,
3478                src_end,
3479                chunk_data.len(),
3480                dst_start,
3481                dst_end,
3482                result.len
3483            )));
3484        }
3485        std::ptr::copy_nonoverlapping(
3486            chunk_data.as_ptr().add(src_start),
3487            result.ptr.add(dst_start),
3488            row_bytes,
3489        );
3490        return Ok(());
3491    }
3492
3493    let outer_counts = &overlap_counts[..ndim - 1];
3494    let total_rows = checked_product_usize(outer_counts, "unit-stride slice row count")?;
3495    let mut outer_idx = vec![0usize; ndim - 1];
3496
3497    for _ in 0..total_rows {
3498        let mut chunk_row = chunk_origin;
3499        let mut result_row = result_origin;
3500        for (d, outer) in outer_idx.iter().copied().enumerate() {
3501            let chunk_term = checked_mul_usize(outer, layout.chunk_strides[d], "slice chunk row")?;
3502            let result_term =
3503                checked_mul_usize(outer, layout.result_strides[d], "slice result row")?;
3504            chunk_row = checked_add_usize(chunk_row, chunk_term, "slice chunk row")?;
3505            result_row = checked_add_usize(result_row, result_term, "slice result row")?;
3506        }
3507
3508        let src_start = checked_mul_usize(chunk_row, layout.elem_size, "slice src offset")?;
3509        let dst_start = checked_mul_usize(result_row, layout.elem_size, "slice dst offset")?;
3510        let src_end = checked_add_usize(src_start, row_bytes, "slice src end")?;
3511        let dst_end = checked_add_usize(dst_start, row_bytes, "slice dst end")?;
3512        if src_end > chunk_data.len() || dst_end > result.len {
3513            return Err(Error::InvalidData(format!(
3514                "unit-stride slice copy out of bounds: source range {}..{} of {} bytes, destination range {}..{} of {} bytes",
3515                src_start,
3516                src_end,
3517                chunk_data.len(),
3518                dst_start,
3519                dst_end,
3520                result.len
3521            )));
3522        }
3523        std::ptr::copy_nonoverlapping(
3524            chunk_data.as_ptr().add(src_start),
3525            result.ptr.add(dst_start),
3526            row_bytes,
3527        );
3528
3529        let mut carry = true;
3530        for d in (0..outer_idx.len()).rev() {
3531            if carry {
3532                outer_idx[d] += 1;
3533                if outer_idx[d] < outer_counts[d] {
3534                    carry = false;
3535                } else {
3536                    outer_idx[d] = 0;
3537                }
3538            }
3539        }
3540    }
3541
3542    Ok(())
3543}
3544
3545#[allow(clippy::too_many_arguments)]
3546/// Copy selected elements from a chunk into the result buffer.
3547///
3548/// `dim_indices[d]` is a list of `(chunk_local_idx, result_dim_idx)` pairs for dimension `d`.
3549#[inline(always)]
3550fn copy_selected_elements(
3551    chunk_data: &[u8],
3552    result_buf: &mut [u8],
3553    dim_indices: &[Vec<(usize, usize)>],
3554    chunk_strides: &[usize],
3555    result_strides: &[usize],
3556    elem_size: usize,
3557    ndim: usize,
3558) -> Result<()> {
3559    if dim_indices.len() != ndim || chunk_strides.len() != ndim || result_strides.len() != ndim {
3560        return Err(Error::InvalidData(format!(
3561            "selected-element copy layout rank does not match rank {ndim}"
3562        )));
3563    }
3564
3565    // Check for empty selection
3566    if dim_indices.iter().any(|v| v.is_empty()) {
3567        return Ok(());
3568    }
3569
3570    // Recursive cartesian-product iteration, but unrolled iteratively.
3571    let counts: Vec<usize> = dim_indices.iter().map(|v| v.len()).collect();
3572    let total = checked_product_usize(&counts, "selected-element copy count")?;
3573    let mut counters = vec![0usize; ndim];
3574
3575    for _ in 0..total {
3576        let mut chunk_flat = 0;
3577        let mut result_flat = 0;
3578        for d in 0..ndim {
3579            let (cl, ri) = dim_indices[d][counters[d]];
3580            let chunk_term = checked_mul_usize(cl, chunk_strides[d], "selected chunk offset")?;
3581            let result_term = checked_mul_usize(ri, result_strides[d], "selected result offset")?;
3582            chunk_flat = checked_add_usize(chunk_flat, chunk_term, "selected chunk offset")?;
3583            result_flat = checked_add_usize(result_flat, result_term, "selected result offset")?;
3584        }
3585
3586        let src_start = checked_mul_usize(chunk_flat, elem_size, "selected source byte offset")?;
3587        let dst_start =
3588            checked_mul_usize(result_flat, elem_size, "selected destination byte offset")?;
3589        let src_end = checked_add_usize(src_start, elem_size, "selected source byte end")?;
3590        let dst_end = checked_add_usize(dst_start, elem_size, "selected destination byte end")?;
3591
3592        if src_end > chunk_data.len() || dst_end > result_buf.len() {
3593            return Err(Error::InvalidData(format!(
3594                "selected-element copy out of bounds: source range {}..{} of {} bytes, destination range {}..{} of {} bytes",
3595                src_start,
3596                src_end,
3597                chunk_data.len(),
3598                dst_start,
3599                dst_end,
3600                result_buf.len()
3601            )));
3602        }
3603        result_buf[dst_start..dst_end].copy_from_slice(&chunk_data[src_start..src_end]);
3604
3605        // Increment counters (row-major)
3606        let mut carry = true;
3607        for d in (0..ndim).rev() {
3608            if carry {
3609                counters[d] += 1;
3610                if counters[d] < dim_indices[d].len() {
3611                    carry = false;
3612                } else {
3613                    counters[d] = 0;
3614                }
3615            }
3616        }
3617    }
3618
3619    Ok(())
3620}
3621
3622/// Copy selected elements from a chunk into a raw output pointer.
3623///
3624/// This is the pointer-based variant of `copy_selected_elements`, suitable for
3625/// parallel use where multiple threads write to disjoint regions of the same buffer.
3626///
3627/// # Safety
3628///
3629/// The caller must guarantee that no two concurrent calls write to the same
3630/// byte range within `[result_ptr .. result_ptr + result_len)`.
3631#[cfg(feature = "rayon")]
3632#[allow(clippy::too_many_arguments)]
3633#[inline(always)]
3634unsafe fn copy_selected_elements_ptr(
3635    chunk_data: &[u8],
3636    result_ptr: *mut u8,
3637    result_len: usize,
3638    dim_indices: &[Vec<(usize, usize)>],
3639    chunk_strides: &[usize],
3640    result_strides: &[usize],
3641    elem_size: usize,
3642    ndim: usize,
3643) -> Result<()> {
3644    if dim_indices.len() != ndim || chunk_strides.len() != ndim || result_strides.len() != ndim {
3645        return Err(Error::InvalidData(format!(
3646            "selected-element copy layout rank does not match rank {ndim}"
3647        )));
3648    }
3649
3650    if dim_indices.iter().any(|v| v.is_empty()) {
3651        return Ok(());
3652    }
3653
3654    let counts: Vec<usize> = dim_indices.iter().map(|v| v.len()).collect();
3655    let total = checked_product_usize(&counts, "selected-element copy count")?;
3656    let mut counters = vec![0usize; ndim];
3657
3658    for _ in 0..total {
3659        let mut chunk_flat = 0;
3660        let mut result_flat = 0;
3661        for d in 0..ndim {
3662            let (cl, ri) = dim_indices[d][counters[d]];
3663            let chunk_term = checked_mul_usize(cl, chunk_strides[d], "selected chunk offset")?;
3664            let result_term = checked_mul_usize(ri, result_strides[d], "selected result offset")?;
3665            chunk_flat = checked_add_usize(chunk_flat, chunk_term, "selected chunk offset")?;
3666            result_flat = checked_add_usize(result_flat, result_term, "selected result offset")?;
3667        }
3668
3669        let src_start = checked_mul_usize(chunk_flat, elem_size, "selected source byte offset")?;
3670        let dst_start =
3671            checked_mul_usize(result_flat, elem_size, "selected destination byte offset")?;
3672        let src_end = checked_add_usize(src_start, elem_size, "selected source byte end")?;
3673        let dst_end = checked_add_usize(dst_start, elem_size, "selected destination byte end")?;
3674
3675        if src_end > chunk_data.len() || dst_end > result_len {
3676            return Err(Error::InvalidData(format!(
3677                "selected-element copy out of bounds: source range {}..{} of {} bytes, destination range {}..{} of {} bytes",
3678                src_start,
3679                src_end,
3680                chunk_data.len(),
3681                dst_start,
3682                dst_end,
3683                result_len
3684            )));
3685        }
3686        std::ptr::copy_nonoverlapping(
3687            chunk_data.as_ptr().add(src_start),
3688            result_ptr.add(dst_start),
3689            elem_size,
3690        );
3691
3692        let mut carry = true;
3693        for d in (0..ndim).rev() {
3694            if carry {
3695                counters[d] += 1;
3696                if counters[d] < dim_indices[d].len() {
3697                    carry = false;
3698                } else {
3699                    counters[d] = 0;
3700                }
3701            }
3702        }
3703    }
3704
3705    Ok(())
3706}
3707
3708/// Slice an ndarray according to a SliceInfo selection.
3709fn slice_array<T: H5Type + Clone>(
3710    array: &ArrayD<T>,
3711    selection: &SliceInfo,
3712    shape: &[u64],
3713) -> Result<ArrayD<T>> {
3714    // Build result shape
3715    let mut result_shape = Vec::new();
3716
3717    for (i, sel) in selection.selections.iter().enumerate() {
3718        let dim_size = shape[i];
3719        match sel {
3720            SliceInfoElem::Index(idx) => {
3721                if *idx >= dim_size {
3722                    return Err(Error::SliceOutOfBounds {
3723                        dim: i,
3724                        index: *idx,
3725                        size: dim_size,
3726                    });
3727                }
3728                // Don't add to result_shape — this dimension is collapsed
3729            }
3730            SliceInfoElem::Slice { start, end, step } => {
3731                let dim_size = checked_usize(dim_size, "slice dimension size")?;
3732                let actual_end = if *end == u64::MAX {
3733                    dim_size
3734                } else {
3735                    checked_usize(*end, "slice end")?.min(dim_size)
3736                };
3737                let actual_start = checked_usize(*start, "slice start")?;
3738                let actual_step = checked_usize(*step, "slice step")?;
3739                if actual_step == 0 {
3740                    return Err(Error::InvalidData("slice step cannot be 0".into()));
3741                }
3742                if actual_start > dim_size {
3743                    return Err(Error::SliceOutOfBounds {
3744                        dim: i,
3745                        index: *start,
3746                        size: shape[i],
3747                    });
3748                }
3749                let n = (actual_end - actual_start).div_ceil(actual_step);
3750                result_shape.push(n);
3751            }
3752        }
3753    }
3754
3755    // Extract elements manually (ndarray's slicing API is complex with dynamic dims)
3756    let ndim = shape.len();
3757    let total = checked_product_usize(&result_shape, "slice result element count")?;
3758    let mut elements = Vec::with_capacity(total);
3759
3760    // Generate all indices in the result
3761    let mut result_idx = vec![0usize; result_shape.len()];
3762
3763    for _ in 0..total {
3764        // Map result index to source index
3765        let mut src_idx = Vec::with_capacity(ndim);
3766        let mut ri = 0;
3767        for sel in selection.selections.iter() {
3768            match sel {
3769                SliceInfoElem::Index(idx) => {
3770                    src_idx.push(checked_usize(*idx, "slice source index")?);
3771                }
3772                SliceInfoElem::Slice { start, step, .. } => {
3773                    let start = checked_usize(*start, "slice start")?;
3774                    let step = checked_usize(*step, "slice step")?;
3775                    let offset =
3776                        checked_mul_usize(result_idx[ri], step, "slice source index offset")?;
3777                    src_idx.push(checked_add_usize(start, offset, "slice source index")?);
3778                    ri += 1;
3779                }
3780            }
3781        }
3782
3783        elements.push(array[IxDyn(&src_idx)].clone());
3784
3785        // Increment result index
3786        if !result_shape.is_empty() {
3787            let mut carry = true;
3788            for d in (0..result_shape.len()).rev() {
3789                if carry {
3790                    result_idx[d] += 1;
3791                    if result_idx[d] < result_shape[d] {
3792                        carry = false;
3793                    } else {
3794                        result_idx[d] = 0;
3795                    }
3796                }
3797            }
3798        }
3799    }
3800
3801    ArrayD::from_shape_vec(IxDyn(&result_shape), elements)
3802        .map_err(|e| Error::InvalidData(format!("slice shape error: {e}")))
3803}
3804
3805#[cfg(test)]
3806mod tests {
3807    use super::*;
3808    use crate::storage::BytesStorage;
3809    use crate::superblock::Superblock;
3810    use std::collections::HashMap;
3811
3812    fn test_context(bytes: Vec<u8>) -> Arc<FileContext> {
3813        let storage: DynStorage = Arc::new(BytesStorage::new(bytes));
3814        Arc::new(FileContext {
3815            storage,
3816            superblock: Superblock {
3817                version: 2,
3818                offset_size: 8,
3819                length_size: 8,
3820                group_leaf_node_k: 0,
3821                group_internal_node_k: 0,
3822                indexed_storage_k: 0,
3823                consistency_flags: 0,
3824                base_address: 0,
3825                free_space_address: u64::MAX,
3826                eof_address: 0,
3827                driver_info_address: u64::MAX,
3828                root_symbol_table_entry: None,
3829                root_object_header_address: Some(0),
3830                extension_address: None,
3831            },
3832            chunk_cache: Arc::new(ChunkCache::new(1024, 8)),
3833            header_cache: Arc::new(Mutex::new(HashMap::new())),
3834            dataset_path_cache: Arc::new(Mutex::new(HashMap::new())),
3835            filter_registry: Arc::new(FilterRegistry::default()),
3836            external_file_resolver: None,
3837            external_link_resolver: None,
3838            external_file_cache: Mutex::new(HashMap::new()),
3839            sohm_table: OnceLock::new(),
3840            full_file_cache: OnceLock::new(),
3841        })
3842    }
3843
3844    fn fixed_u16_dataset(layout: DataLayout, storage_bytes: Vec<u8>) -> Dataset {
3845        let context = test_context(storage_bytes);
3846        Dataset {
3847            context: context.clone(),
3848            name: "short".to_string(),
3849            data_address: 0,
3850            dataspace: DataspaceMessage {
3851                rank: 1,
3852                dims: vec![3],
3853                max_dims: None,
3854                dataspace_type: DataspaceType::Simple,
3855            },
3856            datatype: Datatype::FixedPoint {
3857                size: 2,
3858                signed: false,
3859                byte_order: ByteOrder::LittleEndian,
3860            },
3861            layout,
3862            fill_value: None,
3863            filters: None,
3864            external_files: None,
3865            attributes: Vec::new(),
3866            chunk_cache: context.chunk_cache.clone(),
3867            chunk_entry_cache: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(32).unwrap()))),
3868            full_chunk_entries: Arc::new(OnceLock::new()),
3869            full_dataset_bytes: Arc::new(OnceLock::new()),
3870            external_slots: Arc::new(OnceLock::new()),
3871            filter_registry: context.filter_registry.clone(),
3872        }
3873    }
3874
3875    #[test]
3876    fn slice_info_all() {
3877        let s = SliceInfo::all(3);
3878        assert_eq!(s.selections.len(), 3);
3879    }
3880
3881    #[test]
3882    fn raw_element_size_uses_file_vlen_reference_width() {
3883        let dtype = Datatype::VarLen {
3884            base: Box::new(Datatype::FixedPoint {
3885                size: 1,
3886                signed: false,
3887                byte_order: crate::error::ByteOrder::LittleEndian,
3888            }),
3889            kind: VarLenKind::Sequence,
3890            encoding: crate::messages::datatype::StringEncoding::Ascii,
3891            padding: crate::messages::datatype::StringPadding::NullTerminate,
3892        };
3893
3894        assert_eq!(raw_element_size_for_datatype(&dtype, 12).unwrap(), 12);
3895        assert_eq!(
3896            raw_element_size_for_datatype(
3897                &Datatype::Array {
3898                    base: Box::new(dtype),
3899                    dims: vec![2, 3],
3900                },
3901                12,
3902            )
3903            .unwrap(),
3904            72
3905        );
3906    }
3907
3908    #[test]
3909    fn array_raw_element_size_rejects_overflow() {
3910        let dtype = Datatype::Array {
3911            base: Box::new(Datatype::FixedPoint {
3912                size: 8,
3913                signed: false,
3914                byte_order: crate::error::ByteOrder::LittleEndian,
3915            }),
3916            dims: vec![u64::MAX, 2],
3917        };
3918
3919        let err = raw_element_size_for_datatype(&dtype, 12).unwrap_err();
3920        assert!(err.to_string().contains("array datatype"));
3921    }
3922
3923    #[test]
3924    fn dataset_num_elements_rejects_overflow() {
3925        let mut dataset = fixed_u16_dataset(DataLayout::Compact { data: vec![] }, Vec::new());
3926        dataset.dataspace.dims = vec![u64::MAX, 2];
3927
3928        let err = dataset.num_elements().unwrap_err();
3929        assert!(err.to_string().contains("element count"));
3930    }
3931
3932    #[test]
3933    fn compact_raw_data_requires_exact_logical_length() {
3934        let dataset = fixed_u16_dataset(
3935            DataLayout::Compact {
3936                data: vec![1, 0, 2, 0, 3],
3937            },
3938            Vec::new(),
3939        );
3940
3941        let err = dataset.read_array::<u16>().unwrap_err();
3942        assert!(
3943            matches!(err, Error::Context { .. })
3944                && err
3945                    .to_string()
3946                    .contains("compact raw data has 5 bytes, expected 6 bytes"),
3947            "expected compact raw length error, got: {err}"
3948        );
3949    }
3950
3951    #[test]
3952    fn contiguous_raw_data_requires_exact_logical_length() {
3953        let dataset = fixed_u16_dataset(
3954            DataLayout::Contiguous {
3955                address: 0,
3956                size: 5,
3957            },
3958            vec![1, 0, 2, 0, 3],
3959        );
3960
3961        let err = dataset.read_raw_bytes().unwrap_err();
3962        assert!(
3963            matches!(err, Error::Context { .. })
3964                && err
3965                    .to_string()
3966                    .contains("contiguous raw data has 5 bytes, expected 6 bytes"),
3967            "expected contiguous raw length error, got: {err}"
3968        );
3969    }
3970
3971    #[test]
3972    fn copy_chunk_1d() {
3973        let chunk_data = vec![1u8, 2, 3, 4]; // 4 elements of 1 byte each
3974        let mut flat = vec![0u8; 8];
3975        let chunk_offsets = vec![2u64]; // starts at index 2
3976        let chunk_shape = vec![4u64];
3977        let dataset_shape = vec![8u64];
3978
3979        copy_chunk_to_flat(
3980            &chunk_data,
3981            &mut flat,
3982            &chunk_offsets,
3983            &chunk_shape,
3984            &dataset_shape,
3985            1,
3986        )
3987        .unwrap();
3988        assert_eq!(flat, vec![0, 0, 1, 2, 3, 4, 0, 0]);
3989    }
3990
3991    #[test]
3992    fn copy_chunk_2d_rowwise() {
3993        let chunk_data = vec![1u8, 2, 3, 4, 5, 6];
3994        let mut flat = vec![0u8; 16];
3995        let chunk_offsets = vec![1u64, 1u64];
3996        let chunk_shape = vec![2u64, 3u64];
3997        let dataset_shape = vec![4u64, 4u64];
3998
3999        copy_chunk_to_flat(
4000            &chunk_data,
4001            &mut flat,
4002            &chunk_offsets,
4003            &chunk_shape,
4004            &dataset_shape,
4005            1,
4006        )
4007        .unwrap();
4008
4009        assert_eq!(flat, vec![0, 0, 0, 0, 0, 1, 2, 3, 0, 4, 5, 6, 0, 0, 0, 0,]);
4010    }
4011
4012    #[test]
4013    fn copy_unit_stride_chunk_overlap_2d_partial() {
4014        let chunk_data: Vec<u8> = (1..=16).collect();
4015        let mut result = vec![0u8; 6];
4016        let chunk_offsets = vec![0u64, 0u64];
4017        let chunk_shape = vec![4u64, 4u64];
4018        let dataset_shape = vec![4u64, 4u64];
4019        let resolved = ResolvedSelection {
4020            dims: vec![
4021                ResolvedSelectionDim {
4022                    start: 1,
4023                    end: 3,
4024                    step: 1,
4025                    count: 2,
4026                },
4027                ResolvedSelectionDim {
4028                    start: 1,
4029                    end: 4,
4030                    step: 1,
4031                    count: 3,
4032                },
4033            ],
4034            result_shape: vec![2, 3],
4035            result_elements: 6,
4036        };
4037        let chunk_strides = vec![4usize, 1usize];
4038        let result_strides = vec![3usize, 1usize];
4039
4040        copy_unit_stride_chunk_overlap(
4041            &chunk_data,
4042            &mut result,
4043            UnitStrideCopyLayout {
4044                chunk_offsets: &chunk_offsets,
4045                chunk_shape: &chunk_shape,
4046                dataset_shape: &dataset_shape,
4047                resolved: &resolved,
4048                chunk_strides: &chunk_strides,
4049                result_strides: &result_strides,
4050                elem_size: 1,
4051            },
4052        )
4053        .unwrap();
4054
4055        assert_eq!(result, vec![6, 7, 8, 10, 11, 12]);
4056    }
4057
4058    fn chunk_entry(offsets: &[u64], address: u64) -> chunk_index::ChunkEntry {
4059        chunk_index::ChunkEntry {
4060            address,
4061            size: 0,
4062            filter_mask: 0,
4063            offsets: offsets.to_vec(),
4064        }
4065    }
4066
4067    #[test]
4068    fn chunk_grid_coverage_detects_missing_chunk() {
4069        let mut entries = vec![
4070            chunk_entry(&[0, 0], 0x1000),
4071            chunk_entry(&[0, 2], 0x2000),
4072            chunk_entry(&[2, 0], 0x3000),
4073        ];
4074
4075        let complete =
4076            validate_chunk_grid_coverage(&mut entries, &[4, 4], &[2, 2], &[0, 0], &[1, 1]).unwrap();
4077
4078        assert!(!complete);
4079    }
4080
4081    #[test]
4082    fn chunk_grid_coverage_rejects_duplicate_offsets() {
4083        let mut entries = vec![
4084            chunk_entry(&[0, 0], 0x1000),
4085            chunk_entry(&[0, 0], 0x2000),
4086            chunk_entry(&[0, 2], 0x3000),
4087            chunk_entry(&[2, 0], 0x4000),
4088        ];
4089
4090        let err = validate_chunk_grid_coverage(&mut entries, &[4, 4], &[2, 2], &[0, 0], &[1, 1])
4091            .unwrap_err();
4092
4093        assert!(matches!(err, Error::InvalidData(_)));
4094    }
4095
4096    #[test]
4097    fn decoded_chunk_len_requires_exact_size() {
4098        let entry = chunk_entry(&[0, 0], 0x1000);
4099
4100        validate_decoded_chunk_len(&entry, &[2, 3], 4, 24).unwrap();
4101        let err = validate_decoded_chunk_len(&entry, &[2, 3], 4, 23).unwrap_err();
4102
4103        assert!(matches!(err, Error::InvalidData(_)));
4104    }
4105
4106    #[test]
4107    fn copy_chunk_errors_on_short_row() {
4108        let chunk_data = vec![1u8, 2, 3, 4, 5];
4109        let mut flat = vec![0u8; 16];
4110        let chunk_offsets = vec![1u64, 1u64];
4111        let chunk_shape = vec![2u64, 3u64];
4112        let dataset_shape = vec![4u64, 4u64];
4113
4114        let err = copy_chunk_to_flat(
4115            &chunk_data,
4116            &mut flat,
4117            &chunk_offsets,
4118            &chunk_shape,
4119            &dataset_shape,
4120            1,
4121        )
4122        .unwrap_err();
4123
4124        assert!(matches!(err, Error::InvalidData(_)));
4125    }
4126
4127    #[test]
4128    fn copy_unit_stride_chunk_overlap_errors_on_short_row() {
4129        let chunk_data: Vec<u8> = (1..=7).collect();
4130        let mut result = vec![0u8; 6];
4131        let chunk_offsets = vec![0u64, 0u64];
4132        let chunk_shape = vec![4u64, 4u64];
4133        let dataset_shape = vec![4u64, 4u64];
4134        let resolved = ResolvedSelection {
4135            dims: vec![
4136                ResolvedSelectionDim {
4137                    start: 1,
4138                    end: 3,
4139                    step: 1,
4140                    count: 2,
4141                },
4142                ResolvedSelectionDim {
4143                    start: 1,
4144                    end: 4,
4145                    step: 1,
4146                    count: 3,
4147                },
4148            ],
4149            result_shape: vec![2, 3],
4150            result_elements: 6,
4151        };
4152        let chunk_strides = vec![4usize, 1usize];
4153        let result_strides = vec![3usize, 1usize];
4154
4155        let err = copy_unit_stride_chunk_overlap(
4156            &chunk_data,
4157            &mut result,
4158            UnitStrideCopyLayout {
4159                chunk_offsets: &chunk_offsets,
4160                chunk_shape: &chunk_shape,
4161                dataset_shape: &dataset_shape,
4162                resolved: &resolved,
4163                chunk_strides: &chunk_strides,
4164                result_strides: &result_strides,
4165                elem_size: 1,
4166            },
4167        )
4168        .unwrap_err();
4169
4170        assert!(matches!(err, Error::InvalidData(_)));
4171    }
4172}