Skip to main content

gapsmith_align/
batch.rs

1//! Batch-cluster alignment across N genomes.
2//!
3//! When annotating many genomes against the same gapseq reference seqdb,
4//! the per-genome loop is dominated by redundant alignments — closely
5//! related genomes share a lot of proteins. The batch-cluster path
6//! amortizes that cost:
7//!
8//! 1. [`concat_genomes`] merges every genome's proteome into one FASTA,
9//!    rewriting each header as `<GENOMEID>|<orig_header>` so we can trace
10//!    any downstream hit back to its genome of origin.
11//! 2. [`cluster_with_mmseqs`] calls `mmseqs easy-cluster` to produce
12//!    (a) a representative FASTA and (b) a two-column `<rep>\t<member>`
13//!    TSV.
14//! 3. [`BatchClusterAligner::align_genomes`] then runs a single alignment
15//!    of the gapseq query FASTA against the representatives and expands
16//!    each rep-hit into its cluster members, bucketing the results by
17//!    genome.
18//!
19//! The expansion is an approximation: a member inherits its
20//! representative's bitscore/identity. Users who need per-member precision
21//! can re-align the affected members with any of the standard aligners —
22//! typically a tiny fraction of the original N-genome cost.
23
24use crate::error::{io_err, AlignError};
25use crate::hit::Hit;
26use crate::{AlignOpts, Aligner};
27use std::collections::{BTreeMap, HashMap};
28use std::fs::File;
29use std::io::{BufRead, BufReader, BufWriter, Write};
30use std::path::{Path, PathBuf};
31use std::process::Command;
32
33/// Separator between the genome ID and the original FASTA header. A pipe
34/// `|` matches the convention used in the R `prepare_batch_alignments.R`
35/// pipeline; the same character also appears in UniProt-style `sp|P12345|NAME`
36/// accessions, so parsers must split on the *first* `|` only.
37pub const GENOME_SEP: char = '|';
38
39/// A genome's protein FASTA plus a short identifier used to prefix every
40/// header on concatenation.
41#[derive(Debug, Clone)]
42pub struct GenomeInput {
43    pub id: String,
44    pub fasta: PathBuf,
45}
46
47/// Hits for one genome after batch-cluster expansion. Emitted in original
48/// input order so downstream code can pair them up with `GenomeInput` by
49/// index without extra lookups.
50#[derive(Debug, Clone)]
51pub struct GenomeHitSet {
52    pub genome_id: String,
53    pub hits: Vec<Hit>,
54}
55
56/// Driver for the batch-cluster mode.
57///
58/// `inner` is any per-pair [`Aligner`] — typically [`crate::DiamondAligner`]
59/// or [`crate::Mmseqs2Aligner`]. It aligns query-vs-representative exactly
60/// once; the expansion layer then rebuilds per-genome hit sets.
61pub struct BatchClusterAligner {
62    pub inner: Box<dyn Aligner>,
63    pub cluster_identity: f32,
64    pub cluster_coverage: f32,
65}
66
67impl BatchClusterAligner {
68    /// Sensible defaults: 0.5 identity / 0.8 coverage mirror mmseqs2's own
69    /// default `easy-cluster` parameters.
70    pub fn new(inner: Box<dyn Aligner>) -> Self {
71        Self { inner, cluster_identity: 0.5, cluster_coverage: 0.8 }
72    }
73
74    /// Run the full batch-cluster pipeline. `workdir` is where intermediate
75    /// files live (concatenated FASTA, cluster outputs, alignment TSV);
76    /// pass a `tempfile::TempDir::path()` if you don't need to persist them.
77    pub fn align_genomes(
78        &self,
79        query_fasta: &Path,
80        genomes: &[GenomeInput],
81        workdir: &Path,
82        opts: &AlignOpts,
83    ) -> Result<Vec<GenomeHitSet>, AlignError> {
84        std::fs::create_dir_all(workdir).map_err(|e| io_err(workdir, e))?;
85
86        // 1) Concatenate.
87        let merged = workdir.join("all_genomes.faa");
88        concat_genomes(genomes, &merged)?;
89
90        // 2) Cluster.
91        let cluster = cluster_with_mmseqs(
92            &merged,
93            workdir,
94            self.cluster_identity,
95            self.cluster_coverage,
96            opts,
97        )?;
98
99        tracing::info!(
100            genomes = genomes.len(),
101            members = cluster.total_members,
102            representatives = cluster.rep_to_members.len(),
103            "clustering complete"
104        );
105
106        // 3) Single alignment vs representatives.
107        let rep_hits = self.inner.align(query_fasta, &cluster.representatives_fasta, opts)?;
108
109        // 4) Expand per-genome.
110        let per_genome = expand_hits(&rep_hits, &cluster, genomes);
111        Ok(per_genome)
112    }
113}
114
115// -- Concatenation --
116
117/// Produce a single FASTA at `out` whose headers are rewritten to
118/// `>GENOMEID|ORIGHEADER`. Fails if any genome ID contains `|` (because
119/// the downstream split would be ambiguous).
120pub fn concat_genomes(genomes: &[GenomeInput], out: &Path) -> Result<(), AlignError> {
121    for g in genomes {
122        if g.id.contains(GENOME_SEP) || g.id.is_empty() {
123            return Err(AlignError::BadArg(format!(
124                "genome id `{}` must be non-empty and must not contain `{GENOME_SEP}`",
125                g.id
126            )));
127        }
128    }
129    let f = File::create(out).map_err(|e| io_err(out, e))?;
130    let mut w = BufWriter::new(f);
131    for g in genomes {
132        let r = File::open(&g.fasta).map_err(|e| io_err(&g.fasta, e))?;
133        let rdr = BufReader::new(r);
134        for line in rdr.lines() {
135            let line = line.map_err(|e| io_err(&g.fasta, e))?;
136            if let Some(rest) = line.strip_prefix('>') {
137                writeln!(w, ">{}{}{}", g.id, GENOME_SEP, rest).map_err(|e| io_err(out, e))?;
138            } else {
139                writeln!(w, "{line}").map_err(|e| io_err(out, e))?;
140            }
141        }
142    }
143    w.flush().map_err(|e| io_err(out, e))?;
144    Ok(())
145}
146
147/// Pull `GENOMEID` out of a concatenated header (first `|`-separated token).
148/// Returns `None` on a malformed header.
149pub fn split_genome_prefix(header: &str) -> Option<(&str, &str)> {
150    header.split_once(GENOME_SEP)
151}
152
153// -- Clustering --
154
155/// Result of an `mmseqs easy-cluster` run.
156pub struct ClusterResult {
157    pub representatives_fasta: PathBuf,
158    /// Map: representative-header → list of member headers (including the
159    /// representative itself, which mmseqs includes as one of the members).
160    pub rep_to_members: HashMap<String, Vec<String>>,
161    pub total_members: usize,
162}
163
164pub fn cluster_with_mmseqs(
165    merged_fasta: &Path,
166    workdir: &Path,
167    min_identity: f32,
168    min_coverage: f32,
169    opts: &AlignOpts,
170) -> Result<ClusterResult, AlignError> {
171    require_mmseqs()?;
172
173    let prefix = workdir.join("cluster");
174    let scratch = workdir.join("cluster_tmp");
175    std::fs::create_dir_all(&scratch).map_err(|e| io_err(&scratch, e))?;
176
177    let verbosity = if opts.quiet { "1" } else { "3" };
178    let mut cmd = Command::new("mmseqs");
179    cmd.arg("easy-cluster")
180        .arg(merged_fasta)
181        .arg(&prefix)
182        .arg(&scratch)
183        .arg("--min-seq-id")
184        .arg(format!("{min_identity:.2}"))
185        .arg("-c")
186        .arg(format!("{min_coverage:.2}"))
187        .arg("--threads")
188        .arg(opts.threads.to_string())
189        .arg("-v")
190        .arg(verbosity);
191
192    tracing::debug!(?cmd, "running mmseqs easy-cluster");
193    let out = cmd.output().map_err(|e| io_err(Path::new("mmseqs"), e))?;
194    if !out.status.success() {
195        return Err(AlignError::ToolFailed {
196            tool: "mmseqs2",
197            status: out.status,
198            stderr: String::from_utf8_lossy(&out.stderr).to_string(),
199        });
200    }
201
202    // easy-cluster writes:
203    //   <prefix>_rep_seq.fasta       (representatives)
204    //   <prefix>_cluster.tsv         (two cols: rep\tmember)
205    //   <prefix>_all_seqs.fasta      (full clustered fasta, ignored here)
206    let rep_fa: PathBuf = append_ext(&prefix, "_rep_seq.fasta");
207    let cluster_tsv: PathBuf = append_ext(&prefix, "_cluster.tsv");
208
209    let rep_to_members = parse_cluster_tsv(&cluster_tsv)?;
210    let total_members: usize = rep_to_members.values().map(|v| v.len()).sum();
211
212    Ok(ClusterResult { representatives_fasta: rep_fa, rep_to_members, total_members })
213}
214
215fn append_ext(prefix: &Path, suffix: &str) -> PathBuf {
216    let mut s = prefix.as_os_str().to_owned();
217    s.push(suffix);
218    PathBuf::from(s)
219}
220
221pub fn parse_cluster_tsv(path: &Path) -> Result<HashMap<String, Vec<String>>, AlignError> {
222    let f = File::open(path).map_err(|e| io_err(path, e))?;
223    let rdr = BufReader::new(f);
224    let mut out: HashMap<String, Vec<String>> = HashMap::new();
225    for (i, line) in rdr.lines().enumerate() {
226        let line = line.map_err(|e| io_err(path, e))?;
227        if line.is_empty() {
228            continue;
229        }
230        let (rep, member) = line.split_once('\t').ok_or_else(|| AlignError::TsvParse {
231            line: (i + 1) as u64,
232            msg: "cluster TSV row has no tab".into(),
233        })?;
234        out.entry(rep.to_string()).or_default().push(member.to_string());
235    }
236    Ok(out)
237}
238
239// -- Expansion --
240
241fn expand_hits(
242    rep_hits: &[Hit],
243    cluster: &ClusterResult,
244    genomes: &[GenomeInput],
245) -> Vec<GenomeHitSet> {
246    // Preserve genome order for stable output.
247    let mut per_genome: BTreeMap<usize, Vec<Hit>> = BTreeMap::new();
248    let index: HashMap<&str, usize> =
249        genomes.iter().enumerate().map(|(i, g)| (g.id.as_str(), i)).collect();
250
251    // mmseqs's cluster TSV keys are the representative's *first*
252    // whitespace-delimited token (accession only); diamond/blastp's
253    // `stitle` field carries the full header. Normalize both sides to
254    // the accession form before the lookup.
255    let to_accession = |h: &str| h.split_whitespace().next().unwrap_or("").to_string();
256    let key_by_acc: HashMap<String, &Vec<String>> = cluster
257        .rep_to_members
258        .iter()
259        .map(|(k, v)| (to_accession(k), v))
260        .collect();
261
262    for rep_hit in rep_hits {
263        let rep_key = to_accession(&rep_hit.stitle);
264        let members = match key_by_acc.get(rep_key.as_str()) {
265            Some(m) => m.as_slice(),
266            None => {
267                tracing::warn!(
268                    rep = %rep_key,
269                    "rep-hit has no matching cluster — dropping"
270                );
271                continue;
272            }
273        };
274        for m in members {
275            let (genome_id, orig_header) = match split_genome_prefix(m) {
276                Some((g, o)) => (g, o),
277                None => continue,
278            };
279            let Some(&gidx) = index.get(genome_id) else { continue };
280            let mut h = rep_hit.clone();
281            h.stitle = orig_header.to_string();
282            per_genome.entry(gidx).or_default().push(h);
283        }
284    }
285    drop(key_by_acc);
286    (0..genomes.len())
287        .map(|i| GenomeHitSet {
288            genome_id: genomes[i].id.clone(),
289            hits: per_genome.remove(&i).unwrap_or_default(),
290        })
291        .collect()
292}
293
294fn require_mmseqs() -> Result<(), AlignError> {
295    if crate::blast::which("mmseqs").is_some() {
296        Ok(())
297    } else {
298        Err(AlignError::ToolMissing { tool: "mmseqs" })
299    }
300}
301
302#[cfg(test)]
303mod tests {
304    use super::*;
305    use std::io::Read;
306
307    #[test]
308    fn concat_rewrites_headers() {
309        let dir = tempfile::tempdir().unwrap();
310        let a = dir.path().join("A.faa");
311        let b = dir.path().join("B.faa");
312        std::fs::write(&a, ">seq1 descA\nMKLMA\n>seq2\nEEPS\n").unwrap();
313        std::fs::write(&b, ">x\nAAA\n").unwrap();
314        let out = dir.path().join("all.faa");
315        concat_genomes(
316            &[
317                GenomeInput { id: "g1".into(), fasta: a },
318                GenomeInput { id: "g2".into(), fasta: b },
319            ],
320            &out,
321        )
322        .unwrap();
323        let mut s = String::new();
324        File::open(&out).unwrap().read_to_string(&mut s).unwrap();
325        assert!(s.contains(">g1|seq1 descA\n"));
326        assert!(s.contains(">g1|seq2\n"));
327        assert!(s.contains(">g2|x\n"));
328        assert!(s.contains("MKLMA"));
329    }
330
331    #[test]
332    fn concat_rejects_bad_genome_id() {
333        let dir = tempfile::tempdir().unwrap();
334        let a = dir.path().join("A.faa");
335        std::fs::write(&a, ">s\nA\n").unwrap();
336        let err = concat_genomes(
337            &[GenomeInput { id: "g|1".into(), fasta: a }],
338            &dir.path().join("out.faa"),
339        )
340        .unwrap_err();
341        assert!(matches!(err, AlignError::BadArg(_)));
342    }
343
344    #[test]
345    fn parses_cluster_tsv() {
346        let dir = tempfile::tempdir().unwrap();
347        let p = dir.path().join("c.tsv");
348        std::fs::write(&p, "rep1\trep1\nrep1\tmemA\nrep2\trep2\n").unwrap();
349        let m = parse_cluster_tsv(&p).unwrap();
350        assert_eq!(m.get("rep1").unwrap(), &vec!["rep1".to_string(), "memA".to_string()]);
351        assert_eq!(m.get("rep2").unwrap(), &vec!["rep2".to_string()]);
352    }
353
354    #[test]
355    fn split_prefix_first_pipe_only() {
356        assert_eq!(split_genome_prefix("g1|sp|P12345|X"), Some(("g1", "sp|P12345|X")));
357        assert_eq!(split_genome_prefix("unpre"), None);
358    }
359}