zarrs/array/
array_async_readable.rs

1use std::{borrow::Cow, sync::Arc};
2
3use futures::{StreamExt, TryStreamExt};
4use unsafe_cell_slice::UnsafeCellSlice;
5
6use crate::{
7    array_subset::ArraySubset,
8    config::MetadataRetrieveVersion,
9    node::{meta_key_v2_array, meta_key_v2_attributes, meta_key_v3, NodePath},
10    storage::{AsyncBytes, AsyncReadableStorageTraits, StorageError, StorageHandle},
11};
12
13use super::{
14    array_bytes::{copy_fill_value_into, merge_chunks_vlen},
15    codec::{
16        ArrayToBytesCodecTraits, AsyncArrayPartialDecoderTraits, AsyncStoragePartialDecoder,
17        CodecOptions,
18    },
19    concurrency::concurrency_chunks_and_codec,
20    element::ElementOwned,
21    Array, ArrayBytes, ArrayBytesFixedDisjointView, ArrayCreateError, ArrayError, ArrayMetadata,
22    ArrayMetadataV2, ArrayMetadataV3, ArraySize, DataTypeSize,
23};
24
25#[cfg(feature = "ndarray")]
26use super::elements_to_ndarray;
27
28impl<TStorage: ?Sized + AsyncReadableStorageTraits + 'static> Array<TStorage> {
29    /// Async variant of [`open`](Array::open).
30    #[allow(clippy::missing_errors_doc)]
31    pub async fn async_open(
32        storage: Arc<TStorage>,
33        path: &str,
34    ) -> Result<Array<TStorage>, ArrayCreateError> {
35        Self::async_open_opt(storage, path, &MetadataRetrieveVersion::Default).await
36    }
37
38    /// Async variant of [`open_opt`](Array::open_opt).
39    #[allow(clippy::missing_errors_doc)]
40    pub async fn async_open_opt(
41        storage: Arc<TStorage>,
42        path: &str,
43        version: &MetadataRetrieveVersion,
44    ) -> Result<Array<TStorage>, ArrayCreateError> {
45        let metadata = Self::async_open_metadata(storage.clone(), path, version).await?;
46        Self::validate_metadata(&metadata)?;
47        Self::new_with_metadata(storage, path, metadata)
48    }
49
50    async fn async_open_metadata(
51        storage: Arc<TStorage>,
52        path: &str,
53        version: &MetadataRetrieveVersion,
54    ) -> Result<ArrayMetadata, ArrayCreateError> {
55        let node_path = NodePath::new(path)?;
56
57        if let MetadataRetrieveVersion::Default | MetadataRetrieveVersion::V3 = version {
58            // Try V3
59            let key_v3 = meta_key_v3(&node_path);
60            if let Some(metadata) = storage.get(&key_v3).await? {
61                let metadata: ArrayMetadataV3 = serde_json::from_slice(&metadata)
62                    .map_err(|err| StorageError::InvalidMetadata(key_v3, err.to_string()))?;
63                return Ok(ArrayMetadata::V3(metadata));
64            }
65        }
66
67        if let MetadataRetrieveVersion::Default | MetadataRetrieveVersion::V2 = version {
68            // Try V2
69            let key_v2 = meta_key_v2_array(&node_path);
70            if let Some(metadata) = storage.get(&key_v2).await? {
71                let mut metadata: ArrayMetadataV2 = serde_json::from_slice(&metadata)
72                    .map_err(|err| StorageError::InvalidMetadata(key_v2, err.to_string()))?;
73
74                let attributes_key = meta_key_v2_attributes(&node_path);
75                let attributes = storage.get(&attributes_key).await?;
76                if let Some(attributes) = attributes {
77                    metadata.attributes = serde_json::from_slice(&attributes).map_err(|err| {
78                        StorageError::InvalidMetadata(attributes_key, err.to_string())
79                    })?;
80                }
81
82                return Ok(ArrayMetadata::V2(metadata));
83            }
84        }
85
86        Err(ArrayCreateError::MissingMetadata)
87    }
88
89    /// Async variant of [`retrieve_chunk_if_exists`](Array::retrieve_chunk_if_exists).
90    #[allow(clippy::missing_errors_doc, clippy::missing_panics_doc)]
91    pub async fn async_retrieve_chunk_if_exists(
92        &self,
93        chunk_indices: &[u64],
94    ) -> Result<Option<ArrayBytes<'_>>, ArrayError> {
95        self.async_retrieve_chunk_if_exists_opt(chunk_indices, &CodecOptions::default())
96            .await
97    }
98
99    /// Async variant of [`retrieve_chunk_elements_if_exists`](Array::retrieve_chunk_elements_if_exists).
100    #[allow(clippy::missing_errors_doc, clippy::missing_panics_doc)]
101    pub async fn async_retrieve_chunk_elements_if_exists<T: ElementOwned + Send + Sync>(
102        &self,
103        chunk_indices: &[u64],
104    ) -> Result<Option<Vec<T>>, ArrayError> {
105        self.async_retrieve_chunk_elements_if_exists_opt(chunk_indices, &CodecOptions::default())
106            .await
107    }
108
109    #[cfg(feature = "ndarray")]
110    /// Async variant of [`retrieve_chunk_ndarray_if_exists`](Array::retrieve_chunk_ndarray_if_exists).
111    #[allow(clippy::missing_errors_doc, clippy::missing_panics_doc)]
112    pub async fn async_retrieve_chunk_ndarray_if_exists<T: ElementOwned + Send + Sync>(
113        &self,
114        chunk_indices: &[u64],
115    ) -> Result<Option<ndarray::ArrayD<T>>, ArrayError> {
116        self.async_retrieve_chunk_ndarray_if_exists_opt(chunk_indices, &CodecOptions::default())
117            .await
118    }
119
120    /// Retrieve the encoded bytes of a chunk.
121    ///
122    /// # Errors
123    /// Returns a [`StorageError`] if there is an underlying store error.
124    #[allow(clippy::missing_panics_doc)]
125    pub async fn async_retrieve_encoded_chunk(
126        &self,
127        chunk_indices: &[u64],
128    ) -> Result<Option<AsyncBytes>, StorageError> {
129        let storage_handle = Arc::new(StorageHandle::new(self.storage.clone()));
130        let storage_transformer = self
131            .storage_transformers()
132            .create_async_readable_transformer(storage_handle)
133            .await?;
134
135        storage_transformer
136            .get(&self.chunk_key(chunk_indices))
137            .await
138    }
139
140    /// Async variant of [`retrieve_chunk`](Array::retrieve_chunk).
141    #[allow(clippy::missing_errors_doc, clippy::missing_panics_doc)]
142    pub async fn async_retrieve_chunk(
143        &self,
144        chunk_indices: &[u64],
145    ) -> Result<ArrayBytes<'_>, ArrayError> {
146        self.async_retrieve_chunk_opt(chunk_indices, &CodecOptions::default())
147            .await
148    }
149
150    /// Async variant of [`retrieve_chunk_elements`](Array::retrieve_chunk_elements).
151    #[allow(clippy::missing_errors_doc, clippy::missing_panics_doc)]
152    pub async fn async_retrieve_chunk_elements<T: ElementOwned + Send + Sync>(
153        &self,
154        chunk_indices: &[u64],
155    ) -> Result<Vec<T>, ArrayError> {
156        self.async_retrieve_chunk_elements_opt(chunk_indices, &CodecOptions::default())
157            .await
158    }
159
160    #[cfg(feature = "ndarray")]
161    /// Async variant of [`retrieve_chunk_ndarray`](Array::retrieve_chunk_ndarray).
162    #[allow(clippy::missing_errors_doc, clippy::missing_panics_doc)]
163    pub async fn async_retrieve_chunk_ndarray<T: ElementOwned + Send + Sync>(
164        &self,
165        chunk_indices: &[u64],
166    ) -> Result<ndarray::ArrayD<T>, ArrayError> {
167        self.async_retrieve_chunk_ndarray_opt(chunk_indices, &CodecOptions::default())
168            .await
169    }
170
171    /// Async variant of [`retrieve_chunks`](Array::retrieve_chunks).
172    #[allow(clippy::missing_errors_doc, clippy::missing_panics_doc)]
173    pub async fn async_retrieve_chunks(
174        &self,
175        chunks: &ArraySubset,
176    ) -> Result<ArrayBytes<'_>, ArrayError> {
177        self.async_retrieve_chunks_opt(chunks, &CodecOptions::default())
178            .await
179    }
180
181    /// Async variant of [`retrieve_chunks_elements`](Array::retrieve_chunks_elements).
182    #[allow(clippy::missing_errors_doc, clippy::missing_panics_doc)]
183    pub async fn async_retrieve_chunks_elements<T: ElementOwned + Send + Sync>(
184        &self,
185        chunks: &ArraySubset,
186    ) -> Result<Vec<T>, ArrayError> {
187        self.async_retrieve_chunks_elements_opt(chunks, &CodecOptions::default())
188            .await
189    }
190
191    #[cfg(feature = "ndarray")]
192    /// Async variant of [`retrieve_chunks_ndarray`](Array::retrieve_chunks_ndarray).
193    #[allow(clippy::missing_errors_doc, clippy::missing_panics_doc)]
194    pub async fn async_retrieve_chunks_ndarray<T: ElementOwned + Send + Sync>(
195        &self,
196        chunks: &ArraySubset,
197    ) -> Result<ndarray::ArrayD<T>, ArrayError> {
198        self.async_retrieve_chunks_ndarray_opt(chunks, &CodecOptions::default())
199            .await
200    }
201
202    /// Async variant of [`retrieve_chunk_subset`](Array::retrieve_chunk_subset).
203    #[allow(clippy::missing_errors_doc, clippy::missing_panics_doc)]
204    pub async fn async_retrieve_chunk_subset(
205        &self,
206        chunk_indices: &[u64],
207        chunk_subset: &ArraySubset,
208    ) -> Result<ArrayBytes<'_>, ArrayError> {
209        self.async_retrieve_chunk_subset_opt(chunk_indices, chunk_subset, &CodecOptions::default())
210            .await
211    }
212
213    /// Async variant of [`retrieve_chunk_subset_elements`](Array::retrieve_chunk_subset_elements).
214    #[allow(clippy::missing_errors_doc, clippy::missing_panics_doc)]
215    pub async fn async_retrieve_chunk_subset_elements<T: ElementOwned + Send + Sync>(
216        &self,
217        chunk_indices: &[u64],
218        chunk_subset: &ArraySubset,
219    ) -> Result<Vec<T>, ArrayError> {
220        self.async_retrieve_chunk_subset_elements_opt(
221            chunk_indices,
222            chunk_subset,
223            &CodecOptions::default(),
224        )
225        .await
226    }
227
228    #[cfg(feature = "ndarray")]
229    /// Async variant of [`retrieve_chunk_subset_ndarray`](Array::retrieve_chunk_subset_ndarray).
230    #[allow(clippy::missing_errors_doc, clippy::missing_panics_doc)]
231    pub async fn async_retrieve_chunk_subset_ndarray<T: ElementOwned + Send + Sync>(
232        &self,
233        chunk_indices: &[u64],
234        chunk_subset: &ArraySubset,
235    ) -> Result<ndarray::ArrayD<T>, ArrayError> {
236        self.async_retrieve_chunk_subset_ndarray_opt(
237            chunk_indices,
238            chunk_subset,
239            &CodecOptions::default(),
240        )
241        .await
242    }
243
244    /// Async variant of [`retrieve_array_subset`](Array::retrieve_array_subset).
245    #[allow(clippy::missing_errors_doc, clippy::missing_panics_doc)]
246    pub async fn async_retrieve_array_subset(
247        &self,
248        array_subset: &ArraySubset,
249    ) -> Result<ArrayBytes<'_>, ArrayError> {
250        self.async_retrieve_array_subset_opt(array_subset, &CodecOptions::default())
251            .await
252    }
253
254    /// Async variant of [`retrieve_array_subset_elements`](Array::retrieve_array_subset_elements).
255    #[allow(clippy::missing_errors_doc, clippy::missing_panics_doc)]
256    pub async fn async_retrieve_array_subset_elements<T: ElementOwned + Send + Sync>(
257        &self,
258        array_subset: &ArraySubset,
259    ) -> Result<Vec<T>, ArrayError> {
260        self.async_retrieve_array_subset_elements_opt(array_subset, &CodecOptions::default())
261            .await
262    }
263
264    #[cfg(feature = "ndarray")]
265    /// Async variant of [`retrieve_array_subset_ndarray`](Array::retrieve_array_subset_ndarray).
266    #[allow(clippy::missing_errors_doc, clippy::missing_panics_doc)]
267    pub async fn async_retrieve_array_subset_ndarray<T: ElementOwned + Send + Sync>(
268        &self,
269        array_subset: &ArraySubset,
270    ) -> Result<ndarray::ArrayD<T>, ArrayError> {
271        self.async_retrieve_array_subset_ndarray_opt(array_subset, &CodecOptions::default())
272            .await
273    }
274
275    /// Async variant of [`partial_decoder`](Array::partial_decoder).
276    #[allow(clippy::missing_errors_doc, clippy::missing_panics_doc)]
277    pub async fn async_partial_decoder(
278        &self,
279        chunk_indices: &[u64],
280    ) -> Result<Arc<dyn AsyncArrayPartialDecoderTraits>, ArrayError> {
281        self.async_partial_decoder_opt(chunk_indices, &CodecOptions::default())
282            .await
283    }
284
285    /////////////////////////////////////////////////////////////////////////////
286    // Advanced methods
287    /////////////////////////////////////////////////////////////////////////////
288
289    /// Async variant of [`retrieve_chunk_if_exists_opt`](Array::retrieve_chunk_if_exists_opt).
290    #[allow(clippy::missing_errors_doc)]
291    pub async fn async_retrieve_chunk_if_exists_opt(
292        &self,
293        chunk_indices: &[u64],
294        options: &CodecOptions,
295    ) -> Result<Option<ArrayBytes<'_>>, ArrayError> {
296        if chunk_indices.len() != self.dimensionality() {
297            return Err(ArrayError::InvalidChunkGridIndicesError(
298                chunk_indices.to_vec(),
299            ));
300        }
301        let storage_handle = Arc::new(StorageHandle::new(self.storage.clone()));
302        let storage_transformer = self
303            .storage_transformers()
304            .create_async_readable_transformer(storage_handle)
305            .await?;
306        let chunk_encoded = storage_transformer
307            .get(&self.chunk_key(chunk_indices))
308            .await
309            .map_err(ArrayError::StorageError)?;
310        if let Some(chunk_encoded) = chunk_encoded {
311            let chunk_encoded: Vec<u8> = chunk_encoded.into();
312            let chunk_representation = self.chunk_array_representation(chunk_indices)?;
313            let bytes = self
314                .codecs()
315                .decode(Cow::Owned(chunk_encoded), &chunk_representation, options)
316                .map_err(ArrayError::CodecError)?;
317            bytes.validate(
318                chunk_representation.num_elements(),
319                chunk_representation.data_type().size(),
320            )?;
321            Ok(Some(bytes))
322        } else {
323            Ok(None)
324        }
325    }
326
327    /// Async variant of [`retrieve_chunk_opt`](Array::retrieve_chunk_opt).
328    #[allow(clippy::missing_errors_doc)]
329    pub async fn async_retrieve_chunk_opt(
330        &self,
331        chunk_indices: &[u64],
332        options: &CodecOptions,
333    ) -> Result<ArrayBytes<'_>, ArrayError> {
334        let chunk = self
335            .async_retrieve_chunk_if_exists_opt(chunk_indices, options)
336            .await?;
337        if let Some(chunk) = chunk {
338            Ok(chunk)
339        } else {
340            let chunk_shape = self.chunk_shape(chunk_indices)?;
341            let array_size =
342                ArraySize::new(self.data_type().size(), chunk_shape.num_elements_u64());
343            Ok(ArrayBytes::new_fill_value(array_size, self.fill_value()))
344        }
345    }
346
347    /// Async variant of [`retrieve_chunk_into`](Array::retrieve_chunk_into).
348    async fn async_retrieve_chunk_into(
349        &self,
350        chunk_indices: &[u64],
351        output_view: &mut ArrayBytesFixedDisjointView<'_>,
352        options: &CodecOptions,
353    ) -> Result<(), ArrayError> {
354        if chunk_indices.len() != self.dimensionality() {
355            return Err(ArrayError::InvalidChunkGridIndicesError(
356                chunk_indices.to_vec(),
357            ));
358        }
359        let storage_handle = Arc::new(StorageHandle::new(self.storage.clone()));
360        let storage_transformer = self
361            .storage_transformers()
362            .create_async_readable_transformer(storage_handle)
363            .await?;
364        let chunk_encoded = storage_transformer
365            .get(&self.chunk_key(chunk_indices))
366            .await
367            .map_err(ArrayError::StorageError)?;
368        if let Some(chunk_encoded) = chunk_encoded {
369            let chunk_encoded: Vec<u8> = chunk_encoded.into();
370            let chunk_representation = self.chunk_array_representation(chunk_indices)?;
371            self.codecs()
372                .decode_into(
373                    Cow::Owned(chunk_encoded),
374                    &chunk_representation,
375                    output_view,
376                    options,
377                )
378                .map_err(ArrayError::CodecError)
379        } else {
380            copy_fill_value_into(self.data_type(), self.fill_value(), output_view)
381                .map_err(ArrayError::CodecError)
382        }
383    }
384
385    /// Async variant of [`retrieve_chunk_elements_if_exists_opt`](Array::retrieve_chunk_elements_if_exists_opt).
386    #[allow(clippy::missing_errors_doc)]
387    pub async fn async_retrieve_chunk_elements_if_exists_opt<T: ElementOwned + Send + Sync>(
388        &self,
389        chunk_indices: &[u64],
390        options: &CodecOptions,
391    ) -> Result<Option<Vec<T>>, ArrayError> {
392        if let Some(bytes) = self
393            .async_retrieve_chunk_if_exists_opt(chunk_indices, options)
394            .await?
395        {
396            let elements = T::from_array_bytes(self.data_type(), bytes)?;
397            Ok(Some(elements))
398        } else {
399            Ok(None)
400        }
401    }
402
403    /// Async variant of [`retrieve_chunk_elements_opt`](Array::retrieve_chunk_elements_opt).
404    #[allow(clippy::missing_errors_doc)]
405    pub async fn async_retrieve_chunk_elements_opt<T: ElementOwned + Send + Sync>(
406        &self,
407        chunk_indices: &[u64],
408        options: &CodecOptions,
409    ) -> Result<Vec<T>, ArrayError> {
410        let bytes = self
411            .async_retrieve_chunk_opt(chunk_indices, options)
412            .await?;
413        let elements = T::from_array_bytes(self.data_type(), bytes)?;
414        Ok(elements)
415    }
416
417    #[cfg(feature = "ndarray")]
418    /// Async variant of [`retrieve_chunk_ndarray_if_exists_opt`](Array::retrieve_chunk_ndarray_if_exists_opt).
419    #[allow(clippy::missing_errors_doc)]
420    pub async fn async_retrieve_chunk_ndarray_if_exists_opt<T: ElementOwned + Send + Sync>(
421        &self,
422        chunk_indices: &[u64],
423        options: &CodecOptions,
424    ) -> Result<Option<ndarray::ArrayD<T>>, ArrayError> {
425        // validate_element_size::<T>(self.data_type())?; in // async_retrieve_chunk_elements_if_exists
426        let shape = self
427            .chunk_grid()
428            .chunk_shape_u64(chunk_indices, self.shape())?
429            .ok_or_else(|| ArrayError::InvalidChunkGridIndicesError(chunk_indices.to_vec()))?;
430        let elements = self
431            .async_retrieve_chunk_elements_if_exists_opt(chunk_indices, options)
432            .await?;
433        if let Some(elements) = elements {
434            Ok(Some(elements_to_ndarray(&shape, elements)?))
435        } else {
436            Ok(None)
437        }
438    }
439
440    #[cfg(feature = "ndarray")]
441    /// Async variant of [`retrieve_chunk_ndarray_opt`](Array::retrieve_chunk_ndarray_opt).
442    #[allow(clippy::missing_errors_doc)]
443    pub async fn async_retrieve_chunk_ndarray_opt<T: ElementOwned + Send + Sync>(
444        &self,
445        chunk_indices: &[u64],
446        options: &CodecOptions,
447    ) -> Result<ndarray::ArrayD<T>, ArrayError> {
448        // validate_element_size::<T>(self.data_type())?; // in async_retrieve_chunk_elements
449        let shape = self
450            .chunk_grid()
451            .chunk_shape_u64(chunk_indices, self.shape())?
452            .ok_or_else(|| ArrayError::InvalidChunkGridIndicesError(chunk_indices.to_vec()))?;
453        let elements = self
454            .async_retrieve_chunk_elements_opt(chunk_indices, options)
455            .await?;
456        elements_to_ndarray(&shape, elements)
457    }
458
459    /// Retrieve the encoded bytes of the chunks in `chunks`.
460    ///
461    /// The chunks are in order of the chunk indices returned by `chunks.indices().into_iter()`.
462    ///
463    /// # Errors
464    /// Returns a [`StorageError`] if there is an underlying store error.
465    #[allow(clippy::missing_panics_doc)]
466    pub async fn async_retrieve_encoded_chunks(
467        &self,
468        chunks: &ArraySubset,
469        options: &CodecOptions,
470    ) -> Result<Vec<Option<AsyncBytes>>, StorageError> {
471        let storage_handle = Arc::new(StorageHandle::new(self.storage.clone()));
472        let storage_transformer = self
473            .storage_transformers()
474            .create_async_readable_transformer(storage_handle)
475            .await?;
476
477        let retrieve_encoded_chunk = |chunk_indices: Vec<u64>| {
478            let storage_transformer = storage_transformer.clone();
479            async move {
480                storage_transformer
481                    .get(&self.chunk_key(&chunk_indices))
482                    .await
483            }
484        };
485
486        let indices = chunks.indices();
487        let futures = indices.into_iter().map(retrieve_encoded_chunk);
488        futures::stream::iter(futures)
489            .buffered(options.concurrent_target())
490            .try_collect()
491            .await
492    }
493
494    /// Async variant of [`retrieve_chunks_opt`](Array::retrieve_chunks_opt).
495    #[allow(clippy::missing_errors_doc, clippy::missing_panics_doc)]
496    pub async fn async_retrieve_chunks_opt(
497        &self,
498        chunks: &ArraySubset,
499        options: &CodecOptions,
500    ) -> Result<ArrayBytes<'_>, ArrayError> {
501        if chunks.dimensionality() != self.dimensionality() {
502            return Err(ArrayError::InvalidArraySubset(
503                chunks.clone(),
504                self.shape().to_vec(),
505            ));
506        }
507
508        let array_subset = self.chunks_subset(chunks)?;
509        self.async_retrieve_array_subset_opt(&array_subset, options)
510            .await
511    }
512
513    /// Async variant of [`retrieve_chunks_elements_opt`](Array::retrieve_chunks_elements_opt).
514    #[allow(clippy::missing_errors_doc)]
515    pub async fn async_retrieve_chunks_elements_opt<T: ElementOwned + Send + Sync>(
516        &self,
517        chunks: &ArraySubset,
518        options: &CodecOptions,
519    ) -> Result<Vec<T>, ArrayError> {
520        let bytes = self.async_retrieve_chunks_opt(chunks, options).await?;
521        let elements = T::from_array_bytes(self.data_type(), bytes)?;
522        Ok(elements)
523    }
524
525    #[cfg(feature = "ndarray")]
526    /// Async variant of [`retrieve_chunks_ndarray_opt`](Array::retrieve_chunks_ndarray_opt).
527    #[allow(clippy::missing_errors_doc)]
528    pub async fn async_retrieve_chunks_ndarray_opt<T: ElementOwned + Send + Sync>(
529        &self,
530        chunks: &ArraySubset,
531        options: &CodecOptions,
532    ) -> Result<ndarray::ArrayD<T>, ArrayError> {
533        let array_subset = self.chunks_subset(chunks)?;
534        let elements = self
535            .async_retrieve_chunks_elements_opt(chunks, options)
536            .await?;
537        elements_to_ndarray(array_subset.shape(), elements)
538    }
539
540    /// Async variant of [`retrieve_array_subset_opt`](Array::retrieve_array_subset_opt).
541    #[allow(clippy::missing_errors_doc, clippy::missing_panics_doc)]
542    #[allow(clippy::too_many_lines)]
543    pub async fn async_retrieve_array_subset_opt(
544        &self,
545        array_subset: &ArraySubset,
546        options: &CodecOptions,
547    ) -> Result<ArrayBytes<'_>, ArrayError> {
548        if array_subset.dimensionality() != self.dimensionality() {
549            return Err(ArrayError::InvalidArraySubset(
550                array_subset.clone(),
551                self.shape().to_vec(),
552            ));
553        }
554
555        // Find the chunks intersecting this array subset
556        let chunks = self.chunks_in_array_subset(array_subset)?;
557        let Some(chunks) = chunks else {
558            return Err(ArrayError::InvalidArraySubset(
559                array_subset.clone(),
560                self.shape().to_vec(),
561            ));
562        };
563
564        // Retrieve chunk bytes
565        let num_chunks = chunks.num_elements_usize();
566        match num_chunks {
567            0 => {
568                let array_size =
569                    ArraySize::new(self.data_type().size(), array_subset.num_elements());
570                Ok(ArrayBytes::new_fill_value(array_size, self.fill_value()))
571            }
572            1 => {
573                let chunk_indices = chunks.start();
574                let chunk_subset = self.chunk_subset(chunk_indices)?;
575                if &chunk_subset == array_subset {
576                    // Single chunk fast path if the array subset domain matches the chunk domain
577                    self.async_retrieve_chunk_opt(chunk_indices, options).await
578                } else {
579                    let array_subset_in_chunk_subset =
580                        array_subset.relative_to(chunk_subset.start())?;
581                    self.async_retrieve_chunk_subset_opt(
582                        chunk_indices,
583                        &array_subset_in_chunk_subset,
584                        options,
585                    )
586                    .await
587                }
588            }
589            _ => {
590                // Calculate chunk/codec concurrency
591                let chunk_representation =
592                    self.chunk_array_representation(&vec![0; self.dimensionality()])?;
593                let codec_concurrency =
594                    self.recommended_codec_concurrency(&chunk_representation)?;
595                let (chunk_concurrent_limit, options) = concurrency_chunks_and_codec(
596                    options.concurrent_target(),
597                    num_chunks,
598                    options,
599                    &codec_concurrency,
600                );
601
602                match chunk_representation.data_type().size() {
603                    DataTypeSize::Variable => {
604                        let retrieve_chunk = |chunk_indices: Vec<u64>| {
605                            let options = options.clone();
606                            async move {
607                                let chunk_subset = self.chunk_subset(&chunk_indices)?;
608                                let chunk_subset_overlap = chunk_subset.overlap(array_subset)?;
609                                Ok::<_, ArrayError>((
610                                    self.async_retrieve_chunk_subset_opt(
611                                        &chunk_indices,
612                                        &chunk_subset_overlap.relative_to(chunk_subset.start())?,
613                                        &options,
614                                    )
615                                    .await?,
616                                    chunk_subset_overlap.relative_to(array_subset.start())?,
617                                ))
618                            }
619                        };
620
621                        // TODO: chunk_concurrent_limit
622                        let chunk_bytes_and_subsets = futures::future::try_join_all(
623                            chunks.indices().iter().map(retrieve_chunk),
624                        )
625                        .await?;
626
627                        Ok(merge_chunks_vlen(
628                            chunk_bytes_and_subsets,
629                            array_subset.shape(),
630                        )?)
631                    }
632                    DataTypeSize::Fixed(data_type_size) => {
633                        let size_output =
634                            usize::try_from(array_subset.num_elements() * data_type_size as u64)
635                                .unwrap();
636                        if size_output == 0 {
637                            return Ok(ArrayBytes::new_flen(vec![]));
638                        }
639                        let mut output = Vec::with_capacity(size_output);
640                        {
641                            let output =
642                                UnsafeCellSlice::new_from_vec_with_spare_capacity(&mut output);
643                            let retrieve_chunk = |chunk_indices: Vec<u64>| {
644                                let options = options.clone();
645                                async move {
646                                    let chunk_subset = self.chunk_subset(&chunk_indices)?;
647                                    let chunk_subset_overlap =
648                                        chunk_subset.overlap(array_subset)?;
649
650                                    let mut output_view = unsafe {
651                                        // SAFETY: chunks represent disjoint array subsets
652                                        ArrayBytesFixedDisjointView::new(
653                                            output,
654                                            data_type_size,
655                                            array_subset.shape(),
656                                            chunk_subset_overlap
657                                                .relative_to(array_subset.start())
658                                                .unwrap(),
659                                        )
660                                    }?;
661                                    self.async_retrieve_chunk_subset_into(
662                                        &chunk_indices,
663                                        &chunk_subset_overlap.relative_to(chunk_subset.start())?,
664                                        &mut output_view,
665                                        &options,
666                                    )
667                                    .await?;
668                                    // let chunk_subset_bytes = self
669                                    //     .async_retrieve_chunk_subset_opt(
670                                    //         &chunk_indices,
671                                    //         &chunk_subset_overlap
672                                    //             .relative_to(chunk_subset.start())?,
673                                    //         &options,
674                                    //     )
675                                    //     .await?;
676                                    // let chunk_subset_bytes = chunk_subset_bytes.into_fixed()?;
677                                    // let output = unsafe { output.as_mut_slice() };
678                                    // update_bytes_flen(
679                                    //     output,
680                                    //     array_subset.shape(),
681                                    //     &chunk_subset_bytes,
682                                    //     &chunk_subset_overlap.relative_to(array_subset.start())?,
683                                    //     data_type_size,
684                                    // );
685                                    Ok::<_, ArrayError>(())
686                                }
687                            };
688
689                            futures::stream::iter(&chunks.indices())
690                                .map(Ok)
691                                .try_for_each_concurrent(
692                                    Some(chunk_concurrent_limit),
693                                    retrieve_chunk,
694                                )
695                                .await?;
696                        }
697                        unsafe { output.set_len(size_output) };
698                        Ok(ArrayBytes::from(output))
699                    }
700                }
701            }
702        }
703    }
704
705    /// Async variant of [`retrieve_array_subset_elements_opt`](Array::retrieve_array_subset_elements_opt).
706    #[allow(clippy::missing_errors_doc)]
707    pub async fn async_retrieve_array_subset_elements_opt<T: ElementOwned + Send + Sync>(
708        &self,
709        array_subset: &ArraySubset,
710        options: &CodecOptions,
711    ) -> Result<Vec<T>, ArrayError> {
712        let bytes = self
713            .async_retrieve_array_subset_opt(array_subset, options)
714            .await?;
715        let elements = T::from_array_bytes(self.data_type(), bytes)?;
716        Ok(elements)
717    }
718
719    #[cfg(feature = "ndarray")]
720    /// Async variant of [`retrieve_array_subset_ndarray_opt`](Array::retrieve_array_subset_ndarray_opt).
721    #[allow(clippy::missing_errors_doc)]
722    pub async fn async_retrieve_array_subset_ndarray_opt<T: ElementOwned + Send + Sync>(
723        &self,
724        array_subset: &ArraySubset,
725        options: &CodecOptions,
726    ) -> Result<ndarray::ArrayD<T>, ArrayError> {
727        let elements = self
728            .async_retrieve_array_subset_elements_opt(array_subset, options)
729            .await?;
730        elements_to_ndarray(array_subset.shape(), elements)
731    }
732
733    /// Async variant of [`retrieve_chunk_subset_opt`](Array::retrieve_chunk_subset_opt).
734    #[allow(clippy::missing_errors_doc, clippy::missing_panics_doc)]
735    pub async fn async_retrieve_chunk_subset_opt(
736        &self,
737        chunk_indices: &[u64],
738        chunk_subset: &ArraySubset,
739        options: &CodecOptions,
740    ) -> Result<ArrayBytes<'_>, ArrayError> {
741        let chunk_representation = self.chunk_array_representation(chunk_indices)?;
742        if !chunk_subset.inbounds_shape(&chunk_representation.shape_u64()) {
743            return Err(ArrayError::InvalidArraySubset(
744                chunk_subset.clone(),
745                self.shape().to_vec(),
746            ));
747        }
748
749        let bytes = if chunk_subset.start().iter().all(|&o| o == 0)
750            && chunk_subset.shape() == chunk_representation.shape_u64()
751        {
752            // Fast path if `chunk_subset` encompasses the whole chunk
753            self.async_retrieve_chunk_opt(chunk_indices, options)
754                .await?
755        } else {
756            let storage_handle = Arc::new(StorageHandle::new(self.storage.clone()));
757            let storage_transformer = self
758                .storage_transformers()
759                .create_async_readable_transformer(storage_handle)
760                .await?;
761            let input_handle = Arc::new(AsyncStoragePartialDecoder::new(
762                storage_transformer,
763                self.chunk_key(chunk_indices),
764            ));
765            self.codecs
766                .clone()
767                .async_partial_decoder(input_handle, &chunk_representation, options)
768                .await?
769                .partial_decode(&[chunk_subset.clone()], options)
770                .await?
771                .remove(0)
772                .into_owned()
773        };
774        bytes.validate(chunk_subset.num_elements(), self.data_type().size())?;
775        Ok(bytes)
776    }
777
778    async fn async_retrieve_chunk_subset_into(
779        &self,
780        chunk_indices: &[u64],
781        chunk_subset: &ArraySubset,
782        output_view: &mut ArrayBytesFixedDisjointView<'_>,
783        options: &CodecOptions,
784    ) -> Result<(), ArrayError> {
785        let chunk_representation = self.chunk_array_representation(chunk_indices)?;
786        if !chunk_subset.inbounds_shape(&chunk_representation.shape_u64()) {
787            return Err(ArrayError::InvalidArraySubset(
788                chunk_subset.clone(),
789                self.shape().to_vec(),
790            ));
791        }
792
793        if chunk_subset.start().iter().all(|&o| o == 0)
794            && chunk_subset.shape() == chunk_representation.shape_u64()
795        {
796            // Fast path if `chunk_subset` encompasses the whole chunk
797            self.async_retrieve_chunk_into(chunk_indices, output_view, options)
798                .await
799        } else {
800            let storage_handle = Arc::new(StorageHandle::new(self.storage.clone()));
801            let storage_transformer = self
802                .storage_transformers()
803                .create_async_readable_transformer(storage_handle)
804                .await?;
805            let input_handle = Arc::new(AsyncStoragePartialDecoder::new(
806                storage_transformer,
807                self.chunk_key(chunk_indices),
808            ));
809
810            self.codecs
811                .clone()
812                .async_partial_decoder(input_handle, &chunk_representation, options)
813                .await?
814                .partial_decode_into(chunk_subset, output_view, options)
815                .await?;
816            Ok(())
817        }
818    }
819
820    /// Async variant of [`retrieve_chunk_subset_elements_opt`](Array::retrieve_chunk_subset_elements_opt).
821    #[allow(clippy::missing_errors_doc)]
822    pub async fn async_retrieve_chunk_subset_elements_opt<T: ElementOwned + Send + Sync>(
823        &self,
824        chunk_indices: &[u64],
825        chunk_subset: &ArraySubset,
826        options: &CodecOptions,
827    ) -> Result<Vec<T>, ArrayError> {
828        let bytes = self
829            .async_retrieve_chunk_subset_opt(chunk_indices, chunk_subset, options)
830            .await?;
831        let elements = T::from_array_bytes(self.data_type(), bytes)?;
832        Ok(elements)
833    }
834
835    #[cfg(feature = "ndarray")]
836    /// Async variant of [`retrieve_chunk_subset_ndarray_opt`](Array::retrieve_chunk_subset_ndarray_opt).
837    #[allow(clippy::missing_errors_doc)]
838    pub async fn async_retrieve_chunk_subset_ndarray_opt<T: ElementOwned + Send + Sync>(
839        &self,
840        chunk_indices: &[u64],
841        chunk_subset: &ArraySubset,
842        options: &CodecOptions,
843    ) -> Result<ndarray::ArrayD<T>, ArrayError> {
844        let elements = self
845            .async_retrieve_chunk_subset_elements_opt(chunk_indices, chunk_subset, options)
846            .await?;
847        elements_to_ndarray(chunk_subset.shape(), elements)
848    }
849
850    /// Async variant of [`partial_decoder_opt`](Array::partial_decoder_opt).
851    #[allow(clippy::missing_errors_doc)]
852    pub async fn async_partial_decoder_opt(
853        &self,
854        chunk_indices: &[u64],
855        options: &CodecOptions,
856    ) -> Result<Arc<dyn AsyncArrayPartialDecoderTraits>, ArrayError> {
857        let storage_handle = Arc::new(StorageHandle::new(self.storage.clone()));
858        let storage_transformer = self
859            .storage_transformers()
860            .create_async_readable_transformer(storage_handle)
861            .await?;
862        let input_handle = Arc::new(AsyncStoragePartialDecoder::new(
863            storage_transformer,
864            self.chunk_key(chunk_indices),
865        ));
866        let chunk_representation = self.chunk_array_representation(chunk_indices)?;
867        Ok(self
868            .codecs
869            .clone()
870            .async_partial_decoder(input_handle, &chunk_representation, options)
871            .await?)
872    }
873}