1use std::sync::Arc;
2
3use futures::{StreamExt, TryStreamExt};
4
5use crate::{
6 array::ArrayBytes,
7 array_subset::ArraySubset,
8 config::{global_config, MetadataEraseVersion},
9 node::{meta_key_v2_array, meta_key_v2_attributes, meta_key_v3},
10 storage::{AsyncBytes, AsyncWritableStorageTraits, StorageError, StorageHandle},
11};
12
13use super::{
14 codec::{ArrayToBytesCodecTraits, CodecOptions},
15 concurrency::concurrency_chunks_and_codec,
16 Array, ArrayError, ArrayMetadata, ArrayMetadataOptions, Element,
17};
18
19impl<TStorage: ?Sized + AsyncWritableStorageTraits + 'static> Array<TStorage> {
20 #[allow(clippy::missing_errors_doc)]
22 pub async fn async_store_metadata(&self) -> Result<(), StorageError> {
23 self.async_store_metadata_opt(&ArrayMetadataOptions::default())
24 .await
25 }
26
27 #[allow(clippy::missing_errors_doc)]
29 pub async fn async_store_metadata_opt(
30 &self,
31 options: &ArrayMetadataOptions,
32 ) -> Result<(), StorageError> {
33 let storage_handle = Arc::new(StorageHandle::new(self.storage.clone()));
34 let storage_transformer = self
35 .storage_transformers()
36 .create_async_writable_transformer(storage_handle)
37 .await?;
38
39 let metadata = self.metadata_opt(options);
41
42 let path = self.path();
44 match metadata {
45 ArrayMetadata::V3(metadata) => {
46 let key = meta_key_v3(path);
47 let json = serde_json::to_vec_pretty(&metadata)
48 .map_err(|err| StorageError::InvalidMetadata(key.clone(), err.to_string()))?;
49 storage_transformer.set(&key, json.into()).await
50 }
51 ArrayMetadata::V2(metadata) => {
52 let mut metadata = metadata.clone();
53
54 if !metadata.attributes.is_empty() {
55 let key = meta_key_v2_attributes(path);
57 let json = serde_json::to_vec_pretty(&metadata.attributes).map_err(|err| {
58 StorageError::InvalidMetadata(key.clone(), err.to_string())
59 })?;
60 storage_transformer
61 .set(&meta_key_v2_attributes(path), json.into())
62 .await?;
63
64 metadata.attributes = serde_json::Map::default();
65 }
66
67 let key = meta_key_v2_array(path);
69 let json = serde_json::to_vec_pretty(&metadata)
70 .map_err(|err| StorageError::InvalidMetadata(key.clone(), err.to_string()))?;
71 storage_transformer.set(&key, json.into()).await
72 }
73 }
74 }
75
76 #[allow(clippy::missing_errors_doc)]
78 pub async fn async_store_chunk<'a>(
79 &self,
80 chunk_indices: &[u64],
81 chunk_bytes: impl Into<ArrayBytes<'a>> + Send,
82 ) -> Result<(), ArrayError> {
83 self.async_store_chunk_opt(chunk_indices, chunk_bytes, &CodecOptions::default())
84 .await
85 }
86
87 #[allow(clippy::missing_errors_doc)]
89 pub async fn async_store_chunk_elements<T: Element + Send + Sync>(
90 &self,
91 chunk_indices: &[u64],
92 chunk_elements: &[T],
93 ) -> Result<(), ArrayError> {
94 self.async_store_chunk_elements_opt(chunk_indices, chunk_elements, &CodecOptions::default())
95 .await
96 }
97
98 #[cfg(feature = "ndarray")]
99 #[allow(clippy::missing_errors_doc)]
101 pub async fn async_store_chunk_ndarray<T: Element + Send + Sync, D: ndarray::Dimension>(
102 &self,
103 chunk_indices: &[u64],
104 chunk_array: impl Into<ndarray::Array<T, D>> + Send,
105 ) -> Result<(), ArrayError> {
106 self.async_store_chunk_ndarray_opt(chunk_indices, chunk_array, &CodecOptions::default())
107 .await
108 }
109
110 #[allow(clippy::missing_errors_doc)]
112 #[allow(clippy::similar_names)]
113 pub async fn async_store_chunks<'a>(
114 &self,
115 chunks: &ArraySubset,
116 chunks_bytes: impl Into<ArrayBytes<'a>> + Send,
117 ) -> Result<(), ArrayError> {
118 self.async_store_chunks_opt(chunks, chunks_bytes, &CodecOptions::default())
119 .await
120 }
121
122 #[allow(clippy::missing_errors_doc)]
124 pub async fn async_store_chunks_elements<T: Element + Send + Sync>(
125 &self,
126 chunks: &ArraySubset,
127 chunks_elements: &[T],
128 ) -> Result<(), ArrayError> {
129 self.async_store_chunks_elements_opt(chunks, chunks_elements, &CodecOptions::default())
130 .await
131 }
132
133 #[cfg(feature = "ndarray")]
134 #[allow(clippy::missing_errors_doc)]
136 pub async fn async_store_chunks_ndarray<T: Element + Send + Sync, D: ndarray::Dimension>(
137 &self,
138 chunks: &ArraySubset,
139 chunks_array: impl Into<ndarray::Array<T, D>> + Send,
140 ) -> Result<(), ArrayError> {
141 self.async_store_chunks_ndarray_opt(chunks, chunks_array, &CodecOptions::default())
142 .await
143 }
144
145 #[allow(clippy::missing_errors_doc)]
147 pub async fn async_erase_metadata(&self) -> Result<(), StorageError> {
148 let erase_version = global_config().metadata_erase_version();
149 self.async_erase_metadata_opt(erase_version).await
150 }
151
152 #[allow(clippy::missing_errors_doc)]
154 pub async fn async_erase_metadata_opt(
155 &self,
156 options: MetadataEraseVersion,
157 ) -> Result<(), StorageError> {
158 let storage_handle = StorageHandle::new(self.storage.clone());
159 match options {
160 MetadataEraseVersion::Default => match self.metadata {
161 ArrayMetadata::V3(_) => storage_handle.erase(&meta_key_v3(self.path())).await,
162 ArrayMetadata::V2(_) => {
163 storage_handle
164 .erase(&meta_key_v2_array(self.path()))
165 .await?;
166 storage_handle
167 .erase(&meta_key_v2_attributes(self.path()))
168 .await
169 }
170 },
171 MetadataEraseVersion::All => {
172 storage_handle.erase(&meta_key_v3(self.path())).await?;
173 storage_handle
174 .erase(&meta_key_v2_array(self.path()))
175 .await?;
176 storage_handle
177 .erase(&meta_key_v2_attributes(self.path()))
178 .await
179 }
180 MetadataEraseVersion::V3 => storage_handle.erase(&meta_key_v3(self.path())).await,
181 MetadataEraseVersion::V2 => {
182 storage_handle
183 .erase(&meta_key_v2_array(self.path()))
184 .await?;
185 storage_handle
186 .erase(&meta_key_v2_attributes(self.path()))
187 .await
188 }
189 }
190 }
191
192 #[allow(clippy::missing_errors_doc)]
194 pub async fn async_erase_chunk(&self, chunk_indices: &[u64]) -> Result<(), StorageError> {
195 let storage_handle = Arc::new(StorageHandle::new(self.storage.clone()));
196 let storage_transformer = self
197 .storage_transformers()
198 .create_async_writable_transformer(storage_handle)
199 .await?;
200 storage_transformer
201 .erase(&self.chunk_key(chunk_indices))
202 .await
203 }
204
205 #[allow(clippy::missing_errors_doc)]
207 pub async fn async_erase_chunks(&self, chunks: &ArraySubset) -> Result<(), StorageError> {
208 let storage_handle = Arc::new(StorageHandle::new(self.storage.clone()));
209 let storage_transformer = self
210 .storage_transformers()
211 .create_async_writable_transformer(storage_handle)
212 .await?;
213 let erase_chunk = |chunk_indices: Vec<u64>| {
214 let storage_transformer = storage_transformer.clone();
215 async move {
216 storage_transformer
217 .erase(&self.chunk_key(&chunk_indices))
218 .await
219 }
220 };
221 futures::stream::iter(chunks.indices().into_iter())
222 .map(Ok)
223 .try_for_each_concurrent(None, erase_chunk)
224 .await
225 }
226
227 #[allow(clippy::missing_errors_doc)]
233 pub async fn async_store_chunk_opt<'a>(
234 &self,
235 chunk_indices: &[u64],
236 chunk_bytes: impl Into<ArrayBytes<'a>> + Send,
237 options: &CodecOptions,
238 ) -> Result<(), ArrayError> {
239 let chunk_bytes = chunk_bytes.into();
240
241 let chunk_array_representation = self.chunk_array_representation(chunk_indices)?;
243 chunk_bytes.validate(
244 chunk_array_representation.num_elements(),
245 chunk_array_representation.data_type().size(),
246 )?;
247
248 let is_fill_value =
249 !options.store_empty_chunks() && chunk_bytes.is_fill_value(self.fill_value());
250 if is_fill_value {
251 self.async_erase_chunk(chunk_indices).await?;
252 } else {
253 let chunk_encoded = self
254 .codecs()
255 .encode(chunk_bytes, &chunk_array_representation, options)
256 .map_err(ArrayError::CodecError)?;
257 let chunk_encoded = AsyncBytes::from(chunk_encoded.to_vec());
258 unsafe { self.async_store_encoded_chunk(chunk_indices, chunk_encoded) }.await?;
259 }
260 Ok(())
261 }
262
263 #[allow(clippy::missing_errors_doc, clippy::missing_safety_doc)]
265 pub async unsafe fn async_store_encoded_chunk(
266 &self,
267 chunk_indices: &[u64],
268 encoded_chunk_bytes: AsyncBytes,
269 ) -> Result<(), ArrayError> {
270 let storage_handle = Arc::new(StorageHandle::new(self.storage.clone()));
271 let storage_transformer = self
272 .storage_transformers()
273 .create_async_writable_transformer(storage_handle)
274 .await?;
275 storage_transformer
276 .set(&self.chunk_key(chunk_indices), encoded_chunk_bytes)
277 .await?;
278 Ok(())
279 }
280
281 #[allow(clippy::missing_errors_doc)]
283 pub async fn async_store_chunk_elements_opt<T: Element + Send + Sync>(
284 &self,
285 chunk_indices: &[u64],
286 chunk_elements: &[T],
287 options: &CodecOptions,
288 ) -> Result<(), ArrayError> {
289 let bytes = T::into_array_bytes(self.data_type(), chunk_elements)?;
290 self.async_store_chunk_opt(chunk_indices, bytes, options)
291 .await
292 }
293
294 #[cfg(feature = "ndarray")]
295 #[allow(clippy::missing_errors_doc)]
297 pub async fn async_store_chunk_ndarray_opt<T: Element + Send + Sync, D: ndarray::Dimension>(
298 &self,
299 chunk_indices: &[u64],
300 chunk_array: impl Into<ndarray::Array<T, D>> + Send,
301 options: &CodecOptions,
302 ) -> Result<(), ArrayError> {
303 let chunk_array: ndarray::Array<T, D> = chunk_array.into();
304 let chunk_shape = self.chunk_shape_usize(chunk_indices)?;
305 if chunk_array.shape() == chunk_shape {
306 let chunk_array = super::ndarray_into_vec(chunk_array);
307 self.async_store_chunk_elements_opt(chunk_indices, &chunk_array, options)
308 .await
309 } else {
310 Err(ArrayError::InvalidDataShape(
311 chunk_array.shape().to_vec(),
312 chunk_shape,
313 ))
314 }
315 }
316
317 #[allow(clippy::missing_errors_doc, clippy::missing_panics_doc)]
319 #[allow(clippy::similar_names)]
320 pub async fn async_store_chunks_opt<'a>(
321 &self,
322 chunks: &ArraySubset,
323 chunks_bytes: impl Into<ArrayBytes<'a>> + Send,
324 options: &CodecOptions,
325 ) -> Result<(), ArrayError> {
326 let num_chunks = chunks.num_elements_usize();
327 match num_chunks {
328 0 => {
329 let chunks_bytes = chunks_bytes.into();
330 chunks_bytes.validate(0, self.data_type().size())?;
331 }
332 1 => {
333 let chunk_indices = chunks.start();
334 self.async_store_chunk_opt(chunk_indices, chunks_bytes, options)
335 .await?;
336 }
337 _ => {
338 let chunks_bytes = chunks_bytes.into();
339 let array_subset = self.chunks_subset(chunks)?;
340 chunks_bytes.validate(array_subset.num_elements(), self.data_type().size())?;
341
342 let chunk_representation =
344 self.chunk_array_representation(&vec![0; self.dimensionality()])?;
345 let codec_concurrency =
346 self.recommended_codec_concurrency(&chunk_representation)?;
347 let (chunk_concurrent_limit, options) = concurrency_chunks_and_codec(
348 options.concurrent_target(),
349 num_chunks,
350 options,
351 &codec_concurrency,
352 );
353
354 let store_chunk = |chunk_indices: Vec<u64>| {
355 let chunk_subset = self.chunk_subset(&chunk_indices).unwrap(); let chunk_bytes = chunks_bytes
357 .extract_array_subset(
358 &chunk_subset.relative_to(array_subset.start()).unwrap(), array_subset.shape(),
360 self.data_type(),
361 )
362 .unwrap(); let options = options.clone();
364 async move {
365 self.async_store_chunk_opt(&chunk_indices, chunk_bytes, &options)
366 .await
367 }
368 };
369 futures::stream::iter(&chunks.indices())
370 .map(Ok)
371 .try_for_each_concurrent(Some(chunk_concurrent_limit), store_chunk)
372 .await?;
373 }
374 }
375
376 Ok(())
377 }
378
379 #[allow(clippy::missing_errors_doc)]
381 pub async fn async_store_chunks_elements_opt<T: Element + Send + Sync>(
382 &self,
383 chunks: &ArraySubset,
384 chunks_elements: &[T],
385 options: &CodecOptions,
386 ) -> Result<(), ArrayError> {
387 let chunks_bytes = T::into_array_bytes(self.data_type(), chunks_elements)?;
388 self.async_store_chunks_opt(chunks, chunks_bytes, options)
389 .await
390 }
391
392 #[cfg(feature = "ndarray")]
393 #[allow(clippy::missing_errors_doc)]
395 pub async fn async_store_chunks_ndarray_opt<T: Element + Send + Sync, D: ndarray::Dimension>(
396 &self,
397 chunks: &ArraySubset,
398 chunks_array: impl Into<ndarray::Array<T, D>> + Send,
399 options: &CodecOptions,
400 ) -> Result<(), ArrayError> {
401 let chunks_array: ndarray::Array<T, D> = chunks_array.into();
402 let chunks_subset = self.chunks_subset(chunks)?;
403 let chunks_shape = chunks_subset.shape_usize();
404 if chunks_array.shape() == chunks_shape {
405 let chunks_array = super::ndarray_into_vec(chunks_array);
406 self.async_store_chunks_elements_opt(chunks, &chunks_array, options)
407 .await
408 } else {
409 Err(ArrayError::InvalidDataShape(
410 chunks_array.shape().to_vec(),
411 chunks_shape,
412 ))
413 }
414 }
415}