zarrs/array/
array_async_writable.rs

1use std::sync::Arc;
2
3use futures::{StreamExt, TryStreamExt};
4
5use crate::{
6    array::ArrayBytes,
7    array_subset::ArraySubset,
8    config::{global_config, MetadataEraseVersion},
9    node::{meta_key_v2_array, meta_key_v2_attributes, meta_key_v3},
10    storage::{AsyncBytes, AsyncWritableStorageTraits, StorageError, StorageHandle},
11};
12
13use super::{
14    codec::{ArrayToBytesCodecTraits, CodecOptions},
15    concurrency::concurrency_chunks_and_codec,
16    Array, ArrayError, ArrayMetadata, ArrayMetadataOptions, Element,
17};
18
19impl<TStorage: ?Sized + AsyncWritableStorageTraits + 'static> Array<TStorage> {
20    /// Async variant of [`store_metadata`](Array::store_metadata).
21    #[allow(clippy::missing_errors_doc)]
22    pub async fn async_store_metadata(&self) -> Result<(), StorageError> {
23        self.async_store_metadata_opt(&ArrayMetadataOptions::default())
24            .await
25    }
26
27    /// Async variant of [`store_metadata_opt`](Array::store_metadata_opt).
28    #[allow(clippy::missing_errors_doc)]
29    pub async fn async_store_metadata_opt(
30        &self,
31        options: &ArrayMetadataOptions,
32    ) -> Result<(), StorageError> {
33        let storage_handle = Arc::new(StorageHandle::new(self.storage.clone()));
34        let storage_transformer = self
35            .storage_transformers()
36            .create_async_writable_transformer(storage_handle)
37            .await?;
38
39        // Get the metadata with options applied and store
40        let metadata = self.metadata_opt(options);
41
42        // Store the metadata
43        let path = self.path();
44        match metadata {
45            ArrayMetadata::V3(metadata) => {
46                let key = meta_key_v3(path);
47                let json = serde_json::to_vec_pretty(&metadata)
48                    .map_err(|err| StorageError::InvalidMetadata(key.clone(), err.to_string()))?;
49                storage_transformer.set(&key, json.into()).await
50            }
51            ArrayMetadata::V2(metadata) => {
52                let mut metadata = metadata.clone();
53
54                if !metadata.attributes.is_empty() {
55                    // Store .zattrs
56                    let key = meta_key_v2_attributes(path);
57                    let json = serde_json::to_vec_pretty(&metadata.attributes).map_err(|err| {
58                        StorageError::InvalidMetadata(key.clone(), err.to_string())
59                    })?;
60                    storage_transformer
61                        .set(&meta_key_v2_attributes(path), json.into())
62                        .await?;
63
64                    metadata.attributes = serde_json::Map::default();
65                }
66
67                // Store .zarray
68                let key = meta_key_v2_array(path);
69                let json = serde_json::to_vec_pretty(&metadata)
70                    .map_err(|err| StorageError::InvalidMetadata(key.clone(), err.to_string()))?;
71                storage_transformer.set(&key, json.into()).await
72            }
73        }
74    }
75
76    /// Async variant of [`store_chunk`](Array::store_chunk).
77    #[allow(clippy::missing_errors_doc)]
78    pub async fn async_store_chunk<'a>(
79        &self,
80        chunk_indices: &[u64],
81        chunk_bytes: impl Into<ArrayBytes<'a>> + Send,
82    ) -> Result<(), ArrayError> {
83        self.async_store_chunk_opt(chunk_indices, chunk_bytes, &CodecOptions::default())
84            .await
85    }
86
87    /// Async variant of [`store_chunk_elements`](Array::store_chunk_elements).
88    #[allow(clippy::missing_errors_doc)]
89    pub async fn async_store_chunk_elements<T: Element + Send + Sync>(
90        &self,
91        chunk_indices: &[u64],
92        chunk_elements: &[T],
93    ) -> Result<(), ArrayError> {
94        self.async_store_chunk_elements_opt(chunk_indices, chunk_elements, &CodecOptions::default())
95            .await
96    }
97
98    #[cfg(feature = "ndarray")]
99    /// Async variant of [`store_chunk_ndarray`](Array::store_chunk_ndarray).
100    #[allow(clippy::missing_errors_doc)]
101    pub async fn async_store_chunk_ndarray<T: Element + Send + Sync, D: ndarray::Dimension>(
102        &self,
103        chunk_indices: &[u64],
104        chunk_array: impl Into<ndarray::Array<T, D>> + Send,
105    ) -> Result<(), ArrayError> {
106        self.async_store_chunk_ndarray_opt(chunk_indices, chunk_array, &CodecOptions::default())
107            .await
108    }
109
110    /// Async variant of [`store_chunks`](Array::store_chunks).
111    #[allow(clippy::missing_errors_doc)]
112    #[allow(clippy::similar_names)]
113    pub async fn async_store_chunks<'a>(
114        &self,
115        chunks: &ArraySubset,
116        chunks_bytes: impl Into<ArrayBytes<'a>> + Send,
117    ) -> Result<(), ArrayError> {
118        self.async_store_chunks_opt(chunks, chunks_bytes, &CodecOptions::default())
119            .await
120    }
121
122    /// Async variant of [`store_chunks_elements`](Array::store_chunks_elements).
123    #[allow(clippy::missing_errors_doc)]
124    pub async fn async_store_chunks_elements<T: Element + Send + Sync>(
125        &self,
126        chunks: &ArraySubset,
127        chunks_elements: &[T],
128    ) -> Result<(), ArrayError> {
129        self.async_store_chunks_elements_opt(chunks, chunks_elements, &CodecOptions::default())
130            .await
131    }
132
133    #[cfg(feature = "ndarray")]
134    /// Async variant of [`store_chunks_ndarray`](Array::store_chunks_ndarray).
135    #[allow(clippy::missing_errors_doc)]
136    pub async fn async_store_chunks_ndarray<T: Element + Send + Sync, D: ndarray::Dimension>(
137        &self,
138        chunks: &ArraySubset,
139        chunks_array: impl Into<ndarray::Array<T, D>> + Send,
140    ) -> Result<(), ArrayError> {
141        self.async_store_chunks_ndarray_opt(chunks, chunks_array, &CodecOptions::default())
142            .await
143    }
144
145    /// Async variant of [`erase_metadata`](Array::erase_metadata).
146    #[allow(clippy::missing_errors_doc)]
147    pub async fn async_erase_metadata(&self) -> Result<(), StorageError> {
148        let erase_version = global_config().metadata_erase_version();
149        self.async_erase_metadata_opt(erase_version).await
150    }
151
152    /// Async variant of [`erase_metadata_opt`](Array::erase_metadata_opt).
153    #[allow(clippy::missing_errors_doc)]
154    pub async fn async_erase_metadata_opt(
155        &self,
156        options: MetadataEraseVersion,
157    ) -> Result<(), StorageError> {
158        let storage_handle = StorageHandle::new(self.storage.clone());
159        match options {
160            MetadataEraseVersion::Default => match self.metadata {
161                ArrayMetadata::V3(_) => storage_handle.erase(&meta_key_v3(self.path())).await,
162                ArrayMetadata::V2(_) => {
163                    storage_handle
164                        .erase(&meta_key_v2_array(self.path()))
165                        .await?;
166                    storage_handle
167                        .erase(&meta_key_v2_attributes(self.path()))
168                        .await
169                }
170            },
171            MetadataEraseVersion::All => {
172                storage_handle.erase(&meta_key_v3(self.path())).await?;
173                storage_handle
174                    .erase(&meta_key_v2_array(self.path()))
175                    .await?;
176                storage_handle
177                    .erase(&meta_key_v2_attributes(self.path()))
178                    .await
179            }
180            MetadataEraseVersion::V3 => storage_handle.erase(&meta_key_v3(self.path())).await,
181            MetadataEraseVersion::V2 => {
182                storage_handle
183                    .erase(&meta_key_v2_array(self.path()))
184                    .await?;
185                storage_handle
186                    .erase(&meta_key_v2_attributes(self.path()))
187                    .await
188            }
189        }
190    }
191
192    /// Async variant of [`erase_chunk`](Array::erase_chunk).
193    #[allow(clippy::missing_errors_doc)]
194    pub async fn async_erase_chunk(&self, chunk_indices: &[u64]) -> Result<(), StorageError> {
195        let storage_handle = Arc::new(StorageHandle::new(self.storage.clone()));
196        let storage_transformer = self
197            .storage_transformers()
198            .create_async_writable_transformer(storage_handle)
199            .await?;
200        storage_transformer
201            .erase(&self.chunk_key(chunk_indices))
202            .await
203    }
204
205    /// Async variant of [`erase_chunks`](Array::erase_chunks).
206    #[allow(clippy::missing_errors_doc)]
207    pub async fn async_erase_chunks(&self, chunks: &ArraySubset) -> Result<(), StorageError> {
208        let storage_handle = Arc::new(StorageHandle::new(self.storage.clone()));
209        let storage_transformer = self
210            .storage_transformers()
211            .create_async_writable_transformer(storage_handle)
212            .await?;
213        let erase_chunk = |chunk_indices: Vec<u64>| {
214            let storage_transformer = storage_transformer.clone();
215            async move {
216                storage_transformer
217                    .erase(&self.chunk_key(&chunk_indices))
218                    .await
219            }
220        };
221        futures::stream::iter(chunks.indices().into_iter())
222            .map(Ok)
223            .try_for_each_concurrent(None, erase_chunk)
224            .await
225    }
226
227    /////////////////////////////////////////////////////////////////////////////
228    // Advanced methods
229    /////////////////////////////////////////////////////////////////////////////
230
231    /// Async variant of [`store_chunk_opt`](Array::store_chunk_opt).
232    #[allow(clippy::missing_errors_doc)]
233    pub async fn async_store_chunk_opt<'a>(
234        &self,
235        chunk_indices: &[u64],
236        chunk_bytes: impl Into<ArrayBytes<'a>> + Send,
237        options: &CodecOptions,
238    ) -> Result<(), ArrayError> {
239        let chunk_bytes = chunk_bytes.into();
240
241        // Validation
242        let chunk_array_representation = self.chunk_array_representation(chunk_indices)?;
243        chunk_bytes.validate(
244            chunk_array_representation.num_elements(),
245            chunk_array_representation.data_type().size(),
246        )?;
247
248        let is_fill_value =
249            !options.store_empty_chunks() && chunk_bytes.is_fill_value(self.fill_value());
250        if is_fill_value {
251            self.async_erase_chunk(chunk_indices).await?;
252        } else {
253            let chunk_encoded = self
254                .codecs()
255                .encode(chunk_bytes, &chunk_array_representation, options)
256                .map_err(ArrayError::CodecError)?;
257            let chunk_encoded = AsyncBytes::from(chunk_encoded.to_vec());
258            unsafe { self.async_store_encoded_chunk(chunk_indices, chunk_encoded) }.await?;
259        }
260        Ok(())
261    }
262
263    /// Async variant of [`store_encoded_chunk`](Array::store_encoded_chunk)
264    #[allow(clippy::missing_errors_doc, clippy::missing_safety_doc)]
265    pub async unsafe fn async_store_encoded_chunk(
266        &self,
267        chunk_indices: &[u64],
268        encoded_chunk_bytes: AsyncBytes,
269    ) -> Result<(), ArrayError> {
270        let storage_handle = Arc::new(StorageHandle::new(self.storage.clone()));
271        let storage_transformer = self
272            .storage_transformers()
273            .create_async_writable_transformer(storage_handle)
274            .await?;
275        storage_transformer
276            .set(&self.chunk_key(chunk_indices), encoded_chunk_bytes)
277            .await?;
278        Ok(())
279    }
280
281    /// Async variant of [`store_chunk_elements_opt`](Array::store_chunk_elements_opt).
282    #[allow(clippy::missing_errors_doc)]
283    pub async fn async_store_chunk_elements_opt<T: Element + Send + Sync>(
284        &self,
285        chunk_indices: &[u64],
286        chunk_elements: &[T],
287        options: &CodecOptions,
288    ) -> Result<(), ArrayError> {
289        let bytes = T::into_array_bytes(self.data_type(), chunk_elements)?;
290        self.async_store_chunk_opt(chunk_indices, bytes, options)
291            .await
292    }
293
294    #[cfg(feature = "ndarray")]
295    /// Async variant of [`store_chunk_ndarray_opt`](Array::store_chunk_ndarray_opt).
296    #[allow(clippy::missing_errors_doc)]
297    pub async fn async_store_chunk_ndarray_opt<T: Element + Send + Sync, D: ndarray::Dimension>(
298        &self,
299        chunk_indices: &[u64],
300        chunk_array: impl Into<ndarray::Array<T, D>> + Send,
301        options: &CodecOptions,
302    ) -> Result<(), ArrayError> {
303        let chunk_array: ndarray::Array<T, D> = chunk_array.into();
304        let chunk_shape = self.chunk_shape_usize(chunk_indices)?;
305        if chunk_array.shape() == chunk_shape {
306            let chunk_array = super::ndarray_into_vec(chunk_array);
307            self.async_store_chunk_elements_opt(chunk_indices, &chunk_array, options)
308                .await
309        } else {
310            Err(ArrayError::InvalidDataShape(
311                chunk_array.shape().to_vec(),
312                chunk_shape,
313            ))
314        }
315    }
316
317    /// Async variant of [`store_chunks_opt`](Array::store_chunks_opt).
318    #[allow(clippy::missing_errors_doc, clippy::missing_panics_doc)]
319    #[allow(clippy::similar_names)]
320    pub async fn async_store_chunks_opt<'a>(
321        &self,
322        chunks: &ArraySubset,
323        chunks_bytes: impl Into<ArrayBytes<'a>> + Send,
324        options: &CodecOptions,
325    ) -> Result<(), ArrayError> {
326        let num_chunks = chunks.num_elements_usize();
327        match num_chunks {
328            0 => {
329                let chunks_bytes = chunks_bytes.into();
330                chunks_bytes.validate(0, self.data_type().size())?;
331            }
332            1 => {
333                let chunk_indices = chunks.start();
334                self.async_store_chunk_opt(chunk_indices, chunks_bytes, options)
335                    .await?;
336            }
337            _ => {
338                let chunks_bytes = chunks_bytes.into();
339                let array_subset = self.chunks_subset(chunks)?;
340                chunks_bytes.validate(array_subset.num_elements(), self.data_type().size())?;
341
342                // Calculate chunk/codec concurrency
343                let chunk_representation =
344                    self.chunk_array_representation(&vec![0; self.dimensionality()])?;
345                let codec_concurrency =
346                    self.recommended_codec_concurrency(&chunk_representation)?;
347                let (chunk_concurrent_limit, options) = concurrency_chunks_and_codec(
348                    options.concurrent_target(),
349                    num_chunks,
350                    options,
351                    &codec_concurrency,
352                );
353
354                let store_chunk = |chunk_indices: Vec<u64>| {
355                    let chunk_subset = self.chunk_subset(&chunk_indices).unwrap(); // FIXME: unwrap
356                    let chunk_bytes = chunks_bytes
357                        .extract_array_subset(
358                            &chunk_subset.relative_to(array_subset.start()).unwrap(), // FIXME: unwrap
359                            array_subset.shape(),
360                            self.data_type(),
361                        )
362                        .unwrap(); // FIXME: unwrap
363                    let options = options.clone();
364                    async move {
365                        self.async_store_chunk_opt(&chunk_indices, chunk_bytes, &options)
366                            .await
367                    }
368                };
369                futures::stream::iter(&chunks.indices())
370                    .map(Ok)
371                    .try_for_each_concurrent(Some(chunk_concurrent_limit), store_chunk)
372                    .await?;
373            }
374        }
375
376        Ok(())
377    }
378
379    /// Async variant of [`store_chunks_elements_opt`](Array::store_chunks_elements_opt).
380    #[allow(clippy::missing_errors_doc)]
381    pub async fn async_store_chunks_elements_opt<T: Element + Send + Sync>(
382        &self,
383        chunks: &ArraySubset,
384        chunks_elements: &[T],
385        options: &CodecOptions,
386    ) -> Result<(), ArrayError> {
387        let chunks_bytes = T::into_array_bytes(self.data_type(), chunks_elements)?;
388        self.async_store_chunks_opt(chunks, chunks_bytes, options)
389            .await
390    }
391
392    #[cfg(feature = "ndarray")]
393    /// Async variant of [`store_chunks_ndarray_opt`](Array::store_chunks_ndarray_opt).
394    #[allow(clippy::missing_errors_doc)]
395    pub async fn async_store_chunks_ndarray_opt<T: Element + Send + Sync, D: ndarray::Dimension>(
396        &self,
397        chunks: &ArraySubset,
398        chunks_array: impl Into<ndarray::Array<T, D>> + Send,
399        options: &CodecOptions,
400    ) -> Result<(), ArrayError> {
401        let chunks_array: ndarray::Array<T, D> = chunks_array.into();
402        let chunks_subset = self.chunks_subset(chunks)?;
403        let chunks_shape = chunks_subset.shape_usize();
404        if chunks_array.shape() == chunks_shape {
405            let chunks_array = super::ndarray_into_vec(chunks_array);
406            self.async_store_chunks_elements_opt(chunks, &chunks_array, options)
407                .await
408        } else {
409            Err(ArrayError::InvalidDataShape(
410                chunks_array.shape().to_vec(),
411                chunks_shape,
412            ))
413        }
414    }
415}