Skip to main content

gtars_refget/store/
import.rs

1//! FASTA import pipeline for RefgetStore.
2//!
3//! Contains the multithreaded FASTA import pipeline and cached metadata fast path.
4
5use super::*;
6use super::readonly::ReadonlyRefgetStore;
7
8use std::collections::HashMap;
9use std::path::Path;
10use std::time::Instant;
11
12use anyhow::{anyhow, Result};
13
14use crate::collection::SequenceCollectionExt;
15use crate::digest::{
16    SequenceCollection, SequenceCollectionMetadata,
17    SequenceEncoder, SequenceMetadata, SequenceRecord,
18};
19use crate::hashkeyable::HashKeyable;
20
21// ============================================================================
22// ReadonlyRefgetStore import methods
23// ============================================================================
24
25impl ReadonlyRefgetStore {
26    /// Import a FASTA file into the store using a multithreaded pipeline.
27    ///
28    /// After the pipeline finishes, computes collection metadata, registers the
29    /// collection, and inserts all sequences.
30    pub fn add_sequence_collection_from_fasta<P: AsRef<Path>>(
31        &mut self,
32        file_path: P,
33        opts: FastaImportOptions<'_>,
34    ) -> Result<(SequenceCollectionMetadata, bool)> {
35        use crossbeam_channel::bounded;
36        use md5::Md5;
37        use sha2::{Digest, Sha512};
38
39        if !self.quiet {
40            println!("Processing {}...", file_path.as_ref().display());
41        }
42
43        let pipeline_start = Instant::now();
44
45        // Check for RGSI cache
46        let use_cache = self.local_path.is_some();
47        use crate::utils::PathExtension;
48        let rgsi_path = file_path.as_ref().replace_exts_with("rgsi");
49        let have_rgsi = use_cache && rgsi_path.exists();
50
51        if have_rgsi {
52            match self.add_fasta_with_cached_metadata(&file_path, &rgsi_path, opts, pipeline_start) {
53                Ok(result) => return Ok(result),
54                Err(_) => {
55                    // Cache was stale/empty, fall through to pipeline
56                }
57            }
58        }
59
60        // --- Channel message types ---
61
62        /// Threshold above which Thread 1 processes sequences inline
63        const LARGE_SEQ_THRESHOLD: usize = 500 * 1024 * 1024; // 500 MB
64
65        struct DecompressedSequence {
66            name: String,
67            description: Option<String>,
68            raw_header: String,
69            raw_bytes: Vec<u8>,
70        }
71
72        struct DigestedSequence {
73            metadata: SequenceMetadata,
74            raw_bytes: Vec<u8>,
75            aliases: Vec<(String, String)>,
76        }
77
78        struct ReadySequence {
79            metadata: SequenceMetadata,
80            sequence_data: Vec<u8>,
81            aliases: Vec<(String, String)>,
82        }
83
84        enum ToDigest {
85            NeedsWork(DecompressedSequence),
86            AlreadyDone(ReadySequence),
87        }
88
89        enum ToEncode {
90            NeedsWork(DigestedSequence),
91            AlreadyDone(ReadySequence),
92        }
93
94        // --- Set up channels ---
95        let (decompress_tx, decompress_rx) = bounded::<ToDigest>(1);
96        let (digest_tx, digest_rx) = bounded::<ToEncode>(1);
97
98        let file_path_buf = file_path.as_ref().to_path_buf();
99        let namespaces: Vec<String> = opts.namespaces.iter().map(|s| s.to_string()).collect();
100        let ns_for_digest = namespaces.clone();
101
102        // --- Thread 1: Read FASTA ---
103        let quiet = self.quiet;
104        let mode_for_t1 = self.mode;
105        let decompress_handle = std::thread::spawn(move || -> Result<()> {
106            use sha2::{Digest, Sha512};
107            use md5::Md5;
108
109            let mut fasta_reader = crate::fasta::FastaReader::from_path(&file_path_buf)?;
110
111            while let Some(record) = fasta_reader.next_record()? {
112                if record.raw_bytes.len() > LARGE_SEQ_THRESHOLD {
113                    if !quiet {
114                        println!(
115                            "  Large sequence '{}' ({} MB) -- processing inline to reduce memory",
116                            record.name,
117                            record.raw_bytes.len() / (1024 * 1024),
118                        );
119                    }
120                    let crate::fasta::FastaRecord { name, description, raw_header, raw_bytes } = record;
121
122                    let mut sha512_hasher = Sha512::new();
123                    sha512_hasher.update(&raw_bytes);
124                    let sha512 = base64_url::encode(&sha512_hasher.finalize()[0..24]);
125
126                    let mut md5_hasher = Md5::new();
127                    md5_hasher.update(&raw_bytes);
128                    let md5 = format!("{:x}", md5_hasher.finalize());
129
130                    let mut guesser = crate::digest::AlphabetGuesser::new();
131                    guesser.update(&raw_bytes);
132                    let alphabet = guesser.guess();
133
134                    let length = raw_bytes.len();
135                    let sequence_data = match mode_for_t1 {
136                        StorageMode::Encoded => {
137                            let mut encoder = SequenceEncoder::new(alphabet, length);
138                            encoder.update(&raw_bytes);
139                            drop(raw_bytes);
140                            encoder.finalize()
141                        }
142                        StorageMode::Raw => raw_bytes,
143                    };
144
145                    let metadata = SequenceMetadata {
146                        name,
147                        description,
148                        length,
149                        sha512t24u: sha512,
150                        md5,
151                        alphabet,
152                        fai: None,
153                    };
154
155                    let aliases = if !namespaces.is_empty() {
156                        let ns_refs: Vec<&str> = namespaces.iter().map(|s| s.as_str()).collect();
157                        crate::digest::fasta::extract_aliases_from_header(&raw_header, &ns_refs)
158                    } else {
159                        vec![]
160                    };
161
162                    decompress_tx.send(ToDigest::AlreadyDone(ReadySequence {
163                        metadata,
164                        sequence_data,
165                        aliases,
166                    })).map_err(|_| anyhow!("Digest thread stopped receiving"))?;
167                } else {
168                    decompress_tx.send(ToDigest::NeedsWork(DecompressedSequence {
169                        name: record.name,
170                        description: record.description,
171                        raw_header: record.raw_header,
172                        raw_bytes: record.raw_bytes,
173                    })).map_err(|_| anyhow!("Digest thread stopped receiving"))?;
174                }
175            }
176            Ok(())
177        });
178
179        // --- Thread 2: Digest ---
180        let digest_handle = std::thread::spawn(move || -> Result<()> {
181            let ns_refs: Vec<&str> = ns_for_digest.iter().map(|s| s.as_str()).collect();
182
183            for msg in decompress_rx {
184                match msg {
185                    ToDigest::NeedsWork(seq) => {
186                        let mut sha512_hasher = Sha512::new();
187                        sha512_hasher.update(&seq.raw_bytes);
188                        let sha512 = base64_url::encode(&sha512_hasher.finalize()[0..24]);
189
190                        let mut md5_hasher = Md5::new();
191                        md5_hasher.update(&seq.raw_bytes);
192                        let md5 = format!("{:x}", md5_hasher.finalize());
193
194                        let mut guesser = crate::digest::AlphabetGuesser::new();
195                        guesser.update(&seq.raw_bytes);
196                        let alphabet = guesser.guess();
197
198                        let metadata = SequenceMetadata {
199                            name: seq.name,
200                            description: seq.description,
201                            length: seq.raw_bytes.len(),
202                            sha512t24u: sha512,
203                            md5,
204                            alphabet,
205                            fai: None,
206                        };
207
208                        let aliases = if !ns_refs.is_empty() {
209                            crate::digest::fasta::extract_aliases_from_header(&seq.raw_header, &ns_refs)
210                        } else {
211                            vec![]
212                        };
213
214                        digest_tx.send(ToEncode::NeedsWork(DigestedSequence {
215                            metadata,
216                            raw_bytes: seq.raw_bytes,
217                            aliases,
218                        })).map_err(|_| anyhow!("Encode thread stopped receiving"))?;
219                    }
220                    ToDigest::AlreadyDone(ready) => {
221                        digest_tx.send(ToEncode::AlreadyDone(ready))
222                            .map_err(|_| anyhow!("Encode thread stopped receiving"))?;
223                    }
224                }
225            }
226            drop(digest_tx);
227            Ok(())
228        });
229
230        // --- Thread 3 (main thread): Encode + stream to disk ---
231        let mode = self.mode;
232        let mut sequence_metadata: Vec<SequenceMetadata> = Vec::new();
233        let mut all_aliases: Vec<(String, String, String)> = Vec::new();
234
235        for msg in digest_rx {
236            match msg {
237                ToEncode::NeedsWork(digested) => {
238                    let metadata = digested.metadata;
239                    let aliases = digested.aliases;
240                    let raw_bytes = digested.raw_bytes;
241
242                    let sequence_data = match mode {
243                        StorageMode::Encoded => {
244                            let mut encoder = SequenceEncoder::new(metadata.alphabet, metadata.length);
245                            encoder.update(&raw_bytes);
246                            drop(raw_bytes);
247                            encoder.finalize()
248                        }
249                        StorageMode::Raw => raw_bytes,
250                    };
251
252                    for (ns, alias_value) in &aliases {
253                        all_aliases.push((ns.clone(), alias_value.clone(), metadata.sha512t24u.clone()));
254                    }
255
256                    self.add_sequence_record(
257                        SequenceRecord::Full {
258                            metadata: metadata.clone(),
259                            sequence: sequence_data,
260                        },
261                        true,
262                    )?;
263
264                    sequence_metadata.push(metadata);
265                }
266                ToEncode::AlreadyDone(ready) => {
267                    for (ns, alias_value) in &ready.aliases {
268                        all_aliases.push((ns.clone(), alias_value.clone(), ready.metadata.sha512t24u.clone()));
269                    }
270
271                    self.add_sequence_record(
272                        SequenceRecord::Full {
273                            metadata: ready.metadata.clone(),
274                            sequence: ready.sequence_data,
275                        },
276                        true,
277                    )?;
278
279                    sequence_metadata.push(ready.metadata);
280                }
281            }
282        }
283
284        // Join threads and propagate errors
285        decompress_handle.join()
286            .map_err(|e| anyhow!("Decompress thread panicked: {:?}", e))??;
287        digest_handle.join()
288            .map_err(|e| anyhow!("Digest thread panicked: {:?}", e))??;
289
290        let pipeline_elapsed = pipeline_start.elapsed();
291
292        // --- Post-pipeline: compute collection metadata, register ---
293        let metadata_start = std::time::Instant::now();
294        let seq_count = sequence_metadata.len();
295
296        let stub_records: Vec<SequenceRecord> = sequence_metadata
297            .iter()
298            .map(|meta| SequenceRecord::Stub(meta.clone()))
299            .collect();
300
301        let mut seqcol_metadata = SequenceCollectionMetadata::from_sequences(
302            &stub_records,
303            Some(file_path.as_ref().to_path_buf()),
304        );
305        if self.ancillary_digests {
306            seqcol_metadata.compute_ancillary_digests(&stub_records);
307        }
308
309        let coll_key = seqcol_metadata.digest.to_key();
310        let coll_digest_display = seqcol_metadata.digest.clone();
311        let metadata = seqcol_metadata.clone();
312        let metadata_elapsed = metadata_start.elapsed();
313
314        if !opts.force && self.collections.contains_key(&coll_key) {
315            if !self.quiet {
316                println!("Skipped {} (already exists)", coll_digest_display);
317            }
318            return Ok((metadata, false));
319        }
320
321        let index_start = std::time::Instant::now();
322
323        // Write RGSI cache for next time
324        if use_cache {
325            let seqcol_for_cache = SequenceCollection {
326                metadata: seqcol_metadata.clone(),
327                sequences: stub_records.clone(),
328            };
329            let _ = seqcol_for_cache.write_collection_rgsi(&rgsi_path);
330        }
331
332        // Register the collection
333        let seqcol = SequenceCollection {
334            metadata: seqcol_metadata,
335            sequences: stub_records,
336        };
337        self.add_sequence_collection_internal(seqcol, opts.force)?;
338
339        // Register aliases
340        for (ns, alias_value, sha512t24u) in &all_aliases {
341            self.add_sequence_alias(ns, alias_value, sha512t24u)?;
342        }
343
344        // Populate name_lookup for already-written sequences
345        for meta in &sequence_metadata {
346            self.name_lookup
347                .entry(coll_key)
348                .or_default()
349                .insert(meta.name.clone(), meta.sha512t24u.to_key());
350        }
351        let index_elapsed = index_start.elapsed();
352
353        if !self.quiet {
354            let total = pipeline_elapsed + metadata_elapsed + index_elapsed;
355            println!(
356                "Added {} {} seqs in {:.1}s ({:.1} proc | {:.1} meta | {:.1} index)",
357                coll_digest_display,
358                seq_count,
359                total.as_secs_f64(),
360                pipeline_elapsed.as_secs_f64(),
361                metadata_elapsed.as_secs_f64(),
362                index_elapsed.as_secs_f64(),
363            );
364        }
365
366        Ok((metadata, true))
367    }
368
369    /// Fast path when RGSI cache exists: skip digesting, only decompress for encoding.
370    fn add_fasta_with_cached_metadata<P: AsRef<Path>>(
371        &mut self,
372        file_path: P,
373        rgsi_path: &Path,
374        opts: FastaImportOptions<'_>,
375        start_time: Instant,
376    ) -> Result<(SequenceCollectionMetadata, bool)> {
377        let mut seqcol = crate::collection::read_rgsi_file(rgsi_path)?;
378
379        if seqcol.sequences.is_empty() {
380            let _ = std::fs::remove_file(rgsi_path);
381            return Err(anyhow!("Empty RGSI cache"));
382        }
383        let coll_key = seqcol.metadata.digest.to_key();
384        let coll_digest_display = seqcol.metadata.digest.clone();
385
386        if !opts.force && self.collections.contains_key(&coll_key) {
387            if !self.quiet {
388                println!("Skipped {} (already exists)", coll_digest_display);
389            }
390            return Ok((seqcol.metadata.clone(), false));
391        }
392
393        if self.ancillary_digests {
394            seqcol.metadata.compute_ancillary_digests(&seqcol.sequences);
395        }
396        let metadata = seqcol.metadata.clone();
397
398        let seqmeta_hashmap: HashMap<String, SequenceMetadata> = seqcol
399            .sequences
400            .iter()
401            .map(|r| {
402                let meta = r.metadata().clone();
403                (meta.name.clone(), meta)
404            })
405            .collect();
406
407        self.add_sequence_collection_internal(seqcol, opts.force)?;
408
409        // Decompress and encode sequences
410        let mut fasta_reader = crate::fasta::FastaReader::from_path(file_path.as_ref())?;
411        let mut seq_count = 0;
412
413        while let Some(record) = fasta_reader.next_record()? {
414            let (name, _) = crate::fasta::parse_fasta_header(&record.raw_header);
415
416            if !opts.namespaces.is_empty() {
417                let aliases = crate::digest::fasta::extract_aliases_from_header(&record.raw_header, opts.namespaces);
418                for (ns, alias_value) in aliases {
419                    if let Some(meta) = seqmeta_hashmap.get(&name) {
420                        self.add_sequence_alias(&ns, &alias_value, &meta.sha512t24u)?;
421                    }
422                }
423            }
424
425            let dr = seqmeta_hashmap
426                .get(&name)
427                .ok_or_else(|| anyhow!("Sequence '{}' not found in cached metadata", name))?
428                .clone();
429
430            seq_count += 1;
431
432            let sequence_data = match self.mode {
433                StorageMode::Encoded => {
434                    let mut encoder = SequenceEncoder::new(dr.alphabet, dr.length);
435                    encoder.update(&record.raw_bytes);
436                    encoder.finalize()
437                }
438                StorageMode::Raw => record.raw_bytes,
439            };
440
441            self.add_sequence(
442                SequenceRecord::Full { metadata: dr, sequence: sequence_data },
443                coll_key,
444                true,
445            )?;
446        }
447
448        let elapsed = start_time.elapsed();
449        if !self.quiet {
450            println!(
451                "Added {} ({} seqs) in {:.1}s [cached metadata]",
452                coll_digest_display, seq_count, elapsed.as_secs_f64()
453            );
454        }
455
456        Ok((metadata, true))
457    }
458}