use std::io::{BufRead, Seek};
use arrow::array::RecordBatchReader;
use arrow::datatypes::Schema;
use noodles::bgzf::VirtualPosition;
use noodles::csi::BinningIndex;
use crate::alignment::model::tag::TagScanner;
use crate::alignment::model::BatchBuilder;
use crate::alignment::scanner::batch_iterator::{BatchIterator, QueryBatchIterator};
use crate::alignment::AlignmentModel;
use crate::util::query::{BgzfChunkReader, ByteRangeReader};
use crate::Select;
pub struct Scanner {
header: noodles::sam::Header,
model: AlignmentModel,
}
impl Scanner {
pub fn new(
header: noodles::sam::Header,
fields: Select<String>,
tag_defs: Option<Vec<(String, String)>>,
) -> crate::Result<Self> {
let model = AlignmentModel::new(fields, tag_defs)?;
Ok(Self { header, model })
}
pub fn with_model(header: noodles::sam::Header, model: AlignmentModel) -> Self {
Self { header, model }
}
pub fn model(&self) -> &AlignmentModel {
&self.model
}
pub fn header(&self) -> &noodles::sam::Header {
&self.header
}
pub fn chrom_names(&self) -> Vec<String> {
self.header
.reference_sequences()
.iter()
.map(|(name, _)| name.to_string())
.collect()
}
pub fn chrom_sizes(&self) -> Vec<(String, u32)> {
self.header
.reference_sequences()
.iter()
.map(|(name, r)| (name.to_string(), r.length().get() as u32))
.collect()
}
pub fn field_names(&self) -> Vec<String> {
self.model.field_names()
}
pub fn schema(&self) -> &Schema {
self.model.schema()
}
fn build_batch_builder(
&self,
columns: Option<Vec<String>>,
capacity: usize,
) -> crate::Result<BatchBuilder> {
let model = match columns {
None => self.model.clone(),
Some(cols) => self.model.project(&cols)?,
};
BatchBuilder::from_model(&model, self.header.clone(), capacity)
}
}
impl Scanner {
pub fn tag_defs<R: BufRead>(
fmt_reader: &mut noodles::sam::io::Reader<R>,
scan_rows: Option<usize>,
) -> crate::Result<Vec<(String, String)>> {
let records = fmt_reader.records();
let mut tag_scanner = TagScanner::new();
match scan_rows {
None => {
for result in records {
if let Ok(record) = result {
tag_scanner.push(&record);
} else {
eprintln!("Failed to read record");
}
}
}
Some(n) => {
for result in records.take(n) {
if let Ok(record) = result {
tag_scanner.push(&record);
} else {
eprintln!("Failed to read record");
}
}
}
}
Ok(tag_scanner.collect())
}
pub fn scan<R: BufRead>(
&self,
fmt_reader: noodles::sam::io::Reader<R>,
columns: Option<Vec<String>>,
batch_size: Option<usize>,
limit: Option<usize>,
) -> crate::Result<impl RecordBatchReader> {
let batch_size = batch_size.unwrap_or(1024);
let batch_builder = self.build_batch_builder(columns, batch_size)?;
let batch_iter = BatchIterator::new(fmt_reader, batch_builder, batch_size, limit);
Ok(batch_iter)
}
pub fn scan_query<R: noodles::bgzf::io::BufRead + noodles::bgzf::io::Seek>(
&self,
fmt_reader: noodles::sam::io::Reader<R>,
region: noodles::core::Region,
index: impl BinningIndex,
columns: Option<Vec<String>>,
batch_size: Option<usize>,
limit: Option<usize>,
) -> crate::Result<impl RecordBatchReader> {
let batch_size = batch_size.unwrap_or(1024);
let interval = region.interval();
let batch_builder = self.build_batch_builder(columns, batch_size)?;
let reference_sequence_id = super::resolve_chrom_id(&self.header, region.name())?;
let chunks = index.query(reference_sequence_id, interval)?;
let chunks = chunks.into_iter().map(|c| (c.start(), c.end())).collect();
let bgzf_reader = fmt_reader.into_inner();
let query_reader = BgzfChunkReader::new(bgzf_reader, chunks);
let fmt_reader = noodles::sam::io::Reader::new(query_reader);
let batch_iter = QueryBatchIterator::new(
fmt_reader,
self.header.clone(),
reference_sequence_id,
interval,
batch_builder,
batch_size,
limit,
);
Ok(batch_iter)
}
pub fn scan_unmapped<R: noodles::bgzf::io::BufRead + noodles::bgzf::io::Seek>(
&self,
mut fmt_reader: noodles::sam::io::Reader<R>,
index: impl BinningIndex,
columns: Option<Vec<String>>,
batch_size: Option<usize>,
limit: Option<usize>,
) -> crate::Result<impl RecordBatchReader> {
let batch_size = batch_size.unwrap_or(1024);
let batch_builder = self.build_batch_builder(columns, batch_size)?;
let _ = fmt_reader.query_unmapped(&index)?;
let batch_iter = BatchIterator::new(fmt_reader, batch_builder, batch_size, limit);
Ok(batch_iter)
}
pub fn scan_byte_ranges<R: BufRead + Seek>(
&self,
fmt_reader: noodles::sam::io::Reader<R>,
byte_ranges: Vec<(u64, u64)>,
columns: Option<Vec<String>>,
batch_size: Option<usize>,
limit: Option<usize>,
) -> crate::Result<impl RecordBatchReader> {
let batch_size = batch_size.unwrap_or(1024);
let batch_builder = self.build_batch_builder(columns, batch_size)?;
let inner_reader = fmt_reader.into_inner();
let range_reader = ByteRangeReader::new(inner_reader, byte_ranges);
let fmt_reader = noodles::sam::io::Reader::new(range_reader);
let batch_iter = BatchIterator::new(fmt_reader, batch_builder, batch_size, limit);
Ok(batch_iter)
}
pub fn scan_virtual_ranges<R: noodles::bgzf::io::BufRead + noodles::bgzf::io::Seek>(
&self,
fmt_reader: noodles::sam::io::Reader<R>,
vpos_ranges: Vec<(VirtualPosition, VirtualPosition)>,
columns: Option<Vec<String>>,
batch_size: Option<usize>,
limit: Option<usize>,
) -> crate::Result<impl RecordBatchReader> {
let batch_size = batch_size.unwrap_or(1024);
let batch_builder = self.build_batch_builder(columns, batch_size)?;
let bgzf_reader = fmt_reader.into_inner();
let range_reader = BgzfChunkReader::new(bgzf_reader, vpos_ranges);
let fmt_reader = noodles::sam::io::Reader::new(range_reader);
let batch_iter = BatchIterator::new(fmt_reader, batch_builder, batch_size, limit);
Ok(batch_iter)
}
}