use std::fs::File;
use std::io::{Seek, SeekFrom};
use std::path::Path;
use anyhow::{Context, Result, anyhow};
use noodles::core::Region;
use noodles::fasta;
use noodles::sam::Header;
use crate::sequence_dict::SequenceDictionary;
pub struct Fasta {
reader: fasta::io::IndexedReader<fasta::io::BufReader<File>>,
dict: SequenceDictionary,
}
impl Fasta {
pub fn from_path(path: &Path) -> Result<Self> {
let reader = fasta::io::indexed_reader::Builder::default()
.build_from_path(path)
.with_context(|| format!("Failed to open indexed FASTA: {}", path.display()))?;
let dict = SequenceDictionary::from(reader.index());
Ok(Self { reader, dict })
}
#[must_use]
pub fn dict(&self) -> &SequenceDictionary {
&self.dict
}
pub fn validate_bam_header(&self, header: &Header) -> Result<()> {
for (name, _) in header.reference_sequences() {
let name_str = std::str::from_utf8(name.as_ref())
.with_context(|| "BAM contig name is not valid UTF-8")?;
if self.dict.get_by_name(name_str).is_none() {
return Err(anyhow!(
"BAM contig '{name_str}' not found in reference FASTA index. \
All BAM contigs must be present in the reference."
));
}
}
Ok(())
}
pub fn load_contig(&mut self, contig_name: &str, uppercase: bool) -> Result<Vec<u8>> {
let meta = self
.dict
.get_by_name(contig_name)
.ok_or_else(|| anyhow!("Contig '{contig_name}' not found in reference"))?;
let expected_len = meta.length();
let region = Region::new(contig_name, ..);
let pos =
self.reader.index().query(®ion).with_context(|| {
format!("Failed to look up contig '{contig_name}' in FASTA index")
})?;
self.reader
.get_mut()
.seek(SeekFrom::Start(pos))
.with_context(|| format!("Failed to seek to contig '{contig_name}' in FASTA"))?;
let mut seq = Vec::with_capacity(expected_len);
self.reader
.read_sequence(&mut seq)
.with_context(|| format!("Failed to read contig '{contig_name}' from FASTA"))?;
if uppercase {
for b in &mut seq {
*b = b.to_ascii_uppercase();
}
}
Ok(seq)
}
pub fn fetch(&mut self, contig_name: &str, start: u64, end: u64) -> Result<Vec<u8>> {
use noodles::core::Position;
#[expect(
clippy::cast_possible_truncation,
reason = "genomic coordinates fit in usize on all supported platforms"
)]
let pos_start = Position::new((start + 1) as usize)
.ok_or_else(|| anyhow!("Invalid start position {start} for region query"))?;
#[expect(
clippy::cast_possible_truncation,
reason = "genomic coordinates fit in usize on all supported platforms"
)]
let pos_end = Position::new(end as usize)
.ok_or_else(|| anyhow!("Invalid end position {end} for region query"))?;
let region = Region::new(contig_name, pos_start..=pos_end);
let record = self
.reader
.query(®ion)
.with_context(|| format!("Failed to fetch {contig_name}:{start}-{end} from FASTA"))?;
Ok(record.sequence().as_ref().to_vec())
}
#[must_use]
pub fn contig_length(&self, name: &str) -> Option<u64> {
self.dict.get_by_name(name).map(|m| m.length() as u64)
}
#[must_use]
pub fn contig_names(&self) -> Vec<&str> {
self.dict.names()
}
}