use std::{
sync::{Arc, Mutex},
time::SystemTime,
};
use clap::Parser;
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use zarrs::{
array::{
ArrayBytes, ArrayIndicesTinyVec, ArrayShardedExt, ArrayShardedReadableExt,
ArrayShardedReadableExtCache, ArraySubset, CodecOptions,
},
storage::{
storage_adapter::async_to_sync::{AsyncToSyncBlockOn, AsyncToSyncStorageAdapter},
AsyncReadableStorage, ReadableStorage,
},
};
use zarrs_tools::calculate_chunk_and_codec_concurrency;
#[derive(Parser, Debug)]
#[command(author, version=zarrs_tools::ZARRS_TOOLS_VERSION_WITH_ZARRS)]
struct Args {
path: String,
#[arg(long)]
concurrent_chunks: Option<usize>,
#[arg(long, default_value_t = false)]
read_all: bool,
#[arg(long, default_value_t = false)]
inner_chunks: bool,
#[arg(long, default_value_t = false)]
ignore_checksums: bool,
}
struct TokioBlockOn(tokio::runtime::Runtime);
impl AsyncToSyncBlockOn for TokioBlockOn {
fn block_on<F: core::future::Future>(&self, future: F) -> F::Output {
self.0.block_on(future)
}
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = Args::parse();
let storage: AsyncReadableStorage = if args.path.starts_with("http") {
let builder = opendal::services::Http::default().endpoint(&args.path);
let operator = opendal::Operator::new(builder)?.finish();
Arc::new(zarrs_opendal::AsyncOpendalStore::new(operator))
} else {
let builder = opendal::services::Fs::default().root(&args.path);
let operator = opendal::Operator::new(builder)?.finish();
Arc::new(zarrs_opendal::AsyncOpendalStore::new(operator))
};
let block_on = TokioBlockOn(tokio::runtime::Runtime::new()?);
let storage: ReadableStorage = Arc::new(AsyncToSyncStorageAdapter::new(storage, block_on));
let array = zarrs::array::Array::open(storage.clone(), "/")?;
zarrs::config::global_config_mut().set_validate_checksums(!args.ignore_checksums);
let chunks = ArraySubset::new_with_shape(array.chunk_grid_shape().to_vec());
let chunk_indices = chunks.indices();
let concurrent_target = std::thread::available_parallelism().unwrap().get();
let start = SystemTime::now();
let bytes_decoded = Mutex::new(0);
if args.read_all {
if let Some(concurrent_chunks) = args.concurrent_chunks {
zarrs::config::global_config_mut().set_chunk_concurrent_minimum(concurrent_chunks);
}
*bytes_decoded.lock().unwrap() += array
.retrieve_array_subset::<ArrayBytes>(&array.subset_all())?
.size();
} else if let (Some(inner_chunk_shape), true) =
(array.effective_subchunk_shape(), args.inner_chunks)
{
let inner_chunks = ArraySubset::new_with_shape(array.subchunk_grid_shape().clone());
let inner_chunk_indices = inner_chunks.indices();
let (chunks_concurrent_limit, codec_concurrent_target) =
calculate_chunk_and_codec_concurrency(
concurrent_target,
args.concurrent_chunks,
&array.codecs(),
inner_chunks.num_elements_usize(),
&inner_chunk_shape,
array.data_type(),
);
let codec_options = CodecOptions::default().with_concurrent_target(codec_concurrent_target);
let shard_index_cache = ArrayShardedReadableExtCache::new(&array);
rayon_iter_concurrent_limit::iter_concurrent_limit!(
chunks_concurrent_limit,
inner_chunk_indices,
for_each,
|inner_chunk_indices: ArrayIndicesTinyVec| {
let bytes: ArrayBytes = array
.retrieve_subchunk_opt(&shard_index_cache, &inner_chunk_indices, &codec_options)
.unwrap();
*bytes_decoded.lock().unwrap() += bytes.size();
}
);
} else {
let chunk_shape = array.chunk_shape(&vec![0; array.chunk_grid().dimensionality()])?;
let (chunks_concurrent_limit, codec_concurrent_target) =
calculate_chunk_and_codec_concurrency(
concurrent_target,
args.concurrent_chunks,
&array.codecs(),
chunks.num_elements_usize(),
&chunk_shape,
array.data_type(),
);
let codec_options = CodecOptions::default().with_concurrent_target(codec_concurrent_target);
rayon_iter_concurrent_limit::iter_concurrent_limit!(
chunks_concurrent_limit,
chunk_indices,
for_each,
|chunk_indices: ArrayIndicesTinyVec| {
let bytes: ArrayBytes = array
.retrieve_chunk_opt(&chunk_indices, &codec_options)
.unwrap();
*bytes_decoded.lock().unwrap() += bytes.size();
}
);
}
let bytes_decoded = bytes_decoded.into_inner()?;
let duration = SystemTime::now().duration_since(start)?.as_secs_f32();
println!(
"Decoded {} in {:.2}ms ({:.2}MB decoded @ {:.2}GB/s)",
args.path,
duration * 1e3,
bytes_decoded as f32 / 1e6,
(bytes_decoded as f32 * 1e-9) / duration,
);
Ok(())
}