use clap::Parser;
use kun_peng::compact_hash::{HashConfig, Slot};
use kun_peng::utils::{
create_partition_files, create_partition_writers, create_sample_file, get_file_limit,
get_lastest_file_index, set_fd_limit,
};
use kun_peng::IndexOptions;
use seqkmer::{read_parallel, FastxReader, Meros, MinimizerIterator, OptionPair, Reader};
use std::fs;
use std::io::{BufWriter, Write};
use std::io::{Error, ErrorKind, Result};
use std::path::PathBuf;
use std::time::Instant;
use std::fs::File;
use std::io::{BufRead, BufReader};
#[derive(Parser, Debug, Clone)]
#[clap(
version,
about = "Split fast(q/a) file into ranges",
long_about = "Split fast(q/a) file into ranges"
)]
pub struct Args {
#[arg(long = "db", required = true)]
pub database: PathBuf,
#[clap(short = 'P', long = "paired-end-processing", action)]
pub paired_end_processing: bool,
#[clap(short = 'S', long = "single-file-pairs", action)]
pub single_file_pairs: bool,
#[clap(
short = 'Q',
long = "minimum-quality-score",
value_parser,
default_value_t = 0
)]
pub minimum_quality_score: i32,
#[clap(short = 'p', long = "num-threads", value_parser, default_value_t = num_cpus::get())]
pub num_threads: usize,
#[clap(long)]
pub chunk_dir: PathBuf,
#[clap(required = true)]
pub input_files: Vec<PathBuf>,
}
impl Args {
pub fn process_input_files(mut self) -> Result<Self> {
if self.input_files.is_empty() {
return Err(Error::new(ErrorKind::InvalidInput, "No input files provided."));
}
if self.input_files.len() == 1 {
let file_path = &self.input_files[0];
if file_path.is_file() && file_path.extension().map_or(false, |ext| ext == "txt") {
let file = File::open(file_path)?;
let reader = BufReader::new(file);
let mut new_input_files = Vec::new();
for line in reader.lines() {
let line = line?;
let path = PathBuf::from(line.trim());
new_input_files.push(path);
}
if !new_input_files.is_empty() {
self.input_files = new_input_files;
} else {
return Err(Error::new(ErrorKind::InvalidInput, "No input files found in the provided list."));
}
}
}
let mut missing_files = Vec::new();
for file in &self.input_files {
if !file.exists() {
missing_files.push(file.clone());
}
}
if !missing_files.is_empty() {
let error_msg = format!("The following input files do not exist:\n{}",
missing_files.iter().map(|f| f.display().to_string()).collect::<Vec<_>>().join("\n"));
return Err(Error::new(ErrorKind::NotFound, error_msg));
}
println!("Input files:");
for (index, file) in self.input_files.iter().enumerate() {
println!(" {}: {}", index + 1, file.display());
}
Ok(self)
}
}
fn init_chunk_writers(
args: &Args,
partition: usize,
chunk_size: usize,
) -> Vec<BufWriter<fs::File>> {
let chunk_files = create_partition_files(partition, &args.chunk_dir, "sample");
let mut writers = create_partition_writers(&chunk_files);
writers.iter_mut().enumerate().for_each(|(index, writer)| {
let file_size = writer
.get_ref()
.metadata()
.expect("Failed to get file metadata")
.len();
if file_size == 0 {
writer
.write_all(&index.to_le_bytes())
.expect("Failed to write partition");
let chunk_size_bytes = chunk_size.to_le_bytes();
writer
.write_all(&chunk_size_bytes)
.expect("Failed to write chunk size");
writer.flush().expect("Failed to flush writer");
}
});
writers
}
fn process_record(
k2_slot_list: &mut Vec<(usize, Slot<u64>)>,
marker: &mut MinimizerIterator,
hash_config: &HashConfig,
chunk_size: usize,
seq_id: u64,
idx_bits: usize,
) {
let offset = k2_slot_list.len();
for (sort, hash_key) in marker {
let mut slot = hash_config.slot_u64(hash_key, seq_id);
let seq_sort = sort + offset;
let partition_index = slot.idx / chunk_size;
slot.idx = seq_sort << idx_bits | (slot.idx % chunk_size);
k2_slot_list.push((partition_index, slot));
}
}
fn write_data_to_file(
k2_map: String,
k2_slot_list: Vec<(usize, Slot<u64>)>,
writers: &mut Vec<BufWriter<fs::File>>,
slot_size: usize,
sample_writer: &mut BufWriter<fs::File>,
) {
for slot in k2_slot_list {
let partition_index = slot.0;
if let Some(writer) = writers.get_mut(partition_index) {
writer.write_all(slot.1.as_slice(slot_size)).unwrap();
}
}
sample_writer.write_all(k2_map.as_bytes()).unwrap();
}
fn process_fastx_file<R>(
args: &Args,
meros: Meros,
hash_config: HashConfig,
file_index: usize,
reader: &mut R,
writers: &mut Vec<BufWriter<fs::File>>,
sample_writer: &mut BufWriter<fs::File>,
) -> Result<()>
where
R: Reader,
{
let chunk_size = hash_config.hash_capacity;
let idx_bits = ((chunk_size as f64).log2().ceil() as usize).max(1);
let slot_size = std::mem::size_of::<Slot<u64>>();
read_parallel(
reader,
args.num_threads as usize,
&meros,
|seqs| {
let mut buffer = String::new();
let mut k2_slot_list = Vec::new();
for seq in seqs {
let mut init: Vec<(usize, Slot<u64>)> = Vec::new();
let header = &seq.header;
let index = header.reads_index;
let dna_id = header.id.trim();
let seq_id = (file_index << 32 | index) as u64;
seq.body.apply_mut(|m_iter| {
process_record(
&mut init,
m_iter,
&hash_config,
chunk_size,
seq_id,
idx_bits,
);
});
k2_slot_list.extend_from_slice(&init);
let size_str = seq.fmt_size();
let seq_size_str = seq.fmt_seq_size();
buffer.push_str(
format!("{}\t{}\t{}\t{}\n", index, dna_id, seq_size_str, size_str).as_str(),
);
}
(buffer, k2_slot_list)
},
|dataset| {
while let Some(data) = dataset.next() {
let (buffer, k2_slot_list) = data.unwrap();
write_data_to_file(buffer, k2_slot_list, writers, slot_size, sample_writer);
}
},
)
.expect("failed");
Ok(())
}
fn process_files<F>(args: &Args, hash_config: HashConfig, mut action: F) -> Result<()>
where
F: FnMut(usize, OptionPair<PathBuf>) -> Result<()>,
{
let file_path = args.chunk_dir.join("sample_file.map");
let mut file_writer = create_sample_file(&file_path);
let mut file_index = get_lastest_file_index(&file_path)?;
let chunk_size = if args.paired_end_processing && !args.single_file_pairs {
2
} else {
1
};
let files = args.input_files.chunks(chunk_size).collect::<Vec<_>>();
let file_bits = (((files.len() + file_index) as f64).log2().ceil() as usize).max(1);
if file_bits > hash_config.value_bits {
panic!("The number of files is too large to process.");
}
for file_pair in files {
file_index += 1;
let path_pair = OptionPair::from_slice(file_pair);
writeln!(
file_writer,
"{}\t{}",
file_index,
path_pair.reduce_str(",", |a| a.to_str().unwrap().to_string())
)?;
file_writer.flush().unwrap();
action(file_index, path_pair)?;
}
Ok(())
}
pub fn run(args: Args) -> Result<()> {
let args = args.process_input_files()?;
let options_filename = &args.database.join("opts.k2d");
let idx_opts = IndexOptions::read_index_options(options_filename)?;
if args.paired_end_processing && !args.single_file_pairs && args.input_files.len() % 2 != 0 {
return Err(Error::new(
ErrorKind::InvalidInput,
"Paired-end processing requires an even number of input files.",
));
}
let hash_config = HashConfig::from_hash_header(&args.database.join("hash_config.k2d"))?;
println!("{:?}", hash_config);
if hash_config.hash_capacity == 0 {
panic!("`hash_capacity` can't be zero!");
}
println!("splitr start...");
let file_num_limit = get_file_limit();
if hash_config.partition >= file_num_limit {
eprintln!(
"file num limit {:?}, need: {:?}",
file_num_limit, hash_config.partition
);
set_fd_limit(hash_config.partition as u64 + 1)
.expect("Failed to set file descriptor limit, please run this operation with administrative/root privileges.");
}
let meros = idx_opts.as_meros();
let start = Instant::now();
let partition = hash_config.partition;
let mut writers: Vec<BufWriter<fs::File>> =
init_chunk_writers(&args, partition, hash_config.hash_capacity);
process_files(&args, hash_config, |file_index, path_pair| {
let mut sample_writer =
create_sample_file(args.chunk_dir.join(format!("sample_id_{}.map", file_index)));
let score = args.minimum_quality_score;
let mut reader = FastxReader::from_paths(path_pair, file_index, score)?;
process_fastx_file(
&args,
meros,
hash_config,
file_index,
&mut reader,
&mut writers,
&mut sample_writer,
)
.expect("process fastx file error");
Ok(())
})?;
let duration = start.elapsed();
println!("splitr took: {:?}", duration);
Ok(())
}
#[allow(dead_code)]
fn main() {
let args = Args::parse();
if let Err(e) = run(args) {
eprintln!("Application error: {}", e);
}
}