1use crate::error::{io_err, AlignError};
25use crate::hit::Hit;
26use crate::{AlignOpts, Aligner};
27use std::collections::{BTreeMap, HashMap};
28use std::fs::File;
29use std::io::{BufRead, BufReader, BufWriter, Write};
30use std::path::{Path, PathBuf};
31use std::process::Command;
32
33pub const GENOME_SEP: char = '|';
38
39#[derive(Debug, Clone)]
42pub struct GenomeInput {
43 pub id: String,
44 pub fasta: PathBuf,
45}
46
47#[derive(Debug, Clone)]
51pub struct GenomeHitSet {
52 pub genome_id: String,
53 pub hits: Vec<Hit>,
54}
55
56pub struct BatchClusterAligner {
62 pub inner: Box<dyn Aligner>,
63 pub cluster_identity: f32,
64 pub cluster_coverage: f32,
65}
66
67impl BatchClusterAligner {
68 pub fn new(inner: Box<dyn Aligner>) -> Self {
71 Self { inner, cluster_identity: 0.5, cluster_coverage: 0.8 }
72 }
73
74 pub fn align_genomes(
78 &self,
79 query_fasta: &Path,
80 genomes: &[GenomeInput],
81 workdir: &Path,
82 opts: &AlignOpts,
83 ) -> Result<Vec<GenomeHitSet>, AlignError> {
84 std::fs::create_dir_all(workdir).map_err(|e| io_err(workdir, e))?;
85
86 let merged = workdir.join("all_genomes.faa");
88 concat_genomes(genomes, &merged)?;
89
90 let cluster = cluster_with_mmseqs(
92 &merged,
93 workdir,
94 self.cluster_identity,
95 self.cluster_coverage,
96 opts,
97 )?;
98
99 tracing::info!(
100 genomes = genomes.len(),
101 members = cluster.total_members,
102 representatives = cluster.rep_to_members.len(),
103 "clustering complete"
104 );
105
106 let rep_hits = self.inner.align(query_fasta, &cluster.representatives_fasta, opts)?;
108
109 let per_genome = expand_hits(&rep_hits, &cluster, genomes);
111 Ok(per_genome)
112 }
113}
114
115pub fn concat_genomes(genomes: &[GenomeInput], out: &Path) -> Result<(), AlignError> {
121 for g in genomes {
122 if g.id.contains(GENOME_SEP) || g.id.is_empty() {
123 return Err(AlignError::BadArg(format!(
124 "genome id `{}` must be non-empty and must not contain `{GENOME_SEP}`",
125 g.id
126 )));
127 }
128 }
129 let f = File::create(out).map_err(|e| io_err(out, e))?;
130 let mut w = BufWriter::new(f);
131 for g in genomes {
132 let r = File::open(&g.fasta).map_err(|e| io_err(&g.fasta, e))?;
133 let rdr = BufReader::new(r);
134 for line in rdr.lines() {
135 let line = line.map_err(|e| io_err(&g.fasta, e))?;
136 if let Some(rest) = line.strip_prefix('>') {
137 writeln!(w, ">{}{}{}", g.id, GENOME_SEP, rest).map_err(|e| io_err(out, e))?;
138 } else {
139 writeln!(w, "{line}").map_err(|e| io_err(out, e))?;
140 }
141 }
142 }
143 w.flush().map_err(|e| io_err(out, e))?;
144 Ok(())
145}
146
147pub fn split_genome_prefix(header: &str) -> Option<(&str, &str)> {
150 header.split_once(GENOME_SEP)
151}
152
153pub struct ClusterResult {
157 pub representatives_fasta: PathBuf,
158 pub rep_to_members: HashMap<String, Vec<String>>,
161 pub total_members: usize,
162}
163
164pub fn cluster_with_mmseqs(
165 merged_fasta: &Path,
166 workdir: &Path,
167 min_identity: f32,
168 min_coverage: f32,
169 opts: &AlignOpts,
170) -> Result<ClusterResult, AlignError> {
171 require_mmseqs()?;
172
173 let prefix = workdir.join("cluster");
174 let scratch = workdir.join("cluster_tmp");
175 std::fs::create_dir_all(&scratch).map_err(|e| io_err(&scratch, e))?;
176
177 let verbosity = if opts.quiet { "1" } else { "3" };
178 let mut cmd = Command::new("mmseqs");
179 cmd.arg("easy-cluster")
180 .arg(merged_fasta)
181 .arg(&prefix)
182 .arg(&scratch)
183 .arg("--min-seq-id")
184 .arg(format!("{min_identity:.2}"))
185 .arg("-c")
186 .arg(format!("{min_coverage:.2}"))
187 .arg("--threads")
188 .arg(opts.threads.to_string())
189 .arg("-v")
190 .arg(verbosity);
191
192 tracing::debug!(?cmd, "running mmseqs easy-cluster");
193 let out = cmd.output().map_err(|e| io_err(Path::new("mmseqs"), e))?;
194 if !out.status.success() {
195 return Err(AlignError::ToolFailed {
196 tool: "mmseqs2",
197 status: out.status,
198 stderr: String::from_utf8_lossy(&out.stderr).to_string(),
199 });
200 }
201
202 let rep_fa: PathBuf = append_ext(&prefix, "_rep_seq.fasta");
207 let cluster_tsv: PathBuf = append_ext(&prefix, "_cluster.tsv");
208
209 let rep_to_members = parse_cluster_tsv(&cluster_tsv)?;
210 let total_members: usize = rep_to_members.values().map(|v| v.len()).sum();
211
212 Ok(ClusterResult { representatives_fasta: rep_fa, rep_to_members, total_members })
213}
214
215fn append_ext(prefix: &Path, suffix: &str) -> PathBuf {
216 let mut s = prefix.as_os_str().to_owned();
217 s.push(suffix);
218 PathBuf::from(s)
219}
220
221pub fn parse_cluster_tsv(path: &Path) -> Result<HashMap<String, Vec<String>>, AlignError> {
222 let f = File::open(path).map_err(|e| io_err(path, e))?;
223 let rdr = BufReader::new(f);
224 let mut out: HashMap<String, Vec<String>> = HashMap::new();
225 for (i, line) in rdr.lines().enumerate() {
226 let line = line.map_err(|e| io_err(path, e))?;
227 if line.is_empty() {
228 continue;
229 }
230 let (rep, member) = line.split_once('\t').ok_or_else(|| AlignError::TsvParse {
231 line: (i + 1) as u64,
232 msg: "cluster TSV row has no tab".into(),
233 })?;
234 out.entry(rep.to_string()).or_default().push(member.to_string());
235 }
236 Ok(out)
237}
238
239fn expand_hits(
242 rep_hits: &[Hit],
243 cluster: &ClusterResult,
244 genomes: &[GenomeInput],
245) -> Vec<GenomeHitSet> {
246 let mut per_genome: BTreeMap<usize, Vec<Hit>> = BTreeMap::new();
248 let index: HashMap<&str, usize> =
249 genomes.iter().enumerate().map(|(i, g)| (g.id.as_str(), i)).collect();
250
251 let to_accession = |h: &str| h.split_whitespace().next().unwrap_or("").to_string();
256 let key_by_acc: HashMap<String, &Vec<String>> = cluster
257 .rep_to_members
258 .iter()
259 .map(|(k, v)| (to_accession(k), v))
260 .collect();
261
262 for rep_hit in rep_hits {
263 let rep_key = to_accession(&rep_hit.stitle);
264 let members = match key_by_acc.get(rep_key.as_str()) {
265 Some(m) => m.as_slice(),
266 None => {
267 tracing::warn!(
268 rep = %rep_key,
269 "rep-hit has no matching cluster — dropping"
270 );
271 continue;
272 }
273 };
274 for m in members {
275 let (genome_id, orig_header) = match split_genome_prefix(m) {
276 Some((g, o)) => (g, o),
277 None => continue,
278 };
279 let Some(&gidx) = index.get(genome_id) else { continue };
280 let mut h = rep_hit.clone();
281 h.stitle = orig_header.to_string();
282 per_genome.entry(gidx).or_default().push(h);
283 }
284 }
285 drop(key_by_acc);
286 (0..genomes.len())
287 .map(|i| GenomeHitSet {
288 genome_id: genomes[i].id.clone(),
289 hits: per_genome.remove(&i).unwrap_or_default(),
290 })
291 .collect()
292}
293
294fn require_mmseqs() -> Result<(), AlignError> {
295 if crate::blast::which("mmseqs").is_some() {
296 Ok(())
297 } else {
298 Err(AlignError::ToolMissing { tool: "mmseqs" })
299 }
300}
301
302#[cfg(test)]
303mod tests {
304 use super::*;
305 use std::io::Read;
306
307 #[test]
308 fn concat_rewrites_headers() {
309 let dir = tempfile::tempdir().unwrap();
310 let a = dir.path().join("A.faa");
311 let b = dir.path().join("B.faa");
312 std::fs::write(&a, ">seq1 descA\nMKLMA\n>seq2\nEEPS\n").unwrap();
313 std::fs::write(&b, ">x\nAAA\n").unwrap();
314 let out = dir.path().join("all.faa");
315 concat_genomes(
316 &[
317 GenomeInput { id: "g1".into(), fasta: a },
318 GenomeInput { id: "g2".into(), fasta: b },
319 ],
320 &out,
321 )
322 .unwrap();
323 let mut s = String::new();
324 File::open(&out).unwrap().read_to_string(&mut s).unwrap();
325 assert!(s.contains(">g1|seq1 descA\n"));
326 assert!(s.contains(">g1|seq2\n"));
327 assert!(s.contains(">g2|x\n"));
328 assert!(s.contains("MKLMA"));
329 }
330
331 #[test]
332 fn concat_rejects_bad_genome_id() {
333 let dir = tempfile::tempdir().unwrap();
334 let a = dir.path().join("A.faa");
335 std::fs::write(&a, ">s\nA\n").unwrap();
336 let err = concat_genomes(
337 &[GenomeInput { id: "g|1".into(), fasta: a }],
338 &dir.path().join("out.faa"),
339 )
340 .unwrap_err();
341 assert!(matches!(err, AlignError::BadArg(_)));
342 }
343
344 #[test]
345 fn parses_cluster_tsv() {
346 let dir = tempfile::tempdir().unwrap();
347 let p = dir.path().join("c.tsv");
348 std::fs::write(&p, "rep1\trep1\nrep1\tmemA\nrep2\trep2\n").unwrap();
349 let m = parse_cluster_tsv(&p).unwrap();
350 assert_eq!(m.get("rep1").unwrap(), &vec!["rep1".to_string(), "memA".to_string()]);
351 assert_eq!(m.get("rep2").unwrap(), &vec!["rep2".to_string()]);
352 }
353
354 #[test]
355 fn split_prefix_first_pipe_only() {
356 assert_eq!(split_genome_prefix("g1|sp|P12345|X"), Some(("g1", "sp|P12345|X")));
357 assert_eq!(split_genome_prefix("unpre"), None);
358 }
359}