Skip to main content

argenus/
flanking_db.rs

1
2use anyhow::{Context, Result};
3use rustc_hash::{FxHashMap, FxHashSet};
4use std::fs::File;
5use std::io::{BufRead, BufReader, BufWriter, Read, Write};
6use std::path::{Path, PathBuf};
7use std::process::Command;
8use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering};
9use std::sync::{Arc, Mutex, mpsc};
10use std::time::Duration;
11use std::thread;
12
13const NCBI_FTP_BASE: &str = "https://ftp.ncbi.nlm.nih.gov/genomes";
14const NCBI_TAXDUMP_URL: &str = "https://ftp.ncbi.nlm.nih.gov/pub/taxonomy/taxdump.tar.gz";
15const NCBI_DATASETS_API: &str = "https://api.ncbi.nlm.nih.gov/datasets/v2";
16const PLSDB_META_URL: &str = "https://ccb-microbe.cs.uni-saarland.de/plsdb2025/download_meta.tar.gz";
17const PLSDB_FASTA_URL: &str = "https://ccb-microbe.cs.uni-saarland.de/plsdb2025/download_fasta";
18
19const API_BATCH_SIZE: usize = 1000;
20
21#[derive(Clone, Debug, Default)]
22pub struct PlsdbOptions {
23
24    pub dir: Option<PathBuf>,
25
26    pub skip: bool,
27}
28
29#[derive(Clone, Debug)]
30pub struct FlankBuildConfig {
31
32    pub flanking_length: usize,
33
34    pub queue_buffer_gb: u32,
35
36    pub plsdb: PlsdbOptions,
37}
38
39impl Default for FlankBuildConfig {
40    fn default() -> Self {
41        Self {
42            flanking_length: 1050,
43            queue_buffer_gb: 30,
44            plsdb: PlsdbOptions::default(),
45        }
46    }
47}
48
49#[derive(Clone, Debug)]
50pub struct AssemblyInfo {
51    pub accession: String,
52    pub taxid: String,
53    pub species_taxid: String,
54    pub organism_name: String,
55}
56
57#[derive(Clone, Debug)]
58pub struct PlasmidInfo {
59    pub accession: String,
60    pub taxonomy_uid: String,
61    pub genus: String,
62    pub species: String,
63}
64
65#[derive(Clone, Debug)]
66struct PafHit {
67    query_name: String,
68    query_start: usize,
69    query_end: usize,
70    gene_name: String,
71    gene_length: usize,
72    score: i64,
73    mapq: u8,
74    divergence: f32,
75    gap_count: usize,
76    raw_line: String,
77}
78
79impl PafHit {
80    fn from_paf_line(line: &str) -> Option<Self> {
81        let fields: Vec<&str> = line.split('\t').collect();
82        if fields.len() < 12 {
83            return None;
84        }
85
86        let query_name = fields[0].to_string();
87        let query_start: usize = fields[2].parse().ok()?;
88        let query_end: usize = fields[3].parse().ok()?;
89        let gene_name = fields[5].to_string();
90        let gene_length: usize = fields[6].parse().ok()?;
91        let mapq: u8 = fields[11].parse().unwrap_or(0);
92
93        let matches: usize = fields[9].parse().unwrap_or(0);
94        let block_len: usize = fields[10].parse().unwrap_or(0);
95
96        let mut score: i64 = 0;
97        let mut divergence: f32 = 1.0;
98
99        for field in &fields[12..] {
100            if let Some(val) = field.strip_prefix("AS:i:") {
101                score = val.parse().unwrap_or(0);
102            } else if let Some(val) = field.strip_prefix("de:f:") {
103                divergence = val.parse().unwrap_or(1.0);
104            }
105        }
106
107        let gap_count = block_len.saturating_sub(matches);
108
109        Some(PafHit {
110            query_name,
111            query_start,
112            query_end,
113            gene_name,
114            gene_length,
115            score,
116            mapq,
117            divergence,
118            gap_count,
119            raw_line: line.to_string(),
120        })
121    }
122
123    fn overlaps(&self, other: &PafHit) -> bool {
124        if self.query_name != other.query_name {
125            return false;
126        }
127        let start = self.query_start.max(other.query_start);
128        let end = self.query_end.min(other.query_end);
129        if start >= end {
130            return false;
131        }
132        let overlap = end - start;
133        let self_len = self.query_end - self.query_start;
134        let other_len = other.query_end - other.query_start;
135        overlap * 2 > self_len || overlap * 2 > other_len
136    }
137}
138
139fn compare_paf_hits(a: &PafHit, b: &PafHit) -> std::cmp::Ordering {
140    use std::cmp::Ordering;
141    b.score.cmp(&a.score)
142        .then_with(|| b.gene_length.cmp(&a.gene_length))
143        .then_with(|| b.mapq.cmp(&a.mapq))
144        .then_with(|| a.divergence.partial_cmp(&b.divergence).unwrap_or(Ordering::Equal))
145        .then_with(|| a.gap_count.cmp(&b.gap_count))
146        .then_with(|| a.gene_name.cmp(&b.gene_name))
147}
148
149fn deduplicate_paf_hits(hits: Vec<PafHit>) -> Vec<PafHit> {
150    if hits.is_empty() {
151        return hits;
152    }
153
154    let mut groups: Vec<Vec<PafHit>> = Vec::new();
155    for hit in hits {
156        let mut found = false;
157        for group in &mut groups {
158            if group.iter().any(|h| h.overlaps(&hit)) {
159                group.push(hit.clone());
160                found = true;
161                break;
162            }
163        }
164        if !found {
165            groups.push(vec![hit]);
166        }
167    }
168
169    groups
170        .into_iter()
171        .map(|mut g| {
172            g.sort_by(compare_paf_hits);
173            g.into_iter().next().unwrap()
174        })
175        .collect()
176}
177
178#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
179pub enum GenomeSizeCategory {
180
181    Small,
182
183    Medium,
184
185    Large,
186}
187
188const SMALL_GENOME_THRESHOLD: u64 = 5 * 1024 * 1024;
189const LARGE_GENOME_THRESHOLD: u64 = 20 * 1024 * 1024;
190
191impl GenomeSizeCategory {
192
193    pub fn from_size(size_bytes: u64) -> Self {
194        if size_bytes < SMALL_GENOME_THRESHOLD {
195            GenomeSizeCategory::Small
196        } else if size_bytes < LARGE_GENOME_THRESHOLD {
197            GenomeSizeCategory::Medium
198        } else {
199            GenomeSizeCategory::Large
200        }
201    }
202
203    pub fn thread_count(&self, max_threads: usize) -> usize {
204        let recommended = match self {
205            GenomeSizeCategory::Small => 4,
206            GenomeSizeCategory::Medium => 6,
207            GenomeSizeCategory::Large => 8,
208        };
209
210        recommended.min(max_threads)
211    }
212
213    pub fn name(&self) -> &'static str {
214        match self {
215            GenomeSizeCategory::Small => "small (<5MB)",
216            GenomeSizeCategory::Medium => "medium (5-20MB)",
217            GenomeSizeCategory::Large => "large (>20MB)",
218        }
219    }
220}
221
222fn bucket_genomes_by_size(genome_files: &[PathBuf]) -> FxHashMap<GenomeSizeCategory, Vec<PathBuf>> {
223    let mut buckets: FxHashMap<GenomeSizeCategory, Vec<PathBuf>> = FxHashMap::default();
224
225    for path in genome_files {
226        let size = std::fs::metadata(path)
227            .map(|m| m.len())
228            .unwrap_or(0);
229        let category = GenomeSizeCategory::from_size(size);
230        buckets.entry(category).or_default().push(path.clone());
231    }
232
233    buckets
234}
235
236#[derive(Clone, Debug)]
237pub struct CatalogEntry {
238    pub accession: String,
239    pub taxid: String,
240    pub species_taxid: String,
241    pub genus: String,
242    pub species: String,
243    pub organism_name: String,
244    pub source: String,
245}
246
247pub struct GenomeCatalog {
248    entries: FxHashMap<String, CatalogEntry>,
249}
250
251impl Default for GenomeCatalog {
252    fn default() -> Self {
253        Self::new()
254    }
255}
256
257impl GenomeCatalog {
258    pub fn new() -> Self {
259        Self {
260            entries: FxHashMap::default(),
261        }
262    }
263
264    pub fn add_assembly_with_taxonomy(
265        &mut self,
266        asm: &AssemblyInfo,
267        source: &str,
268        taxonomy: Option<&TaxonomyDB>,
269    ) {
270        let (parsed_genus, species) = parse_organism_name(&asm.organism_name);
271
272        let genus = resolve_genus(&asm.organism_name, &asm.taxid, &parsed_genus, taxonomy);
273
274        let entry = CatalogEntry {
275            accession: asm.accession.clone(),
276            taxid: asm.taxid.clone(),
277            species_taxid: asm.species_taxid.clone(),
278            genus,
279            species,
280            organism_name: asm.organism_name.clone(),
281            source: source.to_string(),
282        };
283
284        self.entries.insert(asm.accession.clone(), entry.clone());
285        if let Some(base) = asm.accession.split('.').next() {
286            self.entries.insert(base.to_string(), entry);
287        }
288    }
289
290    pub fn add_plasmid_with_taxonomy(
291        &mut self,
292        plasmid: &PlasmidInfo,
293        taxonomy: Option<&TaxonomyDB>,
294    ) {
295
296        let organism_name = format!("{} {}", plasmid.genus, plasmid.species);
297
298        let genus = resolve_genus(&organism_name, &plasmid.taxonomy_uid, &plasmid.genus, taxonomy);
299
300        let entry = CatalogEntry {
301            accession: plasmid.accession.clone(),
302            taxid: plasmid.taxonomy_uid.clone(),
303            species_taxid: String::new(),
304            genus,
305            species: plasmid.species.clone(),
306            organism_name,
307            source: "plsdb".to_string(),
308        };
309
310        self.entries.insert(plasmid.accession.clone(), entry.clone());
311
312        let base = plasmid.accession
313            .strip_prefix("NZ_")
314            .unwrap_or(&plasmid.accession);
315        let no_ver = base.split('.').next().unwrap_or(base);
316        self.entries.insert(no_ver.to_string(), entry.clone());
317
318        let underscore_key = plasmid.accession.replace('.', "_");
319        self.entries.insert(underscore_key, entry);
320    }
321
322    pub fn get_genus(&self, accession: &str) -> Option<&str> {
323
324        if let Some(entry) = self.entries.get(accession) {
325            return Some(&entry.genus);
326        }
327
328        if let Some(base) = accession.split('.').next() {
329            if let Some(entry) = self.entries.get(base) {
330                return Some(&entry.genus);
331            }
332        }
333
334        let stripped = accession.strip_prefix("NZ_").unwrap_or(accession);
335        if let Some(entry) = self.entries.get(stripped) {
336            return Some(&entry.genus);
337        }
338        let stripped_base = stripped.split('.').next().unwrap_or(stripped);
339        if let Some(entry) = self.entries.get(stripped_base) {
340            return Some(&entry.genus);
341        }
342
343        if let Some(last_underscore) = accession.rfind('_') {
344            let suffix = &accession[last_underscore + 1..];
345
346            if !suffix.is_empty() && suffix.chars().all(|c| c.is_ascii_digit()) {
347                let with_dot = format!("{}.{}", &accession[..last_underscore], suffix);
348                if let Some(entry) = self.entries.get(&with_dot) {
349                    return Some(&entry.genus);
350                }
351
352                let stripped_with_dot = with_dot.strip_prefix("NZ_").unwrap_or(&with_dot);
353                if let Some(entry) = self.entries.get(stripped_with_dot) {
354                    return Some(&entry.genus);
355                }
356            }
357        }
358
359        None
360    }
361
362    pub fn save(&self, path: &Path) -> Result<()> {
363        let file = File::create(path)?;
364        let mut writer = BufWriter::new(file);
365
366        writeln!(writer, "accession\ttaxid\tspecies_taxid\tgenus\tspecies\torganism_name\tsource")?;
367
368        let mut seen: FxHashSet<&str> = FxHashSet::default();
369        for entry in self.entries.values() {
370            if seen.contains(entry.accession.as_str()) {
371                continue;
372            }
373            seen.insert(&entry.accession);
374
375            writeln!(
376                writer,
377                "{}\t{}\t{}\t{}\t{}\t{}\t{}",
378                entry.accession,
379                entry.taxid,
380                entry.species_taxid,
381                entry.genus,
382                entry.species,
383                entry.organism_name.replace('\t', " "),
384                entry.source
385            )?;
386        }
387
388        Ok(())
389    }
390
391    pub fn len(&self) -> usize {
392        let mut seen: FxHashSet<&str> = FxHashSet::default();
393        for entry in self.entries.values() {
394            seen.insert(&entry.accession);
395        }
396        seen.len()
397    }
398
399    #[allow(dead_code)]
400    pub fn is_empty(&self) -> bool {
401        self.entries.is_empty()
402    }
403}
404
405fn parse_organism_name(organism: &str) -> (String, String) {
406    let parts: Vec<&str> = organism.split_whitespace().collect();
407
408    let genus = parts.first()
409        .map(|s| s.to_string())
410        .unwrap_or_else(|| "Unknown".to_string());
411
412    let species = if parts.len() >= 2 {
413
414        parts[1..].join(" ")
415    } else {
416        "Unknown".to_string()
417    };
418
419    (clean_genus(&genus), species)
420}
421
422fn clean_genus(genus: &str) -> String {
423
424    let cleaned = if let Some(idx) = genus.find('(') {
425        genus[..idx].trim()
426    } else {
427        genus.trim()
428    };
429
430    cleaned.to_string()
431}
432
433fn resolve_genus(
434    organism_name: &str,
435    taxid_str: &str,
436    _parsed_genus: &str,
437    taxonomy: Option<&TaxonomyDB>,
438) -> String {
439
440    let organism_lower = organism_name.to_lowercase();
441    if organism_lower.contains("uncultured") {
442        return "uncultured".to_string();
443    }
444
445    if let Some(tax_db) = taxonomy {
446        if let Ok(taxid) = taxid_str.parse::<u32>() {
447
448            if let Some(genus) = tax_db.get_genus(taxid) {
449                return clean_genus(&genus);
450            }
451
452            if let Some((name, _rank)) = tax_db.get_genus_or_higher(taxid) {
453                return clean_genus(&name);
454            }
455        }
456    }
457
458    let cleaned = clean_genus(organism_name);
459    if cleaned.is_empty() {
460        "unknown".to_string()
461    } else {
462        cleaned
463    }
464}
465
466pub struct TaxonomyDB {
467
468    names: FxHashMap<u32, String>,
469
470    nodes: FxHashMap<u32, (u32, String)>,
471}
472
473impl Default for TaxonomyDB {
474    fn default() -> Self {
475        Self::new()
476    }
477}
478
479impl TaxonomyDB {
480    pub fn new() -> Self {
481        Self {
482            names: FxHashMap::default(),
483            nodes: FxHashMap::default(),
484        }
485    }
486
487    pub fn load(taxdump_dir: &Path) -> Result<Self> {
488        let mut db = Self::new();
489
490        let names_path = taxdump_dir.join("names.dmp");
491        let nodes_path = taxdump_dir.join("nodes.dmp");
492
493        if !names_path.exists() || !nodes_path.exists() {
494            anyhow::bail!("Taxonomy files not found in {}", taxdump_dir.display());
495        }
496
497        eprintln!("    Loading names.dmp...");
498        let file = File::open(&names_path)?;
499        let reader = BufReader::new(file);
500
501        for line in reader.lines() {
502            let line = line?;
503            let fields: Vec<&str> = line.split("\t|\t").collect();
504            if fields.len() < 4 {
505                continue;
506            }
507
508            let name_class = fields[3].trim_end_matches("\t|");
509            if name_class != "scientific name" {
510                continue;
511            }
512
513            let taxid: u32 = match fields[0].parse() {
514                Ok(id) => id,
515                Err(_) => continue,
516            };
517            let name = fields[1].to_string();
518
519            db.names.insert(taxid, name);
520        }
521
522        eprintln!("    Loading nodes.dmp...");
523        let file = File::open(&nodes_path)?;
524        let reader = BufReader::new(file);
525
526        for line in reader.lines() {
527            let line = line?;
528            let fields: Vec<&str> = line.split("\t|\t").collect();
529            if fields.len() < 3 {
530                continue;
531            }
532
533            let taxid: u32 = match fields[0].parse() {
534                Ok(id) => id,
535                Err(_) => continue,
536            };
537            let parent_taxid: u32 = match fields[1].parse() {
538                Ok(id) => id,
539                Err(_) => continue,
540            };
541            let rank = fields[2].to_string();
542
543            db.nodes.insert(taxid, (parent_taxid, rank));
544        }
545
546        eprintln!("    Loaded {} taxa, {} nodes", db.names.len(), db.nodes.len());
547        Ok(db)
548    }
549
550    pub fn get_genus(&self, taxid: u32) -> Option<String> {
551        let mut current = taxid;
552        let mut visited = 0;
553
554        while visited < 50 {
555            if let Some((parent, rank)) = self.nodes.get(&current) {
556                if rank == "genus" {
557                    return self.names.get(&current).cloned();
558                }
559
560                if *parent == current {
561                    return None;
562                }
563
564                current = *parent;
565                visited += 1;
566            } else {
567                return None;
568            }
569        }
570
571        None
572    }
573
574    pub fn get_genus_or_higher(&self, taxid: u32) -> Option<(String, String)> {
575        let mut current = taxid;
576        let mut visited = 0;
577
578        let target_ranks = ["genus", "family", "order", "class", "phylum"];
579        let mut best_match: Option<(String, String, usize)> = None;
580
581        while visited < 50 {
582            if let Some((parent, rank)) = self.nodes.get(&current) {
583
584                if let Some(priority) = target_ranks.iter().position(|r| r == rank) {
585                    if let Some(name) = self.names.get(&current) {
586
587                        if rank == "genus" {
588                            return Some((name.clone(), rank.clone()));
589                        }
590
591                        if best_match.is_none() || priority < best_match.as_ref().unwrap().2 {
592                            best_match = Some((name.clone(), rank.clone(), priority));
593                        }
594                    }
595                }
596
597                if *parent == current {
598                    break;
599                }
600
601                current = *parent;
602                visited += 1;
603            } else {
604                break;
605            }
606        }
607
608        best_match.map(|(name, rank, _)| (name, rank))
609    }
610}
611
612#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
613pub struct BuildState {
614    pub started_at: String,
615    pub last_updated: String,
616    pub current_step: String,
617    pub completed_steps: Vec<String>,
618    pub genomes_downloaded: usize,
619    pub plsdb_extracted: usize,
620    pub alignments_done: bool,
621    pub flanking_extracted: bool,
622}
623
624impl Default for BuildState {
625    fn default() -> Self {
626        Self::new()
627    }
628}
629
630impl BuildState {
631    pub fn new() -> Self {
632        let now = chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string();
633        Self {
634            started_at: now.clone(),
635            last_updated: now,
636            current_step: String::new(),
637            completed_steps: Vec::new(),
638            genomes_downloaded: 0,
639            plsdb_extracted: 0,
640            alignments_done: false,
641            flanking_extracted: false,
642        }
643    }
644
645    pub fn load(path: &Path) -> Option<Self> {
646        std::fs::read_to_string(path)
647            .ok()
648            .and_then(|s| serde_json::from_str(&s).ok())
649    }
650
651    pub fn save(&self, path: &Path) -> Result<()> {
652        let json = serde_json::to_string_pretty(self)?;
653        std::fs::write(path, json)?;
654        Ok(())
655    }
656
657    pub fn update_step(&mut self, step: &str) {
658        self.current_step = step.to_string();
659        self.last_updated = chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string();
660    }
661
662    pub fn complete_step(&mut self, step: &str) {
663        if !self.completed_steps.contains(&step.to_string()) {
664            self.completed_steps.push(step.to_string());
665        }
666        self.last_updated = chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string();
667    }
668
669    pub fn is_completed(&self, step: &str) -> bool {
670        self.completed_steps.contains(&step.to_string())
671    }
672}
673
674pub struct FlankingDbBuilder {
675    output_dir: PathBuf,
676    threads: usize,
677    amr_db_path: PathBuf,
678
679    email: String,
680
681    config: FlankBuildConfig,
682}
683
684impl FlankingDbBuilder {
685    pub fn new(amr_db: &Path, output_dir: &Path, threads: usize, email: &str, config: FlankBuildConfig) -> Self {
686        Self {
687            output_dir: output_dir.to_path_buf(),
688            threads,
689            amr_db_path: amr_db.to_path_buf(),
690            email: email.to_string(),
691            config,
692        }
693    }
694
695    pub fn build(&self) -> Result<()> {
696        eprintln!("\n============================================================");
697        eprintln!(" ARGenus Flanking Database Builder (Rust)");
698        eprintln!("============================================================");
699        eprintln!("Output directory: {}", self.output_dir.display());
700        eprintln!("NCBI Email: {}", self.email);
701        eprintln!("Download method: NCBI Datasets API (batch)");
702        eprintln!("Threads: {}", self.threads);
703        eprintln!("Flanking length: {} bp", self.config.flanking_length);
704        eprintln!("Queue buffer: {} GB", self.config.queue_buffer_gb);
705        eprintln!("Alignment: minimap2 (AMR indexed)");
706        eprintln!();
707
708        std::fs::create_dir_all(&self.output_dir)?;
709        let genomes_dir = self.output_dir.join("genomes");
710        std::fs::create_dir_all(&genomes_dir)?;
711
712        let plsdb_dir = self.config.plsdb.dir.clone().unwrap_or_else(|| self.output_dir.join("plsdb"));
713        if !self.config.plsdb.skip {
714            std::fs::create_dir_all(&plsdb_dir)?;
715        }
716        let taxonomy_dir = self.output_dir.join("taxonomy");
717        let temp_dir = self.output_dir.join("temp");
718        std::fs::create_dir_all(&temp_dir)?;
719        let state_path = self.output_dir.join("build_state.json");
720
721        let mut state = BuildState::load(&state_path).unwrap_or_else(|| {
722            eprintln!("Starting fresh build...");
723            BuildState::new()
724        });
725
726        if !state.completed_steps.is_empty() {
727            eprintln!("Resuming from previous state:");
728            eprintln!("  Started: {}", state.started_at);
729            eprintln!("  Last updated: {}", state.last_updated);
730            eprintln!("  Completed steps: {}", state.completed_steps.join(", "));
731            eprintln!("  Genomes downloaded: {}", state.genomes_downloaded);
732            eprintln!("  PLSDB extracted: {}", state.plsdb_extracted);
733            eprintln!();
734        }
735
736        let mut catalog = GenomeCatalog::new();
737        let catalog_path = self.output_dir.join("genome_catalog.tsv");
738
739        if !state.is_completed("taxonomy_download") {
740            state.update_step("taxonomy_download");
741            state.save(&state_path)?;
742            eprintln!("[1/9] Downloading NCBI taxonomy database...");
743            self.download_taxdump(&taxonomy_dir)?;
744            state.complete_step("taxonomy_download");
745            state.save(&state_path)?;
746        } else {
747            eprintln!("[1/9] Taxonomy database already downloaded, skipping...");
748        }
749
750        eprintln!("[2/9] Loading taxonomy database...");
751        let taxonomy = match TaxonomyDB::load(&taxonomy_dir) {
752            Ok(db) => {
753                eprintln!("    Taxonomy database loaded successfully");
754                Some(db)
755            }
756            Err(e) => {
757                eprintln!("    Warning: Failed to load taxonomy: {}", e);
758                eprintln!("    Genus will be parsed from organism names instead");
759                None
760            }
761        };
762
763        eprintln!("[3/9] Downloading NCBI assembly summaries (GenBank)...");
764        let assemblies = self.download_assembly_summaries()?;
765        eprintln!("    GenBank assemblies: {} (Complete Genome + Chromosome)", assemblies.len());
766
767        for asm in &assemblies {
768            catalog.add_assembly_with_taxonomy(asm, "genbank", taxonomy.as_ref());
769        }
770
771        let standalone_plasmids = if self.config.plsdb.skip {
772            eprintln!("[4/9] PLSDB skipped (--skip-plsdb)");
773            eprintln!("[5/9] PLSDB metadata skipped");
774            Vec::new()
775        } else if self.config.plsdb.dir.is_some() {
776
777            eprintln!("[4/9] Using pre-downloaded PLSDB: {}", plsdb_dir.display());
778
779            let nuccore_csv = plsdb_dir.join("nuccore.csv");
780            let fasta_path = plsdb_dir.join("sequences.fasta");
781            if !nuccore_csv.exists() {
782
783                let meta_tar = plsdb_dir.join("meta.tar.gz");
784                if meta_tar.exists() {
785                    eprintln!("    Extracting meta.tar.gz...");
786                    std::process::Command::new("tar")
787                        .args(["-xzf", meta_tar.to_str().unwrap(), "-C", plsdb_dir.to_str().unwrap()])
788                        .status()
789                        .with_context(|| "Failed to extract PLSDB metadata")?;
790                } else {
791                    anyhow::bail!("PLSDB directory missing nuccore.csv and meta.tar.gz: {}", plsdb_dir.display());
792                }
793            }
794            if !fasta_path.exists() {
795                anyhow::bail!("PLSDB directory missing sequences.fasta: {}", plsdb_dir.display());
796            }
797            state.complete_step("plsdb_download");
798            state.save(&state_path)?;
799
800            eprintln!("[5/9] Loading PLSDB metadata...");
801            let plasmids = self.load_plsdb_plasmids(&plsdb_dir)?;
802            eprintln!("    Standalone circular+complete plasmids: {} (not in any assembly)",
803                     plasmids.len());
804            plasmids
805        } else {
806
807            if !state.is_completed("plsdb_download") {
808                state.update_step("plsdb_download");
809                state.save(&state_path)?;
810                eprintln!("[4/9] Downloading PLSDB database...");
811                self.download_plsdb(&plsdb_dir)?;
812                state.complete_step("plsdb_download");
813                state.save(&state_path)?;
814            } else {
815                eprintln!("[4/9] PLSDB database already downloaded, skipping...");
816            }
817
818            eprintln!("[5/9] Loading PLSDB metadata...");
819            let plasmids = self.load_plsdb_plasmids(&plsdb_dir)?;
820            eprintln!("    Standalone circular+complete plasmids: {} (not in any assembly)",
821                     plasmids.len());
822            plasmids
823        };
824
825        for plasmid in &standalone_plasmids {
826            catalog.add_plasmid_with_taxonomy(plasmid, taxonomy.as_ref());
827        }
828
829        eprintln!("[6/9] Saving unified genome catalog...");
830        catalog.save(&catalog_path)?;
831        eprintln!("    Catalog entries: {}", catalog.len());
832        eprintln!("    Saved to: {}", catalog_path.display());
833
834        let paf_output = self.output_dir.join("all_alignments.paf");
835        let merged_hits = self.output_dir.join("merged_alignment_hits.tsv");
836        let output_tsv = self.output_dir.join("all_flanking_sequences.tsv");
837
838        if !state.is_completed("alignment_flanking") {
839
840            state.update_step("genome_download");
841            state.save(&state_path)?;
842
843            eprintln!("[7/9] Batch processing: Download + Align NCBI genomes...");
844            eprintln!("    Producer-consumer pattern with {} GB queue buffer", self.config.queue_buffer_gb);
845
846            let downloaded = self.download_and_align_batches(
847                &assemblies,
848                &genomes_dir,
849                &temp_dir,
850                &paf_output,
851                &catalog,
852            )?;
853            state.genomes_downloaded = downloaded;
854            state.complete_step("genome_download");
855            state.save(&state_path)?;
856            eprintln!("    Processed {} genomes", downloaded);
857
858            if self.config.plsdb.skip {
859                eprintln!("[8/9] PLSDB processing skipped (--skip-plsdb)");
860                state.plsdb_extracted = 0;
861            } else {
862                state.update_step("plsdb_extract");
863                state.save(&state_path)?;
864                eprintln!("[8/9] Processing PLSDB sequences...");
865                let plsdb_count = self.align_plsdb(
866                    &standalone_plasmids,
867                    &plsdb_dir,
868                    &temp_dir,
869                    &paf_output,
870                )?;
871                state.plsdb_extracted = plsdb_count;
872                state.complete_step("plsdb_extract");
873                state.save(&state_path)?;
874            }
875
876            state.update_step("alignment_flanking");
877            state.save(&state_path)?;
878
879            eprintln!("[9/9] Processing alignment results...");
880            self.convert_paf_to_merged(&paf_output, &merged_hits)?;
881            state.alignments_done = true;
882            state.save(&state_path)?;
883
884            eprintln!("    Extracting flanking sequences (flanking_length: {} bp)...",
885                self.config.flanking_length);
886            self.extract_flanking_sequences(
887                &merged_hits,
888                &genomes_dir,
889                &output_tsv,
890                &catalog,
891            )?;
892
893            if !self.config.plsdb.skip {
894                self.extract_flanking_from_plsdb(
895                    &merged_hits,
896                    &plsdb_dir,
897                    &output_tsv,
898                    &catalog,
899                )?;
900            }
901            state.flanking_extracted = true;
902            state.complete_step("alignment_flanking");
903            state.save(&state_path)?;
904
905            eprintln!("    Cleaning up PAF file...");
906            std::fs::remove_file(&paf_output).ok();
907        } else {
908            eprintln!("[7-9] Batch pipeline already completed, skipping...");
909        }
910
911        if temp_dir.exists() {
912            eprintln!("\nCleaning up temp files...");
913            std::fs::remove_dir_all(&temp_dir).ok();
914        }
915
916        let fdb_path = self.output_dir.join("flanking.fdb");
917        if !state.is_completed("fdb_build") && output_tsv.exists() {
918            state.update_step("fdb_build");
919            state.save(&state_path)?;
920            eprintln!("\n[10/10] Building FDB from TSV (external sort + zstd)...");
921
922            let buffer_size_mb = 1024;
923            crate::fdb::build(&output_tsv, &fdb_path, buffer_size_mb, self.threads)?;
924
925            state.complete_step("fdb_build");
926            state.save(&state_path)?;
927        } else if fdb_path.exists() {
928            eprintln!("[10/10] FDB already built, skipping...");
929        }
930
931        eprintln!("\n============================================================");
932        eprintln!(" Build Complete");
933        eprintln!("============================================================");
934        eprintln!("Output files:");
935        eprintln!("  - {}", output_tsv.display());
936        if fdb_path.exists() {
937            let fdb_size = std::fs::metadata(&fdb_path).map(|m| m.len()).unwrap_or(0);
938            eprintln!("  - {} ({:.1} MB)", fdb_path.display(), fdb_size as f64 / 1024.0 / 1024.0);
939        }
940        eprintln!("  - {}", catalog_path.display());
941
942        Ok(())
943    }
944
945    fn download_assembly_summaries(&self) -> Result<Vec<AssemblyInfo>> {
946        let mut assemblies = Vec::new();
947
948        for kingdom in ["bacteria", "archaea"] {
949            let url = format!(
950                "{}/genbank/{}/assembly_summary.txt",
951                NCBI_FTP_BASE, kingdom
952            );
953            eprintln!("    Downloading genbank/{}...", kingdom);
954
955            match self.download_and_parse_assembly_summary(&url, None) {
956                Ok((mut asm, _)) => {
957                    eprintln!("      Found {} assemblies (Complete Genome + Chromosome)", asm.len());
958                    assemblies.append(&mut asm);
959                }
960                Err(e) => {
961                    eprintln!("      Warning: Failed to download {}: {}", url, e);
962                }
963            }
964        }
965
966        eprintln!("    Total assemblies: {}", assemblies.len());
967        Ok(assemblies)
968    }
969
970    fn download_plsdb(&self, plsdb_dir: &Path) -> Result<()> {
971        let meta_tar = plsdb_dir.join("meta.tar.gz");
972        let fasta_path = plsdb_dir.join("sequences.fasta");
973
974        if !plsdb_dir.join("nuccore.csv").exists() {
975            eprintln!("    Downloading PLSDB metadata...");
976            self.download_file(PLSDB_META_URL, &meta_tar)?;
977
978            eprintln!("    Extracting metadata...");
979            let status = Command::new("tar")
980                .args(["-xzf", meta_tar.to_str().unwrap(), "-C", plsdb_dir.to_str().unwrap()])
981                .status()
982                .with_context(|| "Failed to extract PLSDB metadata")?;
983
984            if !status.success() {
985                anyhow::bail!("tar extraction failed");
986            }
987
988            std::fs::remove_file(&meta_tar).ok();
989        } else {
990            eprintln!("    PLSDB metadata already exists, skipping download...");
991        }
992
993        if !fasta_path.exists() {
994            eprintln!("    Downloading PLSDB sequences (~7GB)...");
995            self.download_file(PLSDB_FASTA_URL, &fasta_path)?;
996        } else {
997            eprintln!("    PLSDB sequences already exist, skipping download...");
998        }
999
1000        Ok(())
1001    }
1002
1003    fn download_taxdump(&self, taxonomy_dir: &Path) -> Result<()> {
1004        let tar_path = taxonomy_dir.join("taxdump.tar.gz");
1005        let names_path = taxonomy_dir.join("names.dmp");
1006
1007        if names_path.exists() {
1008            eprintln!("    Taxdump already exists, skipping download...");
1009            return Ok(());
1010        }
1011
1012        std::fs::create_dir_all(taxonomy_dir)?;
1013
1014        eprintln!("    Downloading NCBI taxdump (~60MB)...");
1015        self.download_file(NCBI_TAXDUMP_URL, &tar_path)?;
1016
1017        eprintln!("    Extracting taxdump...");
1018        let status = Command::new("tar")
1019            .args(["-xzf", tar_path.to_str().unwrap(), "-C", taxonomy_dir.to_str().unwrap()])
1020            .status()
1021            .with_context(|| "Failed to extract taxdump")?;
1022
1023        if !status.success() {
1024            anyhow::bail!("tar extraction failed");
1025        }
1026
1027        std::fs::remove_file(&tar_path).ok();
1028
1029        Ok(())
1030    }
1031
1032    fn download_file(&self, url: &str, output_path: &Path) -> Result<()> {
1033        for attempt in 0..3 {
1034            match self.download_file_once(url, output_path) {
1035                Ok(_) => return Ok(()),
1036                Err(e) if attempt < 2 => {
1037                    eprintln!("      Download failed (attempt {}): {}", attempt + 1, e);
1038                    eprintln!("      Retrying in 5 seconds...");
1039                    std::thread::sleep(Duration::from_secs(5));
1040                    continue;
1041                }
1042                Err(e) => return Err(e),
1043            }
1044        }
1045        Ok(())
1046    }
1047
1048    fn download_file_once(&self, url: &str, output_path: &Path) -> Result<()> {
1049        let response = ureq::get(url)
1050            .set("User-Agent", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36")
1051            .timeout(Duration::from_secs(7200))
1052            .call()
1053            .with_context(|| format!("Failed to download {}", url))?;
1054
1055        let mut file = File::create(output_path)?;
1056        let mut reader = response.into_reader();
1057
1058        let mut buffer = [0u8; 65536];
1059        let mut total = 0usize;
1060        loop {
1061            match reader.read(&mut buffer) {
1062                Ok(0) => break,
1063                Ok(n) => {
1064                    file.write_all(&buffer[..n])?;
1065                    total += n;
1066                    if total.is_multiple_of(100 * 1024 * 1024) {
1067                        eprintln!("      Downloaded {} MB...", total / (1024 * 1024));
1068                    }
1069                }
1070                Err(e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
1071                Err(e) => return Err(e.into()),
1072            }
1073        }
1074        eprintln!("      Total: {} MB", total / (1024 * 1024));
1075
1076        Ok(())
1077    }
1078
1079    fn download_and_parse_assembly_summary(
1080        &self,
1081        url: &str,
1082        exclude_set: Option<&FxHashSet<String>>,
1083    ) -> Result<(Vec<AssemblyInfo>, Vec<String>)> {
1084        let response = ureq::get(url)
1085            .timeout(Duration::from_secs(600))
1086            .call()
1087            .with_context(|| format!("Failed to download {}", url))?;
1088
1089        let reader = BufReader::new(response.into_reader());
1090        let mut assemblies = Vec::new();
1091        let mut paired_gca = Vec::new();
1092        let mut skipped_duplicates = 0usize;
1093
1094        for line in reader.lines() {
1095            let line = line?;
1096            if line.starts_with('#') {
1097                continue;
1098            }
1099
1100            let fields: Vec<&str> = line.split('\t').collect();
1101            if fields.len() < 20 {
1102                continue;
1103            }
1104
1105            let accession = fields[0];
1106            let assembly_level = fields[11];
1107
1108            if assembly_level != "Complete Genome" && assembly_level != "Chromosome" {
1109                continue;
1110            }
1111
1112            let ftp_path = fields[19];
1113            if ftp_path == "na" || ftp_path.is_empty() {
1114                continue;
1115            }
1116
1117            if let Some(exclude) = exclude_set {
1118
1119                let acc_base = accession.split('.').next().unwrap_or(accession);
1120                if exclude.contains(accession) || exclude.contains(acc_base) {
1121                    skipped_duplicates += 1;
1122                    continue;
1123                }
1124            }
1125
1126            if fields.len() > 18 && accession.starts_with("GCF_") {
1127                let paired_asm = fields[17];
1128                let paired_comp = fields[18];
1129                if paired_asm != "na" && !paired_asm.is_empty() && paired_comp == "identical" {
1130                    paired_gca.push(paired_asm.to_string());
1131
1132                    if let Some(base) = paired_asm.split('.').next() {
1133                        paired_gca.push(base.to_string());
1134                    }
1135                }
1136            }
1137
1138            assemblies.push(AssemblyInfo {
1139                accession: accession.to_string(),
1140                taxid: fields[5].to_string(),
1141                species_taxid: fields[6].to_string(),
1142                organism_name: fields[7].to_string(),
1143            });
1144        }
1145
1146        if skipped_duplicates > 0 {
1147            eprintln!("      Skipped {} duplicates (have identical RefSeq)", skipped_duplicates);
1148        }
1149
1150        Ok((assemblies, paired_gca))
1151    }
1152
1153    fn load_plsdb_plasmids(&self, plsdb_dir: &Path) -> Result<Vec<PlasmidInfo>> {
1154        let nuccore_path = plsdb_dir.join("nuccore.csv");
1155        let taxonomy_path = plsdb_dir.join("taxonomy.csv");
1156
1157        let mut taxonomy: FxHashMap<String, (String, String)> = FxHashMap::default();
1158        if taxonomy_path.exists() {
1159            let tax_file = File::open(&taxonomy_path)?;
1160            let tax_reader = BufReader::new(tax_file);
1161
1162            for (idx, line) in tax_reader.lines().enumerate() {
1163                let line = line?;
1164                if idx == 0 {
1165                    continue;
1166                }
1167
1168                let fields: Vec<&str> = line.split(',').collect();
1169                if fields.len() >= 10 {
1170                    let uid = fields[0].to_string();
1171                    let genus = fields[8].to_string();
1172                    let species = fields[9].to_string();
1173                    taxonomy.insert(uid, (genus, species));
1174                }
1175            }
1176        }
1177
1178        let mut plasmids = Vec::new();
1179        let nuc_file = File::open(&nuccore_path)?;
1180        let nuc_reader = BufReader::new(nuc_file);
1181
1182        for (idx, line) in nuc_reader.lines().enumerate() {
1183            let line = line?;
1184            if idx == 0 {
1185                continue;
1186            }
1187
1188            let fields = parse_csv_line(&line);
1189            if fields.len() < 15 {
1190                continue;
1191            }
1192
1193            let completeness = &fields[4];
1194            let topology = &fields[14];
1195            let assembly_uid = &fields[9];
1196
1197            if completeness != "complete" {
1198                continue;
1199            }
1200            if topology != "circular" {
1201                continue;
1202            }
1203
1204            if assembly_uid != "-1" {
1205                continue;
1206            }
1207
1208            let taxonomy_uid = &fields[12];
1209            let (genus, species) = taxonomy.get(taxonomy_uid)
1210                .cloned()
1211                .unwrap_or_else(|| ("Unknown".to_string(), "Unknown".to_string()));
1212
1213            plasmids.push(PlasmidInfo {
1214                accession: fields[1].clone(),
1215                taxonomy_uid: taxonomy_uid.clone(),
1216                genus,
1217                species,
1218            });
1219        }
1220
1221        Ok(plasmids)
1222    }
1223
1224    fn download_and_align_batches(
1225        &self,
1226        assemblies: &[AssemblyInfo],
1227        genomes_dir: &Path,
1228        temp_dir: &Path,
1229        paf_output: &Path,
1230        _catalog: &GenomeCatalog,
1231    ) -> Result<usize> {
1232        let all_accessions_path = temp_dir.join("accessions_all.txt");
1233        let done_accessions_path = temp_dir.join("accessions_done.txt");
1234
1235        let accessions: Vec<String> = assemblies.iter()
1236            .map(|a| a.accession.clone())
1237            .collect();
1238
1239        if !all_accessions_path.exists() {
1240            eprintln!("    Saving {} accessions to temp file...", accessions.len());
1241            let mut file = File::create(&all_accessions_path)?;
1242            for acc in &accessions {
1243                writeln!(file, "{}", acc)?;
1244            }
1245        }
1246
1247        let mut done_set: FxHashSet<String> = FxHashSet::default();
1248        if done_accessions_path.exists() {
1249            let file = File::open(&done_accessions_path)?;
1250            let reader = BufReader::new(file);
1251            for acc in reader.lines().map_while(Result::ok) {
1252                done_set.insert(acc.trim().to_string());
1253            }
1254
1255            if !done_set.is_empty() {
1256                if !paf_output.exists() {
1257                    eprintln!("    WARNING: PAF file missing but {} genomes marked as done", done_set.len());
1258                    eprintln!("    Clearing done list and reprocessing all genomes...");
1259                    done_set.clear();
1260                    std::fs::remove_file(&done_accessions_path).ok();
1261                } else {
1262
1263                    let paf_size = std::fs::metadata(paf_output)?.len();
1264                    if paf_size == 0 {
1265                        eprintln!("    WARNING: PAF file is empty but {} genomes marked as done", done_set.len());
1266                        eprintln!("    Clearing done list and reprocessing all genomes...");
1267                        done_set.clear();
1268                        std::fs::remove_file(&done_accessions_path).ok();
1269                    } else {
1270
1271                        let mut missing_genomes = Vec::new();
1272                        for acc in &done_set {
1273                            let genome_path = genomes_dir.join(format!("{}.fna", acc));
1274                            if !genome_path.exists() {
1275                                missing_genomes.push(acc.clone());
1276                            }
1277                        }
1278                        if !missing_genomes.is_empty() {
1279                            eprintln!("    WARNING: {} genome files missing for done accessions", missing_genomes.len());
1280                            for acc in &missing_genomes {
1281                                done_set.remove(acc);
1282                            }
1283                            eprintln!("    Removed missing genomes from done list, will re-download");
1284                        }
1285
1286                        if !done_set.is_empty() {
1287                            eprintln!("    Resume: {} already processed (PAF: {} MB)",
1288                                     done_set.len(), paf_size / 1024 / 1024);
1289                        }
1290                    }
1291                }
1292            }
1293        }
1294
1295        let remaining: Vec<&String> = accessions.iter()
1296            .filter(|a| !done_set.contains(*a))
1297            .collect();
1298
1299        if remaining.is_empty() {
1300            eprintln!("    All {} genomes already processed", accessions.len());
1301            return Ok(accessions.len());
1302        }
1303
1304        eprintln!("    Remaining: {}/{} genomes to process", remaining.len(), accessions.len());
1305
1306        let bytes_per_batch = API_BATCH_SIZE * 3 * 1024 * 1024;
1307        let queue_capacity = std::cmp::max(
1308            1,
1309            (self.config.queue_buffer_gb as usize * 1024 * 1024 * 1024) / bytes_per_batch
1310        );
1311        eprintln!("    Queue capacity: {} batches (based on {} GB buffer)", queue_capacity, self.config.queue_buffer_gb);
1312
1313        let batches: Vec<Vec<String>> = remaining.chunks(API_BATCH_SIZE)
1314            .map(|chunk| chunk.iter().map(|s| (*s).clone()).collect())
1315            .collect();
1316        let total_batches = batches.len();
1317
1318        let (tx, rx) = mpsc::sync_channel::<(usize, Vec<PathBuf>)>(queue_capacity);
1319
1320        let downloaded = Arc::new(AtomicUsize::new(done_set.len()));
1321        let aligned = Arc::new(AtomicUsize::new(done_set.len()));
1322        let download_error = Arc::new(AtomicBool::new(false));
1323
1324        let genomes_dir = genomes_dir.to_path_buf();
1325        let temp_dir_producer = temp_dir.to_path_buf();
1326        let temp_dir_consumer = temp_dir.to_path_buf();
1327        let email = self.email.clone();
1328        let downloaded_clone = Arc::clone(&downloaded);
1329        let download_error_clone = Arc::clone(&download_error);
1330        let total_accessions = accessions.len();
1331
1332        let producer = thread::spawn(move || -> Result<()> {
1333            for (batch_idx, batch) in batches.into_iter().enumerate() {
1334                if download_error_clone.load(Ordering::Relaxed) {
1335                    break;
1336                }
1337
1338                let zip_path = temp_dir_producer.join(format!("batch_{:04}.zip", batch_idx));
1339
1340                let url = format!("{}/genome/download", NCBI_DATASETS_API);
1341                let acc_list: Vec<&str> = batch.iter().map(|s| s.as_str()).collect();
1342                let request_body = serde_json::json!({
1343                    "accessions": acc_list,
1344                    "include_annotation_type": ["GENOME_FASTA"]
1345                });
1346
1347                let response = ureq::post(&url)
1348                    .set("Content-Type", "application/json")
1349                    .set("ncbi-client-id", &email)
1350                    .timeout(Duration::from_secs(3600))
1351                    .send_json(&request_body);
1352
1353                match response {
1354                    Ok(resp) => {
1355
1356                        let mut zip_file = File::create(&zip_path)?;
1357                        let mut reader = resp.into_reader();
1358                        std::io::copy(&mut reader, &mut zip_file)?;
1359                        drop(zip_file);
1360
1361                        let genome_files = Self::extract_batch_to_files_static(&zip_path, &genomes_dir)?;
1362
1363                        std::fs::remove_file(&zip_path).ok();
1364
1365                        downloaded_clone.fetch_add(batch.len(), Ordering::Relaxed);
1366                        eprintln!("    [Download] Batch {}/{}: {} genomes ({}/{})",
1367                                 batch_idx + 1, total_batches, batch.len(),
1368                                 downloaded_clone.load(Ordering::Relaxed), total_accessions);
1369
1370                        if tx.send((batch_idx, genome_files)).is_err() {
1371                            break;
1372                        }
1373                    }
1374                    Err(e) => {
1375                        eprintln!("    [Download] Batch {} failed: {}", batch_idx + 1, e);
1376                        download_error_clone.store(true, Ordering::Relaxed);
1377                        break;
1378                    }
1379                }
1380
1381                std::thread::sleep(Duration::from_millis(200));
1382            }
1383            Ok(())
1384        });
1385
1386        let paf_file = Arc::new(Mutex::new(
1387            std::fs::OpenOptions::new()
1388                .create(true)
1389                .append(true)
1390                .open(paf_output)?
1391        ));
1392        let done_file = Arc::new(Mutex::new(
1393            std::fs::OpenOptions::new()
1394                .create(true)
1395                .append(true)
1396                .open(&done_accessions_path)?
1397        ));
1398
1399        let amr_db = self.amr_db_path.clone();
1400        let max_threads = self.threads;
1401
1402        while let Ok((batch_idx, genome_files)) = rx.recv() {
1403            if genome_files.is_empty() {
1404                continue;
1405            }
1406
1407            let buckets = bucket_genomes_by_size(&genome_files);
1408            let mut batch_hits: Vec<PafHit> = Vec::new();
1409
1410            for (category, bucket_files) in &buckets {
1411                if bucket_files.is_empty() {
1412                    continue;
1413                }
1414
1415                let bucket_threads = category.thread_count(max_threads);
1416                let bucket_suffix = format!("batch_{:04}_{:?}", batch_idx, category);
1417                let bucket_fasta = temp_dir_consumer.join(format!("{}.fas", bucket_suffix));
1418
1419                {
1420                    let mut fasta_writer = BufWriter::new(File::create(&bucket_fasta)?);
1421                    for genome_path in bucket_files {
1422                        let filename = genome_path.file_name()
1423                            .and_then(|n| n.to_str())
1424                            .unwrap_or("unknown.fna");
1425
1426                        let file = File::open(genome_path)?;
1427                        let reader = BufReader::new(file);
1428                        for line in reader.lines() {
1429                            let line = line?;
1430                            if let Some(stripped) = line.strip_prefix('>') {
1431                                let contig_id = stripped.split_whitespace().next().unwrap_or(stripped);
1432                                writeln!(fasta_writer, ">{}|{}", contig_id, filename)?;
1433                            } else {
1434                                writeln!(fasta_writer, "{}", line)?;
1435                            }
1436                        }
1437                    }
1438                    fasta_writer.flush()?;
1439                }
1440
1441                let bucket_paf = temp_dir_consumer.join(format!("{}.paf", bucket_suffix));
1442                let status = Command::new("minimap2")
1443                    .args([
1444                        "-cx", "asm20",
1445                        "-t", &bucket_threads.to_string(),
1446                        amr_db.to_str().unwrap(),
1447                        bucket_fasta.to_str().unwrap(),
1448                        "-o", bucket_paf.to_str().unwrap(),
1449                    ])
1450                    .stdout(std::process::Stdio::null())
1451                    .stderr(std::process::Stdio::null())
1452                    .status();
1453
1454                if let Ok(s) = status {
1455                    if s.success() && bucket_paf.exists() {
1456
1457                        let paf_content = std::fs::read_to_string(&bucket_paf)?;
1458                        let hits: Vec<PafHit> = paf_content
1459                            .lines()
1460                            .filter_map(PafHit::from_paf_line)
1461                            .collect();
1462                        batch_hits.extend(hits);
1463                    }
1464                }
1465
1466                std::fs::remove_file(&bucket_fasta).ok();
1467                std::fs::remove_file(&bucket_paf).ok();
1468            }
1469
1470            let dedup_hits = deduplicate_paf_hits(batch_hits);
1471
1472            {
1473                let mut paf = paf_file.lock().unwrap();
1474                for hit in &dedup_hits {
1475                    writeln!(paf, "{}", hit.raw_line)?;
1476                }
1477                paf.flush()?;
1478            }
1479
1480            {
1481                let mut done = done_file.lock().unwrap();
1482                for genome_path in &genome_files {
1483                    if let Some(stem) = genome_path.file_stem().and_then(|s| s.to_str()) {
1484                        writeln!(done, "{}", stem)?;
1485                    }
1486                }
1487                done.flush()?;
1488            }
1489
1490            let count = genome_files.len();
1491            aligned.fetch_add(count, Ordering::Relaxed);
1492
1493            let bucket_info: Vec<String> = buckets.iter()
1494                .map(|(cat, files)| format!("{}={}", cat.name().split_whitespace().next().unwrap_or("?"), files.len()))
1495                .collect();
1496            eprintln!("    [Align] Batch {}/{}: {} genomes [{}] ({}/{})",
1497                     batch_idx + 1, total_batches, count,
1498                     bucket_info.join(", "),
1499                     aligned.load(Ordering::Relaxed), total_accessions);
1500        }
1501
1502        let _ = producer.join();
1503
1504        Ok(aligned.load(Ordering::Relaxed))
1505    }
1506
1507    fn extract_batch_to_files_static(zip_path: &Path, genomes_dir: &Path) -> Result<Vec<PathBuf>> {
1508        let zip_file = File::open(zip_path)?;
1509        let mut archive = zip::ZipArchive::new(zip_file)?;
1510        let mut genome_files = Vec::new();
1511
1512        for i in 0..archive.len() {
1513            let mut file = archive.by_index(i)?;
1514            let name = file.name().to_string();
1515
1516            if name.ends_with("_genomic.fna") || name.ends_with("_genomic.fasta") {
1517                let parts: Vec<&str> = name.split('/').collect();
1518                if let Some(acc_dir) = parts.iter().find(|p| p.starts_with("GCA_") || p.starts_with("GCF_")) {
1519                    let filename = format!("{}.fna", acc_dir);
1520                    let output_path = genomes_dir.join(&filename);
1521
1522                    let mut content = Vec::new();
1523                    file.read_to_end(&mut content)?;
1524                    std::fs::write(&output_path, &content)?;
1525
1526                    genome_files.push(output_path);
1527                }
1528            }
1529        }
1530
1531        Ok(genome_files)
1532    }
1533
1534    fn align_plsdb(
1535        &self,
1536        plasmids: &[PlasmidInfo],
1537        plsdb_dir: &Path,
1538        temp_dir: &Path,
1539        paf_output: &Path,
1540    ) -> Result<usize> {
1541        let fasta_path = plsdb_dir.join("sequences.fasta");
1542
1543        if !fasta_path.exists() {
1544            eprintln!("    Warning: sequences.fasta not found in PLSDB directory");
1545            return Ok(0);
1546        }
1547
1548        let target_accs: FxHashSet<_> = plasmids.iter()
1549            .map(|p| p.accession.clone())
1550            .collect();
1551
1552        if target_accs.is_empty() {
1553            eprintln!("    No PLSDB plasmids to process");
1554            return Ok(0);
1555        }
1556
1557        let acc_list: Vec<_> = target_accs.iter().collect();
1558        let batches: Vec<_> = acc_list.chunks(API_BATCH_SIZE).collect();
1559        let total_batches = batches.len();
1560        let mut total_processed = 0usize;
1561
1562        eprintln!("    Processing {} PLSDB plasmids in {} batches...",
1563                 target_accs.len(), total_batches);
1564
1565        let file = File::open(&fasta_path)?;
1566        let reader = BufReader::new(file);
1567
1568        let mut seq_map: FxHashMap<String, String> = FxHashMap::default();
1569        let mut current_acc: Option<String> = None;
1570        let mut current_seq = String::new();
1571
1572        for line in reader.lines() {
1573            let line = line?;
1574            if line.starts_with('>') {
1575                if let Some(acc) = current_acc.take() {
1576                    if target_accs.contains(&acc) {
1577                        seq_map.insert(acc, std::mem::take(&mut current_seq));
1578                    }
1579                }
1580                let header = line.trim_start_matches('>');
1581                let acc = header.split_whitespace().next().unwrap_or("").to_string();
1582                current_acc = Some(acc);
1583                current_seq.clear();
1584            } else {
1585                current_seq.push_str(line.trim());
1586            }
1587        }
1588        if let Some(acc) = current_acc {
1589            if target_accs.contains(&acc) {
1590                seq_map.insert(acc, current_seq);
1591            }
1592        }
1593
1594        eprintln!("    Loaded {} target sequences from PLSDB", seq_map.len());
1595
1596        for (batch_idx, batch) in batches.iter().enumerate() {
1597
1598            let mut size_buckets: FxHashMap<GenomeSizeCategory, Vec<(&str, &str)>> = FxHashMap::default();
1599
1600            for acc in *batch {
1601                if let Some(seq) = seq_map.get(*acc) {
1602
1603                    let category = GenomeSizeCategory::from_size(seq.len() as u64);
1604                    size_buckets.entry(category)
1605                        .or_default()
1606                        .push((acc.as_str(), seq.as_str()));
1607                }
1608            }
1609
1610            let mut batch_hits: Vec<PafHit> = Vec::new();
1611
1612            for (category, bucket_seqs) in &size_buckets {
1613                if bucket_seqs.is_empty() {
1614                    continue;
1615                }
1616
1617                let bucket_threads = category.thread_count(self.threads);
1618                let bucket_suffix = format!("plsdb_batch_{:04}_{:?}", batch_idx, category);
1619                let bucket_fasta = temp_dir.join(format!("{}.fas", bucket_suffix));
1620
1621                {
1622                    let mut fasta_writer = BufWriter::new(File::create(&bucket_fasta)?);
1623                    for (acc, seq) in bucket_seqs {
1624                        let filename = format!("{}.fna", acc.replace('.', "_"));
1625                        writeln!(fasta_writer, ">{}|{}", acc, filename)?;
1626                        writeln!(fasta_writer, "{}", seq)?;
1627                    }
1628                    fasta_writer.flush()?;
1629                }
1630
1631                let bucket_paf = temp_dir.join(format!("{}.paf", bucket_suffix));
1632                let status = Command::new("minimap2")
1633                    .args([
1634                        "-cx", "asm20",
1635                        "-t", &bucket_threads.to_string(),
1636                        self.amr_db_path.to_str().unwrap(),
1637                        bucket_fasta.to_str().unwrap(),
1638                        "-o", bucket_paf.to_str().unwrap(),
1639                    ])
1640                    .stdout(std::process::Stdio::null())
1641                    .stderr(std::process::Stdio::null())
1642                    .status();
1643
1644                if let Ok(s) = status {
1645                    if s.success() && bucket_paf.exists() {
1646
1647                        let paf_content = std::fs::read_to_string(&bucket_paf)?;
1648                        let hits: Vec<PafHit> = paf_content
1649                            .lines()
1650                            .filter_map(PafHit::from_paf_line)
1651                            .collect();
1652                        batch_hits.extend(hits);
1653                    }
1654                }
1655
1656                std::fs::remove_file(&bucket_fasta).ok();
1657                std::fs::remove_file(&bucket_paf).ok();
1658            }
1659
1660            let dedup_hits = deduplicate_paf_hits(batch_hits);
1661
1662            {
1663                let mut paf_file = std::fs::OpenOptions::new()
1664                    .create(true)
1665                    .append(true)
1666                    .open(paf_output)?;
1667                for hit in &dedup_hits {
1668                    writeln!(paf_file, "{}", hit.raw_line)?;
1669                }
1670            }
1671
1672            total_processed += batch.len();
1673
1674            let bucket_info: Vec<String> = size_buckets.iter()
1675                .map(|(cat, seqs)| format!("{}={}", cat.name().split_whitespace().next().unwrap_or("?"), seqs.len()))
1676                .collect();
1677            eprintln!("    [PLSDB] Batch {}/{}: {} plasmids [{}] ({}/{})",
1678                     batch_idx + 1, total_batches, batch.len(),
1679                     bucket_info.join(", "),
1680                     total_processed, target_accs.len());
1681        }
1682
1683        for (acc, seq) in &seq_map {
1684            let out_path = plsdb_dir.join(format!("{}.fna", acc.replace('.', "_")));
1685            if !out_path.exists() {
1686                let mut out_file = File::create(&out_path)?;
1687                writeln!(out_file, ">{}", acc)?;
1688                writeln!(out_file, "{}", seq)?;
1689            }
1690        }
1691
1692        eprintln!("    Aligned {} PLSDB sequences", total_processed);
1693        Ok(total_processed)
1694    }
1695
1696    fn extract_flanking_from_plsdb(
1697        &self,
1698        hits_path: &Path,
1699        plsdb_dir: &Path,
1700        output_path: &Path,
1701        catalog: &GenomeCatalog,
1702    ) -> Result<()> {
1703        let hits_file = File::open(hits_path)?;
1704        let reader = BufReader::new(hits_file);
1705
1706        let mut plsdb_hits: FxHashMap<String, Vec<(String, String, usize, usize)>> = FxHashMap::default();
1707
1708        for (i, line) in reader.lines().enumerate() {
1709            let line = line?;
1710
1711            if i == 0 && line.starts_with("gene") {
1712                continue;
1713            }
1714
1715            let fields: Vec<&str> = line.split('\t').collect();
1716            if fields.len() < 4 {
1717                continue;
1718            }
1719
1720            let gene = fields[0];
1721            let contig_file = fields[1];
1722            let start: usize = fields[2].parse().unwrap_or(0);
1723            let end: usize = fields[3].parse().unwrap_or(0);
1724
1725            let (contig_id, genome_file) = if let Some(pipe_pos) = contig_file.rfind('|') {
1726                (contig_file[..pipe_pos].to_string(), contig_file[pipe_pos + 1..].to_string())
1727            } else {
1728                continue;
1729            };
1730
1731            if genome_file.starts_with("NZ_") || genome_file.starts_with("CP") ||
1732               genome_file.starts_with("AP") || genome_file.starts_with("NC_") {
1733                plsdb_hits.entry(genome_file)
1734                    .or_default()
1735                    .push((gene.to_string(), contig_id, start, end));
1736            }
1737        }
1738
1739        if plsdb_hits.is_empty() {
1740            return Ok(());
1741        }
1742
1743        let mut writer = std::fs::OpenOptions::new()
1744            .create(true)
1745            .append(true)
1746            .open(output_path)?;
1747
1748        let mut extracted = 0usize;
1749
1750        for (genome_file, hits) in &plsdb_hits {
1751            let genome_path = plsdb_dir.join(genome_file);
1752            if !genome_path.exists() {
1753                continue;
1754            }
1755
1756            let sequences = match self.load_genome_sequences(&genome_path) {
1757                Ok(seqs) => seqs,
1758                Err(_) => continue,
1759            };
1760
1761            let seq_map: FxHashMap<&str, &str> = sequences.iter()
1762                .map(|(header, seq)| {
1763                    let key = header.split_whitespace().next().unwrap_or(header.as_str());
1764                    (key, seq.as_str())
1765                })
1766                .collect();
1767
1768            let genome_acc = genome_file.trim_end_matches(".fna");
1769            let base_genus = catalog.get_genus(genome_acc).map(|s| s.to_string());
1770
1771            for (gene, contig_id, start, end) in hits {
1772                let contig_seq = match seq_map.get(contig_id.as_str()) {
1773                    Some(seq) => *seq,
1774                    None => continue,
1775                };
1776
1777                let contig_len = contig_seq.len();
1778                if *start >= contig_len || *end > contig_len || start >= end {
1779                    continue;
1780                }
1781
1782                let genus = base_genus.clone().unwrap_or_else(|| "Unknown".to_string());
1783
1784                let upstream_start = start.saturating_sub(self.config.flanking_length);
1785                let upstream = &contig_seq[upstream_start..*start];
1786
1787                let downstream_end = std::cmp::min(*end + self.config.flanking_length, contig_len);
1788                let downstream = &contig_seq[*end..downstream_end];
1789
1790                writeln!(writer, "{}\t{}\t{}\t{}\t{}\t{}\t{}",
1791                        gene, contig_id, genus, start, end, upstream, downstream)?;
1792                extracted += 1;
1793            }
1794        }
1795
1796        if extracted > 0 {
1797            eprintln!("    Extracted {} PLSDB flanking sequences", extracted);
1798        }
1799        Ok(())
1800    }
1801
1802    fn convert_paf_to_merged(
1803        &self,
1804        paf_path: &Path,
1805        output_path: &Path,
1806    ) -> Result<()> {
1807        let mut hits: FxHashSet<(String, String, usize, usize)> = FxHashSet::default();
1808
1809        if paf_path.exists() {
1810            let file = File::open(paf_path)?;
1811            let reader = BufReader::new(file);
1812            for line in reader.lines() {
1813                let line = line?;
1814                let fields: Vec<&str> = line.split('\t').collect();
1815                if fields.len() < 12 {
1816                    continue;
1817                }
1818                let contig_file = fields[0].to_string();
1819                let gene = fields[5].to_string();
1820                let start: usize = fields[2].parse().unwrap_or(0);
1821                let end: usize = fields[3].parse().unwrap_or(0);
1822                hits.insert((gene, contig_file, start, end));
1823            }
1824        }
1825
1826        let mut writer = BufWriter::new(File::create(output_path)?);
1827        for (gene, contig_file, start, end) in &hits {
1828            writeln!(writer, "{}\t{}\t{}\t{}", gene, contig_file, start, end)?;
1829        }
1830
1831        eprintln!("    Found {} unique hits", hits.len());
1832        Ok(())
1833    }
1834
1835    fn extract_flanking_sequences(
1836        &self,
1837        hits_path: &Path,
1838        genomes_dir: &Path,
1839        output_path: &Path,
1840        catalog: &GenomeCatalog,
1841    ) -> Result<()> {
1842
1843        if output_path.exists() {
1844            let metadata = std::fs::metadata(output_path)?;
1845            if metadata.len() > 0 {
1846                eprintln!("    Flanking sequences file already exists ({} MB), skipping extraction...",
1847                         metadata.len() / (1024 * 1024));
1848                return Ok(());
1849            }
1850        }
1851
1852        let hits_file = File::open(hits_path)?;
1853        let reader = BufReader::new(hits_file);
1854
1855        let mut genome_hits: FxHashMap<String, Vec<(String, String, usize, usize)>> = FxHashMap::default();
1856
1857        for (i, line) in reader.lines().enumerate() {
1858            let line = line?;
1859
1860            if i == 0 && line.starts_with("gene") {
1861                continue;
1862            }
1863
1864            let fields: Vec<&str> = line.split('\t').collect();
1865            if fields.len() < 4 {
1866                continue;
1867            }
1868
1869            let gene = fields[0];
1870            let contig_file = fields[1];
1871            let start: usize = fields[2].parse().unwrap_or(0);
1872            let end: usize = fields[3].parse().unwrap_or(0);
1873
1874            let (contig_id, genome_file) = if let Some(pipe_pos) = contig_file.rfind('|') {
1875                (contig_file[..pipe_pos].to_string(), contig_file[pipe_pos + 1..].to_string())
1876            } else {
1877
1878                (contig_file.to_string(), extract_genome_file(contig_file))
1879            };
1880
1881            genome_hits.entry(genome_file)
1882                .or_default()
1883                .push((gene.to_string(), contig_id, start, end));
1884        }
1885
1886        let output_file = File::create(output_path)?;
1887        let mut writer = BufWriter::new(output_file);
1888
1889        writeln!(writer, "Gene\tContig\tGenus\tStart\tEnd\tUpstream\tDownstream")?;
1890
1891        let total_genomes = genome_hits.len();
1892        let mut processed = 0;
1893        let mut genus_found = 0usize;
1894        let mut genus_unknown = 0usize;
1895        let mut sequences_extracted = 0usize;
1896
1897        for (genome_file, hits) in &genome_hits {
1898            let genome_path = genomes_dir.join(genome_file);
1899            if !genome_path.exists() {
1900                continue;
1901            }
1902
1903            let sequences = match self.load_genome_sequences(&genome_path) {
1904                Ok(seqs) => seqs,
1905                Err(_) => continue,
1906            };
1907
1908            let seq_map: FxHashMap<&str, &str> = sequences.iter()
1909                .map(|(header, seq)| {
1910                    let key = header.split_whitespace().next().unwrap_or(header.as_str());
1911                    (key, seq.as_str())
1912                })
1913                .collect();
1914
1915            let genome_acc = genome_file.trim_end_matches(".fna");
1916            let base_genus = catalog.get_genus(genome_acc).map(|s| s.to_string());
1917
1918            for (gene, contig_id, start, end) in hits {
1919
1920                let contig_seq = match seq_map.get(contig_id.as_str()) {
1921                    Some(seq) => *seq,
1922                    None => continue,
1923                };
1924
1925                let contig_len = contig_seq.len();
1926
1927                if *start >= contig_len || *end > contig_len || start >= end {
1928                    continue;
1929                }
1930
1931                let upstream_start = start.saturating_sub(self.config.flanking_length);
1932                let downstream_end = std::cmp::min(end + self.config.flanking_length, contig_len);
1933
1934                let upstream = if *start > 0 && upstream_start < *start {
1935                    &contig_seq[upstream_start..*start]
1936                } else {
1937                    ""
1938                };
1939
1940                let downstream = if *end < contig_len && *end < downstream_end {
1941                    &contig_seq[*end..downstream_end]
1942                } else {
1943                    ""
1944                };
1945
1946                let genus = if let Some(ref g) = base_genus {
1947                    genus_found += 1;
1948                    g.clone()
1949                } else if let Some(g) = catalog.get_genus(contig_id) {
1950                    genus_found += 1;
1951                    g.to_string()
1952                } else {
1953                    genus_unknown += 1;
1954                    "Unknown".to_string()
1955                };
1956
1957                writeln!(
1958                    writer,
1959                    "{}\t{}\t{}\t{}\t{}\t{}\t{}",
1960                    gene, contig_id, genus, start, end, upstream, downstream
1961                )?;
1962                sequences_extracted += 1;
1963            }
1964
1965            processed += 1;
1966            if processed % 10000 == 0 || processed == total_genomes {
1967                eprintln!("    Processed {}/{} genomes ({} sequences)",
1968                    processed, total_genomes, sequences_extracted);
1969            }
1970        }
1971
1972        eprintln!("    Extracted {} flanking sequences from {} genomes",
1973            sequences_extracted, processed);
1974        eprintln!("    Genus resolved: {}, Unknown: {}", genus_found, genus_unknown);
1975        Ok(())
1976    }
1977
1978    fn load_genome_sequences(&self, genome_path: &Path) -> Result<Vec<(String, String)>> {
1979        let file = File::open(genome_path)?;
1980        let reader = BufReader::new(file);
1981
1982        let mut sequences = Vec::new();
1983        let mut current_name: Option<String> = None;
1984        let mut current_seq = String::new();
1985
1986        for line in reader.lines() {
1987            let line = line?;
1988
1989            if let Some(stripped) = line.strip_prefix('>') {
1990                if let Some(name) = current_name.take() {
1991                    sequences.push((name, std::mem::take(&mut current_seq)));
1992                }
1993                current_name = Some(stripped.to_string());
1994                current_seq.clear();
1995            } else {
1996                current_seq.push_str(line.trim());
1997            }
1998        }
1999
2000        if let Some(name) = current_name {
2001            sequences.push((name, current_seq));
2002        }
2003
2004        Ok(sequences)
2005    }
2006}
2007
2008fn parse_csv_line(line: &str) -> Vec<String> {
2009    let mut fields = Vec::new();
2010    let mut current = String::new();
2011    let mut in_quotes = false;
2012
2013    for ch in line.chars() {
2014        match ch {
2015            '"' => in_quotes = !in_quotes,
2016            ',' if !in_quotes => {
2017                fields.push(std::mem::take(&mut current));
2018            }
2019            _ => current.push(ch),
2020        }
2021    }
2022    fields.push(current);
2023    fields
2024}
2025
2026fn extract_genome_file(contig: &str) -> String {
2027
2028    if let Some(start) = contig.find("GC") {
2029        let remainder = &contig[start..];
2030        if let Some(end) = remainder.find(|c: char| !c.is_alphanumeric() && c != '_' && c != '.') {
2031            return format!("{}.fna", &remainder[..end]);
2032        }
2033        return format!("{}.fna", remainder);
2034    }
2035
2036    let acc = contig.split_whitespace().next().unwrap_or(contig);
2037    format!("{}.fna", acc.replace('.', "_"))
2038}
2039
2040pub fn build(output_dir: &Path, arg_db: &Path, threads: usize, email: &str, config: FlankBuildConfig) -> Result<()> {
2041    let builder = FlankingDbBuilder::new(arg_db, output_dir, threads, email, config);
2042    builder.build()
2043}