Skip to main content

zarrs/array/
array_async_sharded_readable_ext.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use futures::{StreamExt, TryStreamExt};
5use unsafe_cell_slice::UnsafeCellSlice;
6
7use super::codec::ShardingCodec;
8use super::codec::array_to_bytes::sharding::AsyncShardingPartialDecoder;
9use super::concurrency::concurrency_chunks_and_codec;
10use super::element::ElementOwned;
11use super::from_array_bytes::FromArrayBytes;
12use super::{
13    Array, ArrayBytes, ArrayBytesFixedDisjointView, ArrayError, ArrayIndicesTinyVec,
14    ArrayShardedExt, ChunkGrid, DataTypeSize,
15};
16use crate::array::array_bytes_internal::merge_chunks_vlen;
17use crate::array::{ArraySubset, ArraySubsetTraits};
18use zarrs_codec::{
19    ArrayBytesDecodeIntoTarget, AsyncArrayPartialDecoderTraits, AsyncStoragePartialDecoder,
20    CodecError, CodecOptions,
21};
22use zarrs_metadata::ConfigurationSerialize;
23use zarrs_metadata_ext::codec::sharding::ShardingCodecConfiguration;
24use zarrs_storage::byte_range::ByteRange;
25use zarrs_storage::{AsyncReadableStorageTraits, MaybeSend, MaybeSync, StorageHandle};
26
27// TODO: Remove with trait upcasting
28#[derive(Clone)]
29enum MaybeShardingPartialDecoder {
30    Sharding(Arc<AsyncShardingPartialDecoder>),
31    Other(Arc<dyn AsyncArrayPartialDecoderTraits>),
32}
33
34impl MaybeShardingPartialDecoder {
35    async fn partial_decode<'a>(
36        &'a self,
37        indexer: &dyn crate::array::Indexer,
38        options: &CodecOptions,
39    ) -> Result<ArrayBytes<'a>, CodecError> {
40        match self {
41            Self::Sharding(partial_decoder) => {
42                partial_decoder.partial_decode(indexer, options).await
43            }
44            Self::Other(partial_decoder) => partial_decoder.partial_decode(indexer, options).await,
45        }
46    }
47
48    async fn partial_decode_into(
49        &self,
50        indexer: &dyn crate::array::Indexer,
51        output_target: ArrayBytesDecodeIntoTarget<'_>,
52        options: &CodecOptions,
53    ) -> Result<(), CodecError> {
54        match self {
55            Self::Sharding(partial_decoder) => {
56                partial_decoder
57                    .partial_decode_into(indexer, output_target, options)
58                    .await
59            }
60            Self::Other(partial_decoder) => {
61                partial_decoder
62                    .partial_decode_into(indexer, output_target, options)
63                    .await
64            }
65        }
66    }
67}
68
69type PartialDecoderHashMap = HashMap<Vec<u64>, MaybeShardingPartialDecoder>;
70
71/// A cache used for methods in the [`AsyncArrayShardedReadableExt`] trait.
72pub struct AsyncArrayShardedReadableExtCache {
73    array_is_sharded: bool,
74    array_is_exclusively_sharded: bool,
75    subchunk_grid: ChunkGrid,
76    cache: Arc<async_lock::Mutex<PartialDecoderHashMap>>,
77}
78
79impl AsyncArrayShardedReadableExtCache {
80    /// Create a new cache for an array.
81    #[must_use]
82    pub fn new<TStorage: ?Sized + AsyncReadableStorageTraits>(array: &Array<TStorage>) -> Self {
83        let subchunk_grid = array.subchunk_grid();
84        Self {
85            array_is_sharded: array.is_sharded(),
86            array_is_exclusively_sharded: array.is_exclusively_sharded(),
87            subchunk_grid,
88            cache: Arc::new(async_lock::Mutex::new(HashMap::default())),
89        }
90    }
91
92    /// Returns true if the array is sharded.
93    ///
94    /// This is cheaper than calling [`ArrayShardedExt::is_sharded`] repeatedly.
95    #[must_use]
96    pub fn array_is_sharded(&self) -> bool {
97        self.array_is_sharded
98    }
99
100    /// Returns true if the array is exclusively sharded (no array-to-array or bytes-to-bytes codecs).
101    ///
102    /// This is cheaper than calling [`ArrayShardedExt::is_exclusively_sharded`] repeatedly.
103    #[must_use]
104    pub fn array_is_exclusively_sharded(&self) -> bool {
105        self.array_is_exclusively_sharded
106    }
107
108    fn subchunk_grid(&self) -> &ChunkGrid {
109        &self.subchunk_grid
110    }
111
112    /// Return the number of shard indexes cached.
113    #[must_use]
114    #[allow(clippy::missing_panics_doc)]
115    pub async fn len(&self) -> usize {
116        self.cache.lock().await.len()
117    }
118
119    /// Returns true if the cache contains no cached shard indexes.
120    #[must_use]
121    #[allow(clippy::missing_panics_doc)]
122    pub async fn is_empty(&self) -> bool {
123        self.cache.lock().await.is_empty()
124    }
125
126    /// Clear the cache.
127    #[allow(clippy::missing_panics_doc)]
128    pub async fn clear(&self) {
129        self.cache.lock().await.clear();
130    }
131
132    async fn retrieve<TStorage: ?Sized + AsyncReadableStorageTraits + 'static>(
133        &self,
134        array: &Array<TStorage>,
135        shard_indices: &[u64],
136    ) -> Result<MaybeShardingPartialDecoder, ArrayError> {
137        let mut cache = self.cache.lock().await;
138        if let Some(partial_decoder) = cache.get(shard_indices) {
139            Ok(partial_decoder.clone())
140        } else if self.array_is_exclusively_sharded() {
141            // Create the sharding partial decoder directly, without a codec chain
142            let storage_handle = Arc::new(StorageHandle::new(array.storage.clone()));
143            let storage_transformer = array
144                .storage_transformers()
145                .create_async_readable_transformer(storage_handle)
146                .await?;
147            let input_handle = Arc::new(AsyncStoragePartialDecoder::new(
148                storage_transformer,
149                array.chunk_key(shard_indices),
150            ));
151
152            // --- Workaround for lack of trait upcasting ---
153            let chunk_shape = array.chunk_shape(shard_indices)?;
154            let sharding_codec_configuration = array
155                .codecs()
156                .array_to_bytes_codec()
157                .configuration_v3(array.metadata_options.codec_metadata_options())
158                .expect("valid sharding metadata");
159            let sharding_codec_configuration =
160                ShardingCodecConfiguration::try_from_configuration(sharding_codec_configuration)
161                    .expect("valid sharding configuration");
162            let sharding_codec = Arc::new(
163                ShardingCodec::new_with_configuration(&sharding_codec_configuration).expect(
164                    "supported sharding codec configuration, already instantiated in array",
165                ),
166            );
167            let partial_decoder = MaybeShardingPartialDecoder::Sharding(Arc::new(
168                AsyncShardingPartialDecoder::new(
169                    input_handle,
170                    array.data_type().clone(),
171                    array.fill_value().clone(),
172                    chunk_shape.clone(),
173                    sharding_codec.subchunk_shape.clone(),
174                    sharding_codec.inner_codecs.clone(),
175                    &sharding_codec.index_codecs,
176                    sharding_codec.index_location,
177                    &array.codec_options,
178                    sharding_codec.options.clone(),
179                )
180                .await?,
181            ));
182            // // TODO: Trait upcasting
183            // let partial_decoder = array
184            //     .codecs()
185            //     .array_to_bytes_codec()
186            //     .clone()
187            //     .partial_decoder(
188            //         input_handle,
189            //         &chunk_representation,
190            //         &self.codec_options,
191            //     )?;
192            cache.insert(shard_indices.to_vec(), partial_decoder.clone());
193            Ok(partial_decoder)
194        } else {
195            let partial_decoder = MaybeShardingPartialDecoder::Other(
196                array.async_partial_decoder(shard_indices).await?,
197            );
198            cache.insert(shard_indices.to_vec(), partial_decoder.clone());
199            Ok(partial_decoder)
200        }
201    }
202}
203
204/// An [`Array`] extension trait to efficiently read data (e.g. subchunks) from arrays using the `sharding_indexed` codec.
205///s
206/// Sharding indexes are cached in a [`AsyncArrayShardedReadableExtCache`] enabling faster retrieval.
207// TODO: Add default methods? Or change to options: Option<&CodecOptions>? Should really do this for array (breaking)...
208#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
209#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
210pub trait AsyncArrayShardedReadableExt<TStorage: ?Sized + AsyncReadableStorageTraits + 'static>:
211    private::Sealed
212{
213    /// Retrieve the byte range of an encoded subchunk.
214    ///
215    /// # Errors
216    /// Returns an [`ArrayError`] on failure, such as if decoding the shard index fails.
217    async fn async_subchunk_byte_range(
218        &self,
219        cache: &AsyncArrayShardedReadableExtCache,
220        subchunk_indices: &[u64],
221    ) -> Result<Option<ByteRange>, ArrayError>;
222
223    /// Retrieve the encoded bytes of a subchunk.
224    ///
225    /// See [`Array::retrieve_encoded_chunk`].
226    #[allow(clippy::missing_errors_doc)]
227    async fn async_retrieve_encoded_subchunk(
228        &self,
229        cache: &AsyncArrayShardedReadableExtCache,
230        subchunk_indices: &[u64],
231    ) -> Result<Option<Vec<u8>>, ArrayError>;
232
233    // TODO: retrieve_encoded_subchunks
234
235    /// Read and decode the subchunk at `subchunk_indices` into its bytes.
236    ///
237    /// See [`Array::retrieve_chunk_opt`].
238    #[allow(clippy::missing_errors_doc)]
239    async fn async_retrieve_subchunk_opt<T: FromArrayBytes + MaybeSend>(
240        &self,
241        cache: &AsyncArrayShardedReadableExtCache,
242        subchunk_indices: &[u64],
243        options: &CodecOptions,
244    ) -> Result<T, ArrayError>;
245
246    #[deprecated(
247        since = "0.23.0",
248        note = "Use async_retrieve_subchunk_opt::<Vec<T>>() instead"
249    )]
250    /// Read and decode the subchunk at `subchunk_indices` into a vector of its elements.
251    ///
252    /// See [`Array::retrieve_chunk_elements_opt`].
253    #[allow(clippy::missing_errors_doc)]
254    async fn async_retrieve_subchunk_elements_opt<T: ElementOwned + MaybeSend + MaybeSync>(
255        &self,
256        cache: &AsyncArrayShardedReadableExtCache,
257        subchunk_indices: &[u64],
258        options: &CodecOptions,
259    ) -> Result<Vec<T>, ArrayError>;
260
261    #[cfg(feature = "ndarray")]
262    #[deprecated(
263        since = "0.23.0",
264        note = "Use async_retrieve_subchunk_opt::<ndarray::ArrayD<T>>() instead"
265    )]
266    /// Read and decode the subchunk at `subchunk_indices` into an [`ndarray::ArrayD`].
267    ///
268    /// See [`Array::retrieve_chunk_ndarray_opt`].
269    #[allow(clippy::missing_errors_doc)]
270    async fn async_retrieve_subchunk_ndarray_opt<T: ElementOwned + MaybeSend + MaybeSync>(
271        &self,
272        cache: &AsyncArrayShardedReadableExtCache,
273        subchunk_indices: &[u64],
274        options: &CodecOptions,
275    ) -> Result<ndarray::ArrayD<T>, ArrayError>;
276
277    /// Read and decode the chunks at `chunks`.
278    ///
279    /// See [`Array::retrieve_chunks_opt`].
280    #[allow(clippy::missing_errors_doc)]
281    async fn async_retrieve_subchunks_opt<T: FromArrayBytes + MaybeSend>(
282        &self,
283        cache: &AsyncArrayShardedReadableExtCache,
284        subchunks: &dyn ArraySubsetTraits,
285        options: &CodecOptions,
286    ) -> Result<T, ArrayError>;
287
288    #[deprecated(
289        since = "0.23.0",
290        note = "Use async_retrieve_subchunks_opt::<Vec<T>>() instead"
291    )]
292    /// Read and decode the subchunks at `subchunks` into a vector of their elements.
293    ///
294    /// See [`Array::retrieve_chunks_elements_opt`].
295    #[allow(clippy::missing_errors_doc)]
296    async fn async_retrieve_subchunks_elements_opt<T: ElementOwned + MaybeSend + MaybeSync>(
297        &self,
298        cache: &AsyncArrayShardedReadableExtCache,
299        subchunks: &dyn ArraySubsetTraits,
300        options: &CodecOptions,
301    ) -> Result<Vec<T>, ArrayError>;
302
303    #[cfg(feature = "ndarray")]
304    #[deprecated(
305        since = "0.23.0",
306        note = "Use async_retrieve_subchunks_opt::<ndarray::ArrayD<T>>() instead"
307    )]
308    /// Read and decode the subchunks at `subchunks` into an [`ndarray::ArrayD`].
309    ///
310    /// See [`Array::retrieve_chunks_ndarray_opt`].
311    #[allow(clippy::missing_errors_doc)]
312    async fn async_retrieve_subchunks_ndarray_opt<T: ElementOwned + MaybeSend + MaybeSync>(
313        &self,
314        cache: &AsyncArrayShardedReadableExtCache,
315        subchunks: &dyn ArraySubsetTraits,
316        options: &CodecOptions,
317    ) -> Result<ndarray::ArrayD<T>, ArrayError>;
318
319    /// Read and decode the `array_subset` of array.
320    ///
321    /// See [`Array::retrieve_array_subset_opt`].
322    #[allow(clippy::missing_errors_doc)]
323    async fn async_retrieve_array_subset_sharded_opt<T: FromArrayBytes + MaybeSend>(
324        &self,
325        cache: &AsyncArrayShardedReadableExtCache,
326        array_subset: &dyn ArraySubsetTraits,
327        options: &CodecOptions,
328    ) -> Result<T, ArrayError>;
329
330    #[deprecated(
331        since = "0.23.0",
332        note = "Use async_retrieve_array_subset_sharded_opt::<Vec<T>>() instead"
333    )]
334    /// Read and decode the `array_subset` of array into a vector of its elements.
335    ///
336    /// See [`Array::retrieve_array_subset_elements_opt`].
337    #[allow(clippy::missing_errors_doc)]
338    async fn async_retrieve_array_subset_elements_sharded_opt<
339        T: ElementOwned + MaybeSend + MaybeSync,
340    >(
341        &self,
342        cache: &AsyncArrayShardedReadableExtCache,
343        array_subset: &dyn ArraySubsetTraits,
344        options: &CodecOptions,
345    ) -> Result<Vec<T>, ArrayError>;
346
347    #[cfg(feature = "ndarray")]
348    #[deprecated(
349        since = "0.23.0",
350        note = "Use async_retrieve_array_subset_sharded_opt::<ndarray::ArrayD<T>>() instead"
351    )]
352    /// Read and decode the `array_subset` of array into an [`ndarray::ArrayD`].
353    ///
354    /// See [`Array::retrieve_array_subset_ndarray_opt`].
355    #[allow(clippy::missing_errors_doc)]
356    async fn async_retrieve_array_subset_ndarray_sharded_opt<
357        T: ElementOwned + MaybeSend + MaybeSync,
358    >(
359        &self,
360        cache: &AsyncArrayShardedReadableExtCache,
361        array_subset: &dyn ArraySubsetTraits,
362        options: &CodecOptions,
363    ) -> Result<ndarray::ArrayD<T>, ArrayError>;
364}
365
366fn subchunk_shard_index_and_subset<TStorage: ?Sized + AsyncReadableStorageTraits + 'static>(
367    array: &Array<TStorage>,
368    cache: &AsyncArrayShardedReadableExtCache,
369    subchunk_indices: &[u64],
370) -> Result<(Vec<u64>, ArraySubset), ArrayError> {
371    // TODO: Can this logic be simplified?
372    let array_subset = cache
373        .subchunk_grid()
374        .subset(subchunk_indices)?
375        .ok_or_else(|| ArrayError::InvalidChunkGridIndicesError(subchunk_indices.to_vec()))?;
376    let shards = array
377        .chunks_in_array_subset(&array_subset)?
378        .ok_or_else(|| ArrayError::InvalidChunkGridIndicesError(subchunk_indices.to_vec()))?;
379    if shards.num_elements() != 1 {
380        // This should not happen, but it is checked just in case.
381        return Err(ArrayError::InvalidChunkGridIndicesError(
382            subchunk_indices.to_vec(),
383        ));
384    }
385    let shard_indices = shards.start();
386    let shard_origin = array.chunk_origin(shard_indices)?;
387    let shard_subset = array_subset.relative_to(&shard_origin)?;
388    Ok((shard_indices.to_vec(), shard_subset))
389}
390
391fn subchunk_shard_index_and_chunk_index<TStorage: ?Sized + AsyncReadableStorageTraits + 'static>(
392    array: &Array<TStorage>,
393    cache: &AsyncArrayShardedReadableExtCache,
394    subchunk_indices: &[u64],
395) -> Result<(Vec<u64>, Vec<u64>), ArrayError> {
396    // TODO: Simplify this?
397    let (shard_indices, shard_subset) =
398        subchunk_shard_index_and_subset(array, cache, subchunk_indices)?;
399    let effective_subchunk_shape = array.effective_subchunk_shape().expect("array is sharded");
400    let chunk_indices: Vec<u64> = shard_subset
401        .start()
402        .iter()
403        .zip(effective_subchunk_shape.as_slice())
404        .map(|(o, s)| o / s.get())
405        .collect();
406    Ok((shard_indices, chunk_indices))
407}
408
409#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
410#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
411impl<TStorage: ?Sized + AsyncReadableStorageTraits + 'static> AsyncArrayShardedReadableExt<TStorage>
412    for Array<TStorage>
413{
414    async fn async_subchunk_byte_range(
415        &self,
416        cache: &AsyncArrayShardedReadableExtCache,
417        subchunk_indices: &[u64],
418    ) -> Result<Option<ByteRange>, ArrayError> {
419        if cache.array_is_exclusively_sharded() {
420            let (shard_indices, chunk_indices) =
421                subchunk_shard_index_and_chunk_index(self, cache, subchunk_indices)?;
422            let partial_decoder = cache.retrieve(self, &shard_indices).await?;
423            let MaybeShardingPartialDecoder::Sharding(partial_decoder) = partial_decoder else {
424                unreachable!("exlusively sharded")
425            };
426            // TODO: trait upcasting
427            // let partial_decoder: Arc<dyn Any + MaybeSend + MaybeSync> = partial_decoder.clone();
428            // let partial_decoder = partial_decoder
429            //     .downcast::<AsyncShardingPartialDecoder>()
430            //     .expect("array is exclusively sharded");
431
432            Ok(partial_decoder.subchunk_byte_range(&chunk_indices)?)
433        } else {
434            Err(ArrayError::UnsupportedMethod(
435                "the array is not exclusively sharded".to_string(),
436            ))
437        }
438    }
439
440    async fn async_retrieve_encoded_subchunk(
441        &self,
442        cache: &AsyncArrayShardedReadableExtCache,
443        subchunk_indices: &[u64],
444    ) -> Result<Option<Vec<u8>>, ArrayError> {
445        if cache.array_is_exclusively_sharded() {
446            let (shard_indices, chunk_indices) =
447                subchunk_shard_index_and_chunk_index(self, cache, subchunk_indices)?;
448            let partial_decoder = cache.retrieve(self, &shard_indices).await?;
449            let MaybeShardingPartialDecoder::Sharding(partial_decoder) = partial_decoder else {
450                unreachable!("exlusively sharded")
451            };
452            // TODO: trait upcasting
453            // let partial_decoder: Arc<dyn Any + MaybeSend + MaybeSync> = partial_decoder.clone();
454            // let partial_decoder = partial_decoder
455            //     .downcast::<AsyncShardingPartialDecoder>()
456            //     .expect("array is exclusively sharded");
457
458            Ok(partial_decoder
459                .retrieve_subchunk_encoded(&chunk_indices)
460                .await?
461                .map(Vec::from))
462        } else {
463            Err(ArrayError::UnsupportedMethod(
464                "the array is not exclusively sharded".to_string(),
465            ))
466        }
467    }
468
469    async fn async_retrieve_subchunk_opt<T: FromArrayBytes + MaybeSend>(
470        &self,
471        cache: &AsyncArrayShardedReadableExtCache,
472        subchunk_indices: &[u64],
473        options: &CodecOptions,
474    ) -> Result<T, ArrayError> {
475        if cache.array_is_sharded() {
476            let (shard_indices, shard_subset) =
477                subchunk_shard_index_and_subset(self, cache, subchunk_indices)?;
478            let partial_decoder = cache.retrieve(self, &shard_indices).await?;
479            let bytes = partial_decoder
480                .partial_decode(&shard_subset, options)
481                .await?
482                .into_owned();
483            T::from_array_bytes(bytes, shard_subset.shape(), self.data_type())
484        } else {
485            self.async_retrieve_chunk_opt(subchunk_indices, options)
486                .await
487        }
488    }
489
490    async fn async_retrieve_subchunk_elements_opt<T: ElementOwned + MaybeSend + MaybeSync>(
491        &self,
492        cache: &AsyncArrayShardedReadableExtCache,
493        subchunk_indices: &[u64],
494        options: &CodecOptions,
495    ) -> Result<Vec<T>, ArrayError> {
496        self.async_retrieve_subchunk_opt(cache, subchunk_indices, options)
497            .await
498    }
499
500    #[cfg(feature = "ndarray")]
501    async fn async_retrieve_subchunk_ndarray_opt<T: ElementOwned + MaybeSend + MaybeSync>(
502        &self,
503        cache: &AsyncArrayShardedReadableExtCache,
504        subchunk_indices: &[u64],
505        options: &CodecOptions,
506    ) -> Result<ndarray::ArrayD<T>, ArrayError> {
507        self.async_retrieve_subchunk_opt(cache, subchunk_indices, options)
508            .await
509    }
510
511    async fn async_retrieve_subchunks_opt<T: FromArrayBytes + MaybeSend>(
512        &self,
513        cache: &AsyncArrayShardedReadableExtCache,
514        subchunks: &dyn ArraySubsetTraits,
515        options: &CodecOptions,
516    ) -> Result<T, ArrayError> {
517        if cache.array_is_sharded() {
518            let subchunk_grid = cache.subchunk_grid();
519            let array_subset = subchunk_grid.chunks_subset(subchunks)?.ok_or_else(|| {
520                ArrayError::InvalidArraySubset(
521                    subchunks.to_array_subset(),
522                    subchunk_grid.grid_shape().to_vec(),
523                )
524            })?;
525            self.async_retrieve_array_subset_sharded_opt(cache, &array_subset, options)
526                .await
527        } else {
528            self.async_retrieve_chunks_opt(subchunks, options).await
529        }
530    }
531
532    async fn async_retrieve_subchunks_elements_opt<T: ElementOwned + MaybeSend + MaybeSync>(
533        &self,
534        cache: &AsyncArrayShardedReadableExtCache,
535        subchunks: &dyn ArraySubsetTraits,
536        options: &CodecOptions,
537    ) -> Result<Vec<T>, ArrayError> {
538        self.async_retrieve_subchunks_opt(cache, subchunks, options)
539            .await
540    }
541
542    #[cfg(feature = "ndarray")]
543    async fn async_retrieve_subchunks_ndarray_opt<T: ElementOwned + MaybeSend + MaybeSync>(
544        &self,
545        cache: &AsyncArrayShardedReadableExtCache,
546        subchunks: &dyn ArraySubsetTraits,
547        options: &CodecOptions,
548    ) -> Result<ndarray::ArrayD<T>, ArrayError> {
549        self.async_retrieve_subchunks_opt(cache, subchunks, options)
550            .await
551    }
552
553    #[allow(clippy::too_many_lines)]
554    async fn async_retrieve_array_subset_sharded_opt<T: FromArrayBytes + MaybeSend>(
555        &self,
556        cache: &AsyncArrayShardedReadableExtCache,
557        array_subset: &dyn ArraySubsetTraits,
558        options: &CodecOptions,
559    ) -> Result<T, ArrayError> {
560        if cache.array_is_sharded() {
561            // Find the shards intersecting this array subset
562            let shards = self.chunks_in_array_subset(array_subset)?;
563            let Some(shards) = shards else {
564                return Err(ArrayError::InvalidArraySubset(
565                    array_subset.to_array_subset(),
566                    self.shape().to_vec(),
567                ));
568            };
569
570            // Retrieve chunk bytes
571            let num_shards = shards.num_elements_usize();
572            let array_subset_start = array_subset.start();
573            let array_subset_shape = array_subset.shape();
574            let bytes = if num_shards == 0 {
575                ArrayBytes::new_fill_value(
576                    self.data_type(),
577                    array_subset.num_elements(),
578                    self.fill_value(),
579                )
580                .map_err(CodecError::from)
581                .map_err(ArrayError::from)?
582            } else {
583                // Calculate chunk/codec concurrency
584                let chunk_shape = self.chunk_shape(&vec![0; self.dimensionality()])?;
585                let codec_concurrency =
586                    self.recommended_codec_concurrency(&chunk_shape, self.data_type())?;
587                let (chunk_concurrent_limit, options) = concurrency_chunks_and_codec(
588                    options.concurrent_target(),
589                    num_shards,
590                    options,
591                    &codec_concurrency,
592                );
593                let options = Arc::new(options);
594
595                match self.data_type().size() {
596                    DataTypeSize::Variable => {
597                        let retrieve_subchunk = |shard_indices: ArrayIndicesTinyVec| {
598                            let options = options.clone();
599                            let array_subset_start = &array_subset_start;
600                            async move {
601                                let shard_subset = self.chunk_subset(&shard_indices)?;
602                                let shard_subset_overlap = shard_subset.overlap(array_subset)?;
603                                let bytes = cache
604                                    .retrieve(self, &shard_indices)
605                                    .await?
606                                    .partial_decode(
607                                        &shard_subset_overlap.relative_to(shard_subset.start())?,
608                                        &options,
609                                    )
610                                    .await?
611                                    .into_owned()
612                                    .into_variable()?;
613                                Ok::<_, ArrayError>((
614                                    bytes,
615                                    shard_subset_overlap.relative_to(array_subset_start)?,
616                                ))
617                            }
618                        };
619
620                        let indices = shards.indices();
621                        let futures = indices.into_iter().map(retrieve_subchunk);
622                        let chunk_bytes_and_subsets = futures::stream::iter(futures)
623                            .buffered(chunk_concurrent_limit)
624                            .try_collect()
625                            .await?;
626                        ArrayBytes::Variable(merge_chunks_vlen(
627                            chunk_bytes_and_subsets,
628                            &array_subset_shape,
629                        ))
630                    }
631                    DataTypeSize::Fixed(data_type_size) => {
632                        let size_output = array_subset.num_elements_usize() * data_type_size;
633                        if size_output == 0 {
634                            ArrayBytes::new_flen(vec![])
635                        } else {
636                            let mut output = Vec::with_capacity(size_output);
637                            {
638                                let output =
639                                    UnsafeCellSlice::new_from_vec_with_spare_capacity(&mut output);
640                                let retrieve_shard_into_slice =
641                                    |shard_indices: ArrayIndicesTinyVec| {
642                                        let options = options.clone();
643                                        let array_subset_start = &array_subset_start;
644                                        let array_subset_shape = &array_subset_shape;
645                                        async move {
646                                            let shard_subset = self.chunk_subset(&shard_indices)?;
647                                            let shard_subset_overlap =
648                                                shard_subset.overlap(array_subset)?;
649                                            let mut output_view = unsafe {
650                                                // SAFETY: chunks represent disjoint array subsets
651                                                ArrayBytesFixedDisjointView::new(
652                                                    output,
653                                                    data_type_size,
654                                                    array_subset_shape.as_ref(),
655                                                    shard_subset_overlap
656                                                        .relative_to(array_subset_start)?,
657                                                )?
658                                            };
659                                            cache
660                                                .retrieve(self, &shard_indices)
661                                                .await?
662                                                .partial_decode_into(
663                                                    &shard_subset_overlap
664                                                        .relative_to(shard_subset.start())?,
665                                                    (&mut output_view).into(),
666                                                    &options,
667                                                )
668                                                .await?;
669                                            Ok::<_, ArrayError>(())
670                                        }
671                                    };
672
673                                futures::stream::iter(&shards.indices())
674                                    .map(Ok)
675                                    .try_for_each_concurrent(
676                                        Some(chunk_concurrent_limit),
677                                        retrieve_shard_into_slice,
678                                    )
679                                    .await?;
680                            }
681                            unsafe { output.set_len(size_output) };
682                            ArrayBytes::from(output)
683                        }
684                    }
685                }
686            };
687            T::from_array_bytes(bytes, &array_subset_shape, self.data_type())
688        } else {
689            self.async_retrieve_array_subset_opt(array_subset, options)
690                .await
691        }
692    }
693
694    async fn async_retrieve_array_subset_elements_sharded_opt<
695        T: ElementOwned + MaybeSend + MaybeSync,
696    >(
697        &self,
698        cache: &AsyncArrayShardedReadableExtCache,
699        array_subset: &dyn ArraySubsetTraits,
700        options: &CodecOptions,
701    ) -> Result<Vec<T>, ArrayError> {
702        self.async_retrieve_array_subset_sharded_opt(cache, array_subset, options)
703            .await
704    }
705
706    #[cfg(feature = "ndarray")]
707    async fn async_retrieve_array_subset_ndarray_sharded_opt<
708        T: ElementOwned + MaybeSend + MaybeSync,
709    >(
710        &self,
711        cache: &AsyncArrayShardedReadableExtCache,
712        array_subset: &dyn ArraySubsetTraits,
713        options: &CodecOptions,
714    ) -> Result<ndarray::ArrayD<T>, ArrayError> {
715        self.async_retrieve_array_subset_sharded_opt(cache, array_subset, options)
716            .await
717    }
718}
719
720mod private {
721    use super::{Array, AsyncReadableStorageTraits};
722
723    pub trait Sealed {}
724
725    impl<TStorage: ?Sized + AsyncReadableStorageTraits + 'static> Sealed for Array<TStorage> {}
726}
727
728#[cfg(test)]
729mod tests {
730    use std::num::NonZeroU64;
731    use std::sync::Arc;
732
733    use super::*;
734    use crate::array::codec::TransposeCodec;
735    use crate::array::codec::array_to_bytes::sharding::ShardingCodecBuilder;
736    use crate::array::{ArrayBuilder, ArraySubset, data_type};
737    use zarrs_metadata_ext::codec::transpose::TransposeOrder;
738    use zarrs_storage::storage_adapter::performance_metrics::PerformanceMetricsStorageAdapter;
739
740    async fn array_sharded_ext_impl(sharded: bool) -> Result<(), Box<dyn std::error::Error>> {
741        let store = object_store::memory::InMemory::new();
742        let store = Arc::new(zarrs_object_store::AsyncObjectStore::new(store));
743        let array_path = "/array";
744        let mut builder = ArrayBuilder::new(
745            vec![8, 8], // array shape
746            vec![4, 4], // regular chunk shape
747            data_type::uint16(),
748            0u16,
749        );
750        builder.bytes_to_bytes_codecs(vec![
751            #[cfg(feature = "gzip")]
752            Arc::new(crate::array::codec::GzipCodec::new(5)?),
753        ]);
754        if sharded {
755            builder.subchunk_shape(vec![2, 2]);
756        }
757        let array = builder.build(store, array_path)?;
758
759        let data: Vec<u16> = (0..array.shape().iter().product())
760            .map(|i| i as u16)
761            .collect();
762
763        array
764            .async_store_array_subset(&array.subset_all(), &data)
765            .await?;
766
767        let cache = AsyncArrayShardedReadableExtCache::new(&array);
768        assert_eq!(array.is_sharded(), sharded);
769        let subchunk_grid = array.subchunk_grid();
770        if sharded {
771            assert_eq!(
772                array.subchunk_shape(),
773                Some(vec![NonZeroU64::new(2).unwrap(); 2])
774            );
775            assert_eq!(subchunk_grid.grid_shape(), &[4, 4]);
776
777            let compare = array
778                .async_retrieve_array_subset::<Vec<u16>>(&[4..6, 6..8])
779                .await?;
780            let test = array
781                .async_retrieve_subchunk_opt::<Vec<u16>>(&cache, &[2, 3], &CodecOptions::default())
782                .await?;
783            assert_eq!(compare, test);
784            assert_eq!(cache.len().await, 1);
785
786            #[cfg(feature = "ndarray")]
787            {
788                let compare = array
789                    .async_retrieve_array_subset::<ndarray::ArrayD<u16>>(&[4..6, 6..8])
790                    .await?;
791                let test = array
792                    .async_retrieve_subchunk_opt::<ndarray::ArrayD<u16>>(
793                        &cache,
794                        &[2, 3],
795                        &CodecOptions::default(),
796                    )
797                    .await?;
798                assert_eq!(compare, test);
799            }
800
801            cache.clear().await;
802            assert_eq!(cache.len().await, 0);
803
804            let subset = ArraySubset::new_with_ranges(&[3..7, 3..7]);
805            let compare = array
806                .async_retrieve_array_subset::<Vec<u16>>(&subset)
807                .await?;
808            let test = array
809                .async_retrieve_array_subset_sharded_opt::<Vec<u16>>(
810                    &cache,
811                    &subset,
812                    &CodecOptions::default(),
813                )
814                .await?;
815            assert_eq!(compare, test);
816            assert_eq!(cache.len().await, 4);
817
818            #[cfg(feature = "ndarray")]
819            {
820                let subset = ArraySubset::new_with_ranges(&[3..7, 3..7]);
821                let compare = array
822                    .async_retrieve_array_subset::<ndarray::ArrayD<u16>>(&subset)
823                    .await?;
824                let test = array
825                    .async_retrieve_array_subset_sharded_opt::<ndarray::ArrayD<u16>>(
826                        &cache,
827                        &subset,
828                        &CodecOptions::default(),
829                    )
830                    .await?;
831                assert_eq!(compare, test);
832            }
833
834            let subset = ArraySubset::new_with_ranges(&[2..6, 2..6]);
835            let subchunks = ArraySubset::new_with_ranges(&[1..3, 1..3]);
836            let compare = array
837                .async_retrieve_array_subset::<Vec<u16>>(&subset)
838                .await?;
839            let test = array
840                .async_retrieve_subchunks_opt::<Vec<u16>>(
841                    &cache,
842                    &subchunks,
843                    &CodecOptions::default(),
844                )
845                .await?;
846            assert_eq!(compare, test);
847            assert_eq!(cache.len().await, 4);
848
849            #[cfg(feature = "ndarray")]
850            {
851                let subset = ArraySubset::new_with_ranges(&[2..6, 2..6]);
852                let subchunks = ArraySubset::new_with_ranges(&[1..3, 1..3]);
853                let compare = array
854                    .async_retrieve_array_subset::<ndarray::ArrayD<u16>>(&subset)
855                    .await?;
856                let test = array
857                    .async_retrieve_subchunks_opt::<ndarray::ArrayD<u16>>(
858                        &cache,
859                        &subchunks,
860                        &CodecOptions::default(),
861                    )
862                    .await?;
863                assert_eq!(compare, test);
864                assert_eq!(cache.len().await, 4);
865            }
866
867            let encoded_subchunk = array
868                .async_retrieve_encoded_subchunk(&cache, &[0, 0])
869                .await?
870                .unwrap();
871            assert_eq!(
872                array
873                    .async_subchunk_byte_range(&cache, &[0, 0])
874                    .await?
875                    .unwrap()
876                    .length(u64::MAX),
877                encoded_subchunk.len() as u64
878            );
879            // assert_eq!(
880            //     u16::from_array_bytes(array.data_type(), encoded_subchunk.into())?,
881            //     array.async_retrieve_chunk_elements::<u16>(&[0, 0])?
882            // );
883        } else {
884            assert_eq!(array.subchunk_shape(), None);
885            assert_eq!(subchunk_grid.grid_shape(), &[2, 2]);
886
887            let compare = array
888                .async_retrieve_array_subset::<Vec<u16>>(&[4..8, 4..8])
889                .await?;
890            let test = array
891                .async_retrieve_subchunk_opt::<Vec<u16>>(&cache, &[1, 1], &CodecOptions::default())
892                .await?;
893            assert_eq!(compare, test);
894
895            let subset = ArraySubset::new_with_ranges(&[3..7, 3..7]);
896            let compare = array
897                .async_retrieve_array_subset::<Vec<u16>>(&subset)
898                .await?;
899            let test = array
900                .async_retrieve_array_subset_sharded_opt::<Vec<u16>>(
901                    &cache,
902                    &subset,
903                    &CodecOptions::default(),
904                )
905                .await?;
906            assert_eq!(compare, test);
907            assert!(cache.is_empty().await);
908
909            assert!(
910                array
911                    .async_retrieve_encoded_subchunk(&cache, &[0, 0])
912                    .await
913                    .is_err()
914            );
915            assert!(
916                array
917                    .async_subchunk_byte_range(&cache, &[0, 0])
918                    .await
919                    .is_err()
920            );
921        }
922
923        Ok(())
924    }
925
926    #[tokio::test]
927    async fn async_array_sharded_ext_sharded() -> Result<(), Box<dyn std::error::Error>> {
928        array_sharded_ext_impl(true).await
929    }
930
931    #[tokio::test]
932    async fn async_array_sharded_ext_unsharded() -> Result<(), Box<dyn std::error::Error>> {
933        array_sharded_ext_impl(false).await
934    }
935
936    async fn array_sharded_ext_impl_transpose(
937        valid_subchunk_shape: bool,
938    ) -> Result<(), Box<dyn std::error::Error>> {
939        let store = object_store::memory::InMemory::new();
940        let store = Arc::new(zarrs_object_store::AsyncObjectStore::new(store));
941        let store = Arc::new(PerformanceMetricsStorageAdapter::new(store));
942
943        let array_path = "/array";
944        let mut builder = ArrayBuilder::new(
945            vec![16, 16, 9], // array shape
946            vec![8, 4, 3],   // regular chunk shape
947            data_type::uint32(),
948            0u32,
949        );
950        builder.array_to_array_codecs(vec![Arc::new(TransposeCodec::new(TransposeOrder::new(
951            &[1, 0, 2],
952        )?))]);
953        builder.array_to_bytes_codec(Arc::new(
954            ShardingCodecBuilder::new(
955                vec![
956                    NonZeroU64::new(1).unwrap(),
957                    if valid_subchunk_shape {
958                        NonZeroU64::new(2).unwrap()
959                    } else {
960                        NonZeroU64::new(3).unwrap()
961                    },
962                    NonZeroU64::new(3).unwrap(),
963                ],
964                &data_type::uint32(),
965            )
966            .bytes_to_bytes_codecs(vec![
967                #[cfg(feature = "gzip")]
968                Arc::new(crate::array::codec::GzipCodec::new(5)?),
969            ])
970            .build(),
971        ));
972        let array = builder.build(store.clone(), array_path)?;
973
974        let subchunk_grid = array.subchunk_grid();
975        if valid_subchunk_shape {
976            //  Config:
977            //  16 x 16 x 9 Array shape
978            //   8 x  4 x 3 Chunk (shard) shape
979            //   1 x  2 x 3 Subchunk shape
980            //      [1,0,2] Transpose order
981            //  Calculations:
982            //   2 x  4 x 3 Number of shards (chunk grid shape)
983            //   4 x  8 x 3 Transposed shard shape
984            //   4 x  4 x 1 Subchunks per (transposed) shard
985            //   8 x 16 x 3 Subchunk grid shape
986            //   2 x  1 x 3 Effective subchunk shape (read granularity)
987
988            assert_eq!(array.chunk_grid_shape(), &[2, 4, 3]);
989            assert_eq!(
990                array.subchunk_shape(),
991                Some(vec![
992                    NonZeroU64::new(1).unwrap(),
993                    NonZeroU64::new(2).unwrap(),
994                    NonZeroU64::new(3).unwrap()
995                ])
996            );
997            assert_eq!(
998                array.effective_subchunk_shape(),
999                Some(vec![
1000                    NonZeroU64::new(2).unwrap(),
1001                    NonZeroU64::new(1).unwrap(),
1002                    NonZeroU64::new(3).unwrap()
1003                ])
1004            ); // NOTE: transposed
1005            assert_eq!(subchunk_grid.grid_shape(), &[8, 16, 3]);
1006        } else {
1007            // skip above tests if the subchunk shape is invalid, below calls fail with
1008            // CodecError(Other("invalid subchunk shape [1, 3, 3], it must evenly divide [4, 8, 3]"))
1009        }
1010
1011        let data: Vec<u32> = (0..array.shape().iter().product())
1012            .map(|i| i as u32)
1013            .collect();
1014        array
1015            .async_store_array_subset(&array.subset_all(), &data)
1016            .await?;
1017
1018        // Retrieving a subchunk should be exactly 2 reads: index + chunk
1019        let subchunk_subset = subchunk_grid.subset(&[0, 0, 0])?.unwrap();
1020        let subchunk_data = array
1021            .async_retrieve_array_subset::<Vec<u32>>(&subchunk_subset)
1022            .await?;
1023        assert_eq!(subchunk_data, &[0, 1, 2, 144, 145, 146]);
1024        assert_eq!(store.reads(), 2);
1025
1026        Ok(())
1027    }
1028
1029    #[tokio::test]
1030    async fn async_array_sharded_ext_impl_transpose_valid_subchunk_shape() {
1031        assert!(array_sharded_ext_impl_transpose(true).await.is_ok());
1032    }
1033
1034    #[tokio::test]
1035    async fn async_array_sharded_ext_impl_transpose_invalid_subchunk_shape() {
1036        assert_eq!(
1037            array_sharded_ext_impl_transpose(false)
1038                .await
1039                .unwrap_err()
1040                .to_string(),
1041            "invalid subchunk shape [1, 3, 3], it must evenly divide shard shape [4, 8, 3]"
1042        );
1043    }
1044}