#![warn(missing_docs)]
#![allow(clippy::too_many_arguments)]
use std::io::Write;
use std::sync::mpsc;
use std::time::Instant;
#[macro_use]
extern crate arrayref;
extern crate num_cpus;
use anyhow::Error;
use indicatif::ParallelProgressIterator;
use rayon::prelude::*;
pub mod cli;
use crate::cli::*;
use crate::hashing::HashType;
pub mod sketch;
use crate::sketch::multisketch::MultiSketch;
use crate::sketch::sketch_datafile::SketchArrayReader;
use crate::sketch::{num_bins, sketch_files};
pub mod inverted;
use crate::inverted::Inverted;
pub mod distances;
use crate::distances::*;
pub mod io;
use crate::io::{get_input_list, parse_kmers, read_subset_names, reorder_input_files, set_ostream};
pub mod structures;
pub mod hashing;
pub mod utils;
use crate::utils::{get_progress_bar, strip_sketch_extension};
use std::fs::{File, OpenOptions};
use std::io::copy;
use std::io::BufRead;
use std::path::Path;
pub const DEFAULT_KMER: usize = 21;
#[doc(hidden)]
pub fn main() -> Result<(), Error> {
let args = cli_args();
if args.quiet {
simple_logger::init_with_level(log::Level::Error).unwrap();
} else if args.verbose {
simple_logger::init_with_level(log::Level::Info).unwrap();
} else {
simple_logger::init_with_level(log::Level::Warn).unwrap();
}
let mut print_success = true;
let start = Instant::now();
let result = match &args.command {
Commands::Sketch {
seq_files,
file_list,
concat_fasta,
#[cfg(feature = "3di")]
convert_pdb,
output,
kmers,
sketch_size,
seq_type,
level,
single_strand,
min_count,
min_qual,
threads,
} => {
if *concat_fasta && matches!(*seq_type, HashType::DNA | HashType::PDB) {
panic!("--concat-fasta currently only supported with --seq-type aa");
}
check_and_set_threads(*threads + 1);
log::info!("Getting input files");
let input_files = get_input_list(file_list, seq_files);
log::info!("Parsed {} samples in input list", input_files.len());
let kmers = parse_kmers(kmers);
let rc = !*single_strand;
let seq_type = if let HashType::AA(_) = seq_type {
HashType::AA(level.clone())
} else {
seq_type.clone()
};
let (_, sketch_bins, _) = num_bins(*sketch_size);
log::info!(
"Running sketching: k:{kmers:?}; sketch_size:{sketch_bins}; seq:{seq_type:?}; threads:{threads}"
);
let mut sketches = sketch_files(
output,
&input_files,
*concat_fasta,
#[cfg(feature = "3di")]
*convert_pdb,
&kmers,
sketch_bins,
&seq_type,
rc,
*min_count,
*min_qual,
args.quiet,
);
let sketch_vec = MultiSketch::new(&mut sketches, sketch_bins, &kmers, seq_type);
sketch_vec
.save_metadata(output)
.expect("Error saving metadata");
Ok(())
}
Commands::Dist {
ref_db,
query_db,
output,
mut knn,
subset,
kmer,
ani,
threads,
} => {
check_and_set_threads(*threads);
let mut output_file = set_ostream(output);
let ref_db_name = utils::strip_sketch_extension(ref_db);
let mut references = MultiSketch::load_metadata(ref_db_name)
.unwrap_or_else(|_| panic!("Could not read sketch metadata from {ref_db}.skm"));
log::info!("Loading sketch data from {ref_db_name}.skd");
if let Some(subset_file) = subset {
let subset_names = read_subset_names(subset_file);
references.read_sketch_data_block(ref_db_name, &subset_names);
} else {
references.read_sketch_data(ref_db_name);
}
log::info!("Read reference sketches:\n{references:?}");
let n = references.number_samples_loaded();
if let Some(nn) = knn {
if nn >= n {
log::warn!("knn={nn} is higher than number of samples={n}");
knn = Some(n - 1);
}
}
let dist_type = set_k(&references, *kmer, *ani).unwrap_or_else(|e| {
panic!("Error setting k size: {e}");
});
let queries = if let Some(query_db_name) = query_db {
let mut queries = MultiSketch::load_metadata(query_db_name).unwrap_or_else(|_| {
panic!("Could not read sketch metadata from {query_db_name}.skm")
});
log::info!("Loading query sketch data from {query_db_name}.skd");
queries.read_sketch_data(query_db_name);
log::info!("Read query sketches:\n{queries:?}");
Some(queries)
} else {
None
};
match queries {
None => {
match knn {
None => {
log::info!("Calculating all ref vs ref distances");
let distances = self_dists_all(&references, n, dist_type, args.quiet);
log::info!("Writing out in long matrix form");
write!(output_file, "{distances}")
.expect("Error writing output distances");
}
Some(nn) => {
log::info!("Calculating sparse ref vs ref distances with {nn} nearest neighbours");
let distances =
self_dists_knn(&references, n, nn, dist_type, args.quiet);
log::info!("Writing out in sparse matrix form");
write!(output_file, "{distances}")
.expect("Error writing output distances");
}
}
}
Some(query_db) => {
log::info!("Calculating all ref vs query distances");
let nq = query_db.number_samples_loaded();
let distances =
self_query_dists_all(&references, &query_db, n, nq, dist_type, args.quiet);
log::info!("Writing out in long matrix form");
write!(output_file, "{distances}").expect("Error writing output distances");
}
}
Ok(())
}
Commands::Merge { db1, db2, output } => {
let ref_db_name1 = utils::strip_sketch_extension(db1);
let ref_db_name2 = utils::strip_sketch_extension(db2);
log::info!("Reading input metadata");
let mut sketches1: MultiSketch = MultiSketch::load_metadata(ref_db_name1)
.unwrap_or_else(|_| {
panic!("Could not read sketch metadata from {ref_db_name1}.skm")
});
let sketches2: MultiSketch =
MultiSketch::load_metadata(ref_db_name2).unwrap_or_else(|_| {
panic!("Could not read sketch metadata from {ref_db_name2}.skm")
});
if !sketches1.is_compatible_with(&sketches2) {
panic!("Databases are not compatible for merging.")
}
log::info!("Merging metadata to {output}.skm");
let merged_sketch = sketches1.merge_sketches(&sketches2);
merged_sketch
.save_metadata(output)
.unwrap_or_else(|_| panic!("Couldn't save metadata to {output}"));
log::info!("Merging and saving sketch data to {output}.skd");
utils::save_sketch_data(ref_db_name1, ref_db_name2, output)
}
Commands::Inverted { command } => match command {
InvertedCommands::Build {
seq_files,
file_list,
output,
write_skq,
species_names,
single_strand,
min_count,
min_qual,
threads,
sketch_size,
kmer_length,
} => {
check_and_set_threads(*threads + 1);
log::info!("Getting input files");
let input_files: Vec<(String, String, Option<String>)> =
get_input_list(file_list, seq_files);
log::info!("Parsed {} samples in input list", input_files.len());
let file_order = if let Some(species_name_file) = species_names {
reorder_input_files(&input_files, species_name_file)
} else {
(0..input_files.len()).collect()
};
let skq_file = if *write_skq {
Some(format!("{output}.skq"))
} else {
None
};
let rc = !*single_strand;
let seq_type = &HashType::DNA;
let inverted = Inverted::new(
&input_files,
skq_file,
&file_order,
*kmer_length,
*sketch_size, seq_type,
rc,
*min_count,
*min_qual,
args.quiet,
);
inverted.save(output)?;
log::info!("Index info:\n{inverted:?}");
Ok(())
}
InvertedCommands::Query {
ski,
seq_files,
file_list,
output,
query_type,
min_count,
min_qual,
threads,
} => {
let mut output_file = set_ostream(output);
let inverted_index = Inverted::load(strip_sketch_extension(ski))?;
log::info!("Read inverted index:\n{inverted_index:?}");
log::info!("Getting input queries");
let input_files: Vec<(String, String, Option<String>)> =
get_input_list(file_list, seq_files);
log::info!("Parsed {} samples in input query list", input_files.len());
log::info!("Sketching input queries");
check_and_set_threads(*threads + 1); let (queries, query_names) =
inverted_index.sketch_queries(&input_files, *min_count, *min_qual, args.quiet);
log::info!("Running queries in mode: {query_type}");
write!(output_file, "Query")?;
if *query_type == InvertedQueryType::MatchCount {
for name in inverted_index.sample_names() {
write!(output_file, "\t{name}")?;
}
writeln!(output_file)?;
} else {
writeln!(output_file, "\tMatches")?;
}
let (tx, rx) = mpsc::channel();
let percent = false;
let progress_bar = get_progress_bar(queries.len(), percent, args.quiet);
rayon::scope(|s| {
s.spawn(|_| {
queries
.par_iter()
.progress_with(progress_bar)
.zip(query_names)
.map(|(q, q_name)| match query_type {
InvertedQueryType::MatchCount => {
(q_name, inverted_index.query_against_inverted_index(q))
}
InvertedQueryType::AllBins => {
(q_name, inverted_index.all_shared_bins(q))
}
InvertedQueryType::AnyBin => {
(q_name, inverted_index.any_shared_bins(q))
}
})
.for_each_with(tx, |tx, dists| {
let _ = tx.send(dists);
});
});
});
for (q_name, dist) in rx {
write!(output_file, "{q_name}")?;
if *query_type == InvertedQueryType::MatchCount {
for distance in dist {
write!(output_file, "\t{distance}")?;
}
} else {
write!(
output_file,
"\t{}",
inverted_index.sample_at(dist[0] as usize)
)?;
for r_name in dist
.iter()
.skip(1)
.map(|idx| inverted_index.sample_at(*idx as usize))
{
write!(output_file, ",{r_name}")?;
}
}
writeln!(output_file)?;
}
Ok(())
}
InvertedCommands::Precluster {
ski,
skd,
count,
output,
mut knn,
ani,
threads,
} => {
check_and_set_threads(*threads);
let input_prefix = strip_sketch_extension(ski);
let inverted_index = Inverted::load(input_prefix)?;
if *count {
let prefilter_pairs = inverted_index.any_shared_bin_list(args.quiet);
println!(
"Identified {} prefilter pairs from a max of {}",
prefilter_pairs.len(),
inverted_index.sample_names().len()
* (inverted_index.sample_names().len() - 1)
/ 2
);
} else if let Some(ref_db_input) = skd {
let mut output_file = set_ostream(output);
let skq_filename = &format!("{input_prefix}.skq");
log::info!("Loading queries from {skq_filename}");
let (mmap, bin_stride, kmer_stride, sample_stride) =
(false, 1, 1, inverted_index.sketch_size());
let mut skq_reader = SketchArrayReader::open(
skq_filename,
mmap,
bin_stride,
kmer_stride,
sample_stride,
);
let skq_bins =
skq_reader.read_all_from_skq(sample_stride * inverted_index.sketch_size());
let ref_db_name = utils::strip_sketch_extension(ref_db_input);
let mut references =
MultiSketch::load_metadata(ref_db_name).unwrap_or_else(|_| {
panic!("Could not read sketch metadata from {ref_db_name}.skm")
});
log::info!("Loading sketch data from {ref_db_name}.skd");
references.read_sketch_data(ref_db_name);
log::info!("Read reference sketches:\n{references:?}");
let n = references.number_samples_loaded();
if knn >= n {
log::warn!("knn={knn} is higher than number of samples={n}");
knn = n - 1;
}
let kmer = inverted_index.kmer();
let dist_type = set_k(&references, Some(kmer), *ani).unwrap_or_else(|e| {
panic!("K-mer size {kmer} used for .ski not found in .skd: {e}");
});
log::info!(
"Calculating sparse ref vs ref distances with {knn} nearest neighbours"
);
log::info!(
"Preclustering with k={} and s={}",
kmer,
inverted_index.sketch_size()
);
let distances = self_dists_knn_precluster(
&references,
&inverted_index,
&skq_bins,
skq_reader.sample_stride,
n,
knn,
dist_type,
args.quiet,
);
log::info!("Writing out in sparse matrix form");
write!(output_file, "{distances}").expect("Error writing output distances");
}
Ok(())
}
},
Commands::Append {
db,
seq_files,
file_list,
output,
single_strand,
min_count,
min_qual,
concat_fasta,
threads,
level,
} => {
check_and_set_threads(*threads + 1);
log::info!("Getting input files");
let input_files: Vec<(String, String, Option<String>)> =
get_input_list(file_list, seq_files);
log::info!("Parsed {} samples in input list", input_files.len());
let db_metadata: MultiSketch = MultiSketch::load_metadata(db)?;
if !db_metadata.append_compatibility(&input_files) {
panic!("Databases are not compatible for merging.")
}
log::info!("Passed concat check");
let kmers = db_metadata.kmer_lengths();
let rc = !*single_strand;
let sketch_size = db_metadata.sketch_size;
let seq_type = db_metadata.get_hash_type();
if *concat_fasta && matches!(*seq_type, HashType::DNA | HashType::PDB) {
panic!("--concat-fasta currently only supported with --seq-type aa");
}
log::info!(
"Running sketching: k:{:?}; sketch_size:{}; seq:{:?}; threads:{}",
kmers,
sketch_size * u64::BITS as u64,
seq_type,
threads,
);
let seq_type = if let HashType::AA(_) = seq_type {
HashType::AA(level.clone())
} else {
seq_type.clone()
};
let mut db2_sketches = sketch_files(
output,
&input_files,
*concat_fasta,
#[cfg(feature = "3di")]
false,
kmers,
sketch_size,
&seq_type,
rc,
*min_count,
*min_qual,
args.quiet,
);
let mut db2_metadata =
MultiSketch::new(&mut db2_sketches, sketch_size, kmers, seq_type);
log::info!("Merging and saving sketch data to {output}.skd");
let mut output_file = OpenOptions::new()
.create(true)
.append(true)
.open(format!("{output}.skd"))?;
let mut db_sketch = File::open(format!("{db}.skd"))?;
copy(&mut db_sketch, &mut output_file)?;
let concat_metadata = db2_metadata.merge_sketches(&db_metadata);
concat_metadata
.save_metadata(output)
.unwrap_or_else(|_| panic!("Could not save metadata to {output}"));
Ok(())
}
Commands::Delete {
db,
samples,
output_file,
} => {
let ref_db = utils::strip_sketch_extension(db);
log::info!("Reading input genomes");
let path = Path::new(samples);
let file = File::open(path)?;
let reader = std::io::BufReader::new(file);
let ids: Vec<String> = reader.lines().map_while(Result::ok).collect();
log::info!("Reading input metadata");
let mut sketches: MultiSketch = MultiSketch::load_metadata(ref_db)
.unwrap_or_else(|_| panic!("Could not read sketch metadata from {ref_db}.skm"));
sketches.remove_metadata(output_file, &ids)?;
log::info!("Remove genomes and writing output");
sketches.remove_genomes(ref_db, output_file, &ids)?;
log::info!("Finished writing filtered sketch data to {output_file}");
Ok(())
}
Commands::Info {
skm_file,
sample_info,
} => {
if skm_file.ends_with(".ski") {
let ski_file = &skm_file[0..skm_file.len() - 4];
let index = Inverted::load(ski_file).unwrap_or_else(|err| {
println!("Read error: {err}");
panic!("Could not read inverted index from {ski_file}.ski")
});
if *sample_info {
log::info!("Printing sample info");
println!("{index}");
} else {
log::info!("Printing inverted index info");
println!("{index:?}");
}
} else {
let ref_db_name = if skm_file.ends_with(".skm") || skm_file.ends_with(".skd") {
&skm_file[0..skm_file.len() - 4]
} else {
skm_file.as_str()
};
let sketches = MultiSketch::load_metadata(ref_db_name).unwrap_or_else(|_| {
panic!("Could not read sketch metadata from {ref_db_name}.skm")
});
if *sample_info {
log::info!("Printing sample info");
println!("{sketches}");
} else {
log::info!("Printing database info");
println!("{sketches:?}");
}
}
print_success = false; Ok(())
}
};
let end = Instant::now();
log::info!("Complete");
if print_success && !args.quiet {
eprintln!(
"🧬🖋️ sketchlib done in {}s",
end.duration_since(start).as_secs()
);
}
result
}