Skip to main content

argenus/
fdb.rs

1
2use anyhow::{Context, Result};
3use extsort_iter::*;
4use std::cmp::Ordering;
5use std::fs::File;
6use std::io::{BufRead, BufReader, BufWriter, Seek, SeekFrom, Write};
7use std::path::Path;
8
9const MAGIC: &[u8; 8] = b"FLANKDB\0";
10const VERSION: u32 = 2;
11
12#[derive(Clone)]
13pub struct FlankingRecord {
14
15    pub gene: String,
16
17    pub line: String,
18}
19
20impl PartialEq for FlankingRecord {
21    fn eq(&self, other: &Self) -> bool {
22        self.gene == other.gene
23    }
24}
25
26impl Eq for FlankingRecord {}
27
28impl PartialOrd for FlankingRecord {
29    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
30        Some(self.cmp(other))
31    }
32}
33
34impl Ord for FlankingRecord {
35    fn cmp(&self, other: &Self) -> Ordering {
36        self.gene.cmp(&other.gene)
37    }
38}
39
40pub fn build(
41    tsv_path: &Path,
42    fdb_path: &Path,
43    buffer_size_mb: usize,
44    threads: usize,
45) -> Result<()> {
46    eprintln!("Building FDB from TSV...");
47    eprintln!("  Input: {}", tsv_path.display());
48    eprintln!("  Output: {}", fdb_path.display());
49    eprintln!("  Buffer: {} MB, Threads: {}", buffer_size_mb, threads);
50
51    rayon::ThreadPoolBuilder::new()
52        .num_threads(threads)
53        .build_global()
54        .ok();
55
56    let temp_dir = tempfile::Builder::new()
57        .prefix("fdb_sort_")
58        .tempdir()
59        .context("Failed to create temp directory")?;
60
61    eprintln!("  Temp dir: {}", temp_dir.path().display());
62
63    eprintln!("\n[Phase 1] Reading TSV and external sorting...");
64
65    let file = File::open(tsv_path).context("Failed to open TSV file")?;
66    let file_size = file.metadata()?.len();
67    let reader = BufReader::with_capacity(8 * 1024 * 1024, file);
68    let mut lines = reader.lines();
69
70    let header = lines
71        .next()
72        .ok_or_else(|| anyhow::anyhow!("Empty TSV file"))??;
73
74    let record_iter = lines.filter_map(|line_result| {
75        let line = line_result.ok()?;
76        if line.is_empty() {
77            return None;
78        }
79
80        let gene = line.split('\t').next()?.to_string();
81        if gene.is_empty() {
82            return None;
83        }
84        Some(FlankingRecord { gene, line })
85    });
86
87    let buffer_bytes = buffer_size_mb * 1024 * 1024;
88    let config = ExtsortConfig::with_buffer_size(buffer_bytes)
89        .compress_lz4_flex();
90
91    let sorted_iter = record_iter
92        .par_external_sort(config)
93        .context("External sort failed")?;
94
95    eprintln!("[Phase 2] Building FDB from sorted records...");
96
97    let mut output = BufWriter::with_capacity(4 * 1024 * 1024, File::create(fdb_path)?);
98
99    output.write_all(MAGIC)?;
100    output.write_all(&VERSION.to_le_bytes())?;
101    output.write_all(&0u32.to_le_bytes())?;
102    output.write_all(&0u64.to_le_bytes())?;
103
104    let mut index_entries: Vec<(String, u64, u32, u32)> = Vec::new();
105    let mut compressor = zstd::bulk::Compressor::new(3)?;
106
107    let mut current_gene: Option<String> = None;
108    let mut current_records: Vec<String> = Vec::new();
109    let mut total_records = 0u64;
110    let mut gene_count = 0u32;
111
112    for record in sorted_iter {
113        total_records += 1;
114
115        if current_gene.as_ref() != Some(&record.gene) {
116
117            if let Some(prev_gene) = current_gene.take() {
118                write_gene_block(
119                    &mut output,
120                    &mut compressor,
121                    &header,
122                    &prev_gene,
123                    &current_records,
124                    &mut index_entries,
125                )?;
126                gene_count += 1;
127
128                if gene_count.is_multiple_of(1000) {
129                    eprintln!(
130                        "  Processed {} genes, {} records...",
131                        gene_count, total_records
132                    );
133                }
134            }
135
136            current_gene = Some(record.gene);
137            current_records.clear();
138        }
139
140        current_records.push(record.line);
141    }
142
143    if let Some(gene) = current_gene {
144        write_gene_block(
145            &mut output,
146            &mut compressor,
147            &header,
148            &gene,
149            &current_records,
150            &mut index_entries,
151        )?;
152        gene_count += 1;
153    }
154
155    eprintln!("  Total: {} genes, {} records", gene_count, total_records);
156
157    eprintln!("[Phase 3] Writing index...");
158
159    let index_offset = output.stream_position()?;
160
161    for (gene, offset, comp_len, record_count) in &index_entries {
162        let gene_bytes = gene.as_bytes();
163        output.write_all(&(gene_bytes.len() as u16).to_le_bytes())?;
164        output.write_all(gene_bytes)?;
165        output.write_all(&offset.to_le_bytes())?;
166        output.write_all(&comp_len.to_le_bytes())?;
167        output.write_all(&record_count.to_le_bytes())?;
168    }
169
170    output.seek(SeekFrom::Start(12))?;
171    output.write_all(&gene_count.to_le_bytes())?;
172    output.write_all(&index_offset.to_le_bytes())?;
173    output.flush()?;
174
175    drop(temp_dir);
176
177    let input_size = file_size;
178    let output_size = std::fs::metadata(fdb_path)?.len();
179    let ratio = input_size as f64 / output_size as f64;
180
181    eprintln!("\n=== FDB Build Complete ===");
182    eprintln!("  Input:  {:.2} MB", input_size as f64 / 1024.0 / 1024.0);
183    eprintln!("  Output: {:.2} MB", output_size as f64 / 1024.0 / 1024.0);
184    eprintln!("  Compression ratio: {:.1}x", ratio);
185    eprintln!("  Genes: {}", gene_count);
186    eprintln!("  Records: {}", total_records);
187
188    Ok(())
189}
190
191fn write_gene_block(
192    output: &mut BufWriter<File>,
193    compressor: &mut zstd::bulk::Compressor<'_>,
194    header: &str,
195    gene: &str,
196    records: &[String],
197    index_entries: &mut Vec<(String, u64, u32, u32)>,
198) -> Result<()> {
199
200    let mut content = String::with_capacity(records.len() * 3000);
201    content.push_str(header);
202    content.push('\n');
203    for record in records {
204        content.push_str(record);
205        content.push('\n');
206    }
207
208    let compressed = compressor.compress(content.as_bytes())?;
209    let offset = output.stream_position()?;
210    output.write_all(&compressed)?;
211
212    index_entries.push((
213        gene.to_string(),
214        offset,
215        compressed.len() as u32,
216        records.len() as u32,
217    ));
218
219    Ok(())
220}
221
222#[cfg(test)]
223mod tests {
224    use super::*;
225
226    #[test]
227    fn test_flanking_record_ordering() {
228        let r1 = FlankingRecord {
229            gene: "aac(6')".to_string(),
230            line: "test1".to_string(),
231        };
232        let r2 = FlankingRecord {
233            gene: "blaTEM".to_string(),
234            line: "test2".to_string(),
235        };
236        assert!(r1 < r2);
237    }
238}