use anyhow::{Context, Result};
use clap::{Parser, ValueEnum};
use console::Term;
use rayon::prelude::*;
use seqtable::output::OutputFormat;
use seqtable::{
DualSeqCounts, FASTQ_EXTENSIONS, calculate_chunk_size, count_sequences,
count_sequences_from_reader, format_count, prepare_records, validate_fastq,
};
use std::path::{Path, PathBuf};
use std::time::Instant;
#[derive(Parser, Debug)]
#[command(name = "seqtable")]
#[command(version)]
#[command(about = "Count sequences in FASTQ files")]
struct Args {
#[arg(required = true)]
input: Vec<PathBuf>,
#[arg(short, long, default_value = ".")]
output_dir: PathBuf,
#[arg(short = 'f', long, default_value = "parquet")]
format: OutputFormat,
#[arg(short, long, default_value = "0")]
threads: usize,
#[arg(short, long)]
quiet: bool,
#[arg(long, default_value = "zstd")]
compression: ParquetCompression,
#[arg(long)]
rpm: bool,
}
#[derive(Debug, Clone, ValueEnum)]
enum ParquetCompression {
None,
Snappy,
Gzip,
Brotli,
Zstd,
}
impl ParquetCompression {
fn to_parquet(&self) -> parquet::basic::Compression {
match self {
ParquetCompression::None => parquet::basic::Compression::UNCOMPRESSED,
ParquetCompression::Snappy => parquet::basic::Compression::SNAPPY,
ParquetCompression::Gzip => {
parquet::basic::Compression::GZIP(parquet::basic::GzipLevel::default())
}
ParquetCompression::Brotli => {
parquet::basic::Compression::BROTLI(parquet::basic::BrotliLevel::default())
}
ParquetCompression::Zstd => {
parquet::basic::Compression::ZSTD(parquet::basic::ZstdLevel::default())
}
}
}
}
fn main() -> Result<()> {
let args = Args::parse();
let quiet = args.quiet;
let is_tty = Term::stderr().is_term();
if args.threads > 0 {
rayon::ThreadPoolBuilder::new()
.num_threads(args.threads)
.build_global()
.context("Failed to initialize thread pool")?;
}
std::fs::create_dir_all(&args.output_dir).context("Failed to create output directory")?;
let has_stdin = args.input.iter().any(|p| p.as_os_str() == "-");
anyhow::ensure!(
!has_stdin || args.input.len() == 1,
"stdin (\"-\") cannot be combined with file arguments"
);
let is_stdin = has_stdin;
let source = if is_stdin {
"stdin".to_string()
} else {
format!(
"{} file{}",
args.input.len(),
if args.input.len() > 1 { "s" } else { "" }
)
};
if !quiet {
eprintln!(
"seqtable {} | {} | {} threads | {:?}",
env!("CARGO_PKG_VERSION"),
source,
rayon::current_num_threads(),
args.format
);
eprintln!();
}
let total_start = Instant::now();
if is_stdin {
process_stdin(&args, quiet, is_tty)?;
} else {
let n_files = args.input.len();
if n_files == 1 {
process_file(&args.input[0], &args, 1, n_files, quiet, is_tty)?;
} else {
args.input
.par_iter()
.enumerate()
.try_for_each(|(idx, input_file)| {
process_file(input_file, &args, idx + 1, n_files, quiet, false)
})?;
}
}
if !quiet {
eprintln!(
"completed {} in {:.2}s",
source,
total_start.elapsed().as_secs_f64()
);
}
Ok(())
}
fn process_stdin(args: &Args, quiet: bool, is_tty: bool) -> Result<()> {
let start_time = Instant::now();
let output_filename = format!("stdin.{}", args.format.extension());
let output_path = args.output_dir.join(&output_filename);
if !quiet {
eprintln!("[1/1] <stdin>");
}
let show_progress = !quiet && is_tty;
let (counts, total_reads) = count_sequences_from_reader(std::io::stdin(), show_progress)?;
write_output(
args,
counts,
total_reads,
&output_path,
&output_filename,
quiet,
start_time,
)
}
fn process_file(
input_path: &Path,
args: &Args,
file_num: usize,
total_files: usize,
quiet: bool,
is_tty: bool,
) -> Result<()> {
validate_fastq(input_path)?;
let start_time = Instant::now();
let base_name = {
let name = input_path
.file_name()
.and_then(|s| s.to_str())
.unwrap_or("output");
let mut base = name;
for suffix in FASTQ_EXTENSIONS {
if let Some(stripped) = base.strip_suffix(suffix) {
base = stripped;
break;
}
}
base.to_string()
};
let output_filename = format!("{base_name}.{}", args.format.extension());
let output_path = args.output_dir.join(&output_filename);
if !quiet {
let input_display = input_path
.file_name()
.and_then(|s| s.to_str())
.unwrap_or("?");
eprintln!("[{}/{}] {}", file_num, total_files, input_display);
}
let file_size = std::fs::metadata(input_path)?.len();
let chunk_size = calculate_chunk_size(file_size);
let show_progress = !quiet && is_tty;
let (counts, total_reads) = count_sequences(input_path, chunk_size, show_progress)?;
write_output(
args,
counts,
total_reads,
&output_path,
&output_filename,
quiet,
start_time,
)
}
fn write_output(
args: &Args,
counts: DualSeqCounts,
total_reads: u64,
output_path: &Path,
output_filename: &str,
quiet: bool,
start_time: Instant,
) -> Result<()> {
let unique_count = counts.len() as u64;
let records = prepare_records(counts);
if !quiet {
eprint!(" {:<10} {} ... ", "writing", output_filename);
std::io::Write::flush(&mut std::io::stderr()).ok();
}
seqtable::output::save_output(
&records,
output_path,
&args.format,
args.compression.to_parquet(),
total_reads,
args.rpm,
)?;
if !quiet {
eprintln!("done");
}
if !quiet {
eprintln!(
" {:<10} {} unique | {} total -> {} [{:.2}s]",
"result",
format_count(unique_count),
format_count(total_reads),
output_filename,
start_time.elapsed().as_secs_f64()
);
eprintln!();
}
Ok(())
}