use std::sync::Arc;
#[cfg(not(target_arch = "wasm32"))]
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use super::concurrency::concurrency_chunks_and_codec;
use super::{
Array, ArrayError, ArrayIndicesTinyVec, ArrayMetadata, ArrayMetadataOptions, ChunkShapeTraits,
Element, IntoArrayBytes,
};
use crate::array::ArraySubsetTraits;
use crate::config::MetadataEraseVersion;
use crate::iter_concurrent_limit;
use crate::node::{meta_key_v2_array, meta_key_v2_attributes, meta_key_v3};
use zarrs_codec::{ArrayToBytesCodecTraits, CodecOptions};
use zarrs_storage::{Bytes, StorageError, StorageHandle, WritableStorageTraits};
impl<TStorage: ?Sized + WritableStorageTraits + 'static> Array<TStorage> {
pub fn store_metadata(&self) -> Result<(), StorageError> {
self.store_metadata_opt(&self.metadata_options)
}
pub fn store_metadata_opt(&self, options: &ArrayMetadataOptions) -> Result<(), StorageError> {
let storage_handle = Arc::new(StorageHandle::new(self.storage.clone()));
let storage_transformer = self
.storage_transformers()
.create_writable_transformer(storage_handle)?;
let metadata = self.metadata_opt(options);
let path = self.path();
match metadata {
ArrayMetadata::V3(metadata) => {
let key = meta_key_v3(path);
let json = serde_json::to_vec_pretty(&metadata)
.map_err(|err| StorageError::InvalidMetadata(key.clone(), err.to_string()))?;
storage_transformer.set(&key, json.into())
}
ArrayMetadata::V2(metadata) => {
let mut metadata = metadata.clone();
if !metadata.attributes.is_empty() {
let key = meta_key_v2_attributes(path);
let json = serde_json::to_vec_pretty(&metadata.attributes).map_err(|err| {
StorageError::InvalidMetadata(key.clone(), err.to_string())
})?;
storage_transformer.set(&meta_key_v2_attributes(path), json.into())?;
metadata.attributes = serde_json::Map::default();
}
let key = meta_key_v2_array(path);
let json = serde_json::to_vec_pretty(&metadata)
.map_err(|err| StorageError::InvalidMetadata(key.clone(), err.to_string()))?;
storage_transformer.set(&key, json.into())
}
}
}
pub fn store_chunk<'a>(
&self,
chunk_indices: &[u64],
chunk_data: impl IntoArrayBytes<'a>,
) -> Result<(), ArrayError> {
self.store_chunk_opt(chunk_indices, chunk_data, &CodecOptions::default())
}
#[deprecated(since = "0.23.0", note = "Use store_chunk() instead")]
pub fn store_chunk_elements<T: Element>(
&self,
chunk_indices: &[u64],
chunk_elements: &[T],
) -> Result<(), ArrayError> {
self.store_chunk_opt(chunk_indices, chunk_elements, &CodecOptions::default())
}
#[cfg(feature = "ndarray")]
#[deprecated(since = "0.23.0", note = "Use store_chunk() instead")]
#[allow(clippy::missing_errors_doc, clippy::missing_panics_doc)]
pub fn store_chunk_ndarray<T: Element, D: ndarray::Dimension>(
&self,
chunk_indices: &[u64],
chunk_array: &ndarray::ArrayRef<T, D>,
) -> Result<(), ArrayError> {
self.store_chunk_opt(
chunk_indices,
chunk_array.as_standard_layout().to_owned(),
&CodecOptions::default(),
)
}
#[allow(clippy::similar_names)]
#[allow(clippy::missing_errors_doc, clippy::missing_panics_doc)]
pub fn store_chunks<'a>(
&self,
chunks: &dyn ArraySubsetTraits,
chunks_data: impl IntoArrayBytes<'a>,
) -> Result<(), ArrayError> {
self.store_chunks_opt(chunks, chunks_data, &CodecOptions::default())
}
#[deprecated(since = "0.23.0", note = "Use store_chunks() instead")]
#[allow(clippy::missing_errors_doc, clippy::missing_panics_doc)]
pub fn store_chunks_elements<T: Element>(
&self,
chunks: &dyn ArraySubsetTraits,
chunks_elements: &[T],
) -> Result<(), ArrayError> {
self.store_chunks_opt(chunks, chunks_elements, &CodecOptions::default())
}
#[cfg(feature = "ndarray")]
#[deprecated(since = "0.23.0", note = "Use store_chunks() instead")]
#[allow(clippy::missing_errors_doc, clippy::missing_panics_doc)]
pub fn store_chunks_ndarray<T: Element, D: ndarray::Dimension>(
&self,
chunks: &dyn ArraySubsetTraits,
chunks_array: &ndarray::ArrayRef<T, D>,
) -> Result<(), ArrayError> {
self.store_chunks_opt(
chunks,
chunks_array.as_standard_layout().to_owned(),
&CodecOptions::default(),
)
}
pub fn erase_metadata(&self) -> Result<(), StorageError> {
self.erase_metadata_opt(self.metadata_erase_version)
}
pub fn erase_metadata_opt(&self, options: MetadataEraseVersion) -> Result<(), StorageError> {
let storage_handle = StorageHandle::new(self.storage.clone());
match options {
MetadataEraseVersion::Default => match self.metadata {
ArrayMetadata::V3(_) => storage_handle.erase(&meta_key_v3(self.path())),
ArrayMetadata::V2(_) => {
storage_handle.erase(&meta_key_v2_array(self.path()))?;
storage_handle.erase(&meta_key_v2_attributes(self.path()))
}
},
MetadataEraseVersion::All => {
storage_handle.erase(&meta_key_v3(self.path()))?;
storage_handle.erase(&meta_key_v2_array(self.path()))?;
storage_handle.erase(&meta_key_v2_attributes(self.path()))
}
MetadataEraseVersion::V3 => storage_handle.erase(&meta_key_v3(self.path())),
MetadataEraseVersion::V2 => {
storage_handle.erase(&meta_key_v2_array(self.path()))?;
storage_handle.erase(&meta_key_v2_attributes(self.path()))
}
}
}
pub fn erase_chunk(&self, chunk_indices: &[u64]) -> Result<(), StorageError> {
let storage_handle = Arc::new(StorageHandle::new(self.storage.clone()));
let storage_transformer = self
.storage_transformers()
.create_writable_transformer(storage_handle)?;
storage_transformer.erase(&self.chunk_key(chunk_indices))
}
pub fn erase_chunks(&self, chunks: &dyn ArraySubsetTraits) -> Result<(), StorageError> {
let storage_handle = Arc::new(StorageHandle::new(self.storage.clone()));
let storage_transformer = self
.storage_transformers()
.create_writable_transformer(storage_handle)?;
let erase_chunk = |chunk_indices: ArrayIndicesTinyVec| {
storage_transformer.erase(&self.chunk_key(&chunk_indices))
};
#[cfg(not(target_arch = "wasm32"))]
chunks.indices().into_par_iter().try_for_each(erase_chunk)?;
#[cfg(target_arch = "wasm32")]
chunks.indices().into_iter().try_for_each(erase_chunk)?;
Ok(())
}
#[allow(clippy::missing_errors_doc)]
pub fn store_chunk_opt<'a>(
&self,
chunk_indices: &[u64],
chunk_data: impl IntoArrayBytes<'a>,
options: &CodecOptions,
) -> Result<(), ArrayError> {
let chunk_bytes = chunk_data.into_array_bytes(self.data_type())?;
let chunk_shape = self.chunk_shape(chunk_indices)?;
chunk_bytes.validate(chunk_shape.num_elements_u64(), self.data_type())?;
let is_fill_value =
!options.store_empty_chunks() && chunk_bytes.is_fill_value(self.fill_value());
if is_fill_value {
self.erase_chunk(chunk_indices)?;
} else {
let chunk_encoded = self
.codecs()
.encode(
chunk_bytes,
&chunk_shape,
self.data_type(),
self.fill_value(),
options,
)
.map_err(ArrayError::CodecError)?;
let chunk_encoded = Bytes::from(chunk_encoded.into_owned());
unsafe { self.store_encoded_chunk(chunk_indices, chunk_encoded) }?;
}
Ok(())
}
pub unsafe fn store_encoded_chunk(
&self,
chunk_indices: &[u64],
encoded_chunk_bytes: bytes::Bytes,
) -> Result<(), ArrayError> {
let storage_handle = Arc::new(StorageHandle::new(self.storage.clone()));
let storage_transformer = self
.storage_transformers()
.create_writable_transformer(storage_handle)?;
storage_transformer.set(&self.chunk_key(chunk_indices), encoded_chunk_bytes)?;
Ok(())
}
#[deprecated(since = "0.23.0", note = "Use store_chunk_opt() instead")]
#[allow(clippy::missing_errors_doc)]
pub fn store_chunk_elements_opt<T: Element>(
&self,
chunk_indices: &[u64],
chunk_elements: &[T],
options: &CodecOptions,
) -> Result<(), ArrayError> {
self.store_chunk_opt(chunk_indices, chunk_elements, options)
}
#[cfg(feature = "ndarray")]
#[deprecated(since = "0.23.0", note = "Use store_chunk_opt() instead")]
#[allow(clippy::missing_errors_doc)]
pub fn store_chunk_ndarray_opt<T: Element, D: ndarray::Dimension>(
&self,
chunk_indices: &[u64],
chunk_array: &ndarray::ArrayRef<T, D>,
options: &CodecOptions,
) -> Result<(), ArrayError> {
self.store_chunk_opt(
chunk_indices,
chunk_array.as_standard_layout().to_owned(),
options,
)
}
#[allow(clippy::similar_names)]
#[allow(clippy::missing_errors_doc, clippy::missing_panics_doc)]
pub fn store_chunks_opt<'a>(
&self,
chunks: &dyn ArraySubsetTraits,
chunks_data: impl IntoArrayBytes<'a>,
options: &CodecOptions,
) -> Result<(), ArrayError> {
let num_chunks = chunks.num_elements_usize();
match num_chunks {
0 => {
let chunks_bytes = chunks_data.into_array_bytes(self.data_type())?;
chunks_bytes.validate(0, self.data_type())?;
}
1 => {
let chunk_indices = chunks.start();
self.store_chunk_opt(&chunk_indices, chunks_data, options)?;
}
_ => {
let chunks_bytes = chunks_data.into_array_bytes(self.data_type())?;
let array_subset = self.chunks_subset(chunks)?;
chunks_bytes.validate(array_subset.num_elements(), self.data_type())?;
let chunk_shape = self.chunk_shape(&vec![0; self.dimensionality()])?;
let codec_concurrency =
self.recommended_codec_concurrency(&chunk_shape, self.data_type())?;
let (chunk_concurrent_limit, options) = concurrency_chunks_and_codec(
options.concurrent_target(),
num_chunks,
options,
&codec_concurrency,
);
let store_chunk = |chunk_indices: ArrayIndicesTinyVec| -> Result<(), ArrayError> {
let chunk_subset = self.chunk_subset(&chunk_indices)?;
let chunk_bytes = chunks_bytes.extract_array_subset(
&chunk_subset.relative_to(array_subset.start())?,
array_subset.shape(),
self.data_type(),
)?;
self.store_chunk_opt(&chunk_indices, chunk_bytes, &options)
};
let indices = chunks.indices();
iter_concurrent_limit!(chunk_concurrent_limit, indices, try_for_each, store_chunk)?;
}
}
Ok(())
}
#[deprecated(since = "0.23.0", note = "Use store_chunks_opt() instead")]
#[allow(clippy::missing_errors_doc)]
pub fn store_chunks_elements_opt<T: Element>(
&self,
chunks: &dyn ArraySubsetTraits,
chunks_elements: &[T],
options: &CodecOptions,
) -> Result<(), ArrayError> {
let chunks_bytes = T::to_array_bytes(self.data_type(), chunks_elements)?;
self.store_chunks_opt(chunks, chunks_bytes, options)
}
#[cfg(feature = "ndarray")]
#[deprecated(since = "0.23.0", note = "Use store_chunks_opt() instead")]
#[allow(clippy::missing_errors_doc)]
pub fn store_chunks_ndarray_opt<T: Element, D: ndarray::Dimension>(
&self,
chunks: &dyn ArraySubsetTraits,
chunks_array: &ndarray::ArrayRef<T, D>,
options: &CodecOptions,
) -> Result<(), ArrayError> {
self.store_chunks_opt(
chunks,
chunks_array.as_standard_layout().to_owned(),
options,
)
}
}