1use std::{borrow::Cow, sync::Arc};
2
3use futures::{StreamExt, TryStreamExt};
4use unsafe_cell_slice::UnsafeCellSlice;
5
6use crate::{
7 array_subset::ArraySubset,
8 config::MetadataRetrieveVersion,
9 node::{meta_key_v2_array, meta_key_v2_attributes, meta_key_v3, NodePath},
10 storage::{AsyncBytes, AsyncReadableStorageTraits, StorageError, StorageHandle},
11};
12
13use super::{
14 array_bytes::{copy_fill_value_into, merge_chunks_vlen},
15 codec::{
16 ArrayToBytesCodecTraits, AsyncArrayPartialDecoderTraits, AsyncStoragePartialDecoder,
17 CodecOptions,
18 },
19 concurrency::concurrency_chunks_and_codec,
20 element::ElementOwned,
21 Array, ArrayBytes, ArrayBytesFixedDisjointView, ArrayCreateError, ArrayError, ArrayMetadata,
22 ArrayMetadataV2, ArrayMetadataV3, ArraySize, DataTypeSize,
23};
24
25#[cfg(feature = "ndarray")]
26use super::elements_to_ndarray;
27
28impl<TStorage: ?Sized + AsyncReadableStorageTraits + 'static> Array<TStorage> {
29 #[allow(clippy::missing_errors_doc)]
31 pub async fn async_open(
32 storage: Arc<TStorage>,
33 path: &str,
34 ) -> Result<Array<TStorage>, ArrayCreateError> {
35 Self::async_open_opt(storage, path, &MetadataRetrieveVersion::Default).await
36 }
37
38 #[allow(clippy::missing_errors_doc)]
40 pub async fn async_open_opt(
41 storage: Arc<TStorage>,
42 path: &str,
43 version: &MetadataRetrieveVersion,
44 ) -> Result<Array<TStorage>, ArrayCreateError> {
45 let metadata = Self::async_open_metadata(storage.clone(), path, version).await?;
46 Self::validate_metadata(&metadata)?;
47 Self::new_with_metadata(storage, path, metadata)
48 }
49
50 async fn async_open_metadata(
51 storage: Arc<TStorage>,
52 path: &str,
53 version: &MetadataRetrieveVersion,
54 ) -> Result<ArrayMetadata, ArrayCreateError> {
55 let node_path = NodePath::new(path)?;
56
57 if let MetadataRetrieveVersion::Default | MetadataRetrieveVersion::V3 = version {
58 let key_v3 = meta_key_v3(&node_path);
60 if let Some(metadata) = storage.get(&key_v3).await? {
61 let metadata: ArrayMetadataV3 = serde_json::from_slice(&metadata)
62 .map_err(|err| StorageError::InvalidMetadata(key_v3, err.to_string()))?;
63 return Ok(ArrayMetadata::V3(metadata));
64 }
65 }
66
67 if let MetadataRetrieveVersion::Default | MetadataRetrieveVersion::V2 = version {
68 let key_v2 = meta_key_v2_array(&node_path);
70 if let Some(metadata) = storage.get(&key_v2).await? {
71 let mut metadata: ArrayMetadataV2 = serde_json::from_slice(&metadata)
72 .map_err(|err| StorageError::InvalidMetadata(key_v2, err.to_string()))?;
73
74 let attributes_key = meta_key_v2_attributes(&node_path);
75 let attributes = storage.get(&attributes_key).await?;
76 if let Some(attributes) = attributes {
77 metadata.attributes = serde_json::from_slice(&attributes).map_err(|err| {
78 StorageError::InvalidMetadata(attributes_key, err.to_string())
79 })?;
80 }
81
82 return Ok(ArrayMetadata::V2(metadata));
83 }
84 }
85
86 Err(ArrayCreateError::MissingMetadata)
87 }
88
89 #[allow(clippy::missing_errors_doc, clippy::missing_panics_doc)]
91 pub async fn async_retrieve_chunk_if_exists(
92 &self,
93 chunk_indices: &[u64],
94 ) -> Result<Option<ArrayBytes<'_>>, ArrayError> {
95 self.async_retrieve_chunk_if_exists_opt(chunk_indices, &CodecOptions::default())
96 .await
97 }
98
99 #[allow(clippy::missing_errors_doc, clippy::missing_panics_doc)]
101 pub async fn async_retrieve_chunk_elements_if_exists<T: ElementOwned + Send + Sync>(
102 &self,
103 chunk_indices: &[u64],
104 ) -> Result<Option<Vec<T>>, ArrayError> {
105 self.async_retrieve_chunk_elements_if_exists_opt(chunk_indices, &CodecOptions::default())
106 .await
107 }
108
109 #[cfg(feature = "ndarray")]
110 #[allow(clippy::missing_errors_doc, clippy::missing_panics_doc)]
112 pub async fn async_retrieve_chunk_ndarray_if_exists<T: ElementOwned + Send + Sync>(
113 &self,
114 chunk_indices: &[u64],
115 ) -> Result<Option<ndarray::ArrayD<T>>, ArrayError> {
116 self.async_retrieve_chunk_ndarray_if_exists_opt(chunk_indices, &CodecOptions::default())
117 .await
118 }
119
120 #[allow(clippy::missing_panics_doc)]
125 pub async fn async_retrieve_encoded_chunk(
126 &self,
127 chunk_indices: &[u64],
128 ) -> Result<Option<AsyncBytes>, StorageError> {
129 let storage_handle = Arc::new(StorageHandle::new(self.storage.clone()));
130 let storage_transformer = self
131 .storage_transformers()
132 .create_async_readable_transformer(storage_handle)
133 .await?;
134
135 storage_transformer
136 .get(&self.chunk_key(chunk_indices))
137 .await
138 }
139
140 #[allow(clippy::missing_errors_doc, clippy::missing_panics_doc)]
142 pub async fn async_retrieve_chunk(
143 &self,
144 chunk_indices: &[u64],
145 ) -> Result<ArrayBytes<'_>, ArrayError> {
146 self.async_retrieve_chunk_opt(chunk_indices, &CodecOptions::default())
147 .await
148 }
149
150 #[allow(clippy::missing_errors_doc, clippy::missing_panics_doc)]
152 pub async fn async_retrieve_chunk_elements<T: ElementOwned + Send + Sync>(
153 &self,
154 chunk_indices: &[u64],
155 ) -> Result<Vec<T>, ArrayError> {
156 self.async_retrieve_chunk_elements_opt(chunk_indices, &CodecOptions::default())
157 .await
158 }
159
160 #[cfg(feature = "ndarray")]
161 #[allow(clippy::missing_errors_doc, clippy::missing_panics_doc)]
163 pub async fn async_retrieve_chunk_ndarray<T: ElementOwned + Send + Sync>(
164 &self,
165 chunk_indices: &[u64],
166 ) -> Result<ndarray::ArrayD<T>, ArrayError> {
167 self.async_retrieve_chunk_ndarray_opt(chunk_indices, &CodecOptions::default())
168 .await
169 }
170
171 #[allow(clippy::missing_errors_doc, clippy::missing_panics_doc)]
173 pub async fn async_retrieve_chunks(
174 &self,
175 chunks: &ArraySubset,
176 ) -> Result<ArrayBytes<'_>, ArrayError> {
177 self.async_retrieve_chunks_opt(chunks, &CodecOptions::default())
178 .await
179 }
180
181 #[allow(clippy::missing_errors_doc, clippy::missing_panics_doc)]
183 pub async fn async_retrieve_chunks_elements<T: ElementOwned + Send + Sync>(
184 &self,
185 chunks: &ArraySubset,
186 ) -> Result<Vec<T>, ArrayError> {
187 self.async_retrieve_chunks_elements_opt(chunks, &CodecOptions::default())
188 .await
189 }
190
191 #[cfg(feature = "ndarray")]
192 #[allow(clippy::missing_errors_doc, clippy::missing_panics_doc)]
194 pub async fn async_retrieve_chunks_ndarray<T: ElementOwned + Send + Sync>(
195 &self,
196 chunks: &ArraySubset,
197 ) -> Result<ndarray::ArrayD<T>, ArrayError> {
198 self.async_retrieve_chunks_ndarray_opt(chunks, &CodecOptions::default())
199 .await
200 }
201
202 #[allow(clippy::missing_errors_doc, clippy::missing_panics_doc)]
204 pub async fn async_retrieve_chunk_subset(
205 &self,
206 chunk_indices: &[u64],
207 chunk_subset: &ArraySubset,
208 ) -> Result<ArrayBytes<'_>, ArrayError> {
209 self.async_retrieve_chunk_subset_opt(chunk_indices, chunk_subset, &CodecOptions::default())
210 .await
211 }
212
213 #[allow(clippy::missing_errors_doc, clippy::missing_panics_doc)]
215 pub async fn async_retrieve_chunk_subset_elements<T: ElementOwned + Send + Sync>(
216 &self,
217 chunk_indices: &[u64],
218 chunk_subset: &ArraySubset,
219 ) -> Result<Vec<T>, ArrayError> {
220 self.async_retrieve_chunk_subset_elements_opt(
221 chunk_indices,
222 chunk_subset,
223 &CodecOptions::default(),
224 )
225 .await
226 }
227
228 #[cfg(feature = "ndarray")]
229 #[allow(clippy::missing_errors_doc, clippy::missing_panics_doc)]
231 pub async fn async_retrieve_chunk_subset_ndarray<T: ElementOwned + Send + Sync>(
232 &self,
233 chunk_indices: &[u64],
234 chunk_subset: &ArraySubset,
235 ) -> Result<ndarray::ArrayD<T>, ArrayError> {
236 self.async_retrieve_chunk_subset_ndarray_opt(
237 chunk_indices,
238 chunk_subset,
239 &CodecOptions::default(),
240 )
241 .await
242 }
243
244 #[allow(clippy::missing_errors_doc, clippy::missing_panics_doc)]
246 pub async fn async_retrieve_array_subset(
247 &self,
248 array_subset: &ArraySubset,
249 ) -> Result<ArrayBytes<'_>, ArrayError> {
250 self.async_retrieve_array_subset_opt(array_subset, &CodecOptions::default())
251 .await
252 }
253
254 #[allow(clippy::missing_errors_doc, clippy::missing_panics_doc)]
256 pub async fn async_retrieve_array_subset_elements<T: ElementOwned + Send + Sync>(
257 &self,
258 array_subset: &ArraySubset,
259 ) -> Result<Vec<T>, ArrayError> {
260 self.async_retrieve_array_subset_elements_opt(array_subset, &CodecOptions::default())
261 .await
262 }
263
264 #[cfg(feature = "ndarray")]
265 #[allow(clippy::missing_errors_doc, clippy::missing_panics_doc)]
267 pub async fn async_retrieve_array_subset_ndarray<T: ElementOwned + Send + Sync>(
268 &self,
269 array_subset: &ArraySubset,
270 ) -> Result<ndarray::ArrayD<T>, ArrayError> {
271 self.async_retrieve_array_subset_ndarray_opt(array_subset, &CodecOptions::default())
272 .await
273 }
274
275 #[allow(clippy::missing_errors_doc, clippy::missing_panics_doc)]
277 pub async fn async_partial_decoder(
278 &self,
279 chunk_indices: &[u64],
280 ) -> Result<Arc<dyn AsyncArrayPartialDecoderTraits>, ArrayError> {
281 self.async_partial_decoder_opt(chunk_indices, &CodecOptions::default())
282 .await
283 }
284
285 #[allow(clippy::missing_errors_doc)]
291 pub async fn async_retrieve_chunk_if_exists_opt(
292 &self,
293 chunk_indices: &[u64],
294 options: &CodecOptions,
295 ) -> Result<Option<ArrayBytes<'_>>, ArrayError> {
296 if chunk_indices.len() != self.dimensionality() {
297 return Err(ArrayError::InvalidChunkGridIndicesError(
298 chunk_indices.to_vec(),
299 ));
300 }
301 let storage_handle = Arc::new(StorageHandle::new(self.storage.clone()));
302 let storage_transformer = self
303 .storage_transformers()
304 .create_async_readable_transformer(storage_handle)
305 .await?;
306 let chunk_encoded = storage_transformer
307 .get(&self.chunk_key(chunk_indices))
308 .await
309 .map_err(ArrayError::StorageError)?;
310 if let Some(chunk_encoded) = chunk_encoded {
311 let chunk_encoded: Vec<u8> = chunk_encoded.into();
312 let chunk_representation = self.chunk_array_representation(chunk_indices)?;
313 let bytes = self
314 .codecs()
315 .decode(Cow::Owned(chunk_encoded), &chunk_representation, options)
316 .map_err(ArrayError::CodecError)?;
317 bytes.validate(
318 chunk_representation.num_elements(),
319 chunk_representation.data_type().size(),
320 )?;
321 Ok(Some(bytes))
322 } else {
323 Ok(None)
324 }
325 }
326
327 #[allow(clippy::missing_errors_doc)]
329 pub async fn async_retrieve_chunk_opt(
330 &self,
331 chunk_indices: &[u64],
332 options: &CodecOptions,
333 ) -> Result<ArrayBytes<'_>, ArrayError> {
334 let chunk = self
335 .async_retrieve_chunk_if_exists_opt(chunk_indices, options)
336 .await?;
337 if let Some(chunk) = chunk {
338 Ok(chunk)
339 } else {
340 let chunk_shape = self.chunk_shape(chunk_indices)?;
341 let array_size =
342 ArraySize::new(self.data_type().size(), chunk_shape.num_elements_u64());
343 Ok(ArrayBytes::new_fill_value(array_size, self.fill_value()))
344 }
345 }
346
347 async fn async_retrieve_chunk_into(
349 &self,
350 chunk_indices: &[u64],
351 output_view: &mut ArrayBytesFixedDisjointView<'_>,
352 options: &CodecOptions,
353 ) -> Result<(), ArrayError> {
354 if chunk_indices.len() != self.dimensionality() {
355 return Err(ArrayError::InvalidChunkGridIndicesError(
356 chunk_indices.to_vec(),
357 ));
358 }
359 let storage_handle = Arc::new(StorageHandle::new(self.storage.clone()));
360 let storage_transformer = self
361 .storage_transformers()
362 .create_async_readable_transformer(storage_handle)
363 .await?;
364 let chunk_encoded = storage_transformer
365 .get(&self.chunk_key(chunk_indices))
366 .await
367 .map_err(ArrayError::StorageError)?;
368 if let Some(chunk_encoded) = chunk_encoded {
369 let chunk_encoded: Vec<u8> = chunk_encoded.into();
370 let chunk_representation = self.chunk_array_representation(chunk_indices)?;
371 self.codecs()
372 .decode_into(
373 Cow::Owned(chunk_encoded),
374 &chunk_representation,
375 output_view,
376 options,
377 )
378 .map_err(ArrayError::CodecError)
379 } else {
380 copy_fill_value_into(self.data_type(), self.fill_value(), output_view)
381 .map_err(ArrayError::CodecError)
382 }
383 }
384
385 #[allow(clippy::missing_errors_doc)]
387 pub async fn async_retrieve_chunk_elements_if_exists_opt<T: ElementOwned + Send + Sync>(
388 &self,
389 chunk_indices: &[u64],
390 options: &CodecOptions,
391 ) -> Result<Option<Vec<T>>, ArrayError> {
392 if let Some(bytes) = self
393 .async_retrieve_chunk_if_exists_opt(chunk_indices, options)
394 .await?
395 {
396 let elements = T::from_array_bytes(self.data_type(), bytes)?;
397 Ok(Some(elements))
398 } else {
399 Ok(None)
400 }
401 }
402
403 #[allow(clippy::missing_errors_doc)]
405 pub async fn async_retrieve_chunk_elements_opt<T: ElementOwned + Send + Sync>(
406 &self,
407 chunk_indices: &[u64],
408 options: &CodecOptions,
409 ) -> Result<Vec<T>, ArrayError> {
410 let bytes = self
411 .async_retrieve_chunk_opt(chunk_indices, options)
412 .await?;
413 let elements = T::from_array_bytes(self.data_type(), bytes)?;
414 Ok(elements)
415 }
416
417 #[cfg(feature = "ndarray")]
418 #[allow(clippy::missing_errors_doc)]
420 pub async fn async_retrieve_chunk_ndarray_if_exists_opt<T: ElementOwned + Send + Sync>(
421 &self,
422 chunk_indices: &[u64],
423 options: &CodecOptions,
424 ) -> Result<Option<ndarray::ArrayD<T>>, ArrayError> {
425 let shape = self
427 .chunk_grid()
428 .chunk_shape_u64(chunk_indices, self.shape())?
429 .ok_or_else(|| ArrayError::InvalidChunkGridIndicesError(chunk_indices.to_vec()))?;
430 let elements = self
431 .async_retrieve_chunk_elements_if_exists_opt(chunk_indices, options)
432 .await?;
433 if let Some(elements) = elements {
434 Ok(Some(elements_to_ndarray(&shape, elements)?))
435 } else {
436 Ok(None)
437 }
438 }
439
440 #[cfg(feature = "ndarray")]
441 #[allow(clippy::missing_errors_doc)]
443 pub async fn async_retrieve_chunk_ndarray_opt<T: ElementOwned + Send + Sync>(
444 &self,
445 chunk_indices: &[u64],
446 options: &CodecOptions,
447 ) -> Result<ndarray::ArrayD<T>, ArrayError> {
448 let shape = self
450 .chunk_grid()
451 .chunk_shape_u64(chunk_indices, self.shape())?
452 .ok_or_else(|| ArrayError::InvalidChunkGridIndicesError(chunk_indices.to_vec()))?;
453 let elements = self
454 .async_retrieve_chunk_elements_opt(chunk_indices, options)
455 .await?;
456 elements_to_ndarray(&shape, elements)
457 }
458
459 #[allow(clippy::missing_panics_doc)]
466 pub async fn async_retrieve_encoded_chunks(
467 &self,
468 chunks: &ArraySubset,
469 options: &CodecOptions,
470 ) -> Result<Vec<Option<AsyncBytes>>, StorageError> {
471 let storage_handle = Arc::new(StorageHandle::new(self.storage.clone()));
472 let storage_transformer = self
473 .storage_transformers()
474 .create_async_readable_transformer(storage_handle)
475 .await?;
476
477 let retrieve_encoded_chunk = |chunk_indices: Vec<u64>| {
478 let storage_transformer = storage_transformer.clone();
479 async move {
480 storage_transformer
481 .get(&self.chunk_key(&chunk_indices))
482 .await
483 }
484 };
485
486 let indices = chunks.indices();
487 let futures = indices.into_iter().map(retrieve_encoded_chunk);
488 futures::stream::iter(futures)
489 .buffered(options.concurrent_target())
490 .try_collect()
491 .await
492 }
493
494 #[allow(clippy::missing_errors_doc, clippy::missing_panics_doc)]
496 pub async fn async_retrieve_chunks_opt(
497 &self,
498 chunks: &ArraySubset,
499 options: &CodecOptions,
500 ) -> Result<ArrayBytes<'_>, ArrayError> {
501 if chunks.dimensionality() != self.dimensionality() {
502 return Err(ArrayError::InvalidArraySubset(
503 chunks.clone(),
504 self.shape().to_vec(),
505 ));
506 }
507
508 let array_subset = self.chunks_subset(chunks)?;
509 self.async_retrieve_array_subset_opt(&array_subset, options)
510 .await
511 }
512
513 #[allow(clippy::missing_errors_doc)]
515 pub async fn async_retrieve_chunks_elements_opt<T: ElementOwned + Send + Sync>(
516 &self,
517 chunks: &ArraySubset,
518 options: &CodecOptions,
519 ) -> Result<Vec<T>, ArrayError> {
520 let bytes = self.async_retrieve_chunks_opt(chunks, options).await?;
521 let elements = T::from_array_bytes(self.data_type(), bytes)?;
522 Ok(elements)
523 }
524
525 #[cfg(feature = "ndarray")]
526 #[allow(clippy::missing_errors_doc)]
528 pub async fn async_retrieve_chunks_ndarray_opt<T: ElementOwned + Send + Sync>(
529 &self,
530 chunks: &ArraySubset,
531 options: &CodecOptions,
532 ) -> Result<ndarray::ArrayD<T>, ArrayError> {
533 let array_subset = self.chunks_subset(chunks)?;
534 let elements = self
535 .async_retrieve_chunks_elements_opt(chunks, options)
536 .await?;
537 elements_to_ndarray(array_subset.shape(), elements)
538 }
539
540 #[allow(clippy::missing_errors_doc, clippy::missing_panics_doc)]
542 #[allow(clippy::too_many_lines)]
543 pub async fn async_retrieve_array_subset_opt(
544 &self,
545 array_subset: &ArraySubset,
546 options: &CodecOptions,
547 ) -> Result<ArrayBytes<'_>, ArrayError> {
548 if array_subset.dimensionality() != self.dimensionality() {
549 return Err(ArrayError::InvalidArraySubset(
550 array_subset.clone(),
551 self.shape().to_vec(),
552 ));
553 }
554
555 let chunks = self.chunks_in_array_subset(array_subset)?;
557 let Some(chunks) = chunks else {
558 return Err(ArrayError::InvalidArraySubset(
559 array_subset.clone(),
560 self.shape().to_vec(),
561 ));
562 };
563
564 let num_chunks = chunks.num_elements_usize();
566 match num_chunks {
567 0 => {
568 let array_size =
569 ArraySize::new(self.data_type().size(), array_subset.num_elements());
570 Ok(ArrayBytes::new_fill_value(array_size, self.fill_value()))
571 }
572 1 => {
573 let chunk_indices = chunks.start();
574 let chunk_subset = self.chunk_subset(chunk_indices)?;
575 if &chunk_subset == array_subset {
576 self.async_retrieve_chunk_opt(chunk_indices, options).await
578 } else {
579 let array_subset_in_chunk_subset =
580 array_subset.relative_to(chunk_subset.start())?;
581 self.async_retrieve_chunk_subset_opt(
582 chunk_indices,
583 &array_subset_in_chunk_subset,
584 options,
585 )
586 .await
587 }
588 }
589 _ => {
590 let chunk_representation =
592 self.chunk_array_representation(&vec![0; self.dimensionality()])?;
593 let codec_concurrency =
594 self.recommended_codec_concurrency(&chunk_representation)?;
595 let (chunk_concurrent_limit, options) = concurrency_chunks_and_codec(
596 options.concurrent_target(),
597 num_chunks,
598 options,
599 &codec_concurrency,
600 );
601
602 match chunk_representation.data_type().size() {
603 DataTypeSize::Variable => {
604 let retrieve_chunk = |chunk_indices: Vec<u64>| {
605 let options = options.clone();
606 async move {
607 let chunk_subset = self.chunk_subset(&chunk_indices)?;
608 let chunk_subset_overlap = chunk_subset.overlap(array_subset)?;
609 Ok::<_, ArrayError>((
610 self.async_retrieve_chunk_subset_opt(
611 &chunk_indices,
612 &chunk_subset_overlap.relative_to(chunk_subset.start())?,
613 &options,
614 )
615 .await?,
616 chunk_subset_overlap.relative_to(array_subset.start())?,
617 ))
618 }
619 };
620
621 let chunk_bytes_and_subsets = futures::future::try_join_all(
623 chunks.indices().iter().map(retrieve_chunk),
624 )
625 .await?;
626
627 Ok(merge_chunks_vlen(
628 chunk_bytes_and_subsets,
629 array_subset.shape(),
630 )?)
631 }
632 DataTypeSize::Fixed(data_type_size) => {
633 let size_output =
634 usize::try_from(array_subset.num_elements() * data_type_size as u64)
635 .unwrap();
636 if size_output == 0 {
637 return Ok(ArrayBytes::new_flen(vec![]));
638 }
639 let mut output = Vec::with_capacity(size_output);
640 {
641 let output =
642 UnsafeCellSlice::new_from_vec_with_spare_capacity(&mut output);
643 let retrieve_chunk = |chunk_indices: Vec<u64>| {
644 let options = options.clone();
645 async move {
646 let chunk_subset = self.chunk_subset(&chunk_indices)?;
647 let chunk_subset_overlap =
648 chunk_subset.overlap(array_subset)?;
649
650 let mut output_view = unsafe {
651 ArrayBytesFixedDisjointView::new(
653 output,
654 data_type_size,
655 array_subset.shape(),
656 chunk_subset_overlap
657 .relative_to(array_subset.start())
658 .unwrap(),
659 )
660 }?;
661 self.async_retrieve_chunk_subset_into(
662 &chunk_indices,
663 &chunk_subset_overlap.relative_to(chunk_subset.start())?,
664 &mut output_view,
665 &options,
666 )
667 .await?;
668 Ok::<_, ArrayError>(())
686 }
687 };
688
689 futures::stream::iter(&chunks.indices())
690 .map(Ok)
691 .try_for_each_concurrent(
692 Some(chunk_concurrent_limit),
693 retrieve_chunk,
694 )
695 .await?;
696 }
697 unsafe { output.set_len(size_output) };
698 Ok(ArrayBytes::from(output))
699 }
700 }
701 }
702 }
703 }
704
705 #[allow(clippy::missing_errors_doc)]
707 pub async fn async_retrieve_array_subset_elements_opt<T: ElementOwned + Send + Sync>(
708 &self,
709 array_subset: &ArraySubset,
710 options: &CodecOptions,
711 ) -> Result<Vec<T>, ArrayError> {
712 let bytes = self
713 .async_retrieve_array_subset_opt(array_subset, options)
714 .await?;
715 let elements = T::from_array_bytes(self.data_type(), bytes)?;
716 Ok(elements)
717 }
718
719 #[cfg(feature = "ndarray")]
720 #[allow(clippy::missing_errors_doc)]
722 pub async fn async_retrieve_array_subset_ndarray_opt<T: ElementOwned + Send + Sync>(
723 &self,
724 array_subset: &ArraySubset,
725 options: &CodecOptions,
726 ) -> Result<ndarray::ArrayD<T>, ArrayError> {
727 let elements = self
728 .async_retrieve_array_subset_elements_opt(array_subset, options)
729 .await?;
730 elements_to_ndarray(array_subset.shape(), elements)
731 }
732
733 #[allow(clippy::missing_errors_doc, clippy::missing_panics_doc)]
735 pub async fn async_retrieve_chunk_subset_opt(
736 &self,
737 chunk_indices: &[u64],
738 chunk_subset: &ArraySubset,
739 options: &CodecOptions,
740 ) -> Result<ArrayBytes<'_>, ArrayError> {
741 let chunk_representation = self.chunk_array_representation(chunk_indices)?;
742 if !chunk_subset.inbounds_shape(&chunk_representation.shape_u64()) {
743 return Err(ArrayError::InvalidArraySubset(
744 chunk_subset.clone(),
745 self.shape().to_vec(),
746 ));
747 }
748
749 let bytes = if chunk_subset.start().iter().all(|&o| o == 0)
750 && chunk_subset.shape() == chunk_representation.shape_u64()
751 {
752 self.async_retrieve_chunk_opt(chunk_indices, options)
754 .await?
755 } else {
756 let storage_handle = Arc::new(StorageHandle::new(self.storage.clone()));
757 let storage_transformer = self
758 .storage_transformers()
759 .create_async_readable_transformer(storage_handle)
760 .await?;
761 let input_handle = Arc::new(AsyncStoragePartialDecoder::new(
762 storage_transformer,
763 self.chunk_key(chunk_indices),
764 ));
765 self.codecs
766 .clone()
767 .async_partial_decoder(input_handle, &chunk_representation, options)
768 .await?
769 .partial_decode(&[chunk_subset.clone()], options)
770 .await?
771 .remove(0)
772 .into_owned()
773 };
774 bytes.validate(chunk_subset.num_elements(), self.data_type().size())?;
775 Ok(bytes)
776 }
777
778 async fn async_retrieve_chunk_subset_into(
779 &self,
780 chunk_indices: &[u64],
781 chunk_subset: &ArraySubset,
782 output_view: &mut ArrayBytesFixedDisjointView<'_>,
783 options: &CodecOptions,
784 ) -> Result<(), ArrayError> {
785 let chunk_representation = self.chunk_array_representation(chunk_indices)?;
786 if !chunk_subset.inbounds_shape(&chunk_representation.shape_u64()) {
787 return Err(ArrayError::InvalidArraySubset(
788 chunk_subset.clone(),
789 self.shape().to_vec(),
790 ));
791 }
792
793 if chunk_subset.start().iter().all(|&o| o == 0)
794 && chunk_subset.shape() == chunk_representation.shape_u64()
795 {
796 self.async_retrieve_chunk_into(chunk_indices, output_view, options)
798 .await
799 } else {
800 let storage_handle = Arc::new(StorageHandle::new(self.storage.clone()));
801 let storage_transformer = self
802 .storage_transformers()
803 .create_async_readable_transformer(storage_handle)
804 .await?;
805 let input_handle = Arc::new(AsyncStoragePartialDecoder::new(
806 storage_transformer,
807 self.chunk_key(chunk_indices),
808 ));
809
810 self.codecs
811 .clone()
812 .async_partial_decoder(input_handle, &chunk_representation, options)
813 .await?
814 .partial_decode_into(chunk_subset, output_view, options)
815 .await?;
816 Ok(())
817 }
818 }
819
820 #[allow(clippy::missing_errors_doc)]
822 pub async fn async_retrieve_chunk_subset_elements_opt<T: ElementOwned + Send + Sync>(
823 &self,
824 chunk_indices: &[u64],
825 chunk_subset: &ArraySubset,
826 options: &CodecOptions,
827 ) -> Result<Vec<T>, ArrayError> {
828 let bytes = self
829 .async_retrieve_chunk_subset_opt(chunk_indices, chunk_subset, options)
830 .await?;
831 let elements = T::from_array_bytes(self.data_type(), bytes)?;
832 Ok(elements)
833 }
834
835 #[cfg(feature = "ndarray")]
836 #[allow(clippy::missing_errors_doc)]
838 pub async fn async_retrieve_chunk_subset_ndarray_opt<T: ElementOwned + Send + Sync>(
839 &self,
840 chunk_indices: &[u64],
841 chunk_subset: &ArraySubset,
842 options: &CodecOptions,
843 ) -> Result<ndarray::ArrayD<T>, ArrayError> {
844 let elements = self
845 .async_retrieve_chunk_subset_elements_opt(chunk_indices, chunk_subset, options)
846 .await?;
847 elements_to_ndarray(chunk_subset.shape(), elements)
848 }
849
850 #[allow(clippy::missing_errors_doc)]
852 pub async fn async_partial_decoder_opt(
853 &self,
854 chunk_indices: &[u64],
855 options: &CodecOptions,
856 ) -> Result<Arc<dyn AsyncArrayPartialDecoderTraits>, ArrayError> {
857 let storage_handle = Arc::new(StorageHandle::new(self.storage.clone()));
858 let storage_transformer = self
859 .storage_transformers()
860 .create_async_readable_transformer(storage_handle)
861 .await?;
862 let input_handle = Arc::new(AsyncStoragePartialDecoder::new(
863 storage_transformer,
864 self.chunk_key(chunk_indices),
865 ));
866 let chunk_representation = self.chunk_array_representation(chunk_indices)?;
867 Ok(self
868 .codecs
869 .clone()
870 .async_partial_decoder(input_handle, &chunk_representation, options)
871 .await?)
872 }
873}