use core::f32;
use std::num::NonZeroU64;
use std::sync::Arc;
use clap::Parser;
use indicatif::{ProgressBar, ProgressStyle};
use zarrs::filesystem::{FilesystemStore, FilesystemStoreOptions};
use zarrs::storage::{
ListableStorageTraits, ReadableListableStorage, ReadableWritableListableStorageTraits,
StorePrefix, WritableStorageTraits,
};
use zarrs_tools::{
do_reencode, get_array_builder_reencode,
progress::{ProgressCallback, ProgressStats},
CacheSize, ZarrReencodingArgs,
};
#[cfg(feature = "async")]
use zarrs::storage::{
storage_adapter::async_to_sync::{AsyncToSyncBlockOn, AsyncToSyncStorageAdapter},
AsyncReadableListableStorage,
};
#[cfg(feature = "async")]
use zarrs_opendal::AsyncOpendalStore;
#[derive(Parser, Debug)]
#[command(author, version=zarrs_tools::ZARRS_TOOLS_VERSION_WITH_ZARRS)]
struct Args {
#[command(flatten)]
encoding: ZarrReencodingArgs,
path_in: String,
path_out: String,
#[arg(long)]
concurrent_chunks: Option<usize>,
#[arg(long, default_value_t = false)]
ignore_checksums: bool,
#[arg(long, default_value_t = false)]
validate: bool,
#[arg(long, short, default_value_t = false)]
verbose: bool,
#[arg(long)]
cache_size: Option<u64>,
#[arg(long)]
cache_chunks: Option<u64>,
#[arg(long)]
cache_size_thread: Option<u64>,
#[arg(long)]
cache_chunks_thread: Option<u64>,
#[arg(long, verbatim_doc_comment, value_delimiter = ',')]
write_shape: Option<Vec<NonZeroU64>>,
#[arg(long, default_value_t = false)]
direct_io: bool,
}
fn bar_style_run() -> ProgressStyle {
ProgressStyle::with_template(
"[{elapsed_precise}/{duration_precise}] {bar:40.black/bold} {pos}/{len} ({percent}%) {prefix} {msg}",
)
.unwrap_or(ProgressStyle::default_bar())
}
fn bar_style_finish() -> ProgressStyle {
ProgressStyle::with_template("[{elapsed_precise}/{elapsed_precise}] {prefix} {msg}")
.unwrap_or(ProgressStyle::default_bar())
}
fn progress_callback(stats: ProgressStats, bar: &ProgressBar) {
bar.set_length(stats.num_steps as u64);
bar.set_position(stats.step as u64);
if stats.process_steps.is_empty() {
bar.set_message(format!(
"rw:{:.2}/{:.2} p:{:.2}",
stats.read.as_secs_f32(),
stats.write.as_secs_f32(),
stats.process.as_secs_f32(),
));
} else {
bar.set_message(format!(
"rw:{:.2}/{:.2} p:{:.2} {:.2?}",
stats.read.as_secs_f32(),
stats.write.as_secs_f32(),
stats.process.as_secs_f32(),
stats
.process_steps
.iter()
.map(|t| t.as_secs_f32())
.collect::<Vec<_>>(),
));
}
}
#[cfg(feature = "async")]
struct TokioBlockOn(tokio::runtime::Runtime);
#[cfg(feature = "async")]
impl AsyncToSyncBlockOn for TokioBlockOn {
fn block_on<F: core::future::Future>(&self, future: F) -> F::Output {
self.0.block_on(future)
}
}
fn get_storage(path: &str, direct_io: bool) -> anyhow::Result<ReadableListableStorage> {
if path.starts_with("http://") || path.starts_with("https://") {
#[cfg(feature = "async")]
{
let builder = opendal::services::Http::default().endpoint(path);
let operator = opendal::Operator::new(builder)?.finish();
let storage: AsyncReadableListableStorage = Arc::new(AsyncOpendalStore::new(operator));
let block_on = TokioBlockOn(tokio::runtime::Runtime::new()?);
Ok(Arc::new(AsyncToSyncStorageAdapter::new(storage, block_on)))
}
#[cfg(not(feature = "async"))]
anyhow::bail!("zarrs_tools has not been compiled with the async feature for HTTP stores")
} else {
Ok(Arc::new(FilesystemStore::new_with_options(
path,
FilesystemStoreOptions::default()
.direct_io(direct_io)
.clone(),
)?))
}
}
fn main() -> anyhow::Result<()> {
let args = Args::parse();
zarrs::config::global_config_mut().set_validate_checksums(!args.ignore_checksums);
let storage_in = get_storage(&args.path_in, args.direct_io)?;
let array_in = Arc::new(zarrs::array::Array::open(storage_in.clone().readable(), "/").unwrap());
if args.verbose {
println!(
"{}",
serde_json::to_string_pretty(&array_in.metadata()).unwrap()
);
}
let bar = ProgressBar::new(0);
bar.set_style(bar_style_run());
let progress_callback = |stats: ProgressStats| progress_callback(stats, &bar);
let progress_callback = ProgressCallback::new(&progress_callback);
let mut options = FilesystemStoreOptions::default();
options.direct_io(args.direct_io);
let storage_out: Arc<dyn ReadableWritableListableStorageTraits> =
Arc::new(FilesystemStore::new_with_options(args.path_out.clone(), options).unwrap());
storage_out.erase_prefix(&StorePrefix::root()).unwrap();
let builder = get_array_builder_reencode(&args.encoding, &array_in, None);
let array_out = builder.build(storage_out.clone(), "/").unwrap();
array_out.store_metadata().unwrap();
let cache_size = if let Some(cache_size_thread) = args.cache_size_thread {
CacheSize::SizePerThread(cache_size_thread)
} else if let Some(cache_size) = args.cache_size {
CacheSize::SizeTotal(cache_size)
} else if let Some(cache_chunks_thread) = args.cache_chunks_thread {
CacheSize::ChunksPerThread(cache_chunks_thread)
} else if let Some(cache_chunks) = args.cache_chunks {
CacheSize::ChunksTotal(cache_chunks)
} else {
CacheSize::None
};
let (duration, duration_read, duration_write, bytes_decoded) = do_reencode(
array_in,
&array_out,
args.validate,
args.concurrent_chunks,
&progress_callback,
cache_size,
args.write_shape,
)?;
bar.set_style(bar_style_finish());
bar.finish_and_clear();
let size_in = storage_in
.size()
.map(|size| size as f32)
.unwrap_or(f32::NAN);
let size_out = storage_out.size().unwrap_or_default() as f32;
let bytes_decoded = bytes_decoded as f32;
println!(
"Reencode {} to {}\n\tread: ~{:.2}ms @ {:.2}GB/s\n\twrite: ~{:.2}ms @ {:.2}GB/s\n\ttotal: {:.2}ms\n\tsize: {:.2}MB to {:.2}MB ({:.2}MB uncompressed)",
args.path_in,
args.path_out,
duration_read * 1e3, size_in / 1e9 / duration_read, duration_write * 1e3, size_out / 1e9 / duration_write, duration * 1e3, size_in / 1e6, size_out / 1e6, bytes_decoded / 1e6, );
Ok(())
}