#![doc = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/README.md"))]
#![doc(hidden)]
use mimalloc::MiMalloc;
#[global_allocator]
static GLOBAL: MiMalloc = MiMalloc;
use std::{
num::NonZeroU64,
sync::{Arc, Mutex},
time::SystemTime,
};
use clap::Parser;
use progress::{Progress, ProgressCallback};
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use rayon_iter_concurrent_limit::iter_concurrent_limit;
use serde::{Deserialize, Serialize};
use zarrs::array::chunk_cache::{
ChunkCache, ChunkCacheDecodedLruChunkLimit, ChunkCacheDecodedLruChunkLimitThreadLocal,
ChunkCacheDecodedLruSizeLimit, ChunkCacheDecodedLruSizeLimitThreadLocal,
};
use zarrs::{
array::{
chunk_grid::RegularChunkGrid,
codec::{BytesCodec, Crc32cCodec, ShardingCodec, ShardingIndexLocation},
data_type, Array, ArrayBuilder, ArrayCodecTraits, ArrayError, ArrayIndicesTinyVec,
ArrayShardedExt, ArraySubset, ArrayToBytesCodecTraits, ChunkShape, Codec, CodecChain,
CodecMetadataOptions, CodecOptions, DataType, DimensionName, FillValue,
IncompatibleDimensionalityError, RecommendedConcurrency,
},
config::global_config,
metadata::{v3::MetadataV3, FillValueMetadata},
storage::{ReadableStorageTraits, ReadableWritableListableStorageTraits},
};
pub mod filter;
pub mod info;
pub mod progress;
pub mod type_dispatch;
pub const ZARRS_TOOLS_VERSION_WITH_ZARRS: &str = const_format::formatcp!(
"{} (zarrs {})",
env!("CARGO_PKG_VERSION"),
zarrs::version::version_str(),
);
#[derive(thiserror::Error, Debug)]
#[error("Data type {_0} is unsupported")]
pub struct UnsupportedDataTypeError(String);
impl From<String> for UnsupportedDataTypeError {
fn from(value: String) -> Self {
Self(value)
}
}
#[derive(Parser)]
#[allow(rustdoc::bare_urls)]
pub struct ZarrEncodingArgs {
#[arg(short, long, verbatim_doc_comment, allow_hyphen_values(true), value_parser = parse_fill_value)]
pub fill_value: FillValueMetadata,
#[arg(long, default_value_t = '/')]
pub separator: char,
#[arg(short, long, required = true, value_delimiter = ',')]
pub chunk_shape: Vec<u64>,
#[arg(short, long, verbatim_doc_comment, value_delimiter = ',')]
pub shard_shape: Option<Vec<u64>>,
#[arg(long, verbatim_doc_comment)]
pub array_to_array_codecs: Option<String>,
#[arg(long, verbatim_doc_comment)]
pub array_to_bytes_codec: Option<String>,
#[arg(long, verbatim_doc_comment)]
pub bytes_to_bytes_codecs: Option<String>,
#[arg(long)]
pub attributes: Option<String>,
}
fn parse_data_type(data_type: &str) -> std::io::Result<MetadataV3> {
serde_json::from_value(serde_json::Value::String(data_type.to_string()))
.map_err(|err| std::io::Error::other(err.to_string()))
}
fn parse_fill_value(fill_value: &str) -> std::io::Result<FillValueMetadata> {
serde_json::from_str(fill_value).map_err(|err| std::io::Error::other(err.to_string()))
}
#[must_use]
pub fn get_array_builder(
encoding_args: &ZarrEncodingArgs,
array_shape: &[u64],
data_type: DataType,
dimension_names: Option<Vec<DimensionName>>,
) -> zarrs::array::ArrayBuilder {
let shard_shape = encoding_args.shard_shape.as_ref().map(|shard_shape| {
std::iter::zip(shard_shape, array_shape)
.map(|(&s, &a)| if s == 0 { a } else { std::cmp::min(s, a) })
.collect::<Vec<_>>()
});
let chunk_shape = std::iter::zip(&encoding_args.chunk_shape, array_shape)
.map(|(&c, &a)| if c == 0 { a } else { c })
.collect::<Vec<_>>();
let shard_shape: Option<Vec<u64>> = shard_shape.map(|shard_shape| {
std::iter::zip(&shard_shape, &chunk_shape)
.map(|(s, c)| {
s.next_multiple_of(*c)
})
.collect()
});
let block_shape = shard_shape
.as_ref()
.map_or(&chunk_shape, |shard_shape| shard_shape);
let array_to_array_codecs = encoding_args.array_to_array_codecs.as_ref().map_or_else(
Vec::new,
|array_to_array_codecs| {
let metadatas: Vec<MetadataV3> =
serde_json::from_str(array_to_array_codecs.as_str()).unwrap();
let mut codecs = Vec::with_capacity(metadatas.len());
for metadata in metadatas {
codecs.push(match Codec::from_metadata(&metadata).unwrap() {
Codec::ArrayToArray(codec) => codec,
_ => panic!("Must be an array to array codec"),
});
}
codecs
},
);
let array_to_bytes_codec = encoding_args.array_to_bytes_codec.as_ref().map_or_else(
|| {
let codec: Arc<dyn ArrayToBytesCodecTraits> = Arc::<BytesCodec>::default();
codec
},
|array_codec| {
let metadata = MetadataV3::try_from(array_codec.as_str()).unwrap();
match Codec::from_metadata(&metadata).unwrap() {
Codec::ArrayToBytes(codec) => codec,
_ => panic!("Must be an array to bytes codec"),
}
},
);
let bytes_to_bytes_codecs = encoding_args.bytes_to_bytes_codecs.as_ref().map_or_else(
Vec::new,
|bytes_to_bytes_codecs| {
let metadatas: Vec<MetadataV3> =
serde_json::from_str(bytes_to_bytes_codecs.as_str()).unwrap();
let mut codecs = Vec::with_capacity(metadatas.len());
for metadata in metadatas {
codecs.push(match Codec::from_metadata(&metadata).unwrap() {
Codec::BytesToBytes(codec) => codec,
_ => panic!("Must be a bytes to bytes codec"),
});
}
codecs
},
);
let fill_value = data_type.fill_value_v3(&encoding_args.fill_value).unwrap();
let mut array_builder = ArrayBuilder::new(
array_shape.to_vec(),
block_shape.clone(),
data_type,
fill_value,
);
array_builder.dimension_names(dimension_names);
if let Some(attributes) = &encoding_args.attributes {
let attributes: serde_json::Map<String, serde_json::Value> =
serde_json::from_str(attributes).expect("Attributes are invalid.");
array_builder.attributes(attributes);
}
array_builder.chunk_key_encoding_default_separator(encoding_args.separator.try_into().unwrap());
if shard_shape.is_some() {
let index_codecs = Arc::new(CodecChain::new(
vec![],
Arc::<BytesCodec>::default(),
vec![Arc::new(Crc32cCodec::new())],
));
let inner_codecs = Arc::new(CodecChain::new(
array_to_array_codecs,
array_to_bytes_codec,
bytes_to_bytes_codecs,
));
let chunk_shape_nonzero: ChunkShape = chunk_shape
.iter()
.map(|&s| NonZeroU64::new(s).unwrap())
.collect();
array_builder.array_to_bytes_codec(Arc::new(ShardingCodec::new(
chunk_shape_nonzero,
inner_codecs,
index_codecs,
ShardingIndexLocation::End,
)));
} else {
array_builder.array_to_array_codecs(array_to_array_codecs);
array_builder.array_to_bytes_codec(array_to_bytes_codec);
array_builder.bytes_to_bytes_codecs(bytes_to_bytes_codecs);
}
array_builder
}
fn is_false(value: &bool) -> bool {
!value
}
#[derive(Parser, Debug, Clone, Default, Serialize, Deserialize)]
pub struct ZarrReencodingArgs {
#[serde(skip_serializing_if = "Option::is_none")]
#[arg(short, long, verbatim_doc_comment, value_parser = parse_data_type)]
pub data_type: Option<MetadataV3>,
#[serde(skip_serializing_if = "Option::is_none")]
#[arg(short, long, verbatim_doc_comment, allow_hyphen_values(true), value_parser = parse_fill_value)]
pub fill_value: Option<FillValueMetadata>,
#[serde(skip_serializing_if = "Option::is_none")]
#[arg(long)]
pub separator: Option<char>,
#[serde(skip_serializing_if = "Option::is_none")]
#[arg(short, long, value_delimiter = ',')]
pub chunk_shape: Option<Vec<u64>>,
#[serde(skip_serializing_if = "Option::is_none")]
#[arg(short, long, verbatim_doc_comment, value_delimiter = ',')]
pub shard_shape: Option<Vec<u64>>,
#[serde(skip_serializing_if = "is_false")]
#[arg(long, verbatim_doc_comment)]
pub ignore_input_sharding: bool,
#[serde(skip_serializing_if = "Option::is_none")]
#[arg(long, verbatim_doc_comment)]
pub array_to_array_codecs: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
#[arg(long, verbatim_doc_comment)]
pub array_to_bytes_codec: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
#[arg(long, verbatim_doc_comment)]
pub bytes_to_bytes_codecs: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
#[arg(long, verbatim_doc_comment, value_delimiter = ',')]
pub dimension_names: Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
#[arg(long)]
pub attributes: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
#[arg(long)]
pub attributes_append: Option<String>,
}
pub enum ZarrReEncodingChangeType {
None,
Metadata,
MetadataAndChunks,
}
impl ZarrReencodingArgs {
pub fn change_type(&self) -> ZarrReEncodingChangeType {
if self.data_type.is_some()
|| self.fill_value.is_some()
|| self.separator.is_some()
|| self.chunk_shape.is_some()
|| self.shard_shape.is_some()
|| self.ignore_input_sharding
|| self.array_to_array_codecs.is_some()
|| self.array_to_bytes_codec.is_some()
|| self.bytes_to_bytes_codecs.is_some()
{
ZarrReEncodingChangeType::MetadataAndChunks
} else if self.dimension_names.is_some()
|| self.attributes.is_some()
|| self.attributes_append.is_some()
{
ZarrReEncodingChangeType::Metadata
} else {
ZarrReEncodingChangeType::None
}
}
}
#[must_use]
pub fn get_array_builder_reencode<TStorage: ?Sized>(
encoding_args: &ZarrReencodingArgs,
array: &Array<TStorage>,
array_shape: Option<Vec<u64>>,
) -> zarrs::array::ArrayBuilder {
let codecs = array.codecs();
let array_to_bytes_codec = codecs.array_to_bytes_codec();
let (
chunk_shape,
shard_shape,
array_to_array_codecs,
array_array_to_bytes_codec,
bytes_to_bytes_codecs,
) = if array_to_bytes_codec
.as_any()
.downcast_ref::<ShardingCodec>()
.is_some()
{
let sharding_configuration = array_to_bytes_codec
.configuration_v3(&CodecMetadataOptions::default())
.unwrap();
let chunk_shape: Vec<u64> =
serde_json::from_value(sharding_configuration["chunk_shape"].clone()).unwrap();
let shard_shape = array
.chunk_shape(&vec![0; chunk_shape.len()])
.unwrap()
.iter()
.map(|i| i.get())
.collect::<Vec<_>>();
let codecs: Vec<MetadataV3> =
serde_json::from_value(sharding_configuration["codecs"].clone()).unwrap();
let codec_chain = CodecChain::from_metadata(&codecs).unwrap();
let array_to_array_codecs = codec_chain.array_to_array_codecs().to_vec();
let array_to_bytes_codec = codec_chain.array_to_bytes_codec().clone();
let bytes_to_bytes_codecs = codec_chain.bytes_to_bytes_codecs().to_vec();
(
chunk_shape,
if encoding_args.ignore_input_sharding {
None
} else {
Some(shard_shape)
},
array_to_array_codecs,
array_to_bytes_codec,
bytes_to_bytes_codecs,
)
} else {
let chunk_shape = array.chunk_grid_shape().to_vec();
let shard_shape = None;
let array_to_array_codecs = array.codecs().array_to_array_codecs().to_vec();
let array_to_bytes_codec = array.codecs().array_to_bytes_codec().clone();
let bytes_to_bytes_codecs = array.codecs().bytes_to_bytes_codecs().to_vec();
(
chunk_shape,
shard_shape,
array_to_array_codecs,
array_to_bytes_codec,
bytes_to_bytes_codecs,
)
};
let chunk_shape = encoding_args
.chunk_shape
.as_ref()
.map(|chunk_shape| {
std::iter::zip(chunk_shape.as_slice(), array.shape())
.map(|(&c, &a)| if c == 0 { a } else { c })
.collect::<Vec<_>>()
})
.unwrap_or(chunk_shape);
let shard_shape: Option<Vec<u64>> =
encoding_args
.shard_shape
.as_ref()
.map_or(shard_shape, |shard_shape| {
let shard_shape = std::iter::zip(shard_shape, array.shape())
.map(|(&s, &a)| if s == 0 { a } else { std::cmp::min(s, a) })
.collect::<Vec<_>>();
Some(shard_shape)
});
let shard_shape: Option<Vec<u64>> = shard_shape.clone().map_or(shard_shape, |shard_shape| {
let shard_shape = std::iter::zip(shard_shape.as_slice(), chunk_shape.as_slice())
.map(|(s, c)| {
s.next_multiple_of(*c)
})
.collect::<Vec<_>>();
Some(shard_shape)
});
let array_to_array_codecs = encoding_args.array_to_array_codecs.clone().map_or(
array_to_array_codecs,
|array_to_array_codecs| {
let metadatas: Vec<MetadataV3> =
serde_json::from_str(array_to_array_codecs.as_str()).unwrap();
let mut codecs = Vec::with_capacity(metadatas.len());
for metadata in metadatas {
let codec = match Codec::from_metadata(&metadata).unwrap() {
Codec::ArrayToArray(codec) => codec,
_ => panic!("Must be an array to array codec"),
};
codecs.push(codec);
}
codecs
},
);
let array_to_bytes_codec = encoding_args.array_to_bytes_codec.as_ref().map_or(
array_array_to_bytes_codec,
|array_codec| {
let metadata = MetadataV3::try_from(array_codec.as_str()).unwrap();
match Codec::from_metadata(&metadata).unwrap() {
Codec::ArrayToBytes(codec) => codec,
_ => panic!("Must be an array to bytes codec"),
}
},
);
let bytes_to_bytes_codecs = encoding_args.bytes_to_bytes_codecs.as_ref().map_or(
bytes_to_bytes_codecs,
|bytes_to_bytes_codecs| {
let metadatas: Vec<MetadataV3> =
serde_json::from_str(bytes_to_bytes_codecs.as_str()).unwrap();
let mut codecs = Vec::with_capacity(metadatas.len());
for metadata in metadatas {
let codec = match Codec::from_metadata(&metadata).unwrap() {
Codec::BytesToBytes(codec) => codec,
_ => panic!("Must be a bytes to bytes codec"),
};
codecs.push(codec);
}
codecs
},
);
let mut array_builder = array.builder();
if let Some(attributes) = &encoding_args.attributes {
let attributes: serde_json::Map<String, serde_json::Value> =
serde_json::from_str(attributes).expect("Attributes are invalid.");
array_builder.attributes(attributes);
}
if let Some(attributes_append) = &encoding_args.attributes_append {
let mut attributes_append: serde_json::Map<String, serde_json::Value> =
serde_json::from_str(attributes_append).expect("Attributes append are invalid.");
array_builder
.attributes_mut()
.append(&mut attributes_append);
}
if let Some(separator) = encoding_args.separator {
array_builder.chunk_key_encoding_default_separator(separator.try_into().unwrap());
}
if let Some(array_shape) = array_shape {
array_builder.shape(array_shape);
}
let data_type = if let Some(data_type) = &encoding_args.data_type {
let data_type = DataType::from_metadata(data_type).unwrap();
array_builder.data_type(data_type.clone());
data_type
} else {
array.data_type().clone()
};
if let Some(dimension_names) = encoding_args.dimension_names.clone() {
array_builder.dimension_names(dimension_names.into());
}
if let Some(fill_value) = &encoding_args.fill_value {
let fill_value = data_type.fill_value_v3(fill_value).unwrap();
array_builder.fill_value(fill_value);
} else if let Some(data_type) = &encoding_args.data_type {
let data_type = DataType::from_metadata(data_type).unwrap();
let fill_value = convert_fill_value(array.data_type(), array.fill_value(), &data_type);
array_builder.fill_value(fill_value);
}
if let Some(shard_shape) = shard_shape {
array_builder.chunk_grid_metadata(shard_shape);
let index_codecs = Arc::new(CodecChain::new(
vec![],
Arc::<BytesCodec>::default(),
vec![Arc::new(Crc32cCodec::new())],
));
let inner_codecs = Arc::new(CodecChain::new(
array_to_array_codecs,
array_to_bytes_codec,
bytes_to_bytes_codecs,
));
array_builder.array_to_array_codecs(vec![]);
let chunk_shape_nonzero: ChunkShape = chunk_shape
.iter()
.map(|&s| NonZeroU64::new(s).unwrap())
.collect();
array_builder.array_to_bytes_codec(Arc::new(ShardingCodec::new(
chunk_shape_nonzero,
inner_codecs,
index_codecs,
ShardingIndexLocation::End,
)));
array_builder.bytes_to_bytes_codecs(vec![]);
} else {
array_builder.array_to_array_codecs(array_to_array_codecs);
array_builder.array_to_bytes_codec(array_to_bytes_codec);
array_builder.bytes_to_bytes_codecs(bytes_to_bytes_codecs);
}
array_builder
}
pub enum CacheSize {
None,
SizeTotal(u64),
SizePerThread(u64),
ChunksTotal(u64),
ChunksPerThread(u64),
}
fn convert_and_store_subset(
array_in: &Arc<Array<dyn ReadableStorageTraits>>,
array_out: &Array<dyn ReadableWritableListableStorageTraits>,
subset: &ArraySubset,
progress: &Progress,
bytes_decoded: &Mutex<usize>,
) -> anyhow::Result<()> {
let bytes_size = subset.num_elements_usize() * array_in.data_type().fixed_size().unwrap();
let timing =
type_dispatch::retrieve_and_store_converting(array_in.as_ref(), array_out, subset)?;
progress.add_conversion_timing(timing);
*bytes_decoded.lock().unwrap() += bytes_size;
Ok(())
}
pub fn do_reencode(
array_in: Arc<Array<dyn ReadableStorageTraits>>,
array_out: &Array<dyn ReadableWritableListableStorageTraits>,
validate: bool,
concurrent_chunks: Option<usize>,
progress_callback: &ProgressCallback,
cache_size: CacheSize,
write_shape: Option<Vec<NonZeroU64>>,
) -> anyhow::Result<(f32, f32, f32, usize)> {
if let Some(write_shape) = &write_shape {
if write_shape.len() != array_out.chunk_grid().dimensionality() {
anyhow::bail!("Write shape dimensionality does not match chunk grid dimensionality");
}
}
let start = SystemTime::now();
let bytes_decoded = Mutex::new(0);
let cache: Option<Arc<dyn ChunkCache>> = match cache_size {
CacheSize::None => None,
CacheSize::SizeTotal(size) => Some(Arc::new(ChunkCacheDecodedLruSizeLimit::new(
array_in.clone(),
size,
))),
CacheSize::SizePerThread(size) => Some(Arc::new(
ChunkCacheDecodedLruSizeLimitThreadLocal::new(array_in.clone(), size),
)),
CacheSize::ChunksTotal(chunks) => Some(Arc::new(ChunkCacheDecodedLruChunkLimit::new(
array_in.clone(),
chunks,
))),
CacheSize::ChunksPerThread(chunks) => Some(Arc::new(
ChunkCacheDecodedLruChunkLimitThreadLocal::new(array_in.clone(), chunks),
)),
};
let chunk_shape = array_out
.chunk_shape(&vec![0; array_out.chunk_grid().dimensionality()])
.unwrap();
let chunks = ArraySubset::new_with_shape(array_out.chunk_grid_shape().to_vec());
let concurrent_target = std::thread::available_parallelism().unwrap().get();
let (chunks_concurrent_limit, codec_concurrent_target) = calculate_chunk_and_codec_concurrency(
concurrent_target,
concurrent_chunks,
&array_out.codecs(),
chunks.num_elements_usize(),
&chunk_shape,
array_out.data_type(),
);
let is_sharded = array_out.is_sharded();
let write_shape = if is_sharded { write_shape } else { None };
let codec_options = CodecOptions::default()
.with_concurrent_target(codec_concurrent_target)
.with_experimental_partial_encoding(write_shape.is_some());
let num_iterations = if let Some(write_shape) = &write_shape {
let indices = chunks.indices();
indices
.into_par_iter()
.map(|chunk_indices| {
let chunk_subset = array_out.chunk_subset(&chunk_indices).unwrap();
chunk_subset
.shape()
.iter()
.zip(write_shape)
.map(|(s, w)| s.div_ceil(w.get()))
.sum::<u64>()
})
.sum::<u64>()
} else {
chunks.num_elements()
};
let num_iterations = usize::try_from(num_iterations).unwrap();
let progress = Progress::new(num_iterations, progress_callback);
let retrieve_array_subset = |subset: &ArraySubset| {
if let Some(cache) = &cache {
Ok(Arc::unwrap_or_clone(
cache.retrieve_array_subset_bytes(subset, &codec_options)?,
))
} else {
array_in.retrieve_array_subset_opt(subset, &codec_options)
}
};
let indices = chunks.indices();
if array_in.data_type() == array_out.data_type() {
iter_concurrent_limit!(
chunks_concurrent_limit,
indices,
try_for_each,
|chunk_indices: ArrayIndicesTinyVec| {
let chunk_subset = array_out.chunk_subset(&chunk_indices).unwrap();
if let Some(write_shape) = &write_shape {
use zarrs::array::ChunkGridTraits;
let write_grid =
RegularChunkGrid::new(chunk_subset.shape().to_vec(), write_shape.clone())
.map_err(|_| {
IncompatibleDimensionalityError::new(
write_shape.len(),
chunk_subset.dimensionality(),
)
})?;
for chunk_write in
ArraySubset::new_with_shape(write_grid.grid_shape().to_vec()).indices()
{
let chunk_subset_write = write_grid
.subset(&chunk_write)
.expect("matching dimensionality")
.expect("determinate for regular chunk grid");
let chunk_subset_write = chunk_subset_write.overlap(&chunk_subset)?;
let bytes = progress.read(|| retrieve_array_subset(&chunk_subset_write))?;
*bytes_decoded.lock().unwrap() += bytes.size();
progress.write(|| {
array_out.store_array_subset_opt(
&chunk_subset_write,
bytes,
&codec_options,
)
})?;
progress.next();
}
} else {
let bytes = progress.read(|| retrieve_array_subset(&chunk_subset))?;
*bytes_decoded.lock().unwrap() += bytes.size();
if validate {
progress.write(|| {
array_out.store_chunk_opt(&chunk_indices, bytes.clone(), &codec_options)
})?;
let bytes_out = array_out
.retrieve_chunk_opt(&chunk_indices, &codec_options)
.unwrap();
assert!(bytes == bytes_out);
} else {
progress.write(|| {
array_out.store_chunk_opt(&chunk_indices, bytes, &codec_options)
})?;
}
progress.next();
}
Ok::<_, ArrayError>(())
}
)?;
} else {
let convert_data = |chunk_indices: ArrayIndicesTinyVec| {
let chunk_subset = array_out.chunk_subset(&chunk_indices).unwrap();
if let Some(write_shape) = &write_shape {
use zarrs::array::ChunkGridTraits;
let write_grid =
RegularChunkGrid::new(chunk_subset.shape().to_vec(), write_shape.clone())
.map_err(|_| {
IncompatibleDimensionalityError::new(
write_shape.len(),
chunk_subset.dimensionality(),
)
})?;
for chunk_write in
ArraySubset::new_with_shape(write_grid.grid_shape().to_vec()).indices()
{
let chunk_subset_write = write_grid
.subset(&chunk_write)
.expect("matching dimensionality")
.expect("determinate for regular chunk grid");
let chunk_subset_write = chunk_subset_write.overlap(&chunk_subset)?;
convert_and_store_subset(
&array_in,
array_out,
&chunk_subset_write,
&progress,
&bytes_decoded,
)?;
progress.next();
}
} else {
convert_and_store_subset(
&array_in,
array_out,
&chunk_subset,
&progress,
&bytes_decoded,
)?;
progress.next();
}
Ok::<_, anyhow::Error>(())
};
iter_concurrent_limit!(chunks_concurrent_limit, indices, try_for_each, convert_data)?;
}
let duration = start.elapsed().unwrap().as_secs_f32();
let stats = progress.stats();
let duration_read = stats.read.as_secs_f32();
let duration_write = stats.write.as_secs_f32();
let duration_read_write = duration_read + duration_write;
let duration_read = duration_read * duration / duration_read_write;
let duration_write = duration_write * duration / duration_read_write;
Ok((
duration,
duration_read,
duration_write,
bytes_decoded.into_inner().unwrap(),
))
}
fn convert_fill_value(
data_type_in: &DataType,
fill_value_in: &FillValue,
data_type_out: &DataType,
) -> FillValue {
macro_rules! convert {
( $t_in:ty, $t_out:ty) => {{
let input_fill_value =
<$t_in>::from_ne_bytes(fill_value_in.as_ne_bytes().try_into().unwrap());
use num_traits::AsPrimitive;
let output_fill_value: $t_out = input_fill_value.as_();
FillValue::from(output_fill_value)
}};
}
macro_rules! apply_inner {
( $type_in:ty, [$( ( $dt_type_out:ty, $type_out:ty ) ),* ]) => {
{
$(if data_type_out.is::<$dt_type_out>() { convert!($type_in, $type_out) } else)*
{ panic!("Unsupported output data type: {:?}", data_type_out) }
}
};
}
macro_rules! apply_outer {
([$( ( $dt_type_in:ty, $type_in:ty ) ),* ]) => {
{
$(if data_type_in.is::<$dt_type_in>() {
apply_inner!($type_in, [
(data_type::BoolDataType, u8),
(data_type::Int8DataType, i8),
(data_type::Int16DataType, i16),
(data_type::Int32DataType, i32),
(data_type::Int64DataType, i64),
(data_type::UInt8DataType, u8),
(data_type::UInt16DataType, u16),
(data_type::UInt32DataType, u32),
(data_type::UInt64DataType, u64),
(data_type::BFloat16DataType, half::bf16),
(data_type::Float16DataType, half::f16),
(data_type::Float32DataType, f32),
(data_type::Float64DataType, f64)
])
} else)*
{ panic!("Unsupported input data type: {:?}", data_type_in) }
}
};
}
apply_outer!([
(data_type::BoolDataType, u8),
(data_type::Int8DataType, i8),
(data_type::Int16DataType, i16),
(data_type::Int32DataType, i32),
(data_type::Int64DataType, i64),
(data_type::UInt8DataType, u8),
(data_type::UInt16DataType, u16),
(data_type::UInt32DataType, u32),
(data_type::UInt64DataType, u64),
(data_type::BFloat16DataType, half::bf16),
(data_type::Float16DataType, half::f16),
(data_type::Float32DataType, f32),
(data_type::Float64DataType, f64)
])
}
pub fn calculate_chunk_and_codec_concurrency(
concurrent_target: usize,
concurrent_chunks: Option<usize>,
codecs: &CodecChain,
num_chunks: usize,
chunk_shape: &ChunkShape,
data_type: &DataType,
) -> (usize, usize) {
zarrs::array::concurrency::calc_concurrency_outer_inner(
concurrent_target,
&if let Some(concurrent_chunks) = concurrent_chunks {
let concurrent_chunks = std::cmp::min(num_chunks, concurrent_chunks);
RecommendedConcurrency::new(concurrent_chunks..concurrent_chunks)
} else {
let concurrent_chunks =
std::cmp::min(num_chunks, global_config().chunk_concurrent_minimum());
RecommendedConcurrency::new_minimum(concurrent_chunks)
},
&codecs
.recommended_concurrency(chunk_shape, data_type)
.unwrap(),
)
}