1
2use anyhow::{Context, Result};
3use rustc_hash::{FxHashMap, FxHashSet};
4use std::fs::File;
5use std::io::{BufRead, BufReader, BufWriter, Read, Write};
6use std::path::{Path, PathBuf};
7use std::process::Command;
8use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering};
9use std::sync::{Arc, Mutex, mpsc};
10use std::time::Duration;
11use std::thread;
12
13const NCBI_FTP_BASE: &str = "https://ftp.ncbi.nlm.nih.gov/genomes";
14const NCBI_TAXDUMP_URL: &str = "https://ftp.ncbi.nlm.nih.gov/pub/taxonomy/taxdump.tar.gz";
15const NCBI_DATASETS_API: &str = "https://api.ncbi.nlm.nih.gov/datasets/v2";
16const PLSDB_META_URL: &str = "https://ccb-microbe.cs.uni-saarland.de/plsdb2025/download_meta.tar.gz";
17const PLSDB_FASTA_URL: &str = "https://ccb-microbe.cs.uni-saarland.de/plsdb2025/download_fasta";
18
19const API_BATCH_SIZE: usize = 1000;
20
21#[derive(Clone, Debug, Default)]
22pub struct PlsdbOptions {
23
24 pub dir: Option<PathBuf>,
25
26 pub skip: bool,
27}
28
29#[derive(Clone, Debug)]
30pub struct FlankBuildConfig {
31
32 pub flanking_length: usize,
33
34 pub queue_buffer_gb: u32,
35
36 pub plsdb: PlsdbOptions,
37}
38
39impl Default for FlankBuildConfig {
40 fn default() -> Self {
41 Self {
42 flanking_length: 1050,
43 queue_buffer_gb: 30,
44 plsdb: PlsdbOptions::default(),
45 }
46 }
47}
48
49#[derive(Clone, Debug)]
50pub struct AssemblyInfo {
51 pub accession: String,
52 pub taxid: String,
53 pub species_taxid: String,
54 pub organism_name: String,
55}
56
57#[derive(Clone, Debug)]
58pub struct PlasmidInfo {
59 pub accession: String,
60 pub taxonomy_uid: String,
61 pub genus: String,
62 pub species: String,
63}
64
65#[derive(Clone, Debug)]
66struct PafHit {
67 query_name: String,
68 query_start: usize,
69 query_end: usize,
70 gene_name: String,
71 gene_length: usize,
72 score: i64,
73 mapq: u8,
74 divergence: f32,
75 gap_count: usize,
76 raw_line: String,
77}
78
79impl PafHit {
80 fn from_paf_line(line: &str) -> Option<Self> {
81 let fields: Vec<&str> = line.split('\t').collect();
82 if fields.len() < 12 {
83 return None;
84 }
85
86 let query_name = fields[0].to_string();
87 let query_start: usize = fields[2].parse().ok()?;
88 let query_end: usize = fields[3].parse().ok()?;
89 let gene_name = fields[5].to_string();
90 let gene_length: usize = fields[6].parse().ok()?;
91 let mapq: u8 = fields[11].parse().unwrap_or(0);
92
93 let matches: usize = fields[9].parse().unwrap_or(0);
94 let block_len: usize = fields[10].parse().unwrap_or(0);
95
96 let mut score: i64 = 0;
97 let mut divergence: f32 = 1.0;
98
99 for field in &fields[12..] {
100 if let Some(val) = field.strip_prefix("AS:i:") {
101 score = val.parse().unwrap_or(0);
102 } else if let Some(val) = field.strip_prefix("de:f:") {
103 divergence = val.parse().unwrap_or(1.0);
104 }
105 }
106
107 let gap_count = block_len.saturating_sub(matches);
108
109 Some(PafHit {
110 query_name,
111 query_start,
112 query_end,
113 gene_name,
114 gene_length,
115 score,
116 mapq,
117 divergence,
118 gap_count,
119 raw_line: line.to_string(),
120 })
121 }
122
123 fn overlaps(&self, other: &PafHit) -> bool {
124 if self.query_name != other.query_name {
125 return false;
126 }
127 let start = self.query_start.max(other.query_start);
128 let end = self.query_end.min(other.query_end);
129 if start >= end {
130 return false;
131 }
132 let overlap = end - start;
133 let self_len = self.query_end - self.query_start;
134 let other_len = other.query_end - other.query_start;
135 overlap * 2 > self_len || overlap * 2 > other_len
136 }
137}
138
139fn compare_paf_hits(a: &PafHit, b: &PafHit) -> std::cmp::Ordering {
140 use std::cmp::Ordering;
141 b.score.cmp(&a.score)
142 .then_with(|| b.gene_length.cmp(&a.gene_length))
143 .then_with(|| b.mapq.cmp(&a.mapq))
144 .then_with(|| a.divergence.partial_cmp(&b.divergence).unwrap_or(Ordering::Equal))
145 .then_with(|| a.gap_count.cmp(&b.gap_count))
146 .then_with(|| a.gene_name.cmp(&b.gene_name))
147}
148
149fn deduplicate_paf_hits(hits: Vec<PafHit>) -> Vec<PafHit> {
150 if hits.is_empty() {
151 return hits;
152 }
153
154 let mut groups: Vec<Vec<PafHit>> = Vec::new();
155 for hit in hits {
156 let mut found = false;
157 for group in &mut groups {
158 if group.iter().any(|h| h.overlaps(&hit)) {
159 group.push(hit.clone());
160 found = true;
161 break;
162 }
163 }
164 if !found {
165 groups.push(vec![hit]);
166 }
167 }
168
169 groups
170 .into_iter()
171 .map(|mut g| {
172 g.sort_by(compare_paf_hits);
173 g.into_iter().next().unwrap()
174 })
175 .collect()
176}
177
178#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
179pub enum GenomeSizeCategory {
180
181 Small,
182
183 Medium,
184
185 Large,
186}
187
188const SMALL_GENOME_THRESHOLD: u64 = 5 * 1024 * 1024;
189const LARGE_GENOME_THRESHOLD: u64 = 20 * 1024 * 1024;
190
191impl GenomeSizeCategory {
192
193 pub fn from_size(size_bytes: u64) -> Self {
194 if size_bytes < SMALL_GENOME_THRESHOLD {
195 GenomeSizeCategory::Small
196 } else if size_bytes < LARGE_GENOME_THRESHOLD {
197 GenomeSizeCategory::Medium
198 } else {
199 GenomeSizeCategory::Large
200 }
201 }
202
203 pub fn thread_count(&self, max_threads: usize) -> usize {
204 let recommended = match self {
205 GenomeSizeCategory::Small => 4,
206 GenomeSizeCategory::Medium => 6,
207 GenomeSizeCategory::Large => 8,
208 };
209
210 recommended.min(max_threads)
211 }
212
213 pub fn name(&self) -> &'static str {
214 match self {
215 GenomeSizeCategory::Small => "small (<5MB)",
216 GenomeSizeCategory::Medium => "medium (5-20MB)",
217 GenomeSizeCategory::Large => "large (>20MB)",
218 }
219 }
220}
221
222fn bucket_genomes_by_size(genome_files: &[PathBuf]) -> FxHashMap<GenomeSizeCategory, Vec<PathBuf>> {
223 let mut buckets: FxHashMap<GenomeSizeCategory, Vec<PathBuf>> = FxHashMap::default();
224
225 for path in genome_files {
226 let size = std::fs::metadata(path)
227 .map(|m| m.len())
228 .unwrap_or(0);
229 let category = GenomeSizeCategory::from_size(size);
230 buckets.entry(category).or_default().push(path.clone());
231 }
232
233 buckets
234}
235
236#[derive(Clone, Debug)]
237pub struct CatalogEntry {
238 pub accession: String,
239 pub taxid: String,
240 pub species_taxid: String,
241 pub genus: String,
242 pub species: String,
243 pub organism_name: String,
244 pub source: String,
245}
246
247pub struct GenomeCatalog {
248 entries: FxHashMap<String, CatalogEntry>,
249}
250
251impl Default for GenomeCatalog {
252 fn default() -> Self {
253 Self::new()
254 }
255}
256
257impl GenomeCatalog {
258 pub fn new() -> Self {
259 Self {
260 entries: FxHashMap::default(),
261 }
262 }
263
264 pub fn add_assembly_with_taxonomy(
265 &mut self,
266 asm: &AssemblyInfo,
267 source: &str,
268 taxonomy: Option<&TaxonomyDB>,
269 ) {
270 let (parsed_genus, species) = parse_organism_name(&asm.organism_name);
271
272 let genus = resolve_genus(&asm.organism_name, &asm.taxid, &parsed_genus, taxonomy);
273
274 let entry = CatalogEntry {
275 accession: asm.accession.clone(),
276 taxid: asm.taxid.clone(),
277 species_taxid: asm.species_taxid.clone(),
278 genus,
279 species,
280 organism_name: asm.organism_name.clone(),
281 source: source.to_string(),
282 };
283
284 self.entries.insert(asm.accession.clone(), entry.clone());
285 if let Some(base) = asm.accession.split('.').next() {
286 self.entries.insert(base.to_string(), entry);
287 }
288 }
289
290 pub fn add_plasmid_with_taxonomy(
291 &mut self,
292 plasmid: &PlasmidInfo,
293 taxonomy: Option<&TaxonomyDB>,
294 ) {
295
296 let organism_name = format!("{} {}", plasmid.genus, plasmid.species);
297
298 let genus = resolve_genus(&organism_name, &plasmid.taxonomy_uid, &plasmid.genus, taxonomy);
299
300 let entry = CatalogEntry {
301 accession: plasmid.accession.clone(),
302 taxid: plasmid.taxonomy_uid.clone(),
303 species_taxid: String::new(),
304 genus,
305 species: plasmid.species.clone(),
306 organism_name,
307 source: "plsdb".to_string(),
308 };
309
310 self.entries.insert(plasmid.accession.clone(), entry.clone());
311
312 let base = plasmid.accession
313 .strip_prefix("NZ_")
314 .unwrap_or(&plasmid.accession);
315 let no_ver = base.split('.').next().unwrap_or(base);
316 self.entries.insert(no_ver.to_string(), entry.clone());
317
318 let underscore_key = plasmid.accession.replace('.', "_");
319 self.entries.insert(underscore_key, entry);
320 }
321
322 pub fn get_genus(&self, accession: &str) -> Option<&str> {
323
324 if let Some(entry) = self.entries.get(accession) {
325 return Some(&entry.genus);
326 }
327
328 if let Some(base) = accession.split('.').next() {
329 if let Some(entry) = self.entries.get(base) {
330 return Some(&entry.genus);
331 }
332 }
333
334 let stripped = accession.strip_prefix("NZ_").unwrap_or(accession);
335 if let Some(entry) = self.entries.get(stripped) {
336 return Some(&entry.genus);
337 }
338 let stripped_base = stripped.split('.').next().unwrap_or(stripped);
339 if let Some(entry) = self.entries.get(stripped_base) {
340 return Some(&entry.genus);
341 }
342
343 if let Some(last_underscore) = accession.rfind('_') {
344 let suffix = &accession[last_underscore + 1..];
345
346 if !suffix.is_empty() && suffix.chars().all(|c| c.is_ascii_digit()) {
347 let with_dot = format!("{}.{}", &accession[..last_underscore], suffix);
348 if let Some(entry) = self.entries.get(&with_dot) {
349 return Some(&entry.genus);
350 }
351
352 let stripped_with_dot = with_dot.strip_prefix("NZ_").unwrap_or(&with_dot);
353 if let Some(entry) = self.entries.get(stripped_with_dot) {
354 return Some(&entry.genus);
355 }
356 }
357 }
358
359 None
360 }
361
362 pub fn save(&self, path: &Path) -> Result<()> {
363 let file = File::create(path)?;
364 let mut writer = BufWriter::new(file);
365
366 writeln!(writer, "accession\ttaxid\tspecies_taxid\tgenus\tspecies\torganism_name\tsource")?;
367
368 let mut seen: FxHashSet<&str> = FxHashSet::default();
369 for entry in self.entries.values() {
370 if seen.contains(entry.accession.as_str()) {
371 continue;
372 }
373 seen.insert(&entry.accession);
374
375 writeln!(
376 writer,
377 "{}\t{}\t{}\t{}\t{}\t{}\t{}",
378 entry.accession,
379 entry.taxid,
380 entry.species_taxid,
381 entry.genus,
382 entry.species,
383 entry.organism_name.replace('\t', " "),
384 entry.source
385 )?;
386 }
387
388 Ok(())
389 }
390
391 pub fn len(&self) -> usize {
392 let mut seen: FxHashSet<&str> = FxHashSet::default();
393 for entry in self.entries.values() {
394 seen.insert(&entry.accession);
395 }
396 seen.len()
397 }
398
399 #[allow(dead_code)]
400 pub fn is_empty(&self) -> bool {
401 self.entries.is_empty()
402 }
403}
404
405fn parse_organism_name(organism: &str) -> (String, String) {
406 let parts: Vec<&str> = organism.split_whitespace().collect();
407
408 let genus = parts.first()
409 .map(|s| s.to_string())
410 .unwrap_or_else(|| "Unknown".to_string());
411
412 let species = if parts.len() >= 2 {
413
414 parts[1..].join(" ")
415 } else {
416 "Unknown".to_string()
417 };
418
419 (clean_genus(&genus), species)
420}
421
422fn clean_genus(genus: &str) -> String {
423
424 let cleaned = if let Some(idx) = genus.find('(') {
425 genus[..idx].trim()
426 } else {
427 genus.trim()
428 };
429
430 cleaned.to_string()
431}
432
433fn resolve_genus(
434 organism_name: &str,
435 taxid_str: &str,
436 _parsed_genus: &str,
437 taxonomy: Option<&TaxonomyDB>,
438) -> String {
439
440 let organism_lower = organism_name.to_lowercase();
441 if organism_lower.contains("uncultured") {
442 return "uncultured".to_string();
443 }
444
445 if let Some(tax_db) = taxonomy {
446 if let Ok(taxid) = taxid_str.parse::<u32>() {
447
448 if let Some(genus) = tax_db.get_genus(taxid) {
449 return clean_genus(&genus);
450 }
451
452 if let Some((name, _rank)) = tax_db.get_genus_or_higher(taxid) {
453 return clean_genus(&name);
454 }
455 }
456 }
457
458 let cleaned = clean_genus(organism_name);
459 if cleaned.is_empty() {
460 "unknown".to_string()
461 } else {
462 cleaned
463 }
464}
465
466pub struct TaxonomyDB {
467
468 names: FxHashMap<u32, String>,
469
470 nodes: FxHashMap<u32, (u32, String)>,
471}
472
473impl Default for TaxonomyDB {
474 fn default() -> Self {
475 Self::new()
476 }
477}
478
479impl TaxonomyDB {
480 pub fn new() -> Self {
481 Self {
482 names: FxHashMap::default(),
483 nodes: FxHashMap::default(),
484 }
485 }
486
487 pub fn load(taxdump_dir: &Path) -> Result<Self> {
488 let mut db = Self::new();
489
490 let names_path = taxdump_dir.join("names.dmp");
491 let nodes_path = taxdump_dir.join("nodes.dmp");
492
493 if !names_path.exists() || !nodes_path.exists() {
494 anyhow::bail!("Taxonomy files not found in {}", taxdump_dir.display());
495 }
496
497 eprintln!(" Loading names.dmp...");
498 let file = File::open(&names_path)?;
499 let reader = BufReader::new(file);
500
501 for line in reader.lines() {
502 let line = line?;
503 let fields: Vec<&str> = line.split("\t|\t").collect();
504 if fields.len() < 4 {
505 continue;
506 }
507
508 let name_class = fields[3].trim_end_matches("\t|");
509 if name_class != "scientific name" {
510 continue;
511 }
512
513 let taxid: u32 = match fields[0].parse() {
514 Ok(id) => id,
515 Err(_) => continue,
516 };
517 let name = fields[1].to_string();
518
519 db.names.insert(taxid, name);
520 }
521
522 eprintln!(" Loading nodes.dmp...");
523 let file = File::open(&nodes_path)?;
524 let reader = BufReader::new(file);
525
526 for line in reader.lines() {
527 let line = line?;
528 let fields: Vec<&str> = line.split("\t|\t").collect();
529 if fields.len() < 3 {
530 continue;
531 }
532
533 let taxid: u32 = match fields[0].parse() {
534 Ok(id) => id,
535 Err(_) => continue,
536 };
537 let parent_taxid: u32 = match fields[1].parse() {
538 Ok(id) => id,
539 Err(_) => continue,
540 };
541 let rank = fields[2].to_string();
542
543 db.nodes.insert(taxid, (parent_taxid, rank));
544 }
545
546 eprintln!(" Loaded {} taxa, {} nodes", db.names.len(), db.nodes.len());
547 Ok(db)
548 }
549
550 pub fn get_genus(&self, taxid: u32) -> Option<String> {
551 let mut current = taxid;
552 let mut visited = 0;
553
554 while visited < 50 {
555 if let Some((parent, rank)) = self.nodes.get(¤t) {
556 if rank == "genus" {
557 return self.names.get(¤t).cloned();
558 }
559
560 if *parent == current {
561 return None;
562 }
563
564 current = *parent;
565 visited += 1;
566 } else {
567 return None;
568 }
569 }
570
571 None
572 }
573
574 pub fn get_genus_or_higher(&self, taxid: u32) -> Option<(String, String)> {
575 let mut current = taxid;
576 let mut visited = 0;
577
578 let target_ranks = ["genus", "family", "order", "class", "phylum"];
579 let mut best_match: Option<(String, String, usize)> = None;
580
581 while visited < 50 {
582 if let Some((parent, rank)) = self.nodes.get(¤t) {
583
584 if let Some(priority) = target_ranks.iter().position(|r| r == rank) {
585 if let Some(name) = self.names.get(¤t) {
586
587 if rank == "genus" {
588 return Some((name.clone(), rank.clone()));
589 }
590
591 if best_match.is_none() || priority < best_match.as_ref().unwrap().2 {
592 best_match = Some((name.clone(), rank.clone(), priority));
593 }
594 }
595 }
596
597 if *parent == current {
598 break;
599 }
600
601 current = *parent;
602 visited += 1;
603 } else {
604 break;
605 }
606 }
607
608 best_match.map(|(name, rank, _)| (name, rank))
609 }
610}
611
612#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
613pub struct BuildState {
614 pub started_at: String,
615 pub last_updated: String,
616 pub current_step: String,
617 pub completed_steps: Vec<String>,
618 pub genomes_downloaded: usize,
619 pub plsdb_extracted: usize,
620 pub alignments_done: bool,
621 pub flanking_extracted: bool,
622}
623
624impl Default for BuildState {
625 fn default() -> Self {
626 Self::new()
627 }
628}
629
630impl BuildState {
631 pub fn new() -> Self {
632 let now = chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string();
633 Self {
634 started_at: now.clone(),
635 last_updated: now,
636 current_step: String::new(),
637 completed_steps: Vec::new(),
638 genomes_downloaded: 0,
639 plsdb_extracted: 0,
640 alignments_done: false,
641 flanking_extracted: false,
642 }
643 }
644
645 pub fn load(path: &Path) -> Option<Self> {
646 std::fs::read_to_string(path)
647 .ok()
648 .and_then(|s| serde_json::from_str(&s).ok())
649 }
650
651 pub fn save(&self, path: &Path) -> Result<()> {
652 let json = serde_json::to_string_pretty(self)?;
653 std::fs::write(path, json)?;
654 Ok(())
655 }
656
657 pub fn update_step(&mut self, step: &str) {
658 self.current_step = step.to_string();
659 self.last_updated = chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string();
660 }
661
662 pub fn complete_step(&mut self, step: &str) {
663 if !self.completed_steps.contains(&step.to_string()) {
664 self.completed_steps.push(step.to_string());
665 }
666 self.last_updated = chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string();
667 }
668
669 pub fn is_completed(&self, step: &str) -> bool {
670 self.completed_steps.contains(&step.to_string())
671 }
672}
673
674pub struct FlankingDbBuilder {
675 output_dir: PathBuf,
676 threads: usize,
677 amr_db_path: PathBuf,
678
679 email: String,
680
681 config: FlankBuildConfig,
682}
683
684impl FlankingDbBuilder {
685 pub fn new(amr_db: &Path, output_dir: &Path, threads: usize, email: &str, config: FlankBuildConfig) -> Self {
686 Self {
687 output_dir: output_dir.to_path_buf(),
688 threads,
689 amr_db_path: amr_db.to_path_buf(),
690 email: email.to_string(),
691 config,
692 }
693 }
694
695 pub fn build(&self) -> Result<()> {
696 eprintln!("\n============================================================");
697 eprintln!(" ARGenus Flanking Database Builder (Rust)");
698 eprintln!("============================================================");
699 eprintln!("Output directory: {}", self.output_dir.display());
700 eprintln!("NCBI Email: {}", self.email);
701 eprintln!("Download method: NCBI Datasets API (batch)");
702 eprintln!("Threads: {}", self.threads);
703 eprintln!("Flanking length: {} bp", self.config.flanking_length);
704 eprintln!("Queue buffer: {} GB", self.config.queue_buffer_gb);
705 eprintln!("Alignment: minimap2 (AMR indexed)");
706 eprintln!();
707
708 std::fs::create_dir_all(&self.output_dir)?;
709 let genomes_dir = self.output_dir.join("genomes");
710 std::fs::create_dir_all(&genomes_dir)?;
711
712 let plsdb_dir = self.config.plsdb.dir.clone().unwrap_or_else(|| self.output_dir.join("plsdb"));
713 if !self.config.plsdb.skip {
714 std::fs::create_dir_all(&plsdb_dir)?;
715 }
716 let taxonomy_dir = self.output_dir.join("taxonomy");
717 let temp_dir = self.output_dir.join("temp");
718 std::fs::create_dir_all(&temp_dir)?;
719 let state_path = self.output_dir.join("build_state.json");
720
721 let mut state = BuildState::load(&state_path).unwrap_or_else(|| {
722 eprintln!("Starting fresh build...");
723 BuildState::new()
724 });
725
726 if !state.completed_steps.is_empty() {
727 eprintln!("Resuming from previous state:");
728 eprintln!(" Started: {}", state.started_at);
729 eprintln!(" Last updated: {}", state.last_updated);
730 eprintln!(" Completed steps: {}", state.completed_steps.join(", "));
731 eprintln!(" Genomes downloaded: {}", state.genomes_downloaded);
732 eprintln!(" PLSDB extracted: {}", state.plsdb_extracted);
733 eprintln!();
734 }
735
736 let mut catalog = GenomeCatalog::new();
737 let catalog_path = self.output_dir.join("genome_catalog.tsv");
738
739 if !state.is_completed("taxonomy_download") {
740 state.update_step("taxonomy_download");
741 state.save(&state_path)?;
742 eprintln!("[1/9] Downloading NCBI taxonomy database...");
743 self.download_taxdump(&taxonomy_dir)?;
744 state.complete_step("taxonomy_download");
745 state.save(&state_path)?;
746 } else {
747 eprintln!("[1/9] Taxonomy database already downloaded, skipping...");
748 }
749
750 eprintln!("[2/9] Loading taxonomy database...");
751 let taxonomy = match TaxonomyDB::load(&taxonomy_dir) {
752 Ok(db) => {
753 eprintln!(" Taxonomy database loaded successfully");
754 Some(db)
755 }
756 Err(e) => {
757 eprintln!(" Warning: Failed to load taxonomy: {}", e);
758 eprintln!(" Genus will be parsed from organism names instead");
759 None
760 }
761 };
762
763 eprintln!("[3/9] Downloading NCBI assembly summaries (GenBank)...");
764 let assemblies = self.download_assembly_summaries()?;
765 eprintln!(" GenBank assemblies: {} (Complete Genome + Chromosome)", assemblies.len());
766
767 for asm in &assemblies {
768 catalog.add_assembly_with_taxonomy(asm, "genbank", taxonomy.as_ref());
769 }
770
771 let standalone_plasmids = if self.config.plsdb.skip {
772 eprintln!("[4/9] PLSDB skipped (--skip-plsdb)");
773 eprintln!("[5/9] PLSDB metadata skipped");
774 Vec::new()
775 } else if self.config.plsdb.dir.is_some() {
776
777 eprintln!("[4/9] Using pre-downloaded PLSDB: {}", plsdb_dir.display());
778
779 let nuccore_csv = plsdb_dir.join("nuccore.csv");
780 let fasta_path = plsdb_dir.join("sequences.fasta");
781 if !nuccore_csv.exists() {
782
783 let meta_tar = plsdb_dir.join("meta.tar.gz");
784 if meta_tar.exists() {
785 eprintln!(" Extracting meta.tar.gz...");
786 std::process::Command::new("tar")
787 .args(["-xzf", meta_tar.to_str().unwrap(), "-C", plsdb_dir.to_str().unwrap()])
788 .status()
789 .with_context(|| "Failed to extract PLSDB metadata")?;
790 } else {
791 anyhow::bail!("PLSDB directory missing nuccore.csv and meta.tar.gz: {}", plsdb_dir.display());
792 }
793 }
794 if !fasta_path.exists() {
795 anyhow::bail!("PLSDB directory missing sequences.fasta: {}", plsdb_dir.display());
796 }
797 state.complete_step("plsdb_download");
798 state.save(&state_path)?;
799
800 eprintln!("[5/9] Loading PLSDB metadata...");
801 let plasmids = self.load_plsdb_plasmids(&plsdb_dir)?;
802 eprintln!(" Standalone circular+complete plasmids: {} (not in any assembly)",
803 plasmids.len());
804 plasmids
805 } else {
806
807 if !state.is_completed("plsdb_download") {
808 state.update_step("plsdb_download");
809 state.save(&state_path)?;
810 eprintln!("[4/9] Downloading PLSDB database...");
811 self.download_plsdb(&plsdb_dir)?;
812 state.complete_step("plsdb_download");
813 state.save(&state_path)?;
814 } else {
815 eprintln!("[4/9] PLSDB database already downloaded, skipping...");
816 }
817
818 eprintln!("[5/9] Loading PLSDB metadata...");
819 let plasmids = self.load_plsdb_plasmids(&plsdb_dir)?;
820 eprintln!(" Standalone circular+complete plasmids: {} (not in any assembly)",
821 plasmids.len());
822 plasmids
823 };
824
825 for plasmid in &standalone_plasmids {
826 catalog.add_plasmid_with_taxonomy(plasmid, taxonomy.as_ref());
827 }
828
829 eprintln!("[6/9] Saving unified genome catalog...");
830 catalog.save(&catalog_path)?;
831 eprintln!(" Catalog entries: {}", catalog.len());
832 eprintln!(" Saved to: {}", catalog_path.display());
833
834 let paf_output = self.output_dir.join("all_alignments.paf");
835 let merged_hits = self.output_dir.join("merged_alignment_hits.tsv");
836 let output_tsv = self.output_dir.join("all_flanking_sequences.tsv");
837
838 if !state.is_completed("alignment_flanking") {
839
840 state.update_step("genome_download");
841 state.save(&state_path)?;
842
843 eprintln!("[7/9] Batch processing: Download + Align NCBI genomes...");
844 eprintln!(" Producer-consumer pattern with {} GB queue buffer", self.config.queue_buffer_gb);
845
846 let downloaded = self.download_and_align_batches(
847 &assemblies,
848 &genomes_dir,
849 &temp_dir,
850 &paf_output,
851 &catalog,
852 )?;
853 state.genomes_downloaded = downloaded;
854 state.complete_step("genome_download");
855 state.save(&state_path)?;
856 eprintln!(" Processed {} genomes", downloaded);
857
858 if self.config.plsdb.skip {
859 eprintln!("[8/9] PLSDB processing skipped (--skip-plsdb)");
860 state.plsdb_extracted = 0;
861 } else {
862 state.update_step("plsdb_extract");
863 state.save(&state_path)?;
864 eprintln!("[8/9] Processing PLSDB sequences...");
865 let plsdb_count = self.align_plsdb(
866 &standalone_plasmids,
867 &plsdb_dir,
868 &temp_dir,
869 &paf_output,
870 )?;
871 state.plsdb_extracted = plsdb_count;
872 state.complete_step("plsdb_extract");
873 state.save(&state_path)?;
874 }
875
876 state.update_step("alignment_flanking");
877 state.save(&state_path)?;
878
879 eprintln!("[9/9] Processing alignment results...");
880 self.convert_paf_to_merged(&paf_output, &merged_hits)?;
881 state.alignments_done = true;
882 state.save(&state_path)?;
883
884 eprintln!(" Extracting flanking sequences (flanking_length: {} bp)...",
885 self.config.flanking_length);
886 self.extract_flanking_sequences(
887 &merged_hits,
888 &genomes_dir,
889 &output_tsv,
890 &catalog,
891 )?;
892
893 if !self.config.plsdb.skip {
894 self.extract_flanking_from_plsdb(
895 &merged_hits,
896 &plsdb_dir,
897 &output_tsv,
898 &catalog,
899 )?;
900 }
901 state.flanking_extracted = true;
902 state.complete_step("alignment_flanking");
903 state.save(&state_path)?;
904
905 eprintln!(" Cleaning up PAF file...");
906 std::fs::remove_file(&paf_output).ok();
907 } else {
908 eprintln!("[7-9] Batch pipeline already completed, skipping...");
909 }
910
911 if temp_dir.exists() {
912 eprintln!("\nCleaning up temp files...");
913 std::fs::remove_dir_all(&temp_dir).ok();
914 }
915
916 let fdb_path = self.output_dir.join("flanking.fdb");
917 if !state.is_completed("fdb_build") && output_tsv.exists() {
918 state.update_step("fdb_build");
919 state.save(&state_path)?;
920 eprintln!("\n[10/10] Building FDB from TSV (external sort + zstd)...");
921
922 let buffer_size_mb = 1024;
923 crate::fdb::build(&output_tsv, &fdb_path, buffer_size_mb, self.threads)?;
924
925 state.complete_step("fdb_build");
926 state.save(&state_path)?;
927 } else if fdb_path.exists() {
928 eprintln!("[10/10] FDB already built, skipping...");
929 }
930
931 eprintln!("\n============================================================");
932 eprintln!(" Build Complete");
933 eprintln!("============================================================");
934 eprintln!("Output files:");
935 eprintln!(" - {}", output_tsv.display());
936 if fdb_path.exists() {
937 let fdb_size = std::fs::metadata(&fdb_path).map(|m| m.len()).unwrap_or(0);
938 eprintln!(" - {} ({:.1} MB)", fdb_path.display(), fdb_size as f64 / 1024.0 / 1024.0);
939 }
940 eprintln!(" - {}", catalog_path.display());
941
942 Ok(())
943 }
944
945 fn download_assembly_summaries(&self) -> Result<Vec<AssemblyInfo>> {
946 let mut assemblies = Vec::new();
947
948 for kingdom in ["bacteria", "archaea"] {
949 let url = format!(
950 "{}/genbank/{}/assembly_summary.txt",
951 NCBI_FTP_BASE, kingdom
952 );
953 eprintln!(" Downloading genbank/{}...", kingdom);
954
955 match self.download_and_parse_assembly_summary(&url, None) {
956 Ok((mut asm, _)) => {
957 eprintln!(" Found {} assemblies (Complete Genome + Chromosome)", asm.len());
958 assemblies.append(&mut asm);
959 }
960 Err(e) => {
961 eprintln!(" Warning: Failed to download {}: {}", url, e);
962 }
963 }
964 }
965
966 eprintln!(" Total assemblies: {}", assemblies.len());
967 Ok(assemblies)
968 }
969
970 fn download_plsdb(&self, plsdb_dir: &Path) -> Result<()> {
971 let meta_tar = plsdb_dir.join("meta.tar.gz");
972 let fasta_path = plsdb_dir.join("sequences.fasta");
973
974 if !plsdb_dir.join("nuccore.csv").exists() {
975 eprintln!(" Downloading PLSDB metadata...");
976 self.download_file(PLSDB_META_URL, &meta_tar)?;
977
978 eprintln!(" Extracting metadata...");
979 let status = Command::new("tar")
980 .args(["-xzf", meta_tar.to_str().unwrap(), "-C", plsdb_dir.to_str().unwrap()])
981 .status()
982 .with_context(|| "Failed to extract PLSDB metadata")?;
983
984 if !status.success() {
985 anyhow::bail!("tar extraction failed");
986 }
987
988 std::fs::remove_file(&meta_tar).ok();
989 } else {
990 eprintln!(" PLSDB metadata already exists, skipping download...");
991 }
992
993 if !fasta_path.exists() {
994 eprintln!(" Downloading PLSDB sequences (~7GB)...");
995 self.download_file(PLSDB_FASTA_URL, &fasta_path)?;
996 } else {
997 eprintln!(" PLSDB sequences already exist, skipping download...");
998 }
999
1000 Ok(())
1001 }
1002
1003 fn download_taxdump(&self, taxonomy_dir: &Path) -> Result<()> {
1004 let tar_path = taxonomy_dir.join("taxdump.tar.gz");
1005 let names_path = taxonomy_dir.join("names.dmp");
1006
1007 if names_path.exists() {
1008 eprintln!(" Taxdump already exists, skipping download...");
1009 return Ok(());
1010 }
1011
1012 std::fs::create_dir_all(taxonomy_dir)?;
1013
1014 eprintln!(" Downloading NCBI taxdump (~60MB)...");
1015 self.download_file(NCBI_TAXDUMP_URL, &tar_path)?;
1016
1017 eprintln!(" Extracting taxdump...");
1018 let status = Command::new("tar")
1019 .args(["-xzf", tar_path.to_str().unwrap(), "-C", taxonomy_dir.to_str().unwrap()])
1020 .status()
1021 .with_context(|| "Failed to extract taxdump")?;
1022
1023 if !status.success() {
1024 anyhow::bail!("tar extraction failed");
1025 }
1026
1027 std::fs::remove_file(&tar_path).ok();
1028
1029 Ok(())
1030 }
1031
1032 fn download_file(&self, url: &str, output_path: &Path) -> Result<()> {
1033 for attempt in 0..3 {
1034 match self.download_file_once(url, output_path) {
1035 Ok(_) => return Ok(()),
1036 Err(e) if attempt < 2 => {
1037 eprintln!(" Download failed (attempt {}): {}", attempt + 1, e);
1038 eprintln!(" Retrying in 5 seconds...");
1039 std::thread::sleep(Duration::from_secs(5));
1040 continue;
1041 }
1042 Err(e) => return Err(e),
1043 }
1044 }
1045 Ok(())
1046 }
1047
1048 fn download_file_once(&self, url: &str, output_path: &Path) -> Result<()> {
1049 let response = ureq::get(url)
1050 .set("User-Agent", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36")
1051 .timeout(Duration::from_secs(7200))
1052 .call()
1053 .with_context(|| format!("Failed to download {}", url))?;
1054
1055 let mut file = File::create(output_path)?;
1056 let mut reader = response.into_reader();
1057
1058 let mut buffer = [0u8; 65536];
1059 let mut total = 0usize;
1060 loop {
1061 match reader.read(&mut buffer) {
1062 Ok(0) => break,
1063 Ok(n) => {
1064 file.write_all(&buffer[..n])?;
1065 total += n;
1066 if total.is_multiple_of(100 * 1024 * 1024) {
1067 eprintln!(" Downloaded {} MB...", total / (1024 * 1024));
1068 }
1069 }
1070 Err(e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
1071 Err(e) => return Err(e.into()),
1072 }
1073 }
1074 eprintln!(" Total: {} MB", total / (1024 * 1024));
1075
1076 Ok(())
1077 }
1078
1079 fn download_and_parse_assembly_summary(
1080 &self,
1081 url: &str,
1082 exclude_set: Option<&FxHashSet<String>>,
1083 ) -> Result<(Vec<AssemblyInfo>, Vec<String>)> {
1084 let response = ureq::get(url)
1085 .timeout(Duration::from_secs(600))
1086 .call()
1087 .with_context(|| format!("Failed to download {}", url))?;
1088
1089 let reader = BufReader::new(response.into_reader());
1090 let mut assemblies = Vec::new();
1091 let mut paired_gca = Vec::new();
1092 let mut skipped_duplicates = 0usize;
1093
1094 for line in reader.lines() {
1095 let line = line?;
1096 if line.starts_with('#') {
1097 continue;
1098 }
1099
1100 let fields: Vec<&str> = line.split('\t').collect();
1101 if fields.len() < 20 {
1102 continue;
1103 }
1104
1105 let accession = fields[0];
1106 let assembly_level = fields[11];
1107
1108 if assembly_level != "Complete Genome" && assembly_level != "Chromosome" {
1109 continue;
1110 }
1111
1112 let ftp_path = fields[19];
1113 if ftp_path == "na" || ftp_path.is_empty() {
1114 continue;
1115 }
1116
1117 if let Some(exclude) = exclude_set {
1118
1119 let acc_base = accession.split('.').next().unwrap_or(accession);
1120 if exclude.contains(accession) || exclude.contains(acc_base) {
1121 skipped_duplicates += 1;
1122 continue;
1123 }
1124 }
1125
1126 if fields.len() > 18 && accession.starts_with("GCF_") {
1127 let paired_asm = fields[17];
1128 let paired_comp = fields[18];
1129 if paired_asm != "na" && !paired_asm.is_empty() && paired_comp == "identical" {
1130 paired_gca.push(paired_asm.to_string());
1131
1132 if let Some(base) = paired_asm.split('.').next() {
1133 paired_gca.push(base.to_string());
1134 }
1135 }
1136 }
1137
1138 assemblies.push(AssemblyInfo {
1139 accession: accession.to_string(),
1140 taxid: fields[5].to_string(),
1141 species_taxid: fields[6].to_string(),
1142 organism_name: fields[7].to_string(),
1143 });
1144 }
1145
1146 if skipped_duplicates > 0 {
1147 eprintln!(" Skipped {} duplicates (have identical RefSeq)", skipped_duplicates);
1148 }
1149
1150 Ok((assemblies, paired_gca))
1151 }
1152
1153 fn load_plsdb_plasmids(&self, plsdb_dir: &Path) -> Result<Vec<PlasmidInfo>> {
1154 let nuccore_path = plsdb_dir.join("nuccore.csv");
1155 let taxonomy_path = plsdb_dir.join("taxonomy.csv");
1156
1157 let mut taxonomy: FxHashMap<String, (String, String)> = FxHashMap::default();
1158 if taxonomy_path.exists() {
1159 let tax_file = File::open(&taxonomy_path)?;
1160 let tax_reader = BufReader::new(tax_file);
1161
1162 for (idx, line) in tax_reader.lines().enumerate() {
1163 let line = line?;
1164 if idx == 0 {
1165 continue;
1166 }
1167
1168 let fields: Vec<&str> = line.split(',').collect();
1169 if fields.len() >= 10 {
1170 let uid = fields[0].to_string();
1171 let genus = fields[8].to_string();
1172 let species = fields[9].to_string();
1173 taxonomy.insert(uid, (genus, species));
1174 }
1175 }
1176 }
1177
1178 let mut plasmids = Vec::new();
1179 let nuc_file = File::open(&nuccore_path)?;
1180 let nuc_reader = BufReader::new(nuc_file);
1181
1182 for (idx, line) in nuc_reader.lines().enumerate() {
1183 let line = line?;
1184 if idx == 0 {
1185 continue;
1186 }
1187
1188 let fields = parse_csv_line(&line);
1189 if fields.len() < 15 {
1190 continue;
1191 }
1192
1193 let completeness = &fields[4];
1194 let topology = &fields[14];
1195 let assembly_uid = &fields[9];
1196
1197 if completeness != "complete" {
1198 continue;
1199 }
1200 if topology != "circular" {
1201 continue;
1202 }
1203
1204 if assembly_uid != "-1" {
1205 continue;
1206 }
1207
1208 let taxonomy_uid = &fields[12];
1209 let (genus, species) = taxonomy.get(taxonomy_uid)
1210 .cloned()
1211 .unwrap_or_else(|| ("Unknown".to_string(), "Unknown".to_string()));
1212
1213 plasmids.push(PlasmidInfo {
1214 accession: fields[1].clone(),
1215 taxonomy_uid: taxonomy_uid.clone(),
1216 genus,
1217 species,
1218 });
1219 }
1220
1221 Ok(plasmids)
1222 }
1223
1224 fn download_and_align_batches(
1225 &self,
1226 assemblies: &[AssemblyInfo],
1227 genomes_dir: &Path,
1228 temp_dir: &Path,
1229 paf_output: &Path,
1230 _catalog: &GenomeCatalog,
1231 ) -> Result<usize> {
1232 let all_accessions_path = temp_dir.join("accessions_all.txt");
1233 let done_accessions_path = temp_dir.join("accessions_done.txt");
1234
1235 let accessions: Vec<String> = assemblies.iter()
1236 .map(|a| a.accession.clone())
1237 .collect();
1238
1239 if !all_accessions_path.exists() {
1240 eprintln!(" Saving {} accessions to temp file...", accessions.len());
1241 let mut file = File::create(&all_accessions_path)?;
1242 for acc in &accessions {
1243 writeln!(file, "{}", acc)?;
1244 }
1245 }
1246
1247 let mut done_set: FxHashSet<String> = FxHashSet::default();
1248 if done_accessions_path.exists() {
1249 let file = File::open(&done_accessions_path)?;
1250 let reader = BufReader::new(file);
1251 for acc in reader.lines().map_while(Result::ok) {
1252 done_set.insert(acc.trim().to_string());
1253 }
1254
1255 if !done_set.is_empty() {
1256 if !paf_output.exists() {
1257 eprintln!(" WARNING: PAF file missing but {} genomes marked as done", done_set.len());
1258 eprintln!(" Clearing done list and reprocessing all genomes...");
1259 done_set.clear();
1260 std::fs::remove_file(&done_accessions_path).ok();
1261 } else {
1262
1263 let paf_size = std::fs::metadata(paf_output)?.len();
1264 if paf_size == 0 {
1265 eprintln!(" WARNING: PAF file is empty but {} genomes marked as done", done_set.len());
1266 eprintln!(" Clearing done list and reprocessing all genomes...");
1267 done_set.clear();
1268 std::fs::remove_file(&done_accessions_path).ok();
1269 } else {
1270
1271 let mut missing_genomes = Vec::new();
1272 for acc in &done_set {
1273 let genome_path = genomes_dir.join(format!("{}.fna", acc));
1274 if !genome_path.exists() {
1275 missing_genomes.push(acc.clone());
1276 }
1277 }
1278 if !missing_genomes.is_empty() {
1279 eprintln!(" WARNING: {} genome files missing for done accessions", missing_genomes.len());
1280 for acc in &missing_genomes {
1281 done_set.remove(acc);
1282 }
1283 eprintln!(" Removed missing genomes from done list, will re-download");
1284 }
1285
1286 if !done_set.is_empty() {
1287 eprintln!(" Resume: {} already processed (PAF: {} MB)",
1288 done_set.len(), paf_size / 1024 / 1024);
1289 }
1290 }
1291 }
1292 }
1293 }
1294
1295 let remaining: Vec<&String> = accessions.iter()
1296 .filter(|a| !done_set.contains(*a))
1297 .collect();
1298
1299 if remaining.is_empty() {
1300 eprintln!(" All {} genomes already processed", accessions.len());
1301 return Ok(accessions.len());
1302 }
1303
1304 eprintln!(" Remaining: {}/{} genomes to process", remaining.len(), accessions.len());
1305
1306 let bytes_per_batch = API_BATCH_SIZE * 3 * 1024 * 1024;
1307 let queue_capacity = std::cmp::max(
1308 1,
1309 (self.config.queue_buffer_gb as usize * 1024 * 1024 * 1024) / bytes_per_batch
1310 );
1311 eprintln!(" Queue capacity: {} batches (based on {} GB buffer)", queue_capacity, self.config.queue_buffer_gb);
1312
1313 let batches: Vec<Vec<String>> = remaining.chunks(API_BATCH_SIZE)
1314 .map(|chunk| chunk.iter().map(|s| (*s).clone()).collect())
1315 .collect();
1316 let total_batches = batches.len();
1317
1318 let (tx, rx) = mpsc::sync_channel::<(usize, Vec<PathBuf>)>(queue_capacity);
1319
1320 let downloaded = Arc::new(AtomicUsize::new(done_set.len()));
1321 let aligned = Arc::new(AtomicUsize::new(done_set.len()));
1322 let download_error = Arc::new(AtomicBool::new(false));
1323
1324 let genomes_dir = genomes_dir.to_path_buf();
1325 let temp_dir_producer = temp_dir.to_path_buf();
1326 let temp_dir_consumer = temp_dir.to_path_buf();
1327 let email = self.email.clone();
1328 let downloaded_clone = Arc::clone(&downloaded);
1329 let download_error_clone = Arc::clone(&download_error);
1330 let total_accessions = accessions.len();
1331
1332 let producer = thread::spawn(move || -> Result<()> {
1333 for (batch_idx, batch) in batches.into_iter().enumerate() {
1334 if download_error_clone.load(Ordering::Relaxed) {
1335 break;
1336 }
1337
1338 let zip_path = temp_dir_producer.join(format!("batch_{:04}.zip", batch_idx));
1339
1340 let url = format!("{}/genome/download", NCBI_DATASETS_API);
1341 let acc_list: Vec<&str> = batch.iter().map(|s| s.as_str()).collect();
1342 let request_body = serde_json::json!({
1343 "accessions": acc_list,
1344 "include_annotation_type": ["GENOME_FASTA"]
1345 });
1346
1347 let response = ureq::post(&url)
1348 .set("Content-Type", "application/json")
1349 .set("ncbi-client-id", &email)
1350 .timeout(Duration::from_secs(3600))
1351 .send_json(&request_body);
1352
1353 match response {
1354 Ok(resp) => {
1355
1356 let mut zip_file = File::create(&zip_path)?;
1357 let mut reader = resp.into_reader();
1358 std::io::copy(&mut reader, &mut zip_file)?;
1359 drop(zip_file);
1360
1361 let genome_files = Self::extract_batch_to_files_static(&zip_path, &genomes_dir)?;
1362
1363 std::fs::remove_file(&zip_path).ok();
1364
1365 downloaded_clone.fetch_add(batch.len(), Ordering::Relaxed);
1366 eprintln!(" [Download] Batch {}/{}: {} genomes ({}/{})",
1367 batch_idx + 1, total_batches, batch.len(),
1368 downloaded_clone.load(Ordering::Relaxed), total_accessions);
1369
1370 if tx.send((batch_idx, genome_files)).is_err() {
1371 break;
1372 }
1373 }
1374 Err(e) => {
1375 eprintln!(" [Download] Batch {} failed: {}", batch_idx + 1, e);
1376 download_error_clone.store(true, Ordering::Relaxed);
1377 break;
1378 }
1379 }
1380
1381 std::thread::sleep(Duration::from_millis(200));
1382 }
1383 Ok(())
1384 });
1385
1386 let paf_file = Arc::new(Mutex::new(
1387 std::fs::OpenOptions::new()
1388 .create(true)
1389 .append(true)
1390 .open(paf_output)?
1391 ));
1392 let done_file = Arc::new(Mutex::new(
1393 std::fs::OpenOptions::new()
1394 .create(true)
1395 .append(true)
1396 .open(&done_accessions_path)?
1397 ));
1398
1399 let amr_db = self.amr_db_path.clone();
1400 let max_threads = self.threads;
1401
1402 while let Ok((batch_idx, genome_files)) = rx.recv() {
1403 if genome_files.is_empty() {
1404 continue;
1405 }
1406
1407 let buckets = bucket_genomes_by_size(&genome_files);
1408 let mut batch_hits: Vec<PafHit> = Vec::new();
1409
1410 for (category, bucket_files) in &buckets {
1411 if bucket_files.is_empty() {
1412 continue;
1413 }
1414
1415 let bucket_threads = category.thread_count(max_threads);
1416 let bucket_suffix = format!("batch_{:04}_{:?}", batch_idx, category);
1417 let bucket_fasta = temp_dir_consumer.join(format!("{}.fas", bucket_suffix));
1418
1419 {
1420 let mut fasta_writer = BufWriter::new(File::create(&bucket_fasta)?);
1421 for genome_path in bucket_files {
1422 let filename = genome_path.file_name()
1423 .and_then(|n| n.to_str())
1424 .unwrap_or("unknown.fna");
1425
1426 let file = File::open(genome_path)?;
1427 let reader = BufReader::new(file);
1428 for line in reader.lines() {
1429 let line = line?;
1430 if let Some(stripped) = line.strip_prefix('>') {
1431 let contig_id = stripped.split_whitespace().next().unwrap_or(stripped);
1432 writeln!(fasta_writer, ">{}|{}", contig_id, filename)?;
1433 } else {
1434 writeln!(fasta_writer, "{}", line)?;
1435 }
1436 }
1437 }
1438 fasta_writer.flush()?;
1439 }
1440
1441 let bucket_paf = temp_dir_consumer.join(format!("{}.paf", bucket_suffix));
1442 let status = Command::new("minimap2")
1443 .args([
1444 "-cx", "asm20",
1445 "-t", &bucket_threads.to_string(),
1446 amr_db.to_str().unwrap(),
1447 bucket_fasta.to_str().unwrap(),
1448 "-o", bucket_paf.to_str().unwrap(),
1449 ])
1450 .stdout(std::process::Stdio::null())
1451 .stderr(std::process::Stdio::null())
1452 .status();
1453
1454 if let Ok(s) = status {
1455 if s.success() && bucket_paf.exists() {
1456
1457 let paf_content = std::fs::read_to_string(&bucket_paf)?;
1458 let hits: Vec<PafHit> = paf_content
1459 .lines()
1460 .filter_map(PafHit::from_paf_line)
1461 .collect();
1462 batch_hits.extend(hits);
1463 }
1464 }
1465
1466 std::fs::remove_file(&bucket_fasta).ok();
1467 std::fs::remove_file(&bucket_paf).ok();
1468 }
1469
1470 let dedup_hits = deduplicate_paf_hits(batch_hits);
1471
1472 {
1473 let mut paf = paf_file.lock().unwrap();
1474 for hit in &dedup_hits {
1475 writeln!(paf, "{}", hit.raw_line)?;
1476 }
1477 paf.flush()?;
1478 }
1479
1480 {
1481 let mut done = done_file.lock().unwrap();
1482 for genome_path in &genome_files {
1483 if let Some(stem) = genome_path.file_stem().and_then(|s| s.to_str()) {
1484 writeln!(done, "{}", stem)?;
1485 }
1486 }
1487 done.flush()?;
1488 }
1489
1490 let count = genome_files.len();
1491 aligned.fetch_add(count, Ordering::Relaxed);
1492
1493 let bucket_info: Vec<String> = buckets.iter()
1494 .map(|(cat, files)| format!("{}={}", cat.name().split_whitespace().next().unwrap_or("?"), files.len()))
1495 .collect();
1496 eprintln!(" [Align] Batch {}/{}: {} genomes [{}] ({}/{})",
1497 batch_idx + 1, total_batches, count,
1498 bucket_info.join(", "),
1499 aligned.load(Ordering::Relaxed), total_accessions);
1500 }
1501
1502 let _ = producer.join();
1503
1504 Ok(aligned.load(Ordering::Relaxed))
1505 }
1506
1507 fn extract_batch_to_files_static(zip_path: &Path, genomes_dir: &Path) -> Result<Vec<PathBuf>> {
1508 let zip_file = File::open(zip_path)?;
1509 let mut archive = zip::ZipArchive::new(zip_file)?;
1510 let mut genome_files = Vec::new();
1511
1512 for i in 0..archive.len() {
1513 let mut file = archive.by_index(i)?;
1514 let name = file.name().to_string();
1515
1516 if name.ends_with("_genomic.fna") || name.ends_with("_genomic.fasta") {
1517 let parts: Vec<&str> = name.split('/').collect();
1518 if let Some(acc_dir) = parts.iter().find(|p| p.starts_with("GCA_") || p.starts_with("GCF_")) {
1519 let filename = format!("{}.fna", acc_dir);
1520 let output_path = genomes_dir.join(&filename);
1521
1522 let mut content = Vec::new();
1523 file.read_to_end(&mut content)?;
1524 std::fs::write(&output_path, &content)?;
1525
1526 genome_files.push(output_path);
1527 }
1528 }
1529 }
1530
1531 Ok(genome_files)
1532 }
1533
1534 fn align_plsdb(
1535 &self,
1536 plasmids: &[PlasmidInfo],
1537 plsdb_dir: &Path,
1538 temp_dir: &Path,
1539 paf_output: &Path,
1540 ) -> Result<usize> {
1541 let fasta_path = plsdb_dir.join("sequences.fasta");
1542
1543 if !fasta_path.exists() {
1544 eprintln!(" Warning: sequences.fasta not found in PLSDB directory");
1545 return Ok(0);
1546 }
1547
1548 let target_accs: FxHashSet<_> = plasmids.iter()
1549 .map(|p| p.accession.clone())
1550 .collect();
1551
1552 if target_accs.is_empty() {
1553 eprintln!(" No PLSDB plasmids to process");
1554 return Ok(0);
1555 }
1556
1557 let acc_list: Vec<_> = target_accs.iter().collect();
1558 let batches: Vec<_> = acc_list.chunks(API_BATCH_SIZE).collect();
1559 let total_batches = batches.len();
1560 let mut total_processed = 0usize;
1561
1562 eprintln!(" Processing {} PLSDB plasmids in {} batches...",
1563 target_accs.len(), total_batches);
1564
1565 let file = File::open(&fasta_path)?;
1566 let reader = BufReader::new(file);
1567
1568 let mut seq_map: FxHashMap<String, String> = FxHashMap::default();
1569 let mut current_acc: Option<String> = None;
1570 let mut current_seq = String::new();
1571
1572 for line in reader.lines() {
1573 let line = line?;
1574 if line.starts_with('>') {
1575 if let Some(acc) = current_acc.take() {
1576 if target_accs.contains(&acc) {
1577 seq_map.insert(acc, std::mem::take(&mut current_seq));
1578 }
1579 }
1580 let header = line.trim_start_matches('>');
1581 let acc = header.split_whitespace().next().unwrap_or("").to_string();
1582 current_acc = Some(acc);
1583 current_seq.clear();
1584 } else {
1585 current_seq.push_str(line.trim());
1586 }
1587 }
1588 if let Some(acc) = current_acc {
1589 if target_accs.contains(&acc) {
1590 seq_map.insert(acc, current_seq);
1591 }
1592 }
1593
1594 eprintln!(" Loaded {} target sequences from PLSDB", seq_map.len());
1595
1596 for (batch_idx, batch) in batches.iter().enumerate() {
1597
1598 let mut size_buckets: FxHashMap<GenomeSizeCategory, Vec<(&str, &str)>> = FxHashMap::default();
1599
1600 for acc in *batch {
1601 if let Some(seq) = seq_map.get(*acc) {
1602
1603 let category = GenomeSizeCategory::from_size(seq.len() as u64);
1604 size_buckets.entry(category)
1605 .or_default()
1606 .push((acc.as_str(), seq.as_str()));
1607 }
1608 }
1609
1610 let mut batch_hits: Vec<PafHit> = Vec::new();
1611
1612 for (category, bucket_seqs) in &size_buckets {
1613 if bucket_seqs.is_empty() {
1614 continue;
1615 }
1616
1617 let bucket_threads = category.thread_count(self.threads);
1618 let bucket_suffix = format!("plsdb_batch_{:04}_{:?}", batch_idx, category);
1619 let bucket_fasta = temp_dir.join(format!("{}.fas", bucket_suffix));
1620
1621 {
1622 let mut fasta_writer = BufWriter::new(File::create(&bucket_fasta)?);
1623 for (acc, seq) in bucket_seqs {
1624 let filename = format!("{}.fna", acc.replace('.', "_"));
1625 writeln!(fasta_writer, ">{}|{}", acc, filename)?;
1626 writeln!(fasta_writer, "{}", seq)?;
1627 }
1628 fasta_writer.flush()?;
1629 }
1630
1631 let bucket_paf = temp_dir.join(format!("{}.paf", bucket_suffix));
1632 let status = Command::new("minimap2")
1633 .args([
1634 "-cx", "asm20",
1635 "-t", &bucket_threads.to_string(),
1636 self.amr_db_path.to_str().unwrap(),
1637 bucket_fasta.to_str().unwrap(),
1638 "-o", bucket_paf.to_str().unwrap(),
1639 ])
1640 .stdout(std::process::Stdio::null())
1641 .stderr(std::process::Stdio::null())
1642 .status();
1643
1644 if let Ok(s) = status {
1645 if s.success() && bucket_paf.exists() {
1646
1647 let paf_content = std::fs::read_to_string(&bucket_paf)?;
1648 let hits: Vec<PafHit> = paf_content
1649 .lines()
1650 .filter_map(PafHit::from_paf_line)
1651 .collect();
1652 batch_hits.extend(hits);
1653 }
1654 }
1655
1656 std::fs::remove_file(&bucket_fasta).ok();
1657 std::fs::remove_file(&bucket_paf).ok();
1658 }
1659
1660 let dedup_hits = deduplicate_paf_hits(batch_hits);
1661
1662 {
1663 let mut paf_file = std::fs::OpenOptions::new()
1664 .create(true)
1665 .append(true)
1666 .open(paf_output)?;
1667 for hit in &dedup_hits {
1668 writeln!(paf_file, "{}", hit.raw_line)?;
1669 }
1670 }
1671
1672 total_processed += batch.len();
1673
1674 let bucket_info: Vec<String> = size_buckets.iter()
1675 .map(|(cat, seqs)| format!("{}={}", cat.name().split_whitespace().next().unwrap_or("?"), seqs.len()))
1676 .collect();
1677 eprintln!(" [PLSDB] Batch {}/{}: {} plasmids [{}] ({}/{})",
1678 batch_idx + 1, total_batches, batch.len(),
1679 bucket_info.join(", "),
1680 total_processed, target_accs.len());
1681 }
1682
1683 for (acc, seq) in &seq_map {
1684 let out_path = plsdb_dir.join(format!("{}.fna", acc.replace('.', "_")));
1685 if !out_path.exists() {
1686 let mut out_file = File::create(&out_path)?;
1687 writeln!(out_file, ">{}", acc)?;
1688 writeln!(out_file, "{}", seq)?;
1689 }
1690 }
1691
1692 eprintln!(" Aligned {} PLSDB sequences", total_processed);
1693 Ok(total_processed)
1694 }
1695
1696 fn extract_flanking_from_plsdb(
1697 &self,
1698 hits_path: &Path,
1699 plsdb_dir: &Path,
1700 output_path: &Path,
1701 catalog: &GenomeCatalog,
1702 ) -> Result<()> {
1703 let hits_file = File::open(hits_path)?;
1704 let reader = BufReader::new(hits_file);
1705
1706 let mut plsdb_hits: FxHashMap<String, Vec<(String, String, usize, usize)>> = FxHashMap::default();
1707
1708 for (i, line) in reader.lines().enumerate() {
1709 let line = line?;
1710
1711 if i == 0 && line.starts_with("gene") {
1712 continue;
1713 }
1714
1715 let fields: Vec<&str> = line.split('\t').collect();
1716 if fields.len() < 4 {
1717 continue;
1718 }
1719
1720 let gene = fields[0];
1721 let contig_file = fields[1];
1722 let start: usize = fields[2].parse().unwrap_or(0);
1723 let end: usize = fields[3].parse().unwrap_or(0);
1724
1725 let (contig_id, genome_file) = if let Some(pipe_pos) = contig_file.rfind('|') {
1726 (contig_file[..pipe_pos].to_string(), contig_file[pipe_pos + 1..].to_string())
1727 } else {
1728 continue;
1729 };
1730
1731 if genome_file.starts_with("NZ_") || genome_file.starts_with("CP") ||
1732 genome_file.starts_with("AP") || genome_file.starts_with("NC_") {
1733 plsdb_hits.entry(genome_file)
1734 .or_default()
1735 .push((gene.to_string(), contig_id, start, end));
1736 }
1737 }
1738
1739 if plsdb_hits.is_empty() {
1740 return Ok(());
1741 }
1742
1743 let mut writer = std::fs::OpenOptions::new()
1744 .create(true)
1745 .append(true)
1746 .open(output_path)?;
1747
1748 let mut extracted = 0usize;
1749
1750 for (genome_file, hits) in &plsdb_hits {
1751 let genome_path = plsdb_dir.join(genome_file);
1752 if !genome_path.exists() {
1753 continue;
1754 }
1755
1756 let sequences = match self.load_genome_sequences(&genome_path) {
1757 Ok(seqs) => seqs,
1758 Err(_) => continue,
1759 };
1760
1761 let seq_map: FxHashMap<&str, &str> = sequences.iter()
1762 .map(|(header, seq)| {
1763 let key = header.split_whitespace().next().unwrap_or(header.as_str());
1764 (key, seq.as_str())
1765 })
1766 .collect();
1767
1768 let genome_acc = genome_file.trim_end_matches(".fna");
1769 let base_genus = catalog.get_genus(genome_acc).map(|s| s.to_string());
1770
1771 for (gene, contig_id, start, end) in hits {
1772 let contig_seq = match seq_map.get(contig_id.as_str()) {
1773 Some(seq) => *seq,
1774 None => continue,
1775 };
1776
1777 let contig_len = contig_seq.len();
1778 if *start >= contig_len || *end > contig_len || start >= end {
1779 continue;
1780 }
1781
1782 let genus = base_genus.clone().unwrap_or_else(|| "Unknown".to_string());
1783
1784 let upstream_start = start.saturating_sub(self.config.flanking_length);
1785 let upstream = &contig_seq[upstream_start..*start];
1786
1787 let downstream_end = std::cmp::min(*end + self.config.flanking_length, contig_len);
1788 let downstream = &contig_seq[*end..downstream_end];
1789
1790 writeln!(writer, "{}\t{}\t{}\t{}\t{}\t{}\t{}",
1791 gene, contig_id, genus, start, end, upstream, downstream)?;
1792 extracted += 1;
1793 }
1794 }
1795
1796 if extracted > 0 {
1797 eprintln!(" Extracted {} PLSDB flanking sequences", extracted);
1798 }
1799 Ok(())
1800 }
1801
1802 fn convert_paf_to_merged(
1803 &self,
1804 paf_path: &Path,
1805 output_path: &Path,
1806 ) -> Result<()> {
1807 let mut hits: FxHashSet<(String, String, usize, usize)> = FxHashSet::default();
1808
1809 if paf_path.exists() {
1810 let file = File::open(paf_path)?;
1811 let reader = BufReader::new(file);
1812 for line in reader.lines() {
1813 let line = line?;
1814 let fields: Vec<&str> = line.split('\t').collect();
1815 if fields.len() < 12 {
1816 continue;
1817 }
1818 let contig_file = fields[0].to_string();
1819 let gene = fields[5].to_string();
1820 let start: usize = fields[2].parse().unwrap_or(0);
1821 let end: usize = fields[3].parse().unwrap_or(0);
1822 hits.insert((gene, contig_file, start, end));
1823 }
1824 }
1825
1826 let mut writer = BufWriter::new(File::create(output_path)?);
1827 for (gene, contig_file, start, end) in &hits {
1828 writeln!(writer, "{}\t{}\t{}\t{}", gene, contig_file, start, end)?;
1829 }
1830
1831 eprintln!(" Found {} unique hits", hits.len());
1832 Ok(())
1833 }
1834
1835 fn extract_flanking_sequences(
1836 &self,
1837 hits_path: &Path,
1838 genomes_dir: &Path,
1839 output_path: &Path,
1840 catalog: &GenomeCatalog,
1841 ) -> Result<()> {
1842
1843 if output_path.exists() {
1844 let metadata = std::fs::metadata(output_path)?;
1845 if metadata.len() > 0 {
1846 eprintln!(" Flanking sequences file already exists ({} MB), skipping extraction...",
1847 metadata.len() / (1024 * 1024));
1848 return Ok(());
1849 }
1850 }
1851
1852 let hits_file = File::open(hits_path)?;
1853 let reader = BufReader::new(hits_file);
1854
1855 let mut genome_hits: FxHashMap<String, Vec<(String, String, usize, usize)>> = FxHashMap::default();
1856
1857 for (i, line) in reader.lines().enumerate() {
1858 let line = line?;
1859
1860 if i == 0 && line.starts_with("gene") {
1861 continue;
1862 }
1863
1864 let fields: Vec<&str> = line.split('\t').collect();
1865 if fields.len() < 4 {
1866 continue;
1867 }
1868
1869 let gene = fields[0];
1870 let contig_file = fields[1];
1871 let start: usize = fields[2].parse().unwrap_or(0);
1872 let end: usize = fields[3].parse().unwrap_or(0);
1873
1874 let (contig_id, genome_file) = if let Some(pipe_pos) = contig_file.rfind('|') {
1875 (contig_file[..pipe_pos].to_string(), contig_file[pipe_pos + 1..].to_string())
1876 } else {
1877
1878 (contig_file.to_string(), extract_genome_file(contig_file))
1879 };
1880
1881 genome_hits.entry(genome_file)
1882 .or_default()
1883 .push((gene.to_string(), contig_id, start, end));
1884 }
1885
1886 let output_file = File::create(output_path)?;
1887 let mut writer = BufWriter::new(output_file);
1888
1889 writeln!(writer, "Gene\tContig\tGenus\tStart\tEnd\tUpstream\tDownstream")?;
1890
1891 let total_genomes = genome_hits.len();
1892 let mut processed = 0;
1893 let mut genus_found = 0usize;
1894 let mut genus_unknown = 0usize;
1895 let mut sequences_extracted = 0usize;
1896
1897 for (genome_file, hits) in &genome_hits {
1898 let genome_path = genomes_dir.join(genome_file);
1899 if !genome_path.exists() {
1900 continue;
1901 }
1902
1903 let sequences = match self.load_genome_sequences(&genome_path) {
1904 Ok(seqs) => seqs,
1905 Err(_) => continue,
1906 };
1907
1908 let seq_map: FxHashMap<&str, &str> = sequences.iter()
1909 .map(|(header, seq)| {
1910 let key = header.split_whitespace().next().unwrap_or(header.as_str());
1911 (key, seq.as_str())
1912 })
1913 .collect();
1914
1915 let genome_acc = genome_file.trim_end_matches(".fna");
1916 let base_genus = catalog.get_genus(genome_acc).map(|s| s.to_string());
1917
1918 for (gene, contig_id, start, end) in hits {
1919
1920 let contig_seq = match seq_map.get(contig_id.as_str()) {
1921 Some(seq) => *seq,
1922 None => continue,
1923 };
1924
1925 let contig_len = contig_seq.len();
1926
1927 if *start >= contig_len || *end > contig_len || start >= end {
1928 continue;
1929 }
1930
1931 let upstream_start = start.saturating_sub(self.config.flanking_length);
1932 let downstream_end = std::cmp::min(end + self.config.flanking_length, contig_len);
1933
1934 let upstream = if *start > 0 && upstream_start < *start {
1935 &contig_seq[upstream_start..*start]
1936 } else {
1937 ""
1938 };
1939
1940 let downstream = if *end < contig_len && *end < downstream_end {
1941 &contig_seq[*end..downstream_end]
1942 } else {
1943 ""
1944 };
1945
1946 let genus = if let Some(ref g) = base_genus {
1947 genus_found += 1;
1948 g.clone()
1949 } else if let Some(g) = catalog.get_genus(contig_id) {
1950 genus_found += 1;
1951 g.to_string()
1952 } else {
1953 genus_unknown += 1;
1954 "Unknown".to_string()
1955 };
1956
1957 writeln!(
1958 writer,
1959 "{}\t{}\t{}\t{}\t{}\t{}\t{}",
1960 gene, contig_id, genus, start, end, upstream, downstream
1961 )?;
1962 sequences_extracted += 1;
1963 }
1964
1965 processed += 1;
1966 if processed % 10000 == 0 || processed == total_genomes {
1967 eprintln!(" Processed {}/{} genomes ({} sequences)",
1968 processed, total_genomes, sequences_extracted);
1969 }
1970 }
1971
1972 eprintln!(" Extracted {} flanking sequences from {} genomes",
1973 sequences_extracted, processed);
1974 eprintln!(" Genus resolved: {}, Unknown: {}", genus_found, genus_unknown);
1975 Ok(())
1976 }
1977
1978 fn load_genome_sequences(&self, genome_path: &Path) -> Result<Vec<(String, String)>> {
1979 let file = File::open(genome_path)?;
1980 let reader = BufReader::new(file);
1981
1982 let mut sequences = Vec::new();
1983 let mut current_name: Option<String> = None;
1984 let mut current_seq = String::new();
1985
1986 for line in reader.lines() {
1987 let line = line?;
1988
1989 if let Some(stripped) = line.strip_prefix('>') {
1990 if let Some(name) = current_name.take() {
1991 sequences.push((name, std::mem::take(&mut current_seq)));
1992 }
1993 current_name = Some(stripped.to_string());
1994 current_seq.clear();
1995 } else {
1996 current_seq.push_str(line.trim());
1997 }
1998 }
1999
2000 if let Some(name) = current_name {
2001 sequences.push((name, current_seq));
2002 }
2003
2004 Ok(sequences)
2005 }
2006}
2007
2008fn parse_csv_line(line: &str) -> Vec<String> {
2009 let mut fields = Vec::new();
2010 let mut current = String::new();
2011 let mut in_quotes = false;
2012
2013 for ch in line.chars() {
2014 match ch {
2015 '"' => in_quotes = !in_quotes,
2016 ',' if !in_quotes => {
2017 fields.push(std::mem::take(&mut current));
2018 }
2019 _ => current.push(ch),
2020 }
2021 }
2022 fields.push(current);
2023 fields
2024}
2025
2026fn extract_genome_file(contig: &str) -> String {
2027
2028 if let Some(start) = contig.find("GC") {
2029 let remainder = &contig[start..];
2030 if let Some(end) = remainder.find(|c: char| !c.is_alphanumeric() && c != '_' && c != '.') {
2031 return format!("{}.fna", &remainder[..end]);
2032 }
2033 return format!("{}.fna", remainder);
2034 }
2035
2036 let acc = contig.split_whitespace().next().unwrap_or(contig);
2037 format!("{}.fna", acc.replace('.', "_"))
2038}
2039
2040pub fn build(output_dir: &Path, arg_db: &Path, threads: usize, email: &str, config: FlankBuildConfig) -> Result<()> {
2041 let builder = FlankingDbBuilder::new(arg_db, output_dir, threads, email, config);
2042 builder.build()
2043}