Skip to main content

hdf5_reader/
dataset.rs

1use std::mem::MaybeUninit;
2use std::num::NonZeroUsize;
3use std::sync::{Arc, OnceLock};
4
5use lru::LruCache;
6use ndarray::{ArrayD, IxDyn};
7use parking_lot::Mutex;
8#[cfg(feature = "rayon")]
9use rayon::prelude::*;
10use smallvec::SmallVec;
11
12use crate::attribute_api::{collect_attribute_messages, Attribute};
13use crate::cache::{ChunkCache, ChunkKey};
14use crate::chunk_index;
15use crate::datatype_api::{dtype_element_size, H5Type};
16use crate::error::{Error, Result};
17use crate::filters::{self, FilterRegistry};
18use crate::io::Cursor;
19use crate::messages::attribute::AttributeMessage;
20use crate::messages::dataspace::{DataspaceMessage, DataspaceType};
21use crate::messages::datatype::Datatype;
22use crate::messages::fill_value::{FillTime, FillValueMessage};
23use crate::messages::filter_pipeline::FilterPipelineMessage;
24use crate::messages::layout::{ChunkIndexing, DataLayout};
25use crate::messages::HdfMessage;
26use crate::object_header::ObjectHeader;
27
28const HOT_FULL_DATASET_CACHE_MAX_BYTES: usize = 32 * 1024 * 1024;
29
30#[derive(Clone, Copy)]
31struct FlatBufferPtr {
32    ptr: *mut u8,
33    len: usize,
34}
35
36#[derive(Clone, Copy)]
37struct ChunkCopyLayout<'a> {
38    chunk_offsets: &'a [u64],
39    chunk_shape: &'a [u64],
40    dataset_shape: &'a [u64],
41    dataset_strides: &'a [usize],
42    chunk_strides: &'a [usize],
43    elem_size: usize,
44}
45
46#[derive(Clone, Copy)]
47struct UnitStrideCopyLayout<'a> {
48    chunk_offsets: &'a [u64],
49    chunk_shape: &'a [u64],
50    dataset_shape: &'a [u64],
51    resolved: &'a ResolvedSelection,
52    chunk_strides: &'a [usize],
53    result_strides: &'a [usize],
54    elem_size: usize,
55}
56
57pub(crate) struct DatasetParseContext<'f> {
58    pub(crate) file_data: &'f [u8],
59    pub(crate) offset_size: u8,
60    pub(crate) length_size: u8,
61    pub(crate) chunk_cache: Arc<ChunkCache>,
62    pub(crate) filter_registry: Arc<FilterRegistry>,
63}
64
65#[derive(Clone, Copy)]
66struct ChunkEntrySelection<'a> {
67    shape: &'a [u64],
68    ndim: usize,
69    elem_size: usize,
70    chunk_bounds: Option<(&'a [u64], &'a [u64])>,
71}
72
73unsafe impl Send for FlatBufferPtr {}
74
75unsafe impl Sync for FlatBufferPtr {}
76
77impl FlatBufferPtr {
78    #[cfg(feature = "rayon")]
79    #[inline(always)]
80    unsafe fn copy_chunk(self, chunk_data: &[u8], layout: ChunkCopyLayout<'_>) {
81        copy_chunk_to_flat_with_strides_ptr(chunk_data, self, layout);
82    }
83
84    #[cfg(feature = "rayon")]
85    #[inline(always)]
86    unsafe fn copy_selected(
87        self,
88        chunk_data: &[u8],
89        dim_indices: &[Vec<(usize, usize)>],
90        chunk_strides: &[usize],
91        result_strides: &[usize],
92        elem_size: usize,
93        ndim: usize,
94    ) {
95        copy_selected_elements_ptr(
96            chunk_data,
97            self.ptr,
98            self.len,
99            dim_indices,
100            chunk_strides,
101            result_strides,
102            elem_size,
103            ndim,
104        );
105    }
106
107    #[cfg(feature = "rayon")]
108    #[inline(always)]
109    unsafe fn copy_unit_stride_chunk_overlap(
110        self,
111        chunk_data: &[u8],
112        layout: UnitStrideCopyLayout<'_>,
113    ) -> Result<()> {
114        copy_unit_stride_chunk_overlap_ptr(chunk_data, self, layout)
115    }
116}
117
118/// Hyperslab selection for reading slices of datasets.
119#[derive(Debug, Clone)]
120pub struct SliceInfo {
121    pub selections: Vec<SliceInfoElem>,
122}
123
124/// A single dimension's selection.
125#[derive(Debug, Clone)]
126pub enum SliceInfoElem {
127    /// Select a single index (reduces dimensionality).
128    Index(u64),
129    /// Select a range with optional step.
130    Slice { start: u64, end: u64, step: u64 },
131}
132
133#[derive(Clone, Debug)]
134struct ResolvedSelectionDim {
135    start: u64,
136    end: u64,
137    step: u64,
138    count: usize,
139}
140
141#[derive(Clone, Debug, PartialEq, Eq, Hash)]
142struct ChunkEntryCacheKey {
143    index_address: u64,
144    first_chunk: SmallVec<[u64; 4]>,
145    last_chunk: SmallVec<[u64; 4]>,
146}
147
148impl ResolvedSelectionDim {
149    fn chunk_index_range(&self, chunk_extent: u64) -> Option<(u64, u64)> {
150        if self.count == 0 {
151            return None;
152        }
153
154        Some((self.start / chunk_extent, (self.end - 1) / chunk_extent))
155    }
156}
157
158#[derive(Clone, Debug)]
159struct ResolvedSelection {
160    dims: Vec<ResolvedSelectionDim>,
161    result_shape: Vec<usize>,
162    result_elements: usize,
163}
164
165impl ResolvedSelection {
166    fn result_dims_with_collapsed(&self) -> Vec<usize> {
167        self.dims.iter().map(|dim| dim.count).collect()
168    }
169
170    fn is_unit_stride(&self) -> bool {
171        self.dims.iter().all(|dim| dim.step == 1)
172    }
173}
174
175impl SliceInfo {
176    /// Create a selection that reads everything.
177    pub fn all(ndim: usize) -> Self {
178        SliceInfo {
179            selections: vec![
180                SliceInfoElem::Slice {
181                    start: 0,
182                    end: u64::MAX,
183                    step: 1,
184                };
185                ndim
186            ],
187        }
188    }
189}
190
191fn checked_usize(value: u64, context: &str) -> Result<usize> {
192    usize::try_from(value).map_err(|_| {
193        Error::InvalidData(format!(
194            "{context} value {value} exceeds platform usize capacity"
195        ))
196    })
197}
198
199fn checked_mul_usize(lhs: usize, rhs: usize, context: &str) -> Result<usize> {
200    lhs.checked_mul(rhs)
201        .ok_or_else(|| Error::InvalidData(format!("{context} exceeds platform usize capacity")))
202}
203
204fn checked_add_usize(lhs: usize, rhs: usize, context: &str) -> Result<usize> {
205    lhs.checked_add(rhs)
206        .ok_or_else(|| Error::InvalidData(format!("{context} exceeds platform usize capacity")))
207}
208
209fn expected_chunk_count(first_chunk: &[u64], last_chunk: &[u64]) -> Result<usize> {
210    let mut total = 1usize;
211    for (&first, &last) in first_chunk.iter().zip(last_chunk.iter()) {
212        let dim_count = checked_usize(last - first + 1, "selected chunk count")?;
213        total = checked_mul_usize(total, dim_count, "selected chunk count")?;
214    }
215    Ok(total)
216}
217
218fn full_dataset_chunk_count(shape: &[u64], chunk_shape: &[u64]) -> Result<usize> {
219    let mut total = 1usize;
220    for (&dim, &chunk) in shape.iter().zip(chunk_shape.iter()) {
221        let chunk_count = checked_usize(dim.div_ceil(chunk), "full dataset chunk count")?;
222        total = checked_mul_usize(total, chunk_count, "full dataset chunk count")?;
223    }
224    Ok(total)
225}
226
227fn row_major_strides(shape: &[u64], context: &str) -> Result<Vec<usize>> {
228    let ndim = shape.len();
229    if ndim == 0 {
230        return Ok(Vec::new());
231    }
232
233    let mut strides = vec![1usize; ndim];
234    for i in (0..ndim - 1).rev() {
235        let next_extent = checked_usize(shape[i + 1], context)?;
236        strides[i] = checked_mul_usize(strides[i + 1], next_extent, context)?;
237    }
238    Ok(strides)
239}
240
241fn assume_init_u8_vec(mut buffer: Vec<MaybeUninit<u8>>) -> Vec<u8> {
242    let ptr = buffer.as_mut_ptr() as *mut u8;
243    let len = buffer.len();
244    let capacity = buffer.capacity();
245    std::mem::forget(buffer);
246    unsafe { Vec::from_raw_parts(ptr, len, capacity) }
247}
248
249fn assume_init_vec<T>(mut buffer: Vec<MaybeUninit<T>>) -> Vec<T> {
250    let ptr = buffer.as_mut_ptr() as *mut T;
251    let len = buffer.len();
252    let capacity = buffer.capacity();
253    std::mem::forget(buffer);
254    unsafe { Vec::from_raw_parts(ptr, len, capacity) }
255}
256
257fn normalize_selection(selection: &SliceInfo, shape: &[u64]) -> Result<ResolvedSelection> {
258    if selection.selections.len() != shape.len() {
259        return Err(Error::InvalidData(format!(
260            "slice has {} dimensions but dataset has {}",
261            selection.selections.len(),
262            shape.len()
263        )));
264    }
265
266    let mut dims = Vec::with_capacity(shape.len());
267    let mut result_shape = Vec::new();
268    let mut result_elements = 1usize;
269
270    for (i, sel) in selection.selections.iter().enumerate() {
271        let dim_size = shape[i];
272        match sel {
273            SliceInfoElem::Index(idx) => {
274                if *idx >= dim_size {
275                    return Err(Error::SliceOutOfBounds {
276                        dim: i,
277                        index: *idx,
278                        size: dim_size,
279                    });
280                }
281                dims.push(ResolvedSelectionDim {
282                    start: *idx,
283                    end: *idx + 1,
284                    step: 1,
285                    count: 1,
286                });
287            }
288            SliceInfoElem::Slice { start, end, step } => {
289                if *step == 0 {
290                    return Err(Error::InvalidData("slice step cannot be 0".into()));
291                }
292                if *start > dim_size {
293                    return Err(Error::SliceOutOfBounds {
294                        dim: i,
295                        index: *start,
296                        size: dim_size,
297                    });
298                }
299
300                let actual_end = if *end == u64::MAX {
301                    dim_size
302                } else {
303                    (*end).min(dim_size)
304                };
305                let count_u64 = if *start >= actual_end {
306                    0
307                } else {
308                    (actual_end - *start).div_ceil(*step)
309                };
310                let count = checked_usize(count_u64, "slice element count")?;
311
312                dims.push(ResolvedSelectionDim {
313                    start: *start,
314                    end: actual_end,
315                    step: *step,
316                    count,
317                });
318                result_shape.push(count);
319                result_elements =
320                    checked_mul_usize(result_elements, count, "slice result element count")?;
321            }
322        }
323    }
324
325    Ok(ResolvedSelection {
326        dims,
327        result_shape,
328        result_elements,
329    })
330}
331
332/// A dataset within an HDF5 file.
333pub struct Dataset<'f> {
334    file_data: &'f [u8],
335    offset_size: u8,
336    length_size: u8,
337    pub(crate) name: String,
338    pub(crate) data_address: u64,
339    pub(crate) dataspace: DataspaceMessage,
340    pub(crate) datatype: Datatype,
341    pub(crate) layout: DataLayout,
342    pub(crate) fill_value: Option<FillValueMessage>,
343    pub(crate) filters: Option<FilterPipelineMessage>,
344    pub(crate) attributes: Vec<AttributeMessage>,
345    pub(crate) chunk_cache: Arc<ChunkCache>,
346    chunk_entry_cache: Arc<Mutex<LruCache<ChunkEntryCacheKey, Arc<Vec<chunk_index::ChunkEntry>>>>>,
347    full_chunk_entries: Arc<OnceLock<Arc<Vec<chunk_index::ChunkEntry>>>>,
348    full_dataset_bytes: Arc<OnceLock<Arc<Vec<u8>>>>,
349    pub(crate) filter_registry: Arc<FilterRegistry>,
350}
351
352pub(crate) struct DatasetTemplate {
353    name: String,
354    data_address: u64,
355    dataspace: DataspaceMessage,
356    datatype: Datatype,
357    layout: DataLayout,
358    fill_value: Option<FillValueMessage>,
359    filters: Option<FilterPipelineMessage>,
360    attributes: Vec<AttributeMessage>,
361    chunk_entry_cache: Arc<Mutex<LruCache<ChunkEntryCacheKey, Arc<Vec<chunk_index::ChunkEntry>>>>>,
362    full_chunk_entries: Arc<OnceLock<Arc<Vec<chunk_index::ChunkEntry>>>>,
363    full_dataset_bytes: Arc<OnceLock<Arc<Vec<u8>>>>,
364}
365
366impl<'f> Dataset<'f> {
367    pub(crate) fn from_template(
368        file_data: &'f [u8],
369        offset_size: u8,
370        length_size: u8,
371        template: Arc<DatasetTemplate>,
372        chunk_cache: Arc<ChunkCache>,
373        filter_registry: Arc<FilterRegistry>,
374    ) -> Self {
375        Dataset {
376            file_data,
377            offset_size,
378            length_size,
379            name: template.name.clone(),
380            data_address: template.data_address,
381            dataspace: template.dataspace.clone(),
382            datatype: template.datatype.clone(),
383            layout: template.layout.clone(),
384            fill_value: template.fill_value.clone(),
385            filters: template.filters.clone(),
386            attributes: template.attributes.clone(),
387            chunk_cache,
388            chunk_entry_cache: template.chunk_entry_cache.clone(),
389            full_chunk_entries: template.full_chunk_entries.clone(),
390            full_dataset_bytes: template.full_dataset_bytes.clone(),
391            filter_registry,
392        }
393    }
394
395    pub(crate) fn template(&self) -> Arc<DatasetTemplate> {
396        Arc::new(DatasetTemplate {
397            name: self.name.clone(),
398            data_address: self.data_address,
399            dataspace: self.dataspace.clone(),
400            datatype: self.datatype.clone(),
401            layout: self.layout.clone(),
402            fill_value: self.fill_value.clone(),
403            filters: self.filters.clone(),
404            attributes: self.attributes.clone(),
405            chunk_entry_cache: self.chunk_entry_cache.clone(),
406            full_chunk_entries: self.full_chunk_entries.clone(),
407            full_dataset_bytes: self.full_dataset_bytes.clone(),
408        })
409    }
410
411    pub(crate) fn from_parsed_header(
412        context: DatasetParseContext<'f>,
413        address: u64,
414        name: String,
415        header: &ObjectHeader,
416    ) -> Result<Self> {
417        let mut dataspace: Option<DataspaceMessage> = None;
418        let mut datatype: Option<Datatype> = None;
419        let mut layout: Option<DataLayout> = None;
420        let mut fill_value: Option<FillValueMessage> = None;
421        let mut filter_pipeline: Option<FilterPipelineMessage> = None;
422        let attributes = collect_attribute_messages(
423            header,
424            context.file_data,
425            context.offset_size,
426            context.length_size,
427        )?;
428
429        for msg in &header.messages {
430            match msg {
431                HdfMessage::Dataspace(ds) => dataspace = Some(ds.clone()),
432                HdfMessage::Datatype(dt) => datatype = Some(dt.datatype.clone()),
433                HdfMessage::DataLayout(dl) => layout = Some(dl.layout.clone()),
434                HdfMessage::FillValue(fv) => fill_value = Some(fv.clone()),
435                HdfMessage::FilterPipeline(fp) => filter_pipeline = Some(fp.clone()),
436                _ => {}
437            }
438        }
439
440        let dataspace =
441            dataspace.ok_or_else(|| Error::InvalidData("dataset missing dataspace".into()))?;
442        let dt = datatype.ok_or_else(|| Error::InvalidData("dataset missing datatype".into()))?;
443        let layout =
444            layout.ok_or_else(|| Error::InvalidData("dataset missing data layout".into()))?;
445        let layout = normalize_layout(layout, &dataspace);
446        let attr_fill_value = attributes
447            .iter()
448            .find(|attr| attr.name == "_FillValue" && attr.dataspace.num_elements() == 1)
449            .map(|attr| FillValueMessage {
450                defined: !attr.raw_data.is_empty(),
451                fill_time: FillTime::IfSet,
452                value: Some(attr.raw_data.clone()),
453            });
454        let fill_value = match fill_value {
455            Some(existing) if existing.value.is_some() => Some(existing),
456            _ => attr_fill_value,
457        };
458
459        Ok(Dataset {
460            file_data: context.file_data,
461            offset_size: context.offset_size,
462            length_size: context.length_size,
463            name,
464            data_address: address,
465            dataspace,
466            datatype: dt,
467            layout,
468            fill_value,
469            filters: filter_pipeline,
470            attributes,
471            chunk_cache: context.chunk_cache,
472            chunk_entry_cache: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(32).unwrap()))),
473            full_chunk_entries: Arc::new(OnceLock::new()),
474            full_dataset_bytes: Arc::new(OnceLock::new()),
475            filter_registry: context.filter_registry,
476        })
477    }
478
479    /// Dataset name.
480    pub fn name(&self) -> &str {
481        &self.name
482    }
483
484    /// The object header address used to parse this dataset.
485    /// Useful as an opaque identifier or for NC4 data_offset.
486    pub fn address(&self) -> u64 {
487        self.data_address
488    }
489
490    /// Shape of the dataset (dimensions).
491    pub fn shape(&self) -> &[u64] {
492        &self.dataspace.dims
493    }
494
495    /// Datatype of the dataset.
496    pub fn dtype(&self) -> &Datatype {
497        &self.datatype
498    }
499
500    /// Number of dimensions.
501    pub fn ndim(&self) -> usize {
502        self.dataspace.dims.len()
503    }
504
505    /// Maximum dimension sizes, if defined. `u64::MAX` indicates unlimited.
506    pub fn max_dims(&self) -> Option<&[u64]> {
507        self.dataspace.max_dims.as_deref()
508    }
509
510    /// Chunk dimensions, if the dataset is chunked.
511    pub fn chunks(&self) -> Option<Vec<u32>> {
512        match &self.layout {
513            DataLayout::Chunked { dims, .. } => Some(dims.clone()),
514            _ => None,
515        }
516    }
517
518    /// Fill value, if defined.
519    pub fn fill_value(&self) -> Option<&FillValueMessage> {
520        self.fill_value.as_ref()
521    }
522
523    /// Dataset attributes.
524    pub fn attributes(&self) -> Vec<Attribute> {
525        self.attributes
526            .iter()
527            .map(|a| {
528                Attribute::from_message_with_context(
529                    a.clone(),
530                    Some(self.file_data),
531                    self.offset_size,
532                )
533            })
534            .collect()
535    }
536
537    /// Find an attribute by name.
538    pub fn attribute(&self, name: &str) -> Result<Attribute> {
539        self.attributes
540            .iter()
541            .find(|a| a.name == name)
542            .map(|a| {
543                Attribute::from_message_with_context(
544                    a.clone(),
545                    Some(self.file_data),
546                    self.offset_size,
547                )
548            })
549            .ok_or_else(|| Error::AttributeNotFound(name.to_string()))
550    }
551
552    /// Total number of elements in the dataset.
553    pub fn num_elements(&self) -> u64 {
554        if self.dataspace.dims.is_empty() {
555            match self.dataspace.dataspace_type {
556                DataspaceType::Scalar => 1,
557                DataspaceType::Null => 0,
558                DataspaceType::Simple => 0,
559            }
560        } else {
561            self.dataspace.dims.iter().product()
562        }
563    }
564
565    /// Read the entire dataset into an n-dimensional array.
566    pub fn read_array<T: H5Type>(&self) -> Result<ArrayD<T>> {
567        let result = match &self.layout {
568            DataLayout::Compact { data } => self.read_compact::<T>(data),
569            DataLayout::Contiguous { address, size } => self.read_contiguous::<T>(*address, *size),
570            DataLayout::Chunked {
571                address,
572                dims,
573                element_size,
574                chunk_indexing,
575            } => self.read_chunked::<T>(*address, dims, *element_size, chunk_indexing.as_ref()),
576        };
577        result.map_err(|e| e.with_context(&self.name))
578    }
579
580    /// Read the entire dataset using internal chunk-level parallelism when possible.
581    ///
582    /// Non-chunked datasets fall back to `read_array`.
583    #[cfg(feature = "rayon")]
584    pub fn read_array_parallel<T: H5Type>(&self) -> Result<ArrayD<T>> {
585        match &self.layout {
586            DataLayout::Chunked {
587                address,
588                dims,
589                element_size,
590                chunk_indexing,
591            } => self.read_chunked_parallel::<T>(
592                *address,
593                dims,
594                *element_size,
595                chunk_indexing.as_ref(),
596            ),
597            _ => self.read_array::<T>(),
598        }
599    }
600
601    /// Read the entire dataset using the provided Rayon thread pool.
602    ///
603    /// Non-chunked datasets fall back to `read_array`.
604    #[cfg(feature = "rayon")]
605    pub fn read_array_in_pool<T: H5Type>(&self, pool: &rayon::ThreadPool) -> Result<ArrayD<T>> {
606        match &self.layout {
607            DataLayout::Chunked {
608                address,
609                dims,
610                element_size,
611                chunk_indexing,
612            } => pool.install(|| {
613                self.read_chunked_parallel::<T>(
614                    *address,
615                    dims,
616                    *element_size,
617                    chunk_indexing.as_ref(),
618                )
619            }),
620            _ => self.read_array::<T>(),
621        }
622    }
623
624    /// Read a hyperslab of the dataset using chunk-level parallelism when possible.
625    ///
626    /// Chunked datasets decompress overlapping chunks in parallel via Rayon.
627    /// Non-chunked layouts fall back to `read_slice`.
628    #[cfg(feature = "rayon")]
629    pub fn read_slice_parallel<T: H5Type>(&self, selection: &SliceInfo) -> Result<ArrayD<T>> {
630        let resolved = normalize_selection(selection, &self.dataspace.dims)?;
631
632        match &self.layout {
633            DataLayout::Chunked {
634                address,
635                dims,
636                element_size,
637                chunk_indexing,
638            } => self.read_chunked_slice_parallel::<T>(
639                *address,
640                dims,
641                *element_size,
642                chunk_indexing.as_ref(),
643                selection,
644                &resolved,
645            ),
646            _ => self.read_slice::<T>(selection),
647        }
648    }
649
650    /// Read a hyperslab of the dataset.
651    pub fn read_slice<T: H5Type>(&self, selection: &SliceInfo) -> Result<ArrayD<T>> {
652        let resolved = normalize_selection(selection, &self.dataspace.dims)?;
653
654        match &self.layout {
655            DataLayout::Contiguous { address, size } => {
656                self.read_contiguous_slice::<T>(*address, *size, selection, &resolved)
657            }
658            DataLayout::Compact { data } => self.read_compact_slice::<T>(data, selection),
659            DataLayout::Chunked {
660                address,
661                dims,
662                element_size,
663                chunk_indexing,
664            } => self.read_chunked_slice::<T>(
665                *address,
666                dims,
667                *element_size,
668                chunk_indexing.as_ref(),
669                selection,
670                &resolved,
671            ),
672        }
673    }
674
675    fn read_compact<T: H5Type>(&self, data: &[u8]) -> Result<ArrayD<T>> {
676        self.decode_raw_data::<T>(data)
677    }
678
679    fn read_contiguous<T: H5Type>(&self, address: u64, size: u64) -> Result<ArrayD<T>> {
680        if Cursor::is_undefined_offset(address, self.offset_size) || size == 0 {
681            // Dataset with no data written — return fill values
682            return self.make_fill_array::<T>();
683        }
684
685        let addr = address as usize;
686        let sz = size as usize;
687        if addr + sz > self.file_data.len() {
688            return Err(Error::OffsetOutOfBounds(address));
689        }
690
691        let raw = &self.file_data[addr..addr + sz];
692        self.decode_raw_data::<T>(raw)
693    }
694
695    fn read_chunked<T: H5Type>(
696        &self,
697        index_address: u64,
698        chunk_dims: &[u32],
699        _element_size: u32,
700        chunk_indexing: Option<&ChunkIndexing>,
701    ) -> Result<ArrayD<T>> {
702        if Cursor::is_undefined_offset(index_address, self.offset_size) {
703            return self.make_fill_array::<T>();
704        }
705
706        let ndim = self.ndim();
707        let shape = &self.dataspace.dims;
708        let elem_size = dtype_element_size(&self.datatype);
709        let chunk_shape: Vec<u64> = chunk_dims.iter().map(|&d| d as u64).collect();
710        let dataset_strides = row_major_strides(shape, "dataset stride")?;
711        let chunk_strides = row_major_strides(&chunk_shape, "chunk stride")?;
712
713        // Allocate output initialized from the dataset's fill value.
714        let total_elements = checked_usize(self.num_elements(), "dataset element count")?;
715        let total_bytes = checked_mul_usize(total_elements, elem_size, "dataset size in bytes")?;
716
717        let entries = self.collect_chunk_entries(
718            index_address,
719            chunk_dims,
720            chunk_indexing,
721            ChunkEntrySelection {
722                shape,
723                ndim,
724                elem_size,
725                chunk_bounds: None,
726            },
727        )?;
728
729        let full_chunk_coverage = entries.len() == full_dataset_chunk_count(shape, &chunk_shape)?;
730        if full_chunk_coverage {
731            let hot_full_dataset_bytes = if total_bytes <= HOT_FULL_DATASET_CACHE_MAX_BYTES {
732                self.full_dataset_bytes.get().cloned()
733            } else {
734                None
735            };
736            if let Some(cached_bytes) = hot_full_dataset_bytes {
737                return self.decode_raw_data::<T>(&cached_bytes);
738            }
739            if T::native_copy_compatible(&self.datatype) && std::mem::size_of::<T>() == elem_size {
740                let mut result_values: Vec<MaybeUninit<T>> =
741                    std::iter::repeat_with(MaybeUninit::<T>::uninit)
742                        .take(total_elements)
743                        .collect();
744                let result_ptr = result_values.as_mut_ptr() as *mut u8;
745                let result_len = checked_mul_usize(
746                    result_values.len(),
747                    std::mem::size_of::<T>(),
748                    "typed dataset size in bytes",
749                )?;
750
751                for entry in &entries {
752                    let chunk_data =
753                        self.load_chunk_data(entry, index_address, &chunk_shape, elem_size)?;
754                    unsafe {
755                        copy_chunk_to_flat_with_strides_ptr(
756                            &chunk_data,
757                            FlatBufferPtr {
758                                ptr: result_ptr,
759                                len: result_len,
760                            },
761                            ChunkCopyLayout {
762                                chunk_offsets: &entry.offsets,
763                                chunk_shape: &chunk_shape,
764                                dataset_shape: shape,
765                                dataset_strides: &dataset_strides,
766                                chunk_strides: &chunk_strides,
767                                elem_size,
768                            },
769                        );
770                    }
771                }
772
773                if total_bytes <= HOT_FULL_DATASET_CACHE_MAX_BYTES {
774                    let mut cached_bytes = vec![0u8; total_bytes];
775                    unsafe {
776                        std::ptr::copy_nonoverlapping(
777                            result_ptr,
778                            cached_bytes.as_mut_ptr(),
779                            total_bytes,
780                        );
781                    }
782                    let _ = self.full_dataset_bytes.set(Arc::new(cached_bytes));
783                }
784
785                let mut result_shape = Vec::with_capacity(shape.len());
786                for &dim in shape {
787                    result_shape.push(checked_usize(dim, "dataset dimension")?);
788                }
789                let result_values = assume_init_vec(result_values);
790                return ArrayD::from_shape_vec(IxDyn(&result_shape), result_values)
791                    .map_err(|e| Error::InvalidData(format!("array shape error: {e}")));
792            }
793
794            let mut flat_data = vec![MaybeUninit::<u8>::uninit(); total_bytes];
795            let flat_ptr = flat_data.as_mut_ptr() as *mut u8;
796            let flat_len = flat_data.len();
797
798            for entry in &entries {
799                let chunk_data =
800                    self.load_chunk_data(entry, index_address, &chunk_shape, elem_size)?;
801                unsafe {
802                    copy_chunk_to_flat_with_strides_ptr(
803                        &chunk_data,
804                        FlatBufferPtr {
805                            ptr: flat_ptr,
806                            len: flat_len,
807                        },
808                        ChunkCopyLayout {
809                            chunk_offsets: &entry.offsets,
810                            chunk_shape: &chunk_shape,
811                            dataset_shape: shape,
812                            dataset_strides: &dataset_strides,
813                            chunk_strides: &chunk_strides,
814                            elem_size,
815                        },
816                    );
817                }
818            }
819
820            let flat_data = assume_init_u8_vec(flat_data);
821            if total_bytes <= HOT_FULL_DATASET_CACHE_MAX_BYTES {
822                let _ = self.full_dataset_bytes.set(Arc::new(flat_data.clone()));
823            }
824            return self.decode_raw_data::<T>(&flat_data);
825        }
826
827        let mut flat_data = self.make_output_buffer(total_bytes);
828        for entry in &entries {
829            let chunk_data = self.load_chunk_data(entry, index_address, &chunk_shape, elem_size)?;
830            copy_chunk_to_flat_with_strides(
831                &chunk_data,
832                &mut flat_data,
833                ChunkCopyLayout {
834                    chunk_offsets: &entry.offsets,
835                    chunk_shape: &chunk_shape,
836                    dataset_shape: shape,
837                    dataset_strides: &dataset_strides,
838                    chunk_strides: &chunk_strides,
839                    elem_size,
840                },
841            );
842        }
843
844        self.decode_raw_data::<T>(&flat_data)
845    }
846
847    #[cfg(feature = "rayon")]
848    fn read_chunked_parallel<T: H5Type>(
849        &self,
850        index_address: u64,
851        chunk_dims: &[u32],
852        _element_size: u32,
853        chunk_indexing: Option<&ChunkIndexing>,
854    ) -> Result<ArrayD<T>> {
855        if Cursor::is_undefined_offset(index_address, self.offset_size) {
856            return self.make_fill_array::<T>();
857        }
858
859        let ndim = self.ndim();
860        let shape = &self.dataspace.dims;
861        let elem_size = dtype_element_size(&self.datatype);
862        let chunk_shape: Vec<u64> = chunk_dims.iter().map(|&d| d as u64).collect();
863        let dataset_strides = row_major_strides(shape, "dataset stride")?;
864        let chunk_strides = row_major_strides(&chunk_shape, "chunk stride")?;
865        let total_elements = checked_usize(self.num_elements(), "dataset element count")?;
866        let total_bytes = checked_mul_usize(total_elements, elem_size, "dataset size in bytes")?;
867
868        let mut entries = self.collect_chunk_entries(
869            index_address,
870            chunk_dims,
871            chunk_indexing,
872            ChunkEntrySelection {
873                shape,
874                ndim,
875                elem_size,
876                chunk_bounds: None,
877            },
878        )?;
879
880        // Dedup check: sort by output offsets and reject duplicates.
881        // Two chunks claiming the same output offsets would cause data races
882        // when writing into the flat buffer in parallel.
883        entries.sort_by(|a, b| a.offsets.cmp(&b.offsets));
884        for i in 1..entries.len() {
885            if entries[i].offsets == entries[i - 1].offsets {
886                return Err(Error::InvalidData(format!(
887                    "duplicate chunk output offsets {:?} (addresses {:#x} and {:#x})",
888                    entries[i].offsets,
889                    entries[i - 1].address,
890                    entries[i].address
891                )));
892            }
893        }
894
895        let full_chunk_coverage = entries.len() == full_dataset_chunk_count(shape, &chunk_shape)?;
896        if full_chunk_coverage {
897            if total_bytes <= HOT_FULL_DATASET_CACHE_MAX_BYTES {
898                if let Some(cached_bytes) = self.full_dataset_bytes.get() {
899                    return self.decode_raw_data::<T>(cached_bytes);
900                }
901            }
902            if T::native_copy_compatible(&self.datatype) && std::mem::size_of::<T>() == elem_size {
903                let mut result_values: Vec<MaybeUninit<T>> =
904                    std::iter::repeat_with(MaybeUninit::<T>::uninit)
905                        .take(total_elements)
906                        .collect();
907                let flat = FlatBufferPtr {
908                    ptr: result_values.as_mut_ptr() as *mut u8,
909                    len: checked_mul_usize(
910                        result_values.len(),
911                        std::mem::size_of::<T>(),
912                        "typed dataset size in bytes",
913                    )?,
914                };
915
916                entries
917                    .par_iter()
918                    .map(|entry| {
919                        self.load_chunk_data(entry, index_address, &chunk_shape, elem_size)
920                            .map(|data| unsafe {
921                                flat.copy_chunk(
922                                    &data,
923                                    ChunkCopyLayout {
924                                        chunk_offsets: &entry.offsets,
925                                        chunk_shape: &chunk_shape,
926                                        dataset_shape: shape,
927                                        dataset_strides: &dataset_strides,
928                                        chunk_strides: &chunk_strides,
929                                        elem_size,
930                                    },
931                                );
932                            })
933                    })
934                    .collect::<std::result::Result<Vec<_>, Error>>()?;
935
936                let mut result_shape = Vec::with_capacity(shape.len());
937                for &dim in shape {
938                    result_shape.push(checked_usize(dim, "dataset dimension")?);
939                }
940                if total_bytes <= HOT_FULL_DATASET_CACHE_MAX_BYTES {
941                    let mut cached_bytes = vec![0u8; total_bytes];
942                    unsafe {
943                        std::ptr::copy_nonoverlapping(
944                            flat.ptr,
945                            cached_bytes.as_mut_ptr(),
946                            total_bytes,
947                        );
948                    }
949                    let _ = self.full_dataset_bytes.set(Arc::new(cached_bytes));
950                }
951                let result_values = assume_init_vec(result_values);
952                return ArrayD::from_shape_vec(IxDyn(&result_shape), result_values)
953                    .map_err(|e| Error::InvalidData(format!("array shape error: {e}")));
954            }
955
956            let mut flat_data = vec![MaybeUninit::<u8>::uninit(); total_bytes];
957            let flat = FlatBufferPtr {
958                ptr: flat_data.as_mut_ptr() as *mut u8,
959                len: flat_data.len(),
960            };
961
962            entries
963                .par_iter()
964                .map(|entry| {
965                    self.load_chunk_data(entry, index_address, &chunk_shape, elem_size)
966                        .map(|data| unsafe {
967                            flat.copy_chunk(
968                                &data,
969                                ChunkCopyLayout {
970                                    chunk_offsets: &entry.offsets,
971                                    chunk_shape: &chunk_shape,
972                                    dataset_shape: shape,
973                                    dataset_strides: &dataset_strides,
974                                    chunk_strides: &chunk_strides,
975                                    elem_size,
976                                },
977                            );
978                        })
979                })
980                .collect::<std::result::Result<Vec<_>, Error>>()?;
981
982            let flat_data = assume_init_u8_vec(flat_data);
983            if total_bytes <= HOT_FULL_DATASET_CACHE_MAX_BYTES {
984                let _ = self.full_dataset_bytes.set(Arc::new(flat_data.clone()));
985            }
986            return self.decode_raw_data::<T>(&flat_data);
987        }
988
989        let mut flat_data = self.make_output_buffer(total_bytes);
990        let flat = FlatBufferPtr {
991            ptr: flat_data.as_mut_ptr(),
992            len: flat_data.len(),
993        };
994
995        entries
996            .par_iter()
997            .map(|entry| {
998                self.load_chunk_data(entry, index_address, &chunk_shape, elem_size)
999                    .map(|data| unsafe {
1000                        flat.copy_chunk(
1001                            &data,
1002                            ChunkCopyLayout {
1003                                chunk_offsets: &entry.offsets,
1004                                chunk_shape: &chunk_shape,
1005                                dataset_shape: shape,
1006                                dataset_strides: &dataset_strides,
1007                                chunk_strides: &chunk_strides,
1008                                elem_size,
1009                            },
1010                        );
1011                    })
1012            })
1013            .collect::<std::result::Result<Vec<_>, Error>>()?;
1014
1015        self.decode_raw_data::<T>(&flat_data)
1016    }
1017
1018    /// Collect all chunk entries by dispatching on the chunk indexing type.
1019    ///
1020    /// Shared by `read_chunked` and `read_chunked_slice`.
1021    fn collect_chunk_entries(
1022        &self,
1023        index_address: u64,
1024        chunk_dims: &[u32],
1025        chunk_indexing: Option<&ChunkIndexing>,
1026        selection: ChunkEntrySelection<'_>,
1027    ) -> Result<Vec<chunk_index::ChunkEntry>> {
1028        if selection.chunk_bounds.is_none() {
1029            if let Some(cached) = self.full_chunk_entries.get() {
1030                return Ok((**cached).clone());
1031            }
1032        }
1033
1034        let cache_key =
1035            selection
1036                .chunk_bounds
1037                .map(|(first_chunk, last_chunk)| ChunkEntryCacheKey {
1038                    index_address,
1039                    first_chunk: SmallVec::from_slice(first_chunk),
1040                    last_chunk: SmallVec::from_slice(last_chunk),
1041                });
1042
1043        if let Some(ref key) = cache_key {
1044            let mut cache = self.chunk_entry_cache.lock();
1045            if let Some(cached) = cache.get(key) {
1046                return Ok((**cached).clone());
1047            }
1048        }
1049
1050        let entries = match chunk_indexing {
1051            None => {
1052                // V1-V3: B-tree v1 chunk indexing
1053                self.collect_btree_v1_entries(
1054                    index_address,
1055                    selection.ndim,
1056                    chunk_dims,
1057                    selection.chunk_bounds,
1058                )
1059            }
1060            Some(ChunkIndexing::SingleChunk {
1061                filtered_size,
1062                filters,
1063            }) => Ok(vec![chunk_index::single_chunk_entry(
1064                index_address,
1065                *filtered_size,
1066                *filters,
1067                selection.ndim,
1068            )]),
1069            Some(ChunkIndexing::BTreeV2) => chunk_index::collect_v2_chunk_entries(
1070                self.file_data,
1071                index_address,
1072                self.offset_size,
1073                self.length_size,
1074                selection.ndim as u32,
1075                chunk_dims,
1076                selection.chunk_bounds,
1077            ),
1078            Some(ChunkIndexing::Implicit) => Ok(chunk_index::collect_implicit_chunk_entries(
1079                index_address,
1080                selection.shape,
1081                chunk_dims,
1082                selection.elem_size,
1083                selection.chunk_bounds,
1084            )),
1085            Some(ChunkIndexing::FixedArray { .. }) => {
1086                crate::fixed_array::collect_fixed_array_chunk_entries(
1087                    self.file_data,
1088                    index_address,
1089                    self.offset_size,
1090                    self.length_size,
1091                    selection.shape,
1092                    chunk_dims,
1093                    selection.chunk_bounds,
1094                )
1095            }
1096            Some(ChunkIndexing::ExtensibleArray { .. }) => {
1097                crate::extensible_array::collect_extensible_array_chunk_entries(
1098                    self.file_data,
1099                    index_address,
1100                    self.offset_size,
1101                    self.length_size,
1102                    selection.shape,
1103                    chunk_dims,
1104                    selection.chunk_bounds,
1105                )
1106            }
1107        }?;
1108
1109        if let Some(key) = cache_key {
1110            let mut cache = self.chunk_entry_cache.lock();
1111            cache.put(key, Arc::new(entries.clone()));
1112        } else {
1113            let _ = self.full_chunk_entries.set(Arc::new(entries.clone()));
1114        }
1115
1116        Ok(entries)
1117    }
1118
1119    /// Collect chunk entries from a B-tree v1 index.
1120    fn collect_btree_v1_entries(
1121        &self,
1122        btree_address: u64,
1123        ndim: usize,
1124        chunk_dims: &[u32],
1125        chunk_bounds: Option<(&[u64], &[u64])>,
1126    ) -> Result<Vec<chunk_index::ChunkEntry>> {
1127        let leaves = crate::btree_v1::collect_btree_v1_leaves(
1128            self.file_data,
1129            btree_address,
1130            self.offset_size,
1131            self.length_size,
1132            Some(ndim as u32),
1133            chunk_dims,
1134            chunk_bounds,
1135        )?;
1136
1137        let mut entries = Vec::with_capacity(leaves.len());
1138        for (key, chunk_addr) in &leaves {
1139            match key {
1140                crate::btree_v1::BTreeV1Key::RawData {
1141                    chunk_size,
1142                    filter_mask,
1143                    offsets,
1144                } => {
1145                    entries.push(chunk_index::ChunkEntry {
1146                        address: *chunk_addr,
1147                        size: *chunk_size as u64,
1148                        filter_mask: *filter_mask,
1149                        offsets: offsets[..ndim].to_vec(),
1150                    });
1151                }
1152                _ => {
1153                    return Err(Error::InvalidData(
1154                        "expected raw data key in chunk B-tree".into(),
1155                    ))
1156                }
1157            }
1158        }
1159        Ok(entries)
1160    }
1161
1162    fn load_chunk_data(
1163        &self,
1164        entry: &chunk_index::ChunkEntry,
1165        dataset_addr: u64,
1166        chunk_shape: &[u64],
1167        elem_size: usize,
1168    ) -> Result<Arc<Vec<u8>>> {
1169        let cache_key = ChunkKey {
1170            dataset_addr,
1171            chunk_offsets: smallvec::SmallVec::from_slice(&entry.offsets),
1172        };
1173
1174        if let Some(cached) = self.chunk_cache.get(&cache_key) {
1175            return Ok(cached);
1176        }
1177
1178        let addr = entry.address as usize;
1179        let size = if entry.size > 0 {
1180            entry.size as usize
1181        } else {
1182            chunk_shape.iter().product::<u64>() as usize * elem_size
1183        };
1184        if addr + size > self.file_data.len() {
1185            return Err(Error::OffsetOutOfBounds(entry.address));
1186        }
1187        let raw = &self.file_data[addr..addr + size];
1188
1189        let decoded = if let Some(ref pipeline) = self.filters {
1190            filters::apply_pipeline(
1191                raw,
1192                &pipeline.filters,
1193                entry.filter_mask,
1194                elem_size,
1195                Some(&self.filter_registry),
1196            )?
1197        } else {
1198            raw.to_vec()
1199        };
1200
1201        Ok(self.chunk_cache.insert(cache_key, decoded))
1202    }
1203
1204    /// Chunked slice: only read chunks that overlap the selection.
1205    ///
1206    /// Resolves each `SliceInfoElem` to concrete ranges, computes the chunk
1207    /// grid range per dimension, and only decompresses overlapping chunks.
1208    fn read_chunked_slice<T: H5Type>(
1209        &self,
1210        index_address: u64,
1211        chunk_dims: &[u32],
1212        _element_size: u32,
1213        chunk_indexing: Option<&ChunkIndexing>,
1214        _selection: &SliceInfo,
1215        resolved: &ResolvedSelection,
1216    ) -> Result<ArrayD<T>> {
1217        if resolved.result_elements == 0 {
1218            return self.make_fill_array_from_shape::<T>(0, &resolved.result_shape);
1219        }
1220
1221        if Cursor::is_undefined_offset(index_address, self.offset_size) {
1222            return self
1223                .make_fill_array_from_shape::<T>(resolved.result_elements, &resolved.result_shape);
1224        }
1225
1226        let ndim = self.ndim();
1227        let shape = &self.dataspace.dims;
1228        let elem_size = dtype_element_size(&self.datatype);
1229        let chunk_shape: Vec<u64> = chunk_dims.iter().map(|&d| d as u64).collect();
1230        let mut first_chunk = vec![0u64; ndim];
1231        let mut last_chunk = vec![0u64; ndim];
1232        for d in 0..ndim {
1233            let (first, last) = resolved.dims[d]
1234                .chunk_index_range(chunk_shape[d])
1235                .expect("zero-sized result handled above");
1236            first_chunk[d] = first;
1237            last_chunk[d] = last;
1238        }
1239
1240        // Collect all chunk entries.
1241        let overlapping = self.collect_chunk_entries(
1242            index_address,
1243            chunk_dims,
1244            chunk_indexing,
1245            ChunkEntrySelection {
1246                shape,
1247                ndim,
1248                elem_size,
1249                chunk_bounds: Some((&first_chunk, &last_chunk)),
1250            },
1251        )?;
1252
1253        let result_total_bytes = checked_mul_usize(
1254            resolved.result_elements,
1255            elem_size,
1256            "slice result size in bytes",
1257        )?;
1258        // Compute result strides (including collapsed dims — they have count=1).
1259        let result_dims = resolved.result_dims_with_collapsed();
1260        let mut result_strides = vec![1usize; ndim];
1261        for d in (0..ndim - 1).rev() {
1262            result_strides[d] =
1263                checked_mul_usize(result_strides[d + 1], result_dims[d + 1], "result stride")?;
1264        }
1265        let mut chunk_strides = vec![1usize; ndim];
1266        for d in (0..ndim - 1).rev() {
1267            chunk_strides[d] = checked_mul_usize(
1268                chunk_strides[d + 1],
1269                chunk_shape[d + 1] as usize,
1270                "chunk stride",
1271            )?;
1272        }
1273        let use_unit_stride_fast_path = resolved.is_unit_stride();
1274        let fully_covered_unit_stride = use_unit_stride_fast_path
1275            && overlapping.len() == expected_chunk_count(&first_chunk, &last_chunk)?;
1276
1277        if fully_covered_unit_stride {
1278            if T::native_copy_compatible(&self.datatype) && std::mem::size_of::<T>() == elem_size {
1279                let mut result_values: Vec<MaybeUninit<T>> =
1280                    std::iter::repeat_with(MaybeUninit::<T>::uninit)
1281                        .take(resolved.result_elements)
1282                        .collect();
1283                let result_ptr = result_values.as_mut_ptr() as *mut u8;
1284                let result_len = checked_mul_usize(
1285                    result_values.len(),
1286                    std::mem::size_of::<T>(),
1287                    "typed slice result size in bytes",
1288                )?;
1289
1290                for entry in &overlapping {
1291                    let chunk_data =
1292                        self.load_chunk_data(entry, index_address, &chunk_shape, elem_size)?;
1293
1294                    unsafe {
1295                        copy_unit_stride_chunk_overlap_ptr(
1296                            &chunk_data,
1297                            FlatBufferPtr {
1298                                ptr: result_ptr,
1299                                len: result_len,
1300                            },
1301                            UnitStrideCopyLayout {
1302                                chunk_offsets: &entry.offsets,
1303                                chunk_shape: &chunk_shape,
1304                                dataset_shape: shape,
1305                                resolved,
1306                                chunk_strides: &chunk_strides,
1307                                result_strides: &result_strides,
1308                                elem_size,
1309                            },
1310                        )?;
1311                    }
1312                }
1313
1314                let result_values = assume_init_vec(result_values);
1315                return ArrayD::from_shape_vec(IxDyn(&resolved.result_shape), result_values)
1316                    .map_err(|e| Error::InvalidData(format!("array shape error: {e}")));
1317            }
1318
1319            let mut result_buf = vec![MaybeUninit::<u8>::uninit(); result_total_bytes];
1320            let result_ptr = result_buf.as_mut_ptr() as *mut u8;
1321            let result_len = result_buf.len();
1322
1323            for entry in &overlapping {
1324                let chunk_data =
1325                    self.load_chunk_data(entry, index_address, &chunk_shape, elem_size)?;
1326
1327                unsafe {
1328                    copy_unit_stride_chunk_overlap_ptr(
1329                        &chunk_data,
1330                        FlatBufferPtr {
1331                            ptr: result_ptr,
1332                            len: result_len,
1333                        },
1334                        UnitStrideCopyLayout {
1335                            chunk_offsets: &entry.offsets,
1336                            chunk_shape: &chunk_shape,
1337                            dataset_shape: shape,
1338                            resolved,
1339                            chunk_strides: &chunk_strides,
1340                            result_strides: &result_strides,
1341                            elem_size,
1342                        },
1343                    )?;
1344                }
1345            }
1346
1347            let result_buf = assume_init_u8_vec(result_buf);
1348            return self.decode_buffer_with_shape::<T>(
1349                &result_buf,
1350                resolved.result_elements,
1351                &resolved.result_shape,
1352            );
1353        }
1354
1355        let mut result_buf = self.make_output_buffer(result_total_bytes);
1356
1357        // For each overlapping chunk: decompress and copy matching elements.
1358        for entry in &overlapping {
1359            let cache_key = crate::cache::ChunkKey {
1360                dataset_addr: index_address,
1361                chunk_offsets: smallvec::SmallVec::from_slice(&entry.offsets),
1362            };
1363
1364            let chunk_data = if let Some(cached) = self.chunk_cache.get(&cache_key) {
1365                cached
1366            } else {
1367                let addr = entry.address as usize;
1368                let size = if entry.size > 0 {
1369                    entry.size as usize
1370                } else {
1371                    chunk_shape.iter().product::<u64>() as usize * elem_size
1372                };
1373                if addr + size > self.file_data.len() {
1374                    return Err(Error::OffsetOutOfBounds(entry.address));
1375                }
1376                let raw = &self.file_data[addr..addr + size];
1377                let decoded = if let Some(ref pipeline) = self.filters {
1378                    filters::apply_pipeline(
1379                        raw,
1380                        &pipeline.filters,
1381                        entry.filter_mask,
1382                        elem_size,
1383                        Some(&self.filter_registry),
1384                    )?
1385                } else {
1386                    raw.to_vec()
1387                };
1388                self.chunk_cache.insert(cache_key, decoded)
1389            };
1390
1391            if use_unit_stride_fast_path {
1392                copy_unit_stride_chunk_overlap(
1393                    &chunk_data,
1394                    &mut result_buf,
1395                    UnitStrideCopyLayout {
1396                        chunk_offsets: &entry.offsets,
1397                        chunk_shape: &chunk_shape,
1398                        dataset_shape: shape,
1399                        resolved,
1400                        chunk_strides: &chunk_strides,
1401                        result_strides: &result_strides,
1402                        elem_size,
1403                    },
1404                )?;
1405                continue;
1406            }
1407
1408            // For each dimension, compute which elements within this chunk fall
1409            // within the selection.
1410            let mut dim_indices: Vec<Vec<(usize, usize)>> = Vec::with_capacity(ndim);
1411            for d in 0..ndim {
1412                let chunk_start = entry.offsets[d];
1413                let chunk_end = (chunk_start + chunk_shape[d]).min(shape[d]);
1414                let dim = &resolved.dims[d];
1415                let sel_start = dim.start;
1416                let sel_end = dim.end;
1417                let sel_step = dim.step;
1418                let mut indices = Vec::new();
1419
1420                // Find first selected index >= chunk_start
1421                let first_sel = if sel_start >= chunk_start {
1422                    sel_start
1423                } else {
1424                    let steps_to_skip = (chunk_start - sel_start).div_ceil(sel_step);
1425                    sel_start + steps_to_skip * sel_step
1426                };
1427
1428                let mut sel_idx = first_sel;
1429                while sel_idx < sel_end && sel_idx < chunk_end {
1430                    let chunk_local = checked_usize(sel_idx - chunk_start, "chunk-local index")?;
1431                    // Compute result-space index for this dimension.
1432                    let result_dim_idx =
1433                        checked_usize((sel_idx - dim.start) / sel_step, "result index")?;
1434                    indices.push((chunk_local, result_dim_idx));
1435                    sel_idx += sel_step;
1436                }
1437
1438                dim_indices.push(indices);
1439            }
1440
1441            // Iterate over the cartesian product of matching indices.
1442            copy_selected_elements(
1443                &chunk_data,
1444                &mut result_buf,
1445                &dim_indices,
1446                &chunk_strides,
1447                &result_strides,
1448                elem_size,
1449                ndim,
1450            );
1451        }
1452
1453        self.decode_buffer_with_shape::<T>(
1454            &result_buf,
1455            resolved.result_elements,
1456            &resolved.result_shape,
1457        )
1458    }
1459
1460    /// Parallel variant of `read_chunked_slice`: decompresses overlapping chunks
1461    /// in parallel using Rayon, then copies selected elements into the result buffer.
1462    ///
1463    /// Each chunk writes to a disjoint region of the result buffer (chunks don't
1464    /// overlap in output space), so this is safe to parallelize.
1465    #[cfg(feature = "rayon")]
1466    fn read_chunked_slice_parallel<T: H5Type>(
1467        &self,
1468        index_address: u64,
1469        chunk_dims: &[u32],
1470        _element_size: u32,
1471        chunk_indexing: Option<&ChunkIndexing>,
1472        _selection: &SliceInfo,
1473        resolved: &ResolvedSelection,
1474    ) -> Result<ArrayD<T>> {
1475        if resolved.result_elements == 0 {
1476            return self.make_fill_array_from_shape::<T>(0, &resolved.result_shape);
1477        }
1478
1479        if Cursor::is_undefined_offset(index_address, self.offset_size) {
1480            return self
1481                .make_fill_array_from_shape::<T>(resolved.result_elements, &resolved.result_shape);
1482        }
1483
1484        let ndim = self.ndim();
1485        let shape = &self.dataspace.dims;
1486        let elem_size = dtype_element_size(&self.datatype);
1487        let chunk_shape: Vec<u64> = chunk_dims.iter().map(|&d| d as u64).collect();
1488        let mut first_chunk = vec![0u64; ndim];
1489        let mut last_chunk = vec![0u64; ndim];
1490        for d in 0..ndim {
1491            let (first, last) = resolved.dims[d]
1492                .chunk_index_range(chunk_shape[d])
1493                .expect("zero-sized result handled above");
1494            first_chunk[d] = first;
1495            last_chunk[d] = last;
1496        }
1497
1498        // Collect all chunk entries.
1499        let overlapping = self.collect_chunk_entries(
1500            index_address,
1501            chunk_dims,
1502            chunk_indexing,
1503            ChunkEntrySelection {
1504                shape,
1505                ndim,
1506                elem_size,
1507                chunk_bounds: Some((&first_chunk, &last_chunk)),
1508            },
1509        )?;
1510
1511        // Allocate result buffer (raw bytes) initialized from fill value.
1512        let result_total_bytes = checked_mul_usize(
1513            resolved.result_elements,
1514            elem_size,
1515            "slice result size in bytes",
1516        )?;
1517        // Compute result strides (including collapsed dims — they have count=1).
1518        let result_dims = resolved.result_dims_with_collapsed();
1519        let mut result_strides = vec![1usize; ndim];
1520        for d in (0..ndim - 1).rev() {
1521            result_strides[d] =
1522                checked_mul_usize(result_strides[d + 1], result_dims[d + 1], "result stride")?;
1523        }
1524        let mut chunk_strides = vec![1usize; ndim];
1525        for d in (0..ndim - 1).rev() {
1526            chunk_strides[d] = checked_mul_usize(
1527                chunk_strides[d + 1],
1528                chunk_shape[d + 1] as usize,
1529                "chunk stride",
1530            )?;
1531        }
1532        let use_unit_stride_fast_path = resolved.is_unit_stride();
1533        let fully_covered_unit_stride = use_unit_stride_fast_path
1534            && overlapping.len() == expected_chunk_count(&first_chunk, &last_chunk)?;
1535
1536        if fully_covered_unit_stride {
1537            if T::native_copy_compatible(&self.datatype) && std::mem::size_of::<T>() == elem_size {
1538                let mut result_values: Vec<MaybeUninit<T>> =
1539                    std::iter::repeat_with(MaybeUninit::<T>::uninit)
1540                        .take(resolved.result_elements)
1541                        .collect();
1542                let flat = FlatBufferPtr {
1543                    ptr: result_values.as_mut_ptr() as *mut u8,
1544                    len: checked_mul_usize(
1545                        result_values.len(),
1546                        std::mem::size_of::<T>(),
1547                        "typed slice result size in bytes",
1548                    )?,
1549                };
1550
1551                overlapping
1552                    .par_iter()
1553                    .map(|entry| {
1554                        let chunk_data =
1555                            self.load_chunk_data(entry, index_address, &chunk_shape, elem_size)?;
1556
1557                        unsafe {
1558                            flat.copy_unit_stride_chunk_overlap(
1559                                &chunk_data,
1560                                UnitStrideCopyLayout {
1561                                    chunk_offsets: &entry.offsets,
1562                                    chunk_shape: &chunk_shape,
1563                                    dataset_shape: shape,
1564                                    resolved,
1565                                    chunk_strides: &chunk_strides,
1566                                    result_strides: &result_strides,
1567                                    elem_size,
1568                                },
1569                            )?;
1570                        }
1571
1572                        Ok(())
1573                    })
1574                    .collect::<std::result::Result<Vec<_>, Error>>()?;
1575
1576                let result_values = assume_init_vec(result_values);
1577                return ArrayD::from_shape_vec(IxDyn(&resolved.result_shape), result_values)
1578                    .map_err(|e| Error::InvalidData(format!("array shape error: {e}")));
1579            }
1580
1581            let mut result_buf = vec![MaybeUninit::<u8>::uninit(); result_total_bytes];
1582            let flat = FlatBufferPtr {
1583                ptr: result_buf.as_mut_ptr() as *mut u8,
1584                len: result_buf.len(),
1585            };
1586
1587            overlapping
1588                .par_iter()
1589                .map(|entry| {
1590                    let chunk_data =
1591                        self.load_chunk_data(entry, index_address, &chunk_shape, elem_size)?;
1592
1593                    unsafe {
1594                        flat.copy_unit_stride_chunk_overlap(
1595                            &chunk_data,
1596                            UnitStrideCopyLayout {
1597                                chunk_offsets: &entry.offsets,
1598                                chunk_shape: &chunk_shape,
1599                                dataset_shape: shape,
1600                                resolved,
1601                                chunk_strides: &chunk_strides,
1602                                result_strides: &result_strides,
1603                                elem_size,
1604                            },
1605                        )?;
1606                    }
1607
1608                    Ok(())
1609                })
1610                .collect::<std::result::Result<Vec<_>, Error>>()?;
1611
1612            let result_buf = assume_init_u8_vec(result_buf);
1613            return self.decode_buffer_with_shape::<T>(
1614                &result_buf,
1615                resolved.result_elements,
1616                &resolved.result_shape,
1617            );
1618        }
1619
1620        let mut result_buf = self.make_output_buffer(result_total_bytes);
1621
1622        let flat = FlatBufferPtr {
1623            ptr: result_buf.as_mut_ptr(),
1624            len: result_buf.len(),
1625        };
1626
1627        overlapping
1628            .par_iter()
1629            .map(|entry| {
1630                let chunk_data =
1631                    self.load_chunk_data(entry, index_address, &chunk_shape, elem_size)?;
1632
1633                if use_unit_stride_fast_path {
1634                    unsafe {
1635                        flat.copy_unit_stride_chunk_overlap(
1636                            &chunk_data,
1637                            UnitStrideCopyLayout {
1638                                chunk_offsets: &entry.offsets,
1639                                chunk_shape: &chunk_shape,
1640                                dataset_shape: shape,
1641                                resolved,
1642                                chunk_strides: &chunk_strides,
1643                                result_strides: &result_strides,
1644                                elem_size,
1645                            },
1646                        )?;
1647                    }
1648                    return Ok(());
1649                }
1650
1651                // For each dimension, compute which elements within this chunk fall
1652                // within the selection.
1653                let mut dim_indices: Vec<Vec<(usize, usize)>> = Vec::with_capacity(ndim);
1654                for d in 0..ndim {
1655                    let chunk_start = entry.offsets[d];
1656                    let chunk_end = (chunk_start + chunk_shape[d]).min(shape[d]);
1657                    let dim = &resolved.dims[d];
1658                    let sel_start = dim.start;
1659                    let sel_end = dim.end;
1660                    let sel_step = dim.step;
1661                    let mut indices = Vec::new();
1662
1663                    let first_sel = if sel_start >= chunk_start {
1664                        sel_start
1665                    } else {
1666                        let steps_to_skip = (chunk_start - sel_start).div_ceil(sel_step);
1667                        sel_start + steps_to_skip * sel_step
1668                    };
1669
1670                    let mut sel_idx = first_sel;
1671                    while sel_idx < sel_end && sel_idx < chunk_end {
1672                        let chunk_local =
1673                            checked_usize(sel_idx - chunk_start, "chunk-local index")?;
1674                        let result_dim_idx =
1675                            checked_usize((sel_idx - dim.start) / sel_step, "result index")?;
1676                        indices.push((chunk_local, result_dim_idx));
1677                        sel_idx += sel_step;
1678                    }
1679
1680                    dim_indices.push(indices);
1681                }
1682
1683                // SAFETY: each chunk writes to disjoint output positions because
1684                // chunks occupy non-overlapping regions of the dataset grid and
1685                // the selection maps each dataset coordinate to a unique result index.
1686                unsafe {
1687                    flat.copy_selected(
1688                        &chunk_data,
1689                        &dim_indices,
1690                        &chunk_strides,
1691                        &result_strides,
1692                        elem_size,
1693                        ndim,
1694                    );
1695                }
1696
1697                Ok(())
1698            })
1699            .collect::<std::result::Result<Vec<_>, Error>>()?;
1700
1701        self.decode_buffer_with_shape::<T>(
1702            &result_buf,
1703            resolved.result_elements,
1704            &resolved.result_shape,
1705        )
1706    }
1707
1708    fn read_contiguous_slice<T: H5Type>(
1709        &self,
1710        address: u64,
1711        size: u64,
1712        selection: &SliceInfo,
1713        resolved: &ResolvedSelection,
1714    ) -> Result<ArrayD<T>> {
1715        if resolved.result_elements == 0 {
1716            return self.make_fill_array_from_shape::<T>(0, &resolved.result_shape);
1717        }
1718
1719        if Cursor::is_undefined_offset(address, self.offset_size) || size == 0 {
1720            return self
1721                .make_fill_array_from_shape::<T>(resolved.result_elements, &resolved.result_shape);
1722        }
1723
1724        let shape = &self.dataspace.dims;
1725        let ndim = shape.len();
1726        let elem_size = dtype_element_size(&self.datatype);
1727
1728        // Check if this is a simple contiguous sub-range where we can compute
1729        // byte offsets directly (all Slice selections with step=1 and the
1730        // selection is contiguous in memory — i.e., all dimensions except the
1731        // outermost select the full range).
1732        let can_direct_extract = ndim > 0
1733            && selection.selections.iter().enumerate().all(|(d, sel)| {
1734                match sel {
1735                    SliceInfoElem::Slice { step, start, end } => {
1736                        if *step != 1 {
1737                            return false;
1738                        }
1739                        // Inner dimensions (d > 0) must select the full range
1740                        // for the data to be contiguous in memory.
1741                        if d > 0 {
1742                            *start == 0 && (*end == u64::MAX || *end >= shape[d])
1743                        } else {
1744                            true
1745                        }
1746                    }
1747                    SliceInfoElem::Index(_) => {
1748                        // Index on the outermost dim is fine (single row),
1749                        // but on inner dims it breaks contiguity.
1750                        d == 0
1751                    }
1752                }
1753            });
1754
1755        if can_direct_extract {
1756            // Compute the byte range to read from the mmap.
1757            let row_stride: u64 = shape[1..].iter().product::<u64>().max(1);
1758            let row_bytes = row_stride as usize * elem_size;
1759
1760            let (first_row, num_rows, result_shape) = match &selection.selections[0] {
1761                SliceInfoElem::Index(idx) => {
1762                    let mut rs: Vec<usize> = shape[1..].iter().map(|&d| d as usize).collect();
1763                    if rs.is_empty() {
1764                        rs = vec![];
1765                    }
1766                    (*idx, 1u64, rs)
1767                }
1768                SliceInfoElem::Slice { start, end, .. } => {
1769                    let actual_end = if *end == u64::MAX {
1770                        shape[0]
1771                    } else {
1772                        (*end).min(shape[0])
1773                    };
1774                    let count = actual_end.saturating_sub(*start);
1775                    let mut rs = vec![checked_usize(count, "contiguous slice row count")?];
1776                    for &dim in &shape[1..] {
1777                        rs.push(checked_usize(dim, "dataset dimension")?);
1778                    }
1779                    (*start, count, rs)
1780                }
1781            };
1782
1783            let byte_offset = checked_usize(address, "contiguous data address")?
1784                + checked_mul_usize(
1785                    checked_usize(first_row, "slice row offset")?,
1786                    row_bytes,
1787                    "contiguous byte offset",
1788                )?;
1789            let total_bytes = checked_mul_usize(
1790                checked_usize(num_rows, "contiguous slice row count")?,
1791                row_bytes,
1792                "contiguous slice size in bytes",
1793            )?;
1794
1795            if byte_offset + total_bytes > self.file_data.len() {
1796                return Err(Error::OffsetOutOfBounds(address));
1797            }
1798
1799            let raw = &self.file_data[byte_offset..byte_offset + total_bytes];
1800            let n = (total_bytes) / elem_size;
1801
1802            let elements = if let Some(decoded) = T::decode_vec(raw, &self.datatype, n) {
1803                decoded?
1804            } else {
1805                let mut elements = Vec::with_capacity(n);
1806                for i in 0..n {
1807                    let start = i * elem_size;
1808                    elements.push(T::from_bytes(
1809                        &raw[start..start + elem_size],
1810                        &self.datatype,
1811                    )?);
1812                }
1813                elements
1814            };
1815
1816            return ArrayD::from_shape_vec(IxDyn(&result_shape), elements)
1817                .map_err(|e| Error::InvalidData(format!("contiguous slice shape error: {e}")));
1818        }
1819
1820        // Fallback: read full data then slice.
1821        let full = self.read_contiguous::<T>(address, size)?;
1822        slice_array(&full, selection, &self.dataspace.dims)
1823    }
1824
1825    fn read_compact_slice<T: H5Type>(
1826        &self,
1827        data: &[u8],
1828        selection: &SliceInfo,
1829    ) -> Result<ArrayD<T>> {
1830        let full = self.read_compact::<T>(data)?;
1831        slice_array(&full, selection, &self.dataspace.dims)
1832    }
1833
1834    fn decode_buffer_with_shape<T: H5Type>(
1835        &self,
1836        raw: &[u8],
1837        n: usize,
1838        shape: &[usize],
1839    ) -> Result<ArrayD<T>> {
1840        let elem_size = dtype_element_size(&self.datatype);
1841
1842        if let Some(elements) = T::decode_vec(raw, &self.datatype, n) {
1843            let elements = elements?;
1844            return ArrayD::from_shape_vec(IxDyn(shape), elements)
1845                .map_err(|e| Error::InvalidData(format!("array shape error: {e}")));
1846        }
1847
1848        let mut elements = Vec::with_capacity(n);
1849        for i in 0..n {
1850            let start = checked_mul_usize(i, elem_size, "decoded element byte offset")?;
1851            let end = checked_mul_usize(i + 1, elem_size, "decoded element end offset")?;
1852            if end > raw.len() {
1853                // Pad with fill value or zeros if data is short.
1854                let padded = if end <= raw.len().saturating_add(elem_size) {
1855                    let mut buf = vec![0u8; elem_size];
1856                    let available = raw.len().saturating_sub(start);
1857                    if available > 0 {
1858                        buf[..available].copy_from_slice(&raw[start..start + available]);
1859                    }
1860                    T::from_bytes(&buf, &self.datatype)?
1861                } else {
1862                    T::from_bytes(&vec![0u8; elem_size], &self.datatype)?
1863                };
1864                elements.push(padded);
1865            } else {
1866                elements.push(T::from_bytes(&raw[start..end], &self.datatype)?);
1867            }
1868        }
1869
1870        ArrayD::from_shape_vec(IxDyn(shape), elements)
1871            .map_err(|e| Error::InvalidData(format!("array shape error: {e}")))
1872    }
1873
1874    fn decode_raw_data<T: H5Type>(&self, raw: &[u8]) -> Result<ArrayD<T>> {
1875        let n = checked_usize(self.num_elements(), "dataset element count")?;
1876        let mut shape = Vec::with_capacity(self.dataspace.dims.len());
1877        for &dim in &self.dataspace.dims {
1878            shape.push(checked_usize(dim, "dataset dimension")?);
1879        }
1880        self.decode_buffer_with_shape::<T>(raw, n, &shape)
1881    }
1882
1883    fn make_fill_array<T: H5Type>(&self) -> Result<ArrayD<T>> {
1884        let n = checked_usize(self.num_elements(), "dataset element count")?;
1885        let mut shape = Vec::with_capacity(self.dataspace.dims.len());
1886        for &dim in &self.dataspace.dims {
1887            shape.push(checked_usize(dim, "dataset dimension")?);
1888        }
1889        self.make_fill_array_from_shape::<T>(n, &shape)
1890    }
1891
1892    fn make_fill_array_from_shape<T: H5Type>(
1893        &self,
1894        element_count: usize,
1895        shape: &[usize],
1896    ) -> Result<ArrayD<T>> {
1897        let elem_size = dtype_element_size(&self.datatype);
1898        let total_bytes = checked_mul_usize(element_count, elem_size, "fill result size in bytes")?;
1899        let fill = self.make_output_buffer(total_bytes);
1900        self.decode_buffer_with_shape::<T>(&fill, element_count, shape)
1901    }
1902
1903    fn make_output_buffer(&self, total_bytes: usize) -> Vec<u8> {
1904        if let Some(ref fv) = self.fill_value {
1905            if let Some(ref fill_bytes) = fv.value {
1906                let mut buf = vec![0u8; total_bytes];
1907                if !fill_bytes.is_empty() {
1908                    for chunk in buf.chunks_exact_mut(fill_bytes.len()) {
1909                        chunk.copy_from_slice(fill_bytes);
1910                    }
1911                }
1912                buf
1913            } else {
1914                vec![0u8; total_bytes]
1915            }
1916        } else {
1917            vec![0u8; total_bytes]
1918        }
1919    }
1920}
1921
1922fn normalize_layout(layout: DataLayout, dataspace: &DataspaceMessage) -> DataLayout {
1923    match layout {
1924        DataLayout::Chunked {
1925            address,
1926            mut dims,
1927            mut element_size,
1928            chunk_indexing,
1929        } if dims.len() == dataspace.dims.len() + 1 => {
1930            if let Some(legacy_element_size) = dims.pop() {
1931                if element_size == 0 {
1932                    element_size = legacy_element_size;
1933                }
1934            }
1935            DataLayout::Chunked {
1936                address,
1937                dims,
1938                element_size,
1939                chunk_indexing,
1940            }
1941        }
1942        other => other,
1943    }
1944}
1945
1946#[cfg(test)]
1947/// Copy a chunk's data into the flat output buffer at the correct position.
1948fn copy_chunk_to_flat(
1949    chunk_data: &[u8],
1950    flat: &mut [u8],
1951    chunk_offsets: &[u64],
1952    chunk_shape: &[u64],
1953    dataset_shape: &[u64],
1954    elem_size: usize,
1955) {
1956    let dataset_strides = row_major_strides(dataset_shape, "dataset stride")
1957        .expect("dataset strides should fit in usize");
1958    let chunk_strides =
1959        row_major_strides(chunk_shape, "chunk stride").expect("chunk strides should fit in usize");
1960    copy_chunk_to_flat_with_strides(
1961        chunk_data,
1962        flat,
1963        ChunkCopyLayout {
1964            chunk_offsets,
1965            chunk_shape,
1966            dataset_shape,
1967            dataset_strides: &dataset_strides,
1968            chunk_strides: &chunk_strides,
1969            elem_size,
1970        },
1971    );
1972}
1973
1974fn copy_chunk_to_flat_with_strides(
1975    chunk_data: &[u8],
1976    flat: &mut [u8],
1977    layout: ChunkCopyLayout<'_>,
1978) {
1979    unsafe {
1980        copy_chunk_to_flat_with_strides_ptr(
1981            chunk_data,
1982            FlatBufferPtr {
1983                ptr: flat.as_mut_ptr(),
1984                len: flat.len(),
1985            },
1986            layout,
1987        );
1988    }
1989}
1990
1991#[inline(always)]
1992unsafe fn copy_chunk_to_flat_with_strides_ptr(
1993    chunk_data: &[u8],
1994    flat: FlatBufferPtr,
1995    layout: ChunkCopyLayout<'_>,
1996) {
1997    let ndim = layout.dataset_shape.len();
1998
1999    if ndim == 0 {
2000        let bytes = layout.elem_size.min(chunk_data.len()).min(flat.len);
2001        std::ptr::copy_nonoverlapping(chunk_data.as_ptr(), flat.ptr, bytes);
2002        return;
2003    }
2004
2005    // Total elements in this chunk (clamped to dataset boundaries)
2006    let mut actual_chunk_shape = Vec::with_capacity(ndim);
2007    for i in 0..ndim {
2008        let remaining = layout.dataset_shape[i] - layout.chunk_offsets[i];
2009        actual_chunk_shape.push(remaining.min(layout.chunk_shape[i]) as usize);
2010    }
2011
2012    let row_elems = *actual_chunk_shape.last().unwrap_or(&1);
2013    let row_bytes = row_elems * layout.elem_size;
2014    let dataset_origin: usize = layout
2015        .chunk_offsets
2016        .iter()
2017        .enumerate()
2018        .map(|(d, offset)| *offset as usize * layout.dataset_strides[d])
2019        .sum();
2020
2021    if ndim == 1 {
2022        let bytes = row_bytes.min(chunk_data.len());
2023        let dst_start = dataset_origin * layout.elem_size;
2024        let dst_end = dst_start + bytes;
2025        if dst_end <= flat.len {
2026            std::ptr::copy_nonoverlapping(chunk_data.as_ptr(), flat.ptr.add(dst_start), bytes);
2027        }
2028        return;
2029    }
2030
2031    let outer_dims = &actual_chunk_shape[..ndim - 1];
2032    let total_rows: usize = outer_dims.iter().product();
2033    let mut outer_idx = vec![0usize; ndim - 1];
2034
2035    for _ in 0..total_rows {
2036        let mut chunk_row = 0usize;
2037        let mut dataset_row = dataset_origin;
2038        for (d, outer) in outer_idx.iter().copied().enumerate() {
2039            chunk_row += outer * layout.chunk_strides[d];
2040            dataset_row += outer * layout.dataset_strides[d];
2041        }
2042
2043        let src_start = chunk_row * layout.elem_size;
2044        let dst_start = dataset_row * layout.elem_size;
2045        let src_end = src_start + row_bytes;
2046        let dst_end = dst_start + row_bytes;
2047        if src_end <= chunk_data.len() && dst_end <= flat.len {
2048            std::ptr::copy_nonoverlapping(
2049                chunk_data.as_ptr().add(src_start),
2050                flat.ptr.add(dst_start),
2051                row_bytes,
2052            );
2053        }
2054
2055        let mut carry = true;
2056        for d in (0..outer_idx.len()).rev() {
2057            if carry {
2058                outer_idx[d] += 1;
2059                if outer_idx[d] < outer_dims[d] {
2060                    carry = false;
2061                } else {
2062                    outer_idx[d] = 0;
2063                }
2064            }
2065        }
2066    }
2067}
2068
2069fn checked_product_usize(values: &[usize], context: &str) -> Result<usize> {
2070    let mut product = 1usize;
2071    for &value in values {
2072        product = checked_mul_usize(product, value, context)?;
2073    }
2074    Ok(product)
2075}
2076
2077fn unit_stride_chunk_overlap_plan(
2078    chunk_offsets: &[u64],
2079    chunk_shape: &[u64],
2080    dataset_shape: &[u64],
2081    resolved: &ResolvedSelection,
2082) -> Result<(Vec<usize>, Vec<usize>, Vec<usize>)> {
2083    let ndim = dataset_shape.len();
2084    let mut overlap_counts = Vec::with_capacity(ndim);
2085    let mut chunk_local_start = Vec::with_capacity(ndim);
2086    let mut result_start = Vec::with_capacity(ndim);
2087
2088    for d in 0..ndim {
2089        let chunk_start = chunk_offsets[d];
2090        let chunk_end = (chunk_start + chunk_shape[d]).min(dataset_shape[d]);
2091        let dim = &resolved.dims[d];
2092        let overlap_start = chunk_start.max(dim.start);
2093        let overlap_end = chunk_end.min(dim.end);
2094        if overlap_start >= overlap_end {
2095            return Ok((Vec::new(), Vec::new(), Vec::new()));
2096        }
2097
2098        overlap_counts.push(checked_usize(
2099            overlap_end - overlap_start,
2100            "chunk overlap size",
2101        )?);
2102        chunk_local_start.push(checked_usize(
2103            overlap_start - chunk_start,
2104            "chunk overlap start",
2105        )?);
2106        result_start.push(checked_usize(
2107            overlap_start - dim.start,
2108            "slice result overlap start",
2109        )?);
2110    }
2111
2112    Ok((overlap_counts, chunk_local_start, result_start))
2113}
2114
2115#[inline(always)]
2116fn copy_unit_stride_chunk_overlap(
2117    chunk_data: &[u8],
2118    result_buf: &mut [u8],
2119    layout: UnitStrideCopyLayout<'_>,
2120) -> Result<()> {
2121    unsafe {
2122        copy_unit_stride_chunk_overlap_ptr(
2123            chunk_data,
2124            FlatBufferPtr {
2125                ptr: result_buf.as_mut_ptr(),
2126                len: result_buf.len(),
2127            },
2128            layout,
2129        )
2130    }
2131}
2132
2133/// Copy a unit-step rectangular overlap from a chunk into the result buffer.
2134///
2135/// This is the hot path for contiguous hyperslab reads over chunked datasets:
2136/// rather than copying one element at a time, it copies contiguous runs along
2137/// the innermost dimension with a single memcpy per output row.
2138///
2139/// # Safety
2140///
2141/// The caller must guarantee that `[result_ptr .. result_ptr + result_len)` is
2142/// valid for writes. Concurrent callers must write to disjoint byte ranges.
2143#[inline(always)]
2144unsafe fn copy_unit_stride_chunk_overlap_ptr(
2145    chunk_data: &[u8],
2146    result: FlatBufferPtr,
2147    layout: UnitStrideCopyLayout<'_>,
2148) -> Result<()> {
2149    let ndim = layout.dataset_shape.len();
2150
2151    if ndim == 0 {
2152        let bytes = layout.elem_size.min(chunk_data.len()).min(result.len);
2153        std::ptr::copy_nonoverlapping(chunk_data.as_ptr(), result.ptr, bytes);
2154        return Ok(());
2155    }
2156
2157    let (overlap_counts, chunk_local_start, result_start) = unit_stride_chunk_overlap_plan(
2158        layout.chunk_offsets,
2159        layout.chunk_shape,
2160        layout.dataset_shape,
2161        layout.resolved,
2162    )?;
2163    if overlap_counts.is_empty() {
2164        return Ok(());
2165    }
2166
2167    let row_elems = *overlap_counts.last().unwrap_or(&1);
2168    let row_bytes = checked_mul_usize(row_elems, layout.elem_size, "unit-stride slice row bytes")?;
2169
2170    let mut chunk_origin = 0usize;
2171    let mut result_origin = 0usize;
2172    for d in 0..ndim {
2173        let chunk_term = checked_mul_usize(
2174            chunk_local_start[d],
2175            layout.chunk_strides[d],
2176            "chunk overlap origin",
2177        )?;
2178        let result_term = checked_mul_usize(
2179            result_start[d],
2180            layout.result_strides[d],
2181            "slice result origin",
2182        )?;
2183        chunk_origin = checked_add_usize(chunk_origin, chunk_term, "chunk overlap origin")?;
2184        result_origin = checked_add_usize(result_origin, result_term, "slice result origin")?;
2185    }
2186
2187    if ndim == 1 {
2188        let src_start = chunk_origin * layout.elem_size;
2189        let dst_start = result_origin * layout.elem_size;
2190        let src_end = src_start + row_bytes;
2191        let dst_end = dst_start + row_bytes;
2192        if src_end <= chunk_data.len() && dst_end <= result.len {
2193            std::ptr::copy_nonoverlapping(
2194                chunk_data.as_ptr().add(src_start),
2195                result.ptr.add(dst_start),
2196                row_bytes,
2197            );
2198        }
2199        return Ok(());
2200    }
2201
2202    let outer_counts = &overlap_counts[..ndim - 1];
2203    let total_rows = checked_product_usize(outer_counts, "unit-stride slice row count")?;
2204    let mut outer_idx = vec![0usize; ndim - 1];
2205
2206    for _ in 0..total_rows {
2207        let mut chunk_row = chunk_origin;
2208        let mut result_row = result_origin;
2209        for (d, outer) in outer_idx.iter().copied().enumerate() {
2210            chunk_row += outer * layout.chunk_strides[d];
2211            result_row += outer * layout.result_strides[d];
2212        }
2213
2214        let src_start = chunk_row * layout.elem_size;
2215        let dst_start = result_row * layout.elem_size;
2216        let src_end = src_start + row_bytes;
2217        let dst_end = dst_start + row_bytes;
2218        if src_end <= chunk_data.len() && dst_end <= result.len {
2219            std::ptr::copy_nonoverlapping(
2220                chunk_data.as_ptr().add(src_start),
2221                result.ptr.add(dst_start),
2222                row_bytes,
2223            );
2224        }
2225
2226        let mut carry = true;
2227        for d in (0..outer_idx.len()).rev() {
2228            if carry {
2229                outer_idx[d] += 1;
2230                if outer_idx[d] < outer_counts[d] {
2231                    carry = false;
2232                } else {
2233                    outer_idx[d] = 0;
2234                }
2235            }
2236        }
2237    }
2238
2239    Ok(())
2240}
2241
2242#[allow(clippy::too_many_arguments)]
2243/// Copy selected elements from a chunk into the result buffer.
2244///
2245/// `dim_indices[d]` is a list of `(chunk_local_idx, result_dim_idx)` pairs for dimension `d`.
2246#[inline(always)]
2247fn copy_selected_elements(
2248    chunk_data: &[u8],
2249    result_buf: &mut [u8],
2250    dim_indices: &[Vec<(usize, usize)>],
2251    chunk_strides: &[usize],
2252    result_strides: &[usize],
2253    elem_size: usize,
2254    ndim: usize,
2255) {
2256    // Check for empty selection
2257    if dim_indices.iter().any(|v| v.is_empty()) {
2258        return;
2259    }
2260
2261    // Recursive cartesian-product iteration, but unrolled iteratively.
2262    let total: usize = dim_indices.iter().map(|v| v.len()).product();
2263    let mut counters = vec![0usize; ndim];
2264
2265    for _ in 0..total {
2266        let mut chunk_flat = 0;
2267        let mut result_flat = 0;
2268        for d in 0..ndim {
2269            let (cl, ri) = dim_indices[d][counters[d]];
2270            chunk_flat += cl * chunk_strides[d];
2271            result_flat += ri * result_strides[d];
2272        }
2273
2274        let src_start = chunk_flat * elem_size;
2275        let dst_start = result_flat * elem_size;
2276        let src_end = src_start + elem_size;
2277        let dst_end = dst_start + elem_size;
2278
2279        if src_end <= chunk_data.len() && dst_end <= result_buf.len() {
2280            result_buf[dst_start..dst_end].copy_from_slice(&chunk_data[src_start..src_end]);
2281        }
2282
2283        // Increment counters (row-major)
2284        let mut carry = true;
2285        for d in (0..ndim).rev() {
2286            if carry {
2287                counters[d] += 1;
2288                if counters[d] < dim_indices[d].len() {
2289                    carry = false;
2290                } else {
2291                    counters[d] = 0;
2292                }
2293            }
2294        }
2295    }
2296}
2297
2298/// Copy selected elements from a chunk into a raw output pointer.
2299///
2300/// This is the pointer-based variant of `copy_selected_elements`, suitable for
2301/// parallel use where multiple threads write to disjoint regions of the same buffer.
2302///
2303/// # Safety
2304///
2305/// The caller must guarantee that no two concurrent calls write to the same
2306/// byte range within `[result_ptr .. result_ptr + result_len)`.
2307#[cfg(feature = "rayon")]
2308#[allow(clippy::too_many_arguments)]
2309#[inline(always)]
2310unsafe fn copy_selected_elements_ptr(
2311    chunk_data: &[u8],
2312    result_ptr: *mut u8,
2313    result_len: usize,
2314    dim_indices: &[Vec<(usize, usize)>],
2315    chunk_strides: &[usize],
2316    result_strides: &[usize],
2317    elem_size: usize,
2318    ndim: usize,
2319) {
2320    if dim_indices.iter().any(|v| v.is_empty()) {
2321        return;
2322    }
2323
2324    let total: usize = dim_indices.iter().map(|v| v.len()).product();
2325    let mut counters = vec![0usize; ndim];
2326
2327    for _ in 0..total {
2328        let mut chunk_flat = 0;
2329        let mut result_flat = 0;
2330        for d in 0..ndim {
2331            let (cl, ri) = dim_indices[d][counters[d]];
2332            chunk_flat += cl * chunk_strides[d];
2333            result_flat += ri * result_strides[d];
2334        }
2335
2336        let src_start = chunk_flat * elem_size;
2337        let dst_start = result_flat * elem_size;
2338        let src_end = src_start + elem_size;
2339        let dst_end = dst_start + elem_size;
2340
2341        if src_end <= chunk_data.len() && dst_end <= result_len {
2342            std::ptr::copy_nonoverlapping(
2343                chunk_data.as_ptr().add(src_start),
2344                result_ptr.add(dst_start),
2345                elem_size,
2346            );
2347        }
2348
2349        let mut carry = true;
2350        for d in (0..ndim).rev() {
2351            if carry {
2352                counters[d] += 1;
2353                if counters[d] < dim_indices[d].len() {
2354                    carry = false;
2355                } else {
2356                    counters[d] = 0;
2357                }
2358            }
2359        }
2360    }
2361}
2362
2363/// Slice an ndarray according to a SliceInfo selection.
2364fn slice_array<T: H5Type + Clone>(
2365    array: &ArrayD<T>,
2366    selection: &SliceInfo,
2367    shape: &[u64],
2368) -> Result<ArrayD<T>> {
2369    // Build result shape
2370    let mut result_shape = Vec::new();
2371
2372    for (i, sel) in selection.selections.iter().enumerate() {
2373        let dim_size = shape[i];
2374        match sel {
2375            SliceInfoElem::Index(idx) => {
2376                if *idx >= dim_size {
2377                    return Err(Error::SliceOutOfBounds {
2378                        dim: i,
2379                        index: *idx,
2380                        size: dim_size,
2381                    });
2382                }
2383                // Don't add to result_shape — this dimension is collapsed
2384            }
2385            SliceInfoElem::Slice { start, end, step } => {
2386                let actual_end = if *end == u64::MAX {
2387                    dim_size as usize
2388                } else {
2389                    (*end as usize).min(dim_size as usize)
2390                };
2391                let actual_start = *start as usize;
2392                let actual_step = *step as usize;
2393                if actual_step == 0 {
2394                    return Err(Error::InvalidData("slice step cannot be 0".into()));
2395                }
2396                let n = (actual_end - actual_start).div_ceil(actual_step);
2397                result_shape.push(n);
2398            }
2399        }
2400    }
2401
2402    // Extract elements manually (ndarray's slicing API is complex with dynamic dims)
2403    let ndim = shape.len();
2404    let total: usize = result_shape.iter().product();
2405    let mut elements = Vec::with_capacity(total);
2406
2407    // Generate all indices in the result
2408    let mut result_idx = vec![0usize; result_shape.len()];
2409
2410    for _ in 0..total {
2411        // Map result index to source index
2412        let mut src_idx = Vec::with_capacity(ndim);
2413        let mut ri = 0;
2414        for sel in selection.selections.iter() {
2415            match sel {
2416                SliceInfoElem::Index(idx) => {
2417                    src_idx.push(*idx as usize);
2418                }
2419                SliceInfoElem::Slice { start, step, .. } => {
2420                    src_idx.push(*start as usize + result_idx[ri] * *step as usize);
2421                    ri += 1;
2422                }
2423            }
2424        }
2425
2426        elements.push(array[IxDyn(&src_idx)].clone());
2427
2428        // Increment result index
2429        if !result_shape.is_empty() {
2430            let mut carry = true;
2431            for d in (0..result_shape.len()).rev() {
2432                if carry {
2433                    result_idx[d] += 1;
2434                    if result_idx[d] < result_shape[d] {
2435                        carry = false;
2436                    } else {
2437                        result_idx[d] = 0;
2438                    }
2439                }
2440            }
2441        }
2442    }
2443
2444    ArrayD::from_shape_vec(IxDyn(&result_shape), elements)
2445        .map_err(|e| Error::InvalidData(format!("slice shape error: {e}")))
2446}
2447
2448#[cfg(test)]
2449mod tests {
2450    use super::*;
2451
2452    #[test]
2453    fn test_slice_info_all() {
2454        let s = SliceInfo::all(3);
2455        assert_eq!(s.selections.len(), 3);
2456    }
2457
2458    #[test]
2459    fn test_copy_chunk_1d() {
2460        let chunk_data = vec![1u8, 2, 3, 4]; // 4 elements of 1 byte each
2461        let mut flat = vec![0u8; 8];
2462        let chunk_offsets = vec![2u64]; // starts at index 2
2463        let chunk_shape = vec![4u64];
2464        let dataset_shape = vec![8u64];
2465
2466        copy_chunk_to_flat(
2467            &chunk_data,
2468            &mut flat,
2469            &chunk_offsets,
2470            &chunk_shape,
2471            &dataset_shape,
2472            1,
2473        );
2474        assert_eq!(flat, vec![0, 0, 1, 2, 3, 4, 0, 0]);
2475    }
2476
2477    #[test]
2478    fn test_copy_chunk_2d_rowwise() {
2479        let chunk_data = vec![1u8, 2, 3, 4, 5, 6];
2480        let mut flat = vec![0u8; 16];
2481        let chunk_offsets = vec![1u64, 1u64];
2482        let chunk_shape = vec![2u64, 3u64];
2483        let dataset_shape = vec![4u64, 4u64];
2484
2485        copy_chunk_to_flat(
2486            &chunk_data,
2487            &mut flat,
2488            &chunk_offsets,
2489            &chunk_shape,
2490            &dataset_shape,
2491            1,
2492        );
2493
2494        assert_eq!(flat, vec![0, 0, 0, 0, 0, 1, 2, 3, 0, 4, 5, 6, 0, 0, 0, 0,]);
2495    }
2496
2497    #[test]
2498    fn test_copy_unit_stride_chunk_overlap_2d_partial() {
2499        let chunk_data: Vec<u8> = (1..=16).collect();
2500        let mut result = vec![0u8; 6];
2501        let chunk_offsets = vec![0u64, 0u64];
2502        let chunk_shape = vec![4u64, 4u64];
2503        let dataset_shape = vec![4u64, 4u64];
2504        let resolved = ResolvedSelection {
2505            dims: vec![
2506                ResolvedSelectionDim {
2507                    start: 1,
2508                    end: 3,
2509                    step: 1,
2510                    count: 2,
2511                },
2512                ResolvedSelectionDim {
2513                    start: 1,
2514                    end: 4,
2515                    step: 1,
2516                    count: 3,
2517                },
2518            ],
2519            result_shape: vec![2, 3],
2520            result_elements: 6,
2521        };
2522        let chunk_strides = vec![4usize, 1usize];
2523        let result_strides = vec![3usize, 1usize];
2524
2525        copy_unit_stride_chunk_overlap(
2526            &chunk_data,
2527            &mut result,
2528            UnitStrideCopyLayout {
2529                chunk_offsets: &chunk_offsets,
2530                chunk_shape: &chunk_shape,
2531                dataset_shape: &dataset_shape,
2532                resolved: &resolved,
2533                chunk_strides: &chunk_strides,
2534                result_strides: &result_strides,
2535                elem_size: 1,
2536            },
2537        )
2538        .unwrap();
2539
2540        assert_eq!(result, vec![6, 7, 8, 10, 11, 12]);
2541    }
2542}