1use super::*;
6use super::readonly::ReadonlyRefgetStore;
7
8use std::collections::HashMap;
9use std::path::Path;
10use std::time::Instant;
11
12use anyhow::{anyhow, Result};
13
14use crate::collection::SequenceCollectionExt;
15use crate::digest::{
16 SequenceCollection, SequenceCollectionMetadata,
17 SequenceEncoder, SequenceMetadata, SequenceRecord,
18};
19use crate::hashkeyable::HashKeyable;
20
21impl ReadonlyRefgetStore {
26 pub fn add_sequence_collection_from_fasta<P: AsRef<Path>>(
31 &mut self,
32 file_path: P,
33 opts: FastaImportOptions<'_>,
34 ) -> Result<(SequenceCollectionMetadata, bool)> {
35 use crossbeam_channel::bounded;
36 use md5::Md5;
37 use sha2::{Digest, Sha512};
38
39 if !self.quiet {
40 println!("Processing {}...", file_path.as_ref().display());
41 }
42
43 let pipeline_start = Instant::now();
44
45 let use_cache = self.local_path.is_some();
47 use crate::utils::PathExtension;
48 let rgsi_path = file_path.as_ref().replace_exts_with("rgsi");
49 let have_rgsi = use_cache && rgsi_path.exists();
50
51 if have_rgsi {
52 match self.add_fasta_with_cached_metadata(&file_path, &rgsi_path, opts, pipeline_start) {
53 Ok(result) => return Ok(result),
54 Err(_) => {
55 }
57 }
58 }
59
60 const LARGE_SEQ_THRESHOLD: usize = 500 * 1024 * 1024; struct DecompressedSequence {
66 name: String,
67 description: Option<String>,
68 raw_header: String,
69 raw_bytes: Vec<u8>,
70 }
71
72 struct DigestedSequence {
73 metadata: SequenceMetadata,
74 raw_bytes: Vec<u8>,
75 aliases: Vec<(String, String)>,
76 }
77
78 struct ReadySequence {
79 metadata: SequenceMetadata,
80 sequence_data: Vec<u8>,
81 aliases: Vec<(String, String)>,
82 }
83
84 enum ToDigest {
85 NeedsWork(DecompressedSequence),
86 AlreadyDone(ReadySequence),
87 }
88
89 enum ToEncode {
90 NeedsWork(DigestedSequence),
91 AlreadyDone(ReadySequence),
92 }
93
94 let (decompress_tx, decompress_rx) = bounded::<ToDigest>(1);
96 let (digest_tx, digest_rx) = bounded::<ToEncode>(1);
97
98 let file_path_buf = file_path.as_ref().to_path_buf();
99 let namespaces: Vec<String> = opts.namespaces.iter().map(|s| s.to_string()).collect();
100 let ns_for_digest = namespaces.clone();
101
102 let quiet = self.quiet;
104 let mode_for_t1 = self.mode;
105 let decompress_handle = std::thread::spawn(move || -> Result<()> {
106 use sha2::{Digest, Sha512};
107 use md5::Md5;
108
109 let mut fasta_reader = crate::fasta::FastaReader::from_path(&file_path_buf)?;
110
111 while let Some(record) = fasta_reader.next_record()? {
112 if record.raw_bytes.len() > LARGE_SEQ_THRESHOLD {
113 if !quiet {
114 println!(
115 " Large sequence '{}' ({} MB) -- processing inline to reduce memory",
116 record.name,
117 record.raw_bytes.len() / (1024 * 1024),
118 );
119 }
120 let crate::fasta::FastaRecord { name, description, raw_header, raw_bytes } = record;
121
122 let mut sha512_hasher = Sha512::new();
123 sha512_hasher.update(&raw_bytes);
124 let sha512 = base64_url::encode(&sha512_hasher.finalize()[0..24]);
125
126 let mut md5_hasher = Md5::new();
127 md5_hasher.update(&raw_bytes);
128 let md5 = format!("{:x}", md5_hasher.finalize());
129
130 let mut guesser = crate::digest::AlphabetGuesser::new();
131 guesser.update(&raw_bytes);
132 let alphabet = guesser.guess();
133
134 let length = raw_bytes.len();
135 let sequence_data = match mode_for_t1 {
136 StorageMode::Encoded => {
137 let mut encoder = SequenceEncoder::new(alphabet, length);
138 encoder.update(&raw_bytes);
139 drop(raw_bytes);
140 encoder.finalize()
141 }
142 StorageMode::Raw => raw_bytes,
143 };
144
145 let metadata = SequenceMetadata {
146 name,
147 description,
148 length,
149 sha512t24u: sha512,
150 md5,
151 alphabet,
152 fai: None,
153 };
154
155 let aliases = if !namespaces.is_empty() {
156 let ns_refs: Vec<&str> = namespaces.iter().map(|s| s.as_str()).collect();
157 crate::digest::fasta::extract_aliases_from_header(&raw_header, &ns_refs)
158 } else {
159 vec![]
160 };
161
162 decompress_tx.send(ToDigest::AlreadyDone(ReadySequence {
163 metadata,
164 sequence_data,
165 aliases,
166 })).map_err(|_| anyhow!("Digest thread stopped receiving"))?;
167 } else {
168 decompress_tx.send(ToDigest::NeedsWork(DecompressedSequence {
169 name: record.name,
170 description: record.description,
171 raw_header: record.raw_header,
172 raw_bytes: record.raw_bytes,
173 })).map_err(|_| anyhow!("Digest thread stopped receiving"))?;
174 }
175 }
176 Ok(())
177 });
178
179 let digest_handle = std::thread::spawn(move || -> Result<()> {
181 let ns_refs: Vec<&str> = ns_for_digest.iter().map(|s| s.as_str()).collect();
182
183 for msg in decompress_rx {
184 match msg {
185 ToDigest::NeedsWork(seq) => {
186 let mut sha512_hasher = Sha512::new();
187 sha512_hasher.update(&seq.raw_bytes);
188 let sha512 = base64_url::encode(&sha512_hasher.finalize()[0..24]);
189
190 let mut md5_hasher = Md5::new();
191 md5_hasher.update(&seq.raw_bytes);
192 let md5 = format!("{:x}", md5_hasher.finalize());
193
194 let mut guesser = crate::digest::AlphabetGuesser::new();
195 guesser.update(&seq.raw_bytes);
196 let alphabet = guesser.guess();
197
198 let metadata = SequenceMetadata {
199 name: seq.name,
200 description: seq.description,
201 length: seq.raw_bytes.len(),
202 sha512t24u: sha512,
203 md5,
204 alphabet,
205 fai: None,
206 };
207
208 let aliases = if !ns_refs.is_empty() {
209 crate::digest::fasta::extract_aliases_from_header(&seq.raw_header, &ns_refs)
210 } else {
211 vec![]
212 };
213
214 digest_tx.send(ToEncode::NeedsWork(DigestedSequence {
215 metadata,
216 raw_bytes: seq.raw_bytes,
217 aliases,
218 })).map_err(|_| anyhow!("Encode thread stopped receiving"))?;
219 }
220 ToDigest::AlreadyDone(ready) => {
221 digest_tx.send(ToEncode::AlreadyDone(ready))
222 .map_err(|_| anyhow!("Encode thread stopped receiving"))?;
223 }
224 }
225 }
226 drop(digest_tx);
227 Ok(())
228 });
229
230 let mode = self.mode;
232 let mut sequence_metadata: Vec<SequenceMetadata> = Vec::new();
233 let mut all_aliases: Vec<(String, String, String)> = Vec::new();
234
235 for msg in digest_rx {
236 match msg {
237 ToEncode::NeedsWork(digested) => {
238 let metadata = digested.metadata;
239 let aliases = digested.aliases;
240 let raw_bytes = digested.raw_bytes;
241
242 let sequence_data = match mode {
243 StorageMode::Encoded => {
244 let mut encoder = SequenceEncoder::new(metadata.alphabet, metadata.length);
245 encoder.update(&raw_bytes);
246 drop(raw_bytes);
247 encoder.finalize()
248 }
249 StorageMode::Raw => raw_bytes,
250 };
251
252 for (ns, alias_value) in &aliases {
253 all_aliases.push((ns.clone(), alias_value.clone(), metadata.sha512t24u.clone()));
254 }
255
256 self.add_sequence_record(
257 SequenceRecord::Full {
258 metadata: metadata.clone(),
259 sequence: sequence_data,
260 },
261 true,
262 )?;
263
264 sequence_metadata.push(metadata);
265 }
266 ToEncode::AlreadyDone(ready) => {
267 for (ns, alias_value) in &ready.aliases {
268 all_aliases.push((ns.clone(), alias_value.clone(), ready.metadata.sha512t24u.clone()));
269 }
270
271 self.add_sequence_record(
272 SequenceRecord::Full {
273 metadata: ready.metadata.clone(),
274 sequence: ready.sequence_data,
275 },
276 true,
277 )?;
278
279 sequence_metadata.push(ready.metadata);
280 }
281 }
282 }
283
284 decompress_handle.join()
286 .map_err(|e| anyhow!("Decompress thread panicked: {:?}", e))??;
287 digest_handle.join()
288 .map_err(|e| anyhow!("Digest thread panicked: {:?}", e))??;
289
290 let pipeline_elapsed = pipeline_start.elapsed();
291
292 let metadata_start = std::time::Instant::now();
294 let seq_count = sequence_metadata.len();
295
296 let stub_records: Vec<SequenceRecord> = sequence_metadata
297 .iter()
298 .map(|meta| SequenceRecord::Stub(meta.clone()))
299 .collect();
300
301 let mut seqcol_metadata = SequenceCollectionMetadata::from_sequences(
302 &stub_records,
303 Some(file_path.as_ref().to_path_buf()),
304 );
305 if self.ancillary_digests {
306 seqcol_metadata.compute_ancillary_digests(&stub_records);
307 }
308
309 let coll_key = seqcol_metadata.digest.to_key();
310 let coll_digest_display = seqcol_metadata.digest.clone();
311 let metadata = seqcol_metadata.clone();
312 let metadata_elapsed = metadata_start.elapsed();
313
314 if !opts.force && self.collections.contains_key(&coll_key) {
315 if !self.quiet {
316 println!("Skipped {} (already exists)", coll_digest_display);
317 }
318 return Ok((metadata, false));
319 }
320
321 let index_start = std::time::Instant::now();
322
323 if use_cache {
325 let seqcol_for_cache = SequenceCollection {
326 metadata: seqcol_metadata.clone(),
327 sequences: stub_records.clone(),
328 };
329 let _ = seqcol_for_cache.write_collection_rgsi(&rgsi_path);
330 }
331
332 let seqcol = SequenceCollection {
334 metadata: seqcol_metadata,
335 sequences: stub_records,
336 };
337 self.add_sequence_collection_internal(seqcol, opts.force)?;
338
339 for (ns, alias_value, sha512t24u) in &all_aliases {
341 self.add_sequence_alias(ns, alias_value, sha512t24u)?;
342 }
343
344 for meta in &sequence_metadata {
346 self.name_lookup
347 .entry(coll_key)
348 .or_default()
349 .insert(meta.name.clone(), meta.sha512t24u.to_key());
350 }
351 let index_elapsed = index_start.elapsed();
352
353 if !self.quiet {
354 let total = pipeline_elapsed + metadata_elapsed + index_elapsed;
355 println!(
356 "Added {} {} seqs in {:.1}s ({:.1} proc | {:.1} meta | {:.1} index)",
357 coll_digest_display,
358 seq_count,
359 total.as_secs_f64(),
360 pipeline_elapsed.as_secs_f64(),
361 metadata_elapsed.as_secs_f64(),
362 index_elapsed.as_secs_f64(),
363 );
364 }
365
366 Ok((metadata, true))
367 }
368
369 fn add_fasta_with_cached_metadata<P: AsRef<Path>>(
371 &mut self,
372 file_path: P,
373 rgsi_path: &Path,
374 opts: FastaImportOptions<'_>,
375 start_time: Instant,
376 ) -> Result<(SequenceCollectionMetadata, bool)> {
377 let mut seqcol = crate::collection::read_rgsi_file(rgsi_path)?;
378
379 if seqcol.sequences.is_empty() {
380 let _ = std::fs::remove_file(rgsi_path);
381 return Err(anyhow!("Empty RGSI cache"));
382 }
383 let coll_key = seqcol.metadata.digest.to_key();
384 let coll_digest_display = seqcol.metadata.digest.clone();
385
386 if !opts.force && self.collections.contains_key(&coll_key) {
387 if !self.quiet {
388 println!("Skipped {} (already exists)", coll_digest_display);
389 }
390 return Ok((seqcol.metadata.clone(), false));
391 }
392
393 if self.ancillary_digests {
394 seqcol.metadata.compute_ancillary_digests(&seqcol.sequences);
395 }
396 let metadata = seqcol.metadata.clone();
397
398 let seqmeta_hashmap: HashMap<String, SequenceMetadata> = seqcol
399 .sequences
400 .iter()
401 .map(|r| {
402 let meta = r.metadata().clone();
403 (meta.name.clone(), meta)
404 })
405 .collect();
406
407 self.add_sequence_collection_internal(seqcol, opts.force)?;
408
409 let mut fasta_reader = crate::fasta::FastaReader::from_path(file_path.as_ref())?;
411 let mut seq_count = 0;
412
413 while let Some(record) = fasta_reader.next_record()? {
414 let (name, _) = crate::fasta::parse_fasta_header(&record.raw_header);
415
416 if !opts.namespaces.is_empty() {
417 let aliases = crate::digest::fasta::extract_aliases_from_header(&record.raw_header, opts.namespaces);
418 for (ns, alias_value) in aliases {
419 if let Some(meta) = seqmeta_hashmap.get(&name) {
420 self.add_sequence_alias(&ns, &alias_value, &meta.sha512t24u)?;
421 }
422 }
423 }
424
425 let dr = seqmeta_hashmap
426 .get(&name)
427 .ok_or_else(|| anyhow!("Sequence '{}' not found in cached metadata", name))?
428 .clone();
429
430 seq_count += 1;
431
432 let sequence_data = match self.mode {
433 StorageMode::Encoded => {
434 let mut encoder = SequenceEncoder::new(dr.alphabet, dr.length);
435 encoder.update(&record.raw_bytes);
436 encoder.finalize()
437 }
438 StorageMode::Raw => record.raw_bytes,
439 };
440
441 self.add_sequence(
442 SequenceRecord::Full { metadata: dr, sequence: sequence_data },
443 coll_key,
444 true,
445 )?;
446 }
447
448 let elapsed = start_time.elapsed();
449 if !self.quiet {
450 println!(
451 "Added {} ({} seqs) in {:.1}s [cached metadata]",
452 coll_digest_display, seq_count, elapsed.as_secs_f64()
453 );
454 }
455
456 Ok((metadata, true))
457 }
458}