1
2use anyhow::{Context, Result};
3use extsort_iter::*;
4use std::cmp::Ordering;
5use std::fs::File;
6use std::io::{BufRead, BufReader, BufWriter, Seek, SeekFrom, Write};
7use std::path::Path;
8
9const MAGIC: &[u8; 8] = b"FLANKDB\0";
10const VERSION: u32 = 2;
11
12#[derive(Clone)]
13pub struct FlankingRecord {
14
15 pub gene: String,
16
17 pub line: String,
18}
19
20impl PartialEq for FlankingRecord {
21 fn eq(&self, other: &Self) -> bool {
22 self.gene == other.gene
23 }
24}
25
26impl Eq for FlankingRecord {}
27
28impl PartialOrd for FlankingRecord {
29 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
30 Some(self.cmp(other))
31 }
32}
33
34impl Ord for FlankingRecord {
35 fn cmp(&self, other: &Self) -> Ordering {
36 self.gene.cmp(&other.gene)
37 }
38}
39
40pub fn build(
41 tsv_path: &Path,
42 fdb_path: &Path,
43 buffer_size_mb: usize,
44 threads: usize,
45) -> Result<()> {
46 eprintln!("Building FDB from TSV...");
47 eprintln!(" Input: {}", tsv_path.display());
48 eprintln!(" Output: {}", fdb_path.display());
49 eprintln!(" Buffer: {} MB, Threads: {}", buffer_size_mb, threads);
50
51 rayon::ThreadPoolBuilder::new()
52 .num_threads(threads)
53 .build_global()
54 .ok();
55
56 let temp_dir = tempfile::Builder::new()
57 .prefix("fdb_sort_")
58 .tempdir()
59 .context("Failed to create temp directory")?;
60
61 eprintln!(" Temp dir: {}", temp_dir.path().display());
62
63 eprintln!("\n[Phase 1] Reading TSV and external sorting...");
64
65 let file = File::open(tsv_path).context("Failed to open TSV file")?;
66 let file_size = file.metadata()?.len();
67 let reader = BufReader::with_capacity(8 * 1024 * 1024, file);
68 let mut lines = reader.lines();
69
70 let header = lines
71 .next()
72 .ok_or_else(|| anyhow::anyhow!("Empty TSV file"))??;
73
74 let record_iter = lines.filter_map(|line_result| {
75 let line = line_result.ok()?;
76 if line.is_empty() {
77 return None;
78 }
79
80 let gene = line.split('\t').next()?.to_string();
81 if gene.is_empty() {
82 return None;
83 }
84 Some(FlankingRecord { gene, line })
85 });
86
87 let buffer_bytes = buffer_size_mb * 1024 * 1024;
88 let config = ExtsortConfig::with_buffer_size(buffer_bytes)
89 .compress_lz4_flex();
90
91 let sorted_iter = record_iter
92 .par_external_sort(config)
93 .context("External sort failed")?;
94
95 eprintln!("[Phase 2] Building FDB from sorted records...");
96
97 let mut output = BufWriter::with_capacity(4 * 1024 * 1024, File::create(fdb_path)?);
98
99 output.write_all(MAGIC)?;
100 output.write_all(&VERSION.to_le_bytes())?;
101 output.write_all(&0u32.to_le_bytes())?;
102 output.write_all(&0u64.to_le_bytes())?;
103
104 let mut index_entries: Vec<(String, u64, u32, u32)> = Vec::new();
105 let mut compressor = zstd::bulk::Compressor::new(3)?;
106
107 let mut current_gene: Option<String> = None;
108 let mut current_records: Vec<String> = Vec::new();
109 let mut total_records = 0u64;
110 let mut gene_count = 0u32;
111
112 for record in sorted_iter {
113 total_records += 1;
114
115 if current_gene.as_ref() != Some(&record.gene) {
116
117 if let Some(prev_gene) = current_gene.take() {
118 write_gene_block(
119 &mut output,
120 &mut compressor,
121 &header,
122 &prev_gene,
123 ¤t_records,
124 &mut index_entries,
125 )?;
126 gene_count += 1;
127
128 if gene_count.is_multiple_of(1000) {
129 eprintln!(
130 " Processed {} genes, {} records...",
131 gene_count, total_records
132 );
133 }
134 }
135
136 current_gene = Some(record.gene);
137 current_records.clear();
138 }
139
140 current_records.push(record.line);
141 }
142
143 if let Some(gene) = current_gene {
144 write_gene_block(
145 &mut output,
146 &mut compressor,
147 &header,
148 &gene,
149 ¤t_records,
150 &mut index_entries,
151 )?;
152 gene_count += 1;
153 }
154
155 eprintln!(" Total: {} genes, {} records", gene_count, total_records);
156
157 eprintln!("[Phase 3] Writing index...");
158
159 let index_offset = output.stream_position()?;
160
161 for (gene, offset, comp_len, record_count) in &index_entries {
162 let gene_bytes = gene.as_bytes();
163 output.write_all(&(gene_bytes.len() as u16).to_le_bytes())?;
164 output.write_all(gene_bytes)?;
165 output.write_all(&offset.to_le_bytes())?;
166 output.write_all(&comp_len.to_le_bytes())?;
167 output.write_all(&record_count.to_le_bytes())?;
168 }
169
170 output.seek(SeekFrom::Start(12))?;
171 output.write_all(&gene_count.to_le_bytes())?;
172 output.write_all(&index_offset.to_le_bytes())?;
173 output.flush()?;
174
175 drop(temp_dir);
176
177 let input_size = file_size;
178 let output_size = std::fs::metadata(fdb_path)?.len();
179 let ratio = input_size as f64 / output_size as f64;
180
181 eprintln!("\n=== FDB Build Complete ===");
182 eprintln!(" Input: {:.2} MB", input_size as f64 / 1024.0 / 1024.0);
183 eprintln!(" Output: {:.2} MB", output_size as f64 / 1024.0 / 1024.0);
184 eprintln!(" Compression ratio: {:.1}x", ratio);
185 eprintln!(" Genes: {}", gene_count);
186 eprintln!(" Records: {}", total_records);
187
188 Ok(())
189}
190
191fn write_gene_block(
192 output: &mut BufWriter<File>,
193 compressor: &mut zstd::bulk::Compressor<'_>,
194 header: &str,
195 gene: &str,
196 records: &[String],
197 index_entries: &mut Vec<(String, u64, u32, u32)>,
198) -> Result<()> {
199
200 let mut content = String::with_capacity(records.len() * 3000);
201 content.push_str(header);
202 content.push('\n');
203 for record in records {
204 content.push_str(record);
205 content.push('\n');
206 }
207
208 let compressed = compressor.compress(content.as_bytes())?;
209 let offset = output.stream_position()?;
210 output.write_all(&compressed)?;
211
212 index_entries.push((
213 gene.to_string(),
214 offset,
215 compressed.len() as u32,
216 records.len() as u32,
217 ));
218
219 Ok(())
220}
221
222#[cfg(test)]
223mod tests {
224 use super::*;
225
226 #[test]
227 fn test_flanking_record_ordering() {
228 let r1 = FlankingRecord {
229 gene: "aac(6')".to_string(),
230 line: "test1".to_string(),
231 };
232 let r2 = FlankingRecord {
233 gene: "blaTEM".to_string(),
234 line: "test2".to_string(),
235 };
236 assert!(r1 < r2);
237 }
238}