use anyhow::{Context, Result};
use clap::Parser;
use copc_converter::{
Pipeline, PipelineConfig, ProgressEvent, ProgressObserver, collect_input_files,
};
use indicatif::{ProgressBar, ProgressStyle};
use std::path::PathBuf;
use std::sync::Mutex;
const MEMORY_SAFETY_FACTOR: f64 = 0.75;
#[derive(Parser, Debug)]
#[command(author, version, about = "Convert LAZ files to a COPC file")]
struct Args {
input: PathBuf,
output: PathBuf,
#[arg(long, default_value = "16G")]
memory_limit: String,
#[arg(long)]
temp_dir: Option<PathBuf>,
#[arg(long)]
temporal_index: bool,
#[arg(long, default_value_t = 1000)]
temporal_stride: u32,
}
fn parse_memory_limit(s: &str) -> Result<u64> {
let s = s.trim();
let (num_part, multiplier) = if let Some(n) = s.strip_suffix(['G', 'g']) {
(n.trim(), 1024u64 * 1024 * 1024)
} else if let Some(n) = s.strip_suffix(['M', 'm']) {
(n.trim(), 1024u64 * 1024)
} else if let Some(n) = s.strip_suffix(['K', 'k']) {
(n.trim(), 1024u64)
} else {
(s, 1u64)
};
let value: f64 = num_part
.parse()
.with_context(|| format!("Invalid memory limit: {s:?}"))?;
Ok((value * multiplier as f64) as u64)
}
const TOTAL_STEPS: u32 = 5;
fn human_count(n: u64) -> String {
if n >= 1_000_000_000 {
format!("{:.1}B", n as f64 / 1_000_000_000.0)
} else if n >= 1_000_000 {
format!("{:.1}M", n as f64 / 1_000_000.0)
} else if n >= 1_000 {
format!("{:.1}K", n as f64 / 1_000.0)
} else {
n.to_string()
}
}
struct CliProgress {
bar: Mutex<Option<ProgressBar>>,
step: std::sync::atomic::AtomicU32,
stage_prefix: Mutex<String>,
stage_total: std::sync::atomic::AtomicU64,
}
impl CliProgress {
fn new() -> Self {
Self {
bar: Mutex::new(None),
step: std::sync::atomic::AtomicU32::new(0),
stage_prefix: Mutex::new(String::new()),
stage_total: std::sync::atomic::AtomicU64::new(0),
}
}
}
impl ProgressObserver for CliProgress {
fn on_progress(&self, event: ProgressEvent) {
let mut bar = self.bar.lock().unwrap();
match event {
ProgressEvent::StageStart { name, total } => {
let step = self.step.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1;
let prefix = format!("[{step}/{TOTAL_STEPS}] {name}");
*self.stage_prefix.lock().unwrap() = prefix.clone();
self.stage_total
.store(total, std::sync::atomic::Ordering::Relaxed);
let pb = if total > 0 {
let pb = ProgressBar::new(total);
pb.set_style(
ProgressStyle::with_template("{msg} [{bar:40}] ({eta})")
.unwrap()
.progress_chars("=> "),
);
pb.set_message(format!("{prefix} 0/{}", human_count(total)));
pb
} else {
let pb = ProgressBar::new_spinner();
pb.set_style(ProgressStyle::with_template("{msg} {spinner}").unwrap());
pb.set_message(prefix);
pb
};
*bar = Some(pb);
}
ProgressEvent::StageProgress { done } => {
if let Some(ref pb) = *bar {
pb.set_position(done);
let total = self.stage_total.load(std::sync::atomic::Ordering::Relaxed);
let prefix = self.stage_prefix.lock().unwrap().clone();
pb.set_message(format!(
"{prefix} {}/{}",
human_count(done),
human_count(total)
));
}
}
ProgressEvent::StageDone => {
if let Some(pb) = bar.take() {
let prefix = self.stage_prefix.lock().unwrap().clone();
pb.finish_and_clear();
eprintln!("{prefix} done");
}
}
}
}
}
fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("warn")),
)
.with_target(false)
.init();
let args = Args::parse();
let input_files = collect_input_files(args.input)?;
let raw_limit = parse_memory_limit(&args.memory_limit)?;
let memory_budget = (raw_limit as f64 * MEMORY_SAFETY_FACTOR) as u64;
let progress = std::sync::Arc::new(CliProgress::new());
let config = PipelineConfig {
memory_budget,
temp_dir: args.temp_dir,
temporal_index: args.temporal_index,
temporal_stride: args.temporal_stride,
progress: Some(progress),
};
Pipeline::scan(&input_files, config)?
.validate()?
.distribute()?
.build()?
.write(&args.output)?;
Ok(())
}