use crate::error::{io_err, AlignError};
use crate::hit::Hit;
use crate::{AlignOpts, Aligner};
use std::collections::{BTreeMap, HashMap};
use std::fs::File;
use std::io::{BufRead, BufReader, BufWriter, Write};
use std::path::{Path, PathBuf};
use std::process::Command;
pub const GENOME_SEP: char = '|';
#[derive(Debug, Clone)]
pub struct GenomeInput {
pub id: String,
pub fasta: PathBuf,
}
#[derive(Debug, Clone)]
pub struct GenomeHitSet {
pub genome_id: String,
pub hits: Vec<Hit>,
}
pub struct BatchClusterAligner {
pub inner: Box<dyn Aligner>,
pub cluster_identity: f32,
pub cluster_coverage: f32,
}
impl BatchClusterAligner {
pub fn new(inner: Box<dyn Aligner>) -> Self {
Self { inner, cluster_identity: 0.5, cluster_coverage: 0.8 }
}
pub fn align_genomes(
&self,
query_fasta: &Path,
genomes: &[GenomeInput],
workdir: &Path,
opts: &AlignOpts,
) -> Result<Vec<GenomeHitSet>, AlignError> {
std::fs::create_dir_all(workdir).map_err(|e| io_err(workdir, e))?;
let merged = workdir.join("all_genomes.faa");
concat_genomes(genomes, &merged)?;
let cluster = cluster_with_mmseqs(
&merged,
workdir,
self.cluster_identity,
self.cluster_coverage,
opts,
)?;
tracing::info!(
genomes = genomes.len(),
members = cluster.total_members,
representatives = cluster.rep_to_members.len(),
"clustering complete"
);
let rep_hits = self.inner.align(query_fasta, &cluster.representatives_fasta, opts)?;
let per_genome = expand_hits(&rep_hits, &cluster, genomes);
Ok(per_genome)
}
}
pub fn concat_genomes(genomes: &[GenomeInput], out: &Path) -> Result<(), AlignError> {
for g in genomes {
if g.id.contains(GENOME_SEP) || g.id.is_empty() {
return Err(AlignError::BadArg(format!(
"genome id `{}` must be non-empty and must not contain `{GENOME_SEP}`",
g.id
)));
}
}
let f = File::create(out).map_err(|e| io_err(out, e))?;
let mut w = BufWriter::new(f);
for g in genomes {
let r = File::open(&g.fasta).map_err(|e| io_err(&g.fasta, e))?;
let rdr = BufReader::new(r);
for line in rdr.lines() {
let line = line.map_err(|e| io_err(&g.fasta, e))?;
if let Some(rest) = line.strip_prefix('>') {
writeln!(w, ">{}{}{}", g.id, GENOME_SEP, rest).map_err(|e| io_err(out, e))?;
} else {
writeln!(w, "{line}").map_err(|e| io_err(out, e))?;
}
}
}
w.flush().map_err(|e| io_err(out, e))?;
Ok(())
}
pub fn split_genome_prefix(header: &str) -> Option<(&str, &str)> {
header.split_once(GENOME_SEP)
}
pub struct ClusterResult {
pub representatives_fasta: PathBuf,
pub rep_to_members: HashMap<String, Vec<String>>,
pub total_members: usize,
}
pub fn cluster_with_mmseqs(
merged_fasta: &Path,
workdir: &Path,
min_identity: f32,
min_coverage: f32,
opts: &AlignOpts,
) -> Result<ClusterResult, AlignError> {
require_mmseqs()?;
let prefix = workdir.join("cluster");
let scratch = workdir.join("cluster_tmp");
std::fs::create_dir_all(&scratch).map_err(|e| io_err(&scratch, e))?;
let verbosity = if opts.quiet { "1" } else { "3" };
let mut cmd = Command::new("mmseqs");
cmd.arg("easy-cluster")
.arg(merged_fasta)
.arg(&prefix)
.arg(&scratch)
.arg("--min-seq-id")
.arg(format!("{min_identity:.2}"))
.arg("-c")
.arg(format!("{min_coverage:.2}"))
.arg("--threads")
.arg(opts.threads.to_string())
.arg("-v")
.arg(verbosity);
tracing::debug!(?cmd, "running mmseqs easy-cluster");
let out = cmd.output().map_err(|e| io_err(Path::new("mmseqs"), e))?;
if !out.status.success() {
return Err(AlignError::ToolFailed {
tool: "mmseqs2",
status: out.status,
stderr: String::from_utf8_lossy(&out.stderr).to_string(),
});
}
let rep_fa: PathBuf = append_ext(&prefix, "_rep_seq.fasta");
let cluster_tsv: PathBuf = append_ext(&prefix, "_cluster.tsv");
let rep_to_members = parse_cluster_tsv(&cluster_tsv)?;
let total_members: usize = rep_to_members.values().map(|v| v.len()).sum();
Ok(ClusterResult { representatives_fasta: rep_fa, rep_to_members, total_members })
}
fn append_ext(prefix: &Path, suffix: &str) -> PathBuf {
let mut s = prefix.as_os_str().to_owned();
s.push(suffix);
PathBuf::from(s)
}
pub fn parse_cluster_tsv(path: &Path) -> Result<HashMap<String, Vec<String>>, AlignError> {
let f = File::open(path).map_err(|e| io_err(path, e))?;
let rdr = BufReader::new(f);
let mut out: HashMap<String, Vec<String>> = HashMap::new();
for (i, line) in rdr.lines().enumerate() {
let line = line.map_err(|e| io_err(path, e))?;
if line.is_empty() {
continue;
}
let (rep, member) = line.split_once('\t').ok_or_else(|| AlignError::TsvParse {
line: (i + 1) as u64,
msg: "cluster TSV row has no tab".into(),
})?;
out.entry(rep.to_string()).or_default().push(member.to_string());
}
Ok(out)
}
fn expand_hits(
rep_hits: &[Hit],
cluster: &ClusterResult,
genomes: &[GenomeInput],
) -> Vec<GenomeHitSet> {
let mut per_genome: BTreeMap<usize, Vec<Hit>> = BTreeMap::new();
let index: HashMap<&str, usize> =
genomes.iter().enumerate().map(|(i, g)| (g.id.as_str(), i)).collect();
let to_accession = |h: &str| h.split_whitespace().next().unwrap_or("").to_string();
let key_by_acc: HashMap<String, &Vec<String>> = cluster
.rep_to_members
.iter()
.map(|(k, v)| (to_accession(k), v))
.collect();
for rep_hit in rep_hits {
let rep_key = to_accession(&rep_hit.stitle);
let members = match key_by_acc.get(rep_key.as_str()) {
Some(m) => m.as_slice(),
None => {
tracing::warn!(
rep = %rep_key,
"rep-hit has no matching cluster — dropping"
);
continue;
}
};
for m in members {
let (genome_id, orig_header) = match split_genome_prefix(m) {
Some((g, o)) => (g, o),
None => continue,
};
let Some(&gidx) = index.get(genome_id) else { continue };
let mut h = rep_hit.clone();
h.stitle = orig_header.to_string();
per_genome.entry(gidx).or_default().push(h);
}
}
drop(key_by_acc);
(0..genomes.len())
.map(|i| GenomeHitSet {
genome_id: genomes[i].id.clone(),
hits: per_genome.remove(&i).unwrap_or_default(),
})
.collect()
}
fn require_mmseqs() -> Result<(), AlignError> {
if crate::blast::which("mmseqs").is_some() {
Ok(())
} else {
Err(AlignError::ToolMissing { tool: "mmseqs" })
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Read;
#[test]
fn concat_rewrites_headers() {
let dir = tempfile::tempdir().unwrap();
let a = dir.path().join("A.faa");
let b = dir.path().join("B.faa");
std::fs::write(&a, ">seq1 descA\nMKLMA\n>seq2\nEEPS\n").unwrap();
std::fs::write(&b, ">x\nAAA\n").unwrap();
let out = dir.path().join("all.faa");
concat_genomes(
&[
GenomeInput { id: "g1".into(), fasta: a },
GenomeInput { id: "g2".into(), fasta: b },
],
&out,
)
.unwrap();
let mut s = String::new();
File::open(&out).unwrap().read_to_string(&mut s).unwrap();
assert!(s.contains(">g1|seq1 descA\n"));
assert!(s.contains(">g1|seq2\n"));
assert!(s.contains(">g2|x\n"));
assert!(s.contains("MKLMA"));
}
#[test]
fn concat_rejects_bad_genome_id() {
let dir = tempfile::tempdir().unwrap();
let a = dir.path().join("A.faa");
std::fs::write(&a, ">s\nA\n").unwrap();
let err = concat_genomes(
&[GenomeInput { id: "g|1".into(), fasta: a }],
&dir.path().join("out.faa"),
)
.unwrap_err();
assert!(matches!(err, AlignError::BadArg(_)));
}
#[test]
fn parses_cluster_tsv() {
let dir = tempfile::tempdir().unwrap();
let p = dir.path().join("c.tsv");
std::fs::write(&p, "rep1\trep1\nrep1\tmemA\nrep2\trep2\n").unwrap();
let m = parse_cluster_tsv(&p).unwrap();
assert_eq!(m.get("rep1").unwrap(), &vec!["rep1".to_string(), "memA".to_string()]);
assert_eq!(m.get("rep2").unwrap(), &vec!["rep2".to_string()]);
}
#[test]
fn split_prefix_first_pipe_only() {
assert_eq!(split_genome_prefix("g1|sp|P12345|X"), Some(("g1", "sp|P12345|X")));
assert_eq!(split_genome_prefix("unpre"), None);
}
}