use std::io::{BufRead, BufReader};
use std::path::Path;
#[cfg(feature = "gzip")]
use flate2::read::MultiGzDecoder;
use crate::block::Block;
use crate::chain::Strand;
use crate::error::ChainError;
use crate::parser::common::{is_blank, parse_block, parse_header_with_default_id};
#[cfg(not(feature = "gzip"))]
use crate::storage::gzip_feature_error;
use crate::storage::is_gz_path;
#[derive(Debug, Clone)]
pub struct OwnedChain {
pub score: i64,
pub reference_name: Vec<u8>,
pub reference_size: u32,
pub reference_strand: Strand,
pub reference_start: u32,
pub reference_end: u32,
pub query_name: Vec<u8>,
pub query_size: u32,
pub query_strand: Strand,
pub query_start: u32,
pub query_end: u32,
pub id: u64,
pub blocks: Vec<Block>,
}
#[derive(Debug, Clone)]
pub struct OwnedChainHeader {
pub offset: usize,
pub score: i64,
pub reference_name: Vec<u8>,
pub reference_size: u32,
pub reference_strand: Strand,
pub reference_start: u32,
pub reference_end: u32,
pub query_name: Vec<u8>,
pub query_size: u32,
pub query_strand: Strand,
pub query_start: u32,
pub query_end: u32,
pub id: u64,
}
#[derive(Debug, Clone)]
pub enum StreamItem {
MetaLine(Vec<u8>),
Header(OwnedChainHeader),
}
impl OwnedChainHeader {
pub fn into_chain(self, blocks: Vec<Block>) -> OwnedChain {
OwnedChain {
score: self.score,
reference_name: self.reference_name,
reference_size: self.reference_size,
reference_strand: self.reference_strand,
reference_start: self.reference_start,
reference_end: self.reference_end,
query_name: self.query_name,
query_size: self.query_size,
query_strand: self.query_strand,
query_start: self.query_start,
query_end: self.query_end,
id: self.id,
blocks,
}
}
}
pub struct StreamingReader<R: BufRead> {
reader: R,
buf: Vec<u8>,
offset: usize,
next_id: u64,
}
impl<R: BufRead> StreamingReader<R> {
pub fn new(reader: R) -> Self {
StreamingReader {
reader,
buf: Vec::with_capacity(8 * 1024),
offset: 0,
next_id: 1,
}
}
pub fn set_next_generated_id(&mut self, next_id: u64) {
self.next_id = next_id;
}
pub fn next_generated_id(&self) -> u64 {
self.next_id
}
pub fn next_chain(&mut self) -> Result<Option<OwnedChain>, ChainError> {
let Some(header) = self.next_header()? else {
return Ok(None);
};
let offset = header.offset;
let blocks = self.read_blocks(offset)?;
Ok(Some(header.into_chain(blocks)))
}
pub fn next_header(&mut self) -> Result<Option<OwnedChainHeader>, ChainError> {
while let Some(item) = self.next_item()? {
if let StreamItem::Header(header) = item {
return Ok(Some(header));
}
}
Ok(None)
}
pub fn next_item(&mut self) -> Result<Option<StreamItem>, ChainError> {
loop {
let Some((line_start, line)) = self.read_trimmed_line()? else {
return Ok(None);
};
if line.is_empty() || is_blank(line) {
continue;
}
if line[0] == b'#' {
return Ok(Some(StreamItem::MetaLine(line.to_vec())));
}
let header_line = line.to_vec();
let header = self.parse_header_line(line_start, &header_line)?;
return Ok(Some(StreamItem::Header(header)));
}
}
pub fn read_blocks(&mut self, header_offset: usize) -> Result<Vec<Block>, ChainError> {
let mut blocks = Vec::new();
loop {
let Some((line_start, line)) = self.read_trimmed_line()? else {
break;
};
if is_blank(line) {
break;
}
let block = parse_block(line, line_start)?;
blocks.push(block);
}
if blocks.is_empty() {
return Err(ChainError::Format {
offset: header_offset,
msg: "chain without any alignment blocks".into(),
});
}
Ok(blocks)
}
pub fn skip_blocks(&mut self) -> Result<(), ChainError> {
while let Some((_, line)) = self.read_trimmed_line()? {
if is_blank(line) {
break;
}
}
Ok(())
}
fn read_trimmed_line(&mut self) -> Result<Option<(usize, &[u8])>, ChainError> {
self.buf.clear();
let start = self.offset;
let n = self.reader.read_until(b'\n', &mut self.buf)?;
if n == 0 {
return Ok(None);
}
self.offset += n;
if let Some(b'\n') = self.buf.last() {
self.buf.pop();
}
if let Some(b'\r') = self.buf.last() {
self.buf.pop();
}
Ok(Some((start, self.buf.as_slice())))
}
fn parse_header_line(
&mut self,
header_offset: usize,
header_line: &[u8],
) -> Result<OwnedChainHeader, ChainError> {
let default_id = self.next_id;
let (meta, has_explicit_id) =
parse_header_with_default_id(header_line, header_offset, default_id)?;
let reference_name = slice_name(header_line, header_offset, meta.reference_name.clone())?;
let query_name = slice_name(header_line, header_offset, meta.query_name.clone())?;
if !has_explicit_id {
self.next_id = default_id
.checked_add(1)
.ok_or_else(|| ChainError::Format {
offset: header_offset,
msg: "generated chain id overflows u64".into(),
})?;
}
Ok(OwnedChainHeader {
offset: header_offset,
score: meta.score,
reference_name,
reference_size: meta.reference_size,
reference_strand: meta.reference_strand,
reference_start: meta.reference_start,
reference_end: meta.reference_end,
query_name,
query_size: meta.query_size,
query_strand: meta.query_strand,
query_start: meta.query_start,
query_end: meta.query_end,
id: meta.id,
})
}
}
impl StreamingReader<Box<dyn BufRead>> {
pub fn from_path<P: AsRef<Path>>(path: P) -> Result<Self, ChainError> {
let path = path.as_ref();
if is_gz_path(path) {
#[cfg(feature = "gzip")]
{
let file = std::fs::File::open(path)?;
let reader = BufReader::new(file);
let decoder = MultiGzDecoder::new(reader);
return Ok(StreamingReader::new(Box::new(BufReader::new(decoder))));
}
#[cfg(not(feature = "gzip"))]
{
return Err(gzip_feature_error());
}
}
let file = std::fs::File::open(path)?;
Ok(StreamingReader::new(Box::new(BufReader::new(file))))
}
}
fn slice_name(
line: &[u8],
line_start: usize,
range: std::ops::Range<usize>,
) -> Result<Vec<u8>, ChainError> {
let rel_start = range
.start
.checked_sub(line_start)
.ok_or_else(|| ChainError::Format {
offset: line_start,
msg: "name slice underflow".into(),
})?;
let rel_end = range
.end
.checked_sub(line_start)
.ok_or_else(|| ChainError::Format {
offset: line_start,
msg: "name slice underflow".into(),
})?;
if rel_end > line.len() || rel_start > rel_end {
return Err(ChainError::Format {
offset: line_start,
msg: "name slice out of bounds".into(),
});
}
Ok(line[rel_start..rel_end].to_vec())
}