1use std::collections::HashMap;
2use std::sync::Arc;
3
4use futures::{StreamExt, TryStreamExt};
5use unsafe_cell_slice::UnsafeCellSlice;
6
7use super::codec::ShardingCodec;
8use super::codec::array_to_bytes::sharding::AsyncShardingPartialDecoder;
9use super::concurrency::concurrency_chunks_and_codec;
10use super::element::ElementOwned;
11use super::from_array_bytes::FromArrayBytes;
12use super::{
13 Array, ArrayBytes, ArrayBytesFixedDisjointView, ArrayError, ArrayIndicesTinyVec,
14 ArrayShardedExt, ChunkGrid, DataTypeSize,
15};
16use crate::array::array_bytes_internal::merge_chunks_vlen;
17use crate::array::{ArraySubset, ArraySubsetTraits};
18use zarrs_codec::{
19 ArrayBytesDecodeIntoTarget, AsyncArrayPartialDecoderTraits, AsyncStoragePartialDecoder,
20 CodecError, CodecOptions,
21};
22use zarrs_metadata::ConfigurationSerialize;
23use zarrs_metadata_ext::codec::sharding::ShardingCodecConfiguration;
24use zarrs_storage::byte_range::ByteRange;
25use zarrs_storage::{AsyncReadableStorageTraits, MaybeSend, MaybeSync, StorageHandle};
26
27#[derive(Clone)]
29enum MaybeShardingPartialDecoder {
30 Sharding(Arc<AsyncShardingPartialDecoder>),
31 Other(Arc<dyn AsyncArrayPartialDecoderTraits>),
32}
33
34impl MaybeShardingPartialDecoder {
35 async fn partial_decode<'a>(
36 &'a self,
37 indexer: &dyn crate::array::Indexer,
38 options: &CodecOptions,
39 ) -> Result<ArrayBytes<'a>, CodecError> {
40 match self {
41 Self::Sharding(partial_decoder) => {
42 partial_decoder.partial_decode(indexer, options).await
43 }
44 Self::Other(partial_decoder) => partial_decoder.partial_decode(indexer, options).await,
45 }
46 }
47
48 async fn partial_decode_into(
49 &self,
50 indexer: &dyn crate::array::Indexer,
51 output_target: ArrayBytesDecodeIntoTarget<'_>,
52 options: &CodecOptions,
53 ) -> Result<(), CodecError> {
54 match self {
55 Self::Sharding(partial_decoder) => {
56 partial_decoder
57 .partial_decode_into(indexer, output_target, options)
58 .await
59 }
60 Self::Other(partial_decoder) => {
61 partial_decoder
62 .partial_decode_into(indexer, output_target, options)
63 .await
64 }
65 }
66 }
67}
68
69type PartialDecoderHashMap = HashMap<Vec<u64>, MaybeShardingPartialDecoder>;
70
71pub struct AsyncArrayShardedReadableExtCache {
73 array_is_sharded: bool,
74 array_is_exclusively_sharded: bool,
75 subchunk_grid: ChunkGrid,
76 cache: Arc<async_lock::Mutex<PartialDecoderHashMap>>,
77}
78
79impl AsyncArrayShardedReadableExtCache {
80 #[must_use]
82 pub fn new<TStorage: ?Sized + AsyncReadableStorageTraits>(array: &Array<TStorage>) -> Self {
83 let subchunk_grid = array.subchunk_grid();
84 Self {
85 array_is_sharded: array.is_sharded(),
86 array_is_exclusively_sharded: array.is_exclusively_sharded(),
87 subchunk_grid,
88 cache: Arc::new(async_lock::Mutex::new(HashMap::default())),
89 }
90 }
91
92 #[must_use]
96 pub fn array_is_sharded(&self) -> bool {
97 self.array_is_sharded
98 }
99
100 #[must_use]
104 pub fn array_is_exclusively_sharded(&self) -> bool {
105 self.array_is_exclusively_sharded
106 }
107
108 fn subchunk_grid(&self) -> &ChunkGrid {
109 &self.subchunk_grid
110 }
111
112 #[must_use]
114 #[allow(clippy::missing_panics_doc)]
115 pub async fn len(&self) -> usize {
116 self.cache.lock().await.len()
117 }
118
119 #[must_use]
121 #[allow(clippy::missing_panics_doc)]
122 pub async fn is_empty(&self) -> bool {
123 self.cache.lock().await.is_empty()
124 }
125
126 #[allow(clippy::missing_panics_doc)]
128 pub async fn clear(&self) {
129 self.cache.lock().await.clear();
130 }
131
132 async fn retrieve<TStorage: ?Sized + AsyncReadableStorageTraits + 'static>(
133 &self,
134 array: &Array<TStorage>,
135 shard_indices: &[u64],
136 ) -> Result<MaybeShardingPartialDecoder, ArrayError> {
137 let mut cache = self.cache.lock().await;
138 if let Some(partial_decoder) = cache.get(shard_indices) {
139 Ok(partial_decoder.clone())
140 } else if self.array_is_exclusively_sharded() {
141 let storage_handle = Arc::new(StorageHandle::new(array.storage.clone()));
143 let storage_transformer = array
144 .storage_transformers()
145 .create_async_readable_transformer(storage_handle)
146 .await?;
147 let input_handle = Arc::new(AsyncStoragePartialDecoder::new(
148 storage_transformer,
149 array.chunk_key(shard_indices),
150 ));
151
152 let chunk_shape = array.chunk_shape(shard_indices)?;
154 let sharding_codec_configuration = array
155 .codecs()
156 .array_to_bytes_codec()
157 .configuration_v3(array.metadata_options.codec_metadata_options())
158 .expect("valid sharding metadata");
159 let sharding_codec_configuration =
160 ShardingCodecConfiguration::try_from_configuration(sharding_codec_configuration)
161 .expect("valid sharding configuration");
162 let sharding_codec = Arc::new(
163 ShardingCodec::new_with_configuration(&sharding_codec_configuration).expect(
164 "supported sharding codec configuration, already instantiated in array",
165 ),
166 );
167 let partial_decoder = MaybeShardingPartialDecoder::Sharding(Arc::new(
168 AsyncShardingPartialDecoder::new(
169 input_handle,
170 array.data_type().clone(),
171 array.fill_value().clone(),
172 chunk_shape.clone(),
173 sharding_codec.subchunk_shape.clone(),
174 sharding_codec.inner_codecs.clone(),
175 &sharding_codec.index_codecs,
176 sharding_codec.index_location,
177 &array.codec_options,
178 sharding_codec.options.clone(),
179 )
180 .await?,
181 ));
182 cache.insert(shard_indices.to_vec(), partial_decoder.clone());
193 Ok(partial_decoder)
194 } else {
195 let partial_decoder = MaybeShardingPartialDecoder::Other(
196 array.async_partial_decoder(shard_indices).await?,
197 );
198 cache.insert(shard_indices.to_vec(), partial_decoder.clone());
199 Ok(partial_decoder)
200 }
201 }
202}
203
204#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
209#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
210pub trait AsyncArrayShardedReadableExt<TStorage: ?Sized + AsyncReadableStorageTraits + 'static>:
211 private::Sealed
212{
213 async fn async_subchunk_byte_range(
218 &self,
219 cache: &AsyncArrayShardedReadableExtCache,
220 subchunk_indices: &[u64],
221 ) -> Result<Option<ByteRange>, ArrayError>;
222
223 #[allow(clippy::missing_errors_doc)]
227 async fn async_retrieve_encoded_subchunk(
228 &self,
229 cache: &AsyncArrayShardedReadableExtCache,
230 subchunk_indices: &[u64],
231 ) -> Result<Option<Vec<u8>>, ArrayError>;
232
233 #[allow(clippy::missing_errors_doc)]
239 async fn async_retrieve_subchunk_opt<T: FromArrayBytes + MaybeSend>(
240 &self,
241 cache: &AsyncArrayShardedReadableExtCache,
242 subchunk_indices: &[u64],
243 options: &CodecOptions,
244 ) -> Result<T, ArrayError>;
245
246 #[deprecated(
247 since = "0.23.0",
248 note = "Use async_retrieve_subchunk_opt::<Vec<T>>() instead"
249 )]
250 #[allow(clippy::missing_errors_doc)]
254 async fn async_retrieve_subchunk_elements_opt<T: ElementOwned + MaybeSend + MaybeSync>(
255 &self,
256 cache: &AsyncArrayShardedReadableExtCache,
257 subchunk_indices: &[u64],
258 options: &CodecOptions,
259 ) -> Result<Vec<T>, ArrayError>;
260
261 #[cfg(feature = "ndarray")]
262 #[deprecated(
263 since = "0.23.0",
264 note = "Use async_retrieve_subchunk_opt::<ndarray::ArrayD<T>>() instead"
265 )]
266 #[allow(clippy::missing_errors_doc)]
270 async fn async_retrieve_subchunk_ndarray_opt<T: ElementOwned + MaybeSend + MaybeSync>(
271 &self,
272 cache: &AsyncArrayShardedReadableExtCache,
273 subchunk_indices: &[u64],
274 options: &CodecOptions,
275 ) -> Result<ndarray::ArrayD<T>, ArrayError>;
276
277 #[allow(clippy::missing_errors_doc)]
281 async fn async_retrieve_subchunks_opt<T: FromArrayBytes + MaybeSend>(
282 &self,
283 cache: &AsyncArrayShardedReadableExtCache,
284 subchunks: &dyn ArraySubsetTraits,
285 options: &CodecOptions,
286 ) -> Result<T, ArrayError>;
287
288 #[deprecated(
289 since = "0.23.0",
290 note = "Use async_retrieve_subchunks_opt::<Vec<T>>() instead"
291 )]
292 #[allow(clippy::missing_errors_doc)]
296 async fn async_retrieve_subchunks_elements_opt<T: ElementOwned + MaybeSend + MaybeSync>(
297 &self,
298 cache: &AsyncArrayShardedReadableExtCache,
299 subchunks: &dyn ArraySubsetTraits,
300 options: &CodecOptions,
301 ) -> Result<Vec<T>, ArrayError>;
302
303 #[cfg(feature = "ndarray")]
304 #[deprecated(
305 since = "0.23.0",
306 note = "Use async_retrieve_subchunks_opt::<ndarray::ArrayD<T>>() instead"
307 )]
308 #[allow(clippy::missing_errors_doc)]
312 async fn async_retrieve_subchunks_ndarray_opt<T: ElementOwned + MaybeSend + MaybeSync>(
313 &self,
314 cache: &AsyncArrayShardedReadableExtCache,
315 subchunks: &dyn ArraySubsetTraits,
316 options: &CodecOptions,
317 ) -> Result<ndarray::ArrayD<T>, ArrayError>;
318
319 #[allow(clippy::missing_errors_doc)]
323 async fn async_retrieve_array_subset_sharded_opt<T: FromArrayBytes + MaybeSend>(
324 &self,
325 cache: &AsyncArrayShardedReadableExtCache,
326 array_subset: &dyn ArraySubsetTraits,
327 options: &CodecOptions,
328 ) -> Result<T, ArrayError>;
329
330 #[deprecated(
331 since = "0.23.0",
332 note = "Use async_retrieve_array_subset_sharded_opt::<Vec<T>>() instead"
333 )]
334 #[allow(clippy::missing_errors_doc)]
338 async fn async_retrieve_array_subset_elements_sharded_opt<
339 T: ElementOwned + MaybeSend + MaybeSync,
340 >(
341 &self,
342 cache: &AsyncArrayShardedReadableExtCache,
343 array_subset: &dyn ArraySubsetTraits,
344 options: &CodecOptions,
345 ) -> Result<Vec<T>, ArrayError>;
346
347 #[cfg(feature = "ndarray")]
348 #[deprecated(
349 since = "0.23.0",
350 note = "Use async_retrieve_array_subset_sharded_opt::<ndarray::ArrayD<T>>() instead"
351 )]
352 #[allow(clippy::missing_errors_doc)]
356 async fn async_retrieve_array_subset_ndarray_sharded_opt<
357 T: ElementOwned + MaybeSend + MaybeSync,
358 >(
359 &self,
360 cache: &AsyncArrayShardedReadableExtCache,
361 array_subset: &dyn ArraySubsetTraits,
362 options: &CodecOptions,
363 ) -> Result<ndarray::ArrayD<T>, ArrayError>;
364}
365
366fn subchunk_shard_index_and_subset<TStorage: ?Sized + AsyncReadableStorageTraits + 'static>(
367 array: &Array<TStorage>,
368 cache: &AsyncArrayShardedReadableExtCache,
369 subchunk_indices: &[u64],
370) -> Result<(Vec<u64>, ArraySubset), ArrayError> {
371 let array_subset = cache
373 .subchunk_grid()
374 .subset(subchunk_indices)?
375 .ok_or_else(|| ArrayError::InvalidChunkGridIndicesError(subchunk_indices.to_vec()))?;
376 let shards = array
377 .chunks_in_array_subset(&array_subset)?
378 .ok_or_else(|| ArrayError::InvalidChunkGridIndicesError(subchunk_indices.to_vec()))?;
379 if shards.num_elements() != 1 {
380 return Err(ArrayError::InvalidChunkGridIndicesError(
382 subchunk_indices.to_vec(),
383 ));
384 }
385 let shard_indices = shards.start();
386 let shard_origin = array.chunk_origin(shard_indices)?;
387 let shard_subset = array_subset.relative_to(&shard_origin)?;
388 Ok((shard_indices.to_vec(), shard_subset))
389}
390
391fn subchunk_shard_index_and_chunk_index<TStorage: ?Sized + AsyncReadableStorageTraits + 'static>(
392 array: &Array<TStorage>,
393 cache: &AsyncArrayShardedReadableExtCache,
394 subchunk_indices: &[u64],
395) -> Result<(Vec<u64>, Vec<u64>), ArrayError> {
396 let (shard_indices, shard_subset) =
398 subchunk_shard_index_and_subset(array, cache, subchunk_indices)?;
399 let effective_subchunk_shape = array.effective_subchunk_shape().expect("array is sharded");
400 let chunk_indices: Vec<u64> = shard_subset
401 .start()
402 .iter()
403 .zip(effective_subchunk_shape.as_slice())
404 .map(|(o, s)| o / s.get())
405 .collect();
406 Ok((shard_indices, chunk_indices))
407}
408
409#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
410#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
411impl<TStorage: ?Sized + AsyncReadableStorageTraits + 'static> AsyncArrayShardedReadableExt<TStorage>
412 for Array<TStorage>
413{
414 async fn async_subchunk_byte_range(
415 &self,
416 cache: &AsyncArrayShardedReadableExtCache,
417 subchunk_indices: &[u64],
418 ) -> Result<Option<ByteRange>, ArrayError> {
419 if cache.array_is_exclusively_sharded() {
420 let (shard_indices, chunk_indices) =
421 subchunk_shard_index_and_chunk_index(self, cache, subchunk_indices)?;
422 let partial_decoder = cache.retrieve(self, &shard_indices).await?;
423 let MaybeShardingPartialDecoder::Sharding(partial_decoder) = partial_decoder else {
424 unreachable!("exlusively sharded")
425 };
426 Ok(partial_decoder.subchunk_byte_range(&chunk_indices)?)
433 } else {
434 Err(ArrayError::UnsupportedMethod(
435 "the array is not exclusively sharded".to_string(),
436 ))
437 }
438 }
439
440 async fn async_retrieve_encoded_subchunk(
441 &self,
442 cache: &AsyncArrayShardedReadableExtCache,
443 subchunk_indices: &[u64],
444 ) -> Result<Option<Vec<u8>>, ArrayError> {
445 if cache.array_is_exclusively_sharded() {
446 let (shard_indices, chunk_indices) =
447 subchunk_shard_index_and_chunk_index(self, cache, subchunk_indices)?;
448 let partial_decoder = cache.retrieve(self, &shard_indices).await?;
449 let MaybeShardingPartialDecoder::Sharding(partial_decoder) = partial_decoder else {
450 unreachable!("exlusively sharded")
451 };
452 Ok(partial_decoder
459 .retrieve_subchunk_encoded(&chunk_indices)
460 .await?
461 .map(Vec::from))
462 } else {
463 Err(ArrayError::UnsupportedMethod(
464 "the array is not exclusively sharded".to_string(),
465 ))
466 }
467 }
468
469 async fn async_retrieve_subchunk_opt<T: FromArrayBytes + MaybeSend>(
470 &self,
471 cache: &AsyncArrayShardedReadableExtCache,
472 subchunk_indices: &[u64],
473 options: &CodecOptions,
474 ) -> Result<T, ArrayError> {
475 if cache.array_is_sharded() {
476 let (shard_indices, shard_subset) =
477 subchunk_shard_index_and_subset(self, cache, subchunk_indices)?;
478 let partial_decoder = cache.retrieve(self, &shard_indices).await?;
479 let bytes = partial_decoder
480 .partial_decode(&shard_subset, options)
481 .await?
482 .into_owned();
483 T::from_array_bytes(bytes, shard_subset.shape(), self.data_type())
484 } else {
485 self.async_retrieve_chunk_opt(subchunk_indices, options)
486 .await
487 }
488 }
489
490 async fn async_retrieve_subchunk_elements_opt<T: ElementOwned + MaybeSend + MaybeSync>(
491 &self,
492 cache: &AsyncArrayShardedReadableExtCache,
493 subchunk_indices: &[u64],
494 options: &CodecOptions,
495 ) -> Result<Vec<T>, ArrayError> {
496 self.async_retrieve_subchunk_opt(cache, subchunk_indices, options)
497 .await
498 }
499
500 #[cfg(feature = "ndarray")]
501 async fn async_retrieve_subchunk_ndarray_opt<T: ElementOwned + MaybeSend + MaybeSync>(
502 &self,
503 cache: &AsyncArrayShardedReadableExtCache,
504 subchunk_indices: &[u64],
505 options: &CodecOptions,
506 ) -> Result<ndarray::ArrayD<T>, ArrayError> {
507 self.async_retrieve_subchunk_opt(cache, subchunk_indices, options)
508 .await
509 }
510
511 async fn async_retrieve_subchunks_opt<T: FromArrayBytes + MaybeSend>(
512 &self,
513 cache: &AsyncArrayShardedReadableExtCache,
514 subchunks: &dyn ArraySubsetTraits,
515 options: &CodecOptions,
516 ) -> Result<T, ArrayError> {
517 if cache.array_is_sharded() {
518 let subchunk_grid = cache.subchunk_grid();
519 let array_subset = subchunk_grid.chunks_subset(subchunks)?.ok_or_else(|| {
520 ArrayError::InvalidArraySubset(
521 subchunks.to_array_subset(),
522 subchunk_grid.grid_shape().to_vec(),
523 )
524 })?;
525 self.async_retrieve_array_subset_sharded_opt(cache, &array_subset, options)
526 .await
527 } else {
528 self.async_retrieve_chunks_opt(subchunks, options).await
529 }
530 }
531
532 async fn async_retrieve_subchunks_elements_opt<T: ElementOwned + MaybeSend + MaybeSync>(
533 &self,
534 cache: &AsyncArrayShardedReadableExtCache,
535 subchunks: &dyn ArraySubsetTraits,
536 options: &CodecOptions,
537 ) -> Result<Vec<T>, ArrayError> {
538 self.async_retrieve_subchunks_opt(cache, subchunks, options)
539 .await
540 }
541
542 #[cfg(feature = "ndarray")]
543 async fn async_retrieve_subchunks_ndarray_opt<T: ElementOwned + MaybeSend + MaybeSync>(
544 &self,
545 cache: &AsyncArrayShardedReadableExtCache,
546 subchunks: &dyn ArraySubsetTraits,
547 options: &CodecOptions,
548 ) -> Result<ndarray::ArrayD<T>, ArrayError> {
549 self.async_retrieve_subchunks_opt(cache, subchunks, options)
550 .await
551 }
552
553 #[allow(clippy::too_many_lines)]
554 async fn async_retrieve_array_subset_sharded_opt<T: FromArrayBytes + MaybeSend>(
555 &self,
556 cache: &AsyncArrayShardedReadableExtCache,
557 array_subset: &dyn ArraySubsetTraits,
558 options: &CodecOptions,
559 ) -> Result<T, ArrayError> {
560 if cache.array_is_sharded() {
561 let shards = self.chunks_in_array_subset(array_subset)?;
563 let Some(shards) = shards else {
564 return Err(ArrayError::InvalidArraySubset(
565 array_subset.to_array_subset(),
566 self.shape().to_vec(),
567 ));
568 };
569
570 let num_shards = shards.num_elements_usize();
572 let array_subset_start = array_subset.start();
573 let array_subset_shape = array_subset.shape();
574 let bytes = if num_shards == 0 {
575 ArrayBytes::new_fill_value(
576 self.data_type(),
577 array_subset.num_elements(),
578 self.fill_value(),
579 )
580 .map_err(CodecError::from)
581 .map_err(ArrayError::from)?
582 } else {
583 let chunk_shape = self.chunk_shape(&vec![0; self.dimensionality()])?;
585 let codec_concurrency =
586 self.recommended_codec_concurrency(&chunk_shape, self.data_type())?;
587 let (chunk_concurrent_limit, options) = concurrency_chunks_and_codec(
588 options.concurrent_target(),
589 num_shards,
590 options,
591 &codec_concurrency,
592 );
593 let options = Arc::new(options);
594
595 match self.data_type().size() {
596 DataTypeSize::Variable => {
597 let retrieve_subchunk = |shard_indices: ArrayIndicesTinyVec| {
598 let options = options.clone();
599 let array_subset_start = &array_subset_start;
600 async move {
601 let shard_subset = self.chunk_subset(&shard_indices)?;
602 let shard_subset_overlap = shard_subset.overlap(array_subset)?;
603 let bytes = cache
604 .retrieve(self, &shard_indices)
605 .await?
606 .partial_decode(
607 &shard_subset_overlap.relative_to(shard_subset.start())?,
608 &options,
609 )
610 .await?
611 .into_owned()
612 .into_variable()?;
613 Ok::<_, ArrayError>((
614 bytes,
615 shard_subset_overlap.relative_to(array_subset_start)?,
616 ))
617 }
618 };
619
620 let indices = shards.indices();
621 let futures = indices.into_iter().map(retrieve_subchunk);
622 let chunk_bytes_and_subsets = futures::stream::iter(futures)
623 .buffered(chunk_concurrent_limit)
624 .try_collect()
625 .await?;
626 ArrayBytes::Variable(merge_chunks_vlen(
627 chunk_bytes_and_subsets,
628 &array_subset_shape,
629 ))
630 }
631 DataTypeSize::Fixed(data_type_size) => {
632 let size_output = array_subset.num_elements_usize() * data_type_size;
633 if size_output == 0 {
634 ArrayBytes::new_flen(vec![])
635 } else {
636 let mut output = Vec::with_capacity(size_output);
637 {
638 let output =
639 UnsafeCellSlice::new_from_vec_with_spare_capacity(&mut output);
640 let retrieve_shard_into_slice =
641 |shard_indices: ArrayIndicesTinyVec| {
642 let options = options.clone();
643 let array_subset_start = &array_subset_start;
644 let array_subset_shape = &array_subset_shape;
645 async move {
646 let shard_subset = self.chunk_subset(&shard_indices)?;
647 let shard_subset_overlap =
648 shard_subset.overlap(array_subset)?;
649 let mut output_view = unsafe {
650 ArrayBytesFixedDisjointView::new(
652 output,
653 data_type_size,
654 array_subset_shape.as_ref(),
655 shard_subset_overlap
656 .relative_to(array_subset_start)?,
657 )?
658 };
659 cache
660 .retrieve(self, &shard_indices)
661 .await?
662 .partial_decode_into(
663 &shard_subset_overlap
664 .relative_to(shard_subset.start())?,
665 (&mut output_view).into(),
666 &options,
667 )
668 .await?;
669 Ok::<_, ArrayError>(())
670 }
671 };
672
673 futures::stream::iter(&shards.indices())
674 .map(Ok)
675 .try_for_each_concurrent(
676 Some(chunk_concurrent_limit),
677 retrieve_shard_into_slice,
678 )
679 .await?;
680 }
681 unsafe { output.set_len(size_output) };
682 ArrayBytes::from(output)
683 }
684 }
685 }
686 };
687 T::from_array_bytes(bytes, &array_subset_shape, self.data_type())
688 } else {
689 self.async_retrieve_array_subset_opt(array_subset, options)
690 .await
691 }
692 }
693
694 async fn async_retrieve_array_subset_elements_sharded_opt<
695 T: ElementOwned + MaybeSend + MaybeSync,
696 >(
697 &self,
698 cache: &AsyncArrayShardedReadableExtCache,
699 array_subset: &dyn ArraySubsetTraits,
700 options: &CodecOptions,
701 ) -> Result<Vec<T>, ArrayError> {
702 self.async_retrieve_array_subset_sharded_opt(cache, array_subset, options)
703 .await
704 }
705
706 #[cfg(feature = "ndarray")]
707 async fn async_retrieve_array_subset_ndarray_sharded_opt<
708 T: ElementOwned + MaybeSend + MaybeSync,
709 >(
710 &self,
711 cache: &AsyncArrayShardedReadableExtCache,
712 array_subset: &dyn ArraySubsetTraits,
713 options: &CodecOptions,
714 ) -> Result<ndarray::ArrayD<T>, ArrayError> {
715 self.async_retrieve_array_subset_sharded_opt(cache, array_subset, options)
716 .await
717 }
718}
719
720mod private {
721 use super::{Array, AsyncReadableStorageTraits};
722
723 pub trait Sealed {}
724
725 impl<TStorage: ?Sized + AsyncReadableStorageTraits + 'static> Sealed for Array<TStorage> {}
726}
727
728#[cfg(test)]
729mod tests {
730 use std::num::NonZeroU64;
731 use std::sync::Arc;
732
733 use super::*;
734 use crate::array::codec::TransposeCodec;
735 use crate::array::codec::array_to_bytes::sharding::ShardingCodecBuilder;
736 use crate::array::{ArrayBuilder, ArraySubset, data_type};
737 use zarrs_metadata_ext::codec::transpose::TransposeOrder;
738 use zarrs_storage::storage_adapter::performance_metrics::PerformanceMetricsStorageAdapter;
739
740 async fn array_sharded_ext_impl(sharded: bool) -> Result<(), Box<dyn std::error::Error>> {
741 let store = object_store::memory::InMemory::new();
742 let store = Arc::new(zarrs_object_store::AsyncObjectStore::new(store));
743 let array_path = "/array";
744 let mut builder = ArrayBuilder::new(
745 vec![8, 8], vec![4, 4], data_type::uint16(),
748 0u16,
749 );
750 builder.bytes_to_bytes_codecs(vec![
751 #[cfg(feature = "gzip")]
752 Arc::new(crate::array::codec::GzipCodec::new(5)?),
753 ]);
754 if sharded {
755 builder.subchunk_shape(vec![2, 2]);
756 }
757 let array = builder.build(store, array_path)?;
758
759 let data: Vec<u16> = (0..array.shape().iter().product())
760 .map(|i| i as u16)
761 .collect();
762
763 array
764 .async_store_array_subset(&array.subset_all(), &data)
765 .await?;
766
767 let cache = AsyncArrayShardedReadableExtCache::new(&array);
768 assert_eq!(array.is_sharded(), sharded);
769 let subchunk_grid = array.subchunk_grid();
770 if sharded {
771 assert_eq!(
772 array.subchunk_shape(),
773 Some(vec![NonZeroU64::new(2).unwrap(); 2])
774 );
775 assert_eq!(subchunk_grid.grid_shape(), &[4, 4]);
776
777 let compare = array
778 .async_retrieve_array_subset::<Vec<u16>>(&[4..6, 6..8])
779 .await?;
780 let test = array
781 .async_retrieve_subchunk_opt::<Vec<u16>>(&cache, &[2, 3], &CodecOptions::default())
782 .await?;
783 assert_eq!(compare, test);
784 assert_eq!(cache.len().await, 1);
785
786 #[cfg(feature = "ndarray")]
787 {
788 let compare = array
789 .async_retrieve_array_subset::<ndarray::ArrayD<u16>>(&[4..6, 6..8])
790 .await?;
791 let test = array
792 .async_retrieve_subchunk_opt::<ndarray::ArrayD<u16>>(
793 &cache,
794 &[2, 3],
795 &CodecOptions::default(),
796 )
797 .await?;
798 assert_eq!(compare, test);
799 }
800
801 cache.clear().await;
802 assert_eq!(cache.len().await, 0);
803
804 let subset = ArraySubset::new_with_ranges(&[3..7, 3..7]);
805 let compare = array
806 .async_retrieve_array_subset::<Vec<u16>>(&subset)
807 .await?;
808 let test = array
809 .async_retrieve_array_subset_sharded_opt::<Vec<u16>>(
810 &cache,
811 &subset,
812 &CodecOptions::default(),
813 )
814 .await?;
815 assert_eq!(compare, test);
816 assert_eq!(cache.len().await, 4);
817
818 #[cfg(feature = "ndarray")]
819 {
820 let subset = ArraySubset::new_with_ranges(&[3..7, 3..7]);
821 let compare = array
822 .async_retrieve_array_subset::<ndarray::ArrayD<u16>>(&subset)
823 .await?;
824 let test = array
825 .async_retrieve_array_subset_sharded_opt::<ndarray::ArrayD<u16>>(
826 &cache,
827 &subset,
828 &CodecOptions::default(),
829 )
830 .await?;
831 assert_eq!(compare, test);
832 }
833
834 let subset = ArraySubset::new_with_ranges(&[2..6, 2..6]);
835 let subchunks = ArraySubset::new_with_ranges(&[1..3, 1..3]);
836 let compare = array
837 .async_retrieve_array_subset::<Vec<u16>>(&subset)
838 .await?;
839 let test = array
840 .async_retrieve_subchunks_opt::<Vec<u16>>(
841 &cache,
842 &subchunks,
843 &CodecOptions::default(),
844 )
845 .await?;
846 assert_eq!(compare, test);
847 assert_eq!(cache.len().await, 4);
848
849 #[cfg(feature = "ndarray")]
850 {
851 let subset = ArraySubset::new_with_ranges(&[2..6, 2..6]);
852 let subchunks = ArraySubset::new_with_ranges(&[1..3, 1..3]);
853 let compare = array
854 .async_retrieve_array_subset::<ndarray::ArrayD<u16>>(&subset)
855 .await?;
856 let test = array
857 .async_retrieve_subchunks_opt::<ndarray::ArrayD<u16>>(
858 &cache,
859 &subchunks,
860 &CodecOptions::default(),
861 )
862 .await?;
863 assert_eq!(compare, test);
864 assert_eq!(cache.len().await, 4);
865 }
866
867 let encoded_subchunk = array
868 .async_retrieve_encoded_subchunk(&cache, &[0, 0])
869 .await?
870 .unwrap();
871 assert_eq!(
872 array
873 .async_subchunk_byte_range(&cache, &[0, 0])
874 .await?
875 .unwrap()
876 .length(u64::MAX),
877 encoded_subchunk.len() as u64
878 );
879 } else {
884 assert_eq!(array.subchunk_shape(), None);
885 assert_eq!(subchunk_grid.grid_shape(), &[2, 2]);
886
887 let compare = array
888 .async_retrieve_array_subset::<Vec<u16>>(&[4..8, 4..8])
889 .await?;
890 let test = array
891 .async_retrieve_subchunk_opt::<Vec<u16>>(&cache, &[1, 1], &CodecOptions::default())
892 .await?;
893 assert_eq!(compare, test);
894
895 let subset = ArraySubset::new_with_ranges(&[3..7, 3..7]);
896 let compare = array
897 .async_retrieve_array_subset::<Vec<u16>>(&subset)
898 .await?;
899 let test = array
900 .async_retrieve_array_subset_sharded_opt::<Vec<u16>>(
901 &cache,
902 &subset,
903 &CodecOptions::default(),
904 )
905 .await?;
906 assert_eq!(compare, test);
907 assert!(cache.is_empty().await);
908
909 assert!(
910 array
911 .async_retrieve_encoded_subchunk(&cache, &[0, 0])
912 .await
913 .is_err()
914 );
915 assert!(
916 array
917 .async_subchunk_byte_range(&cache, &[0, 0])
918 .await
919 .is_err()
920 );
921 }
922
923 Ok(())
924 }
925
926 #[tokio::test]
927 async fn async_array_sharded_ext_sharded() -> Result<(), Box<dyn std::error::Error>> {
928 array_sharded_ext_impl(true).await
929 }
930
931 #[tokio::test]
932 async fn async_array_sharded_ext_unsharded() -> Result<(), Box<dyn std::error::Error>> {
933 array_sharded_ext_impl(false).await
934 }
935
936 async fn array_sharded_ext_impl_transpose(
937 valid_subchunk_shape: bool,
938 ) -> Result<(), Box<dyn std::error::Error>> {
939 let store = object_store::memory::InMemory::new();
940 let store = Arc::new(zarrs_object_store::AsyncObjectStore::new(store));
941 let store = Arc::new(PerformanceMetricsStorageAdapter::new(store));
942
943 let array_path = "/array";
944 let mut builder = ArrayBuilder::new(
945 vec![16, 16, 9], vec![8, 4, 3], data_type::uint32(),
948 0u32,
949 );
950 builder.array_to_array_codecs(vec![Arc::new(TransposeCodec::new(TransposeOrder::new(
951 &[1, 0, 2],
952 )?))]);
953 builder.array_to_bytes_codec(Arc::new(
954 ShardingCodecBuilder::new(
955 vec![
956 NonZeroU64::new(1).unwrap(),
957 if valid_subchunk_shape {
958 NonZeroU64::new(2).unwrap()
959 } else {
960 NonZeroU64::new(3).unwrap()
961 },
962 NonZeroU64::new(3).unwrap(),
963 ],
964 &data_type::uint32(),
965 )
966 .bytes_to_bytes_codecs(vec![
967 #[cfg(feature = "gzip")]
968 Arc::new(crate::array::codec::GzipCodec::new(5)?),
969 ])
970 .build(),
971 ));
972 let array = builder.build(store.clone(), array_path)?;
973
974 let subchunk_grid = array.subchunk_grid();
975 if valid_subchunk_shape {
976 assert_eq!(array.chunk_grid_shape(), &[2, 4, 3]);
989 assert_eq!(
990 array.subchunk_shape(),
991 Some(vec![
992 NonZeroU64::new(1).unwrap(),
993 NonZeroU64::new(2).unwrap(),
994 NonZeroU64::new(3).unwrap()
995 ])
996 );
997 assert_eq!(
998 array.effective_subchunk_shape(),
999 Some(vec![
1000 NonZeroU64::new(2).unwrap(),
1001 NonZeroU64::new(1).unwrap(),
1002 NonZeroU64::new(3).unwrap()
1003 ])
1004 ); assert_eq!(subchunk_grid.grid_shape(), &[8, 16, 3]);
1006 } else {
1007 }
1010
1011 let data: Vec<u32> = (0..array.shape().iter().product())
1012 .map(|i| i as u32)
1013 .collect();
1014 array
1015 .async_store_array_subset(&array.subset_all(), &data)
1016 .await?;
1017
1018 let subchunk_subset = subchunk_grid.subset(&[0, 0, 0])?.unwrap();
1020 let subchunk_data = array
1021 .async_retrieve_array_subset::<Vec<u32>>(&subchunk_subset)
1022 .await?;
1023 assert_eq!(subchunk_data, &[0, 1, 2, 144, 145, 146]);
1024 assert_eq!(store.reads(), 2);
1025
1026 Ok(())
1027 }
1028
1029 #[tokio::test]
1030 async fn async_array_sharded_ext_impl_transpose_valid_subchunk_shape() {
1031 assert!(array_sharded_ext_impl_transpose(true).await.is_ok());
1032 }
1033
1034 #[tokio::test]
1035 async fn async_array_sharded_ext_impl_transpose_invalid_subchunk_shape() {
1036 assert_eq!(
1037 array_sharded_ext_impl_transpose(false)
1038 .await
1039 .unwrap_err()
1040 .to_string(),
1041 "invalid subchunk shape [1, 3, 3], it must evenly divide shard shape [4, 8, 3]"
1042 );
1043 }
1044}