use std::io::{self, Read, Seek, SeekFrom};
use arrow::array::RecordBatchReader;
use arrow::datatypes::Schema;
use arrow::error::ArrowError;
use arrow::record_batch::RecordBatch;
use noodles::core::region::Interval;
use crate::alignment::model::tag::TagScanner;
use crate::alignment::model::BatchBuilder;
use crate::alignment::AlignmentModel;
use crate::batch::{Push, RecordBatchBuilder as _};
use crate::Select;
pub struct Scanner {
header: noodles::sam::Header,
model: AlignmentModel,
repo: noodles::fasta::Repository,
}
impl Scanner {
pub fn new(
header: noodles::sam::Header,
fields: Select<String>,
tag_defs: Option<Vec<(String, String)>>,
repo: noodles::fasta::Repository,
) -> crate::Result<Self> {
let model = AlignmentModel::new(fields, tag_defs)?;
Ok(Self {
header,
model,
repo,
})
}
pub fn with_model(
header: noodles::sam::Header,
model: AlignmentModel,
repo: noodles::fasta::Repository,
) -> Self {
Self {
header,
model,
repo,
}
}
pub fn model(&self) -> &AlignmentModel {
&self.model
}
pub fn repo(&self) -> &noodles::fasta::Repository {
&self.repo
}
pub fn header(&self) -> &noodles::sam::Header {
&self.header
}
pub fn chrom_names(&self) -> Vec<String> {
self.header
.reference_sequences()
.iter()
.map(|(name, _)| name.to_string())
.collect()
}
pub fn chrom_sizes(&self) -> Vec<(String, u32)> {
self.header
.reference_sequences()
.iter()
.map(|(name, r)| (name.to_string(), r.length().get() as u32))
.collect()
}
pub fn field_names(&self) -> Vec<String> {
self.model.field_names()
}
pub fn schema(&self) -> &Schema {
self.model.schema()
}
fn build_batch_builder(
&self,
columns: Option<Vec<String>>,
capacity: usize,
) -> crate::Result<BatchBuilder> {
let model = match columns {
None => self.model.clone(),
Some(cols) => self.model.project(&cols)?,
};
BatchBuilder::from_model(&model, self.header.clone(), capacity)
}
}
impl Scanner {
pub fn tag_defs<R: Read>(
fmt_reader: &mut noodles::cram::io::Reader<R>,
header: &noodles::sam::Header,
scan_rows: Option<usize>,
) -> crate::Result<Vec<(String, String)>> {
let records = fmt_reader.records(header);
let mut tag_scanner = TagScanner::new();
match scan_rows {
None => {
for result in records {
if let Ok(record) = result {
tag_scanner.push(&record);
} else {
eprintln!("Failed to read record");
}
}
}
Some(n) => {
for result in records.take(n) {
if let Ok(record) = result {
tag_scanner.push(&record);
} else {
eprintln!("Failed to read record");
}
}
}
}
Ok(tag_scanner.collect())
}
pub fn scan<R: Read>(
&self,
fmt_reader: noodles::cram::io::Reader<R>,
columns: Option<Vec<String>>,
batch_size: Option<usize>,
limit: Option<usize>,
) -> crate::Result<impl RecordBatchReader> {
let batch_size = batch_size.unwrap_or(1024);
let batch_builder = self.build_batch_builder(columns, batch_size)?;
let batch_iter = BatchIterator::new(
fmt_reader,
self.header.clone(),
&self.repo,
batch_builder,
batch_size,
limit,
);
Ok(batch_iter)
}
pub fn scan_query<R: Read + Seek>(
&self,
fmt_reader: noodles::cram::io::Reader<R>,
region: noodles::core::Region,
index: noodles::cram::crai::Index,
columns: Option<Vec<String>>,
batch_size: Option<usize>,
limit: Option<usize>,
) -> crate::Result<impl RecordBatchReader> {
let batch_size = batch_size.unwrap_or(1024);
let interval = region.interval();
let batch_builder = self.build_batch_builder(columns, batch_size)?;
let reference_sequence_id = super::resolve_chrom_id(&self.header, region.name())?;
let batch_iter = QueryBatchIterator::new(
fmt_reader,
self.header.clone(),
&self.repo,
index,
reference_sequence_id,
interval,
batch_builder,
batch_size,
limit,
);
Ok(batch_iter)
}
}
pub struct BatchIterator<R>
where
R: Read,
{
records: CramRecords<R>,
builder: BatchBuilder,
batch_size: usize,
limit: usize,
count: usize,
}
impl<R> BatchIterator<R>
where
R: Read,
{
pub fn new(
reader: noodles::cram::io::Reader<R>,
header: noodles::sam::Header,
repo: &noodles::fasta::Repository,
builder: BatchBuilder,
batch_size: usize,
limit: Option<usize>,
) -> Self {
Self {
records: CramRecords::new(reader, header, repo.clone()),
builder,
batch_size,
limit: limit.unwrap_or(usize::MAX),
count: 0,
}
}
}
impl<R> RecordBatchReader for BatchIterator<R>
where
R: Read,
Self: Iterator<Item = Result<RecordBatch, ArrowError>>,
{
fn schema(&self) -> arrow::datatypes::SchemaRef {
self.builder.schema()
}
}
impl<R> Iterator for BatchIterator<R>
where
R: Read,
{
type Item = Result<RecordBatch, ArrowError>;
fn next(&mut self) -> Option<Self::Item> {
let mut count = 0;
while count < self.batch_size && self.count < self.limit {
match self.records.next() {
Some(Ok(record)) => {
match self.builder.push(&record) {
Ok(_) => {
self.count += 1;
count += 1;
}
Err(e) => return Some(Err(e.into())),
};
}
Some(Err(e)) => return Some(Err(e.into())),
None => break,
}
}
if count == 0 {
None
} else {
let batch = self.builder.finish();
Some(batch)
}
}
}
pub struct CramRecords<R>
where
R: Read,
{
reader: noodles::cram::io::Reader<R>,
repo: noodles::fasta::Repository,
header: noodles::sam::Header,
container: noodles::cram::io::reader::Container,
records: std::vec::IntoIter<noodles::sam::alignment::RecordBuf>,
eof: bool,
}
impl<R> CramRecords<R>
where
R: Read,
{
pub fn new(
reader: noodles::cram::io::Reader<R>,
header: noodles::sam::Header,
repo: noodles::fasta::Repository,
) -> Self {
Self {
reader,
header,
repo,
container: noodles::cram::io::reader::Container::default(),
records: Vec::new().into_iter(),
eof: false,
}
}
}
impl<R> Iterator for CramRecords<R>
where
R: Read,
{
type Item = io::Result<noodles::sam::alignment::RecordBuf>;
fn next(&mut self) -> Option<Self::Item> {
if self.eof {
return None;
}
loop {
match self.records.next() {
Some(record) => return Some(Ok(record)),
None => match read_container_records(
&mut self.reader,
&self.repo,
&self.header,
&mut self.container,
) {
Ok(None) => {
self.eof = true;
return None;
}
Ok(Some(records)) => {
self.records = records.into_iter();
}
Err(e) => return Some(Err(e)),
},
}
}
}
}
pub struct QueryBatchIterator<R>
where
R: Read + Seek,
{
query: CramQuery<R>,
builder: BatchBuilder,
batch_size: usize,
limit: usize,
count: usize,
}
impl<R> QueryBatchIterator<R>
where
R: Read + Seek,
{
#[allow(clippy::too_many_arguments)]
pub fn new(
fmt_reader: noodles::cram::io::Reader<R>,
header: noodles::sam::Header,
repo: &noodles::fasta::Repository,
index: noodles::cram::crai::Index,
reference_sequence_id: usize,
interval: Interval,
builder: BatchBuilder,
batch_size: usize,
limit: Option<usize>,
) -> Self {
let query = CramQuery::new(
fmt_reader,
header,
index,
repo.clone(),
reference_sequence_id,
interval,
);
Self {
query,
builder,
batch_size,
limit: limit.unwrap_or(usize::MAX),
count: 0,
}
}
}
impl<R> RecordBatchReader for QueryBatchIterator<R>
where
R: Read + Seek,
Self: Iterator<Item = Result<RecordBatch, ArrowError>>,
{
fn schema(&self) -> arrow::datatypes::SchemaRef {
self.builder.schema()
}
}
impl<R> Iterator for QueryBatchIterator<R>
where
R: Read + Seek,
{
type Item = Result<RecordBatch, ArrowError>;
fn next(&mut self) -> Option<Self::Item> {
let mut count = 0;
while count < self.batch_size && self.count < self.limit {
match self.query.next() {
Some(Ok(record)) => {
match self.builder.push(&record) {
Ok(_) => {
self.count += 1;
count += 1;
}
Err(e) => return Some(Err(e.into())),
};
}
Some(Err(e)) => return Some(Err(e.into())),
None => break,
}
}
if count == 0 {
None
} else {
let batch = self.builder.finish(); Some(batch)
}
}
}
pub struct CramQuery<R>
where
R: Read + Seek,
{
reader: noodles::cram::io::Reader<R>,
header: noodles::sam::Header,
index: std::vec::IntoIter<noodles::cram::crai::Record>,
repo: noodles::fasta::Repository,
reference_sequence_id: usize,
interval: noodles::core::region::Interval,
container: noodles::cram::io::reader::Container,
records: std::vec::IntoIter<noodles::sam::alignment::RecordBuf>,
}
impl<R> CramQuery<R>
where
R: Read + Seek,
{
pub fn new(
reader: noodles::cram::io::Reader<R>,
header: noodles::sam::Header,
index: Vec<noodles::cram::crai::Record>,
repo: noodles::fasta::Repository,
reference_sequence_id: usize,
interval: noodles::core::region::Interval,
) -> Self {
Self {
reader,
header,
index: index.into_iter(),
repo,
reference_sequence_id,
interval,
container: noodles::cram::io::reader::Container::default(),
records: Vec::new().into_iter(),
}
}
}
impl<R> Iterator for CramQuery<R>
where
R: Read + Seek,
{
type Item = io::Result<noodles::sam::alignment::RecordBuf>;
fn next(&mut self) -> Option<Self::Item> {
loop {
match self.records.next() {
Some(r) => {
if let (Some(start), Some(end)) = (r.alignment_start(), r.alignment_end()) {
let alignment_interval = (start..=end).into();
if self.interval.intersects(alignment_interval) {
return Some(Ok(r));
}
}
}
None => {
let index_record = self
.index
.find(|c| c.reference_sequence_id() == Some(self.reference_sequence_id))?;
if let Err(e) = self.reader.seek(SeekFrom::Start(index_record.offset())) {
return Some(Err(e));
}
match read_container_records(
&mut self.reader,
&self.repo,
&self.header,
&mut self.container,
) {
Ok(Some(records)) => {
self.records = records.into_iter();
}
Ok(None) => {
unreachable!();
}
Err(e) => return Some(Err(e)),
}
}
}
}
}
}
fn read_container_records<R: Read>(
reader: &mut noodles::cram::io::Reader<R>,
repo: &noodles::fasta::Repository,
header: &noodles::sam::Header,
container: &mut noodles::cram::io::reader::Container,
) -> io::Result<Option<Vec<noodles::sam::alignment::RecordBuf>>> {
if reader.read_container(container)? == 0 {
return Ok(None); }
let compression_header = container.compression_header()?;
let records = container
.slices()
.map(|result| {
let slice = result?;
let (core_data_src, external_data_srcs) = slice.decode_blocks()?;
slice
.records(
repo.clone(),
header,
&compression_header,
&core_data_src,
&external_data_srcs,
)
.and_then(|records| {
records
.into_iter()
.map(|record| {
noodles::sam::alignment::RecordBuf::try_from_alignment_record(
header, &record,
)
})
.collect::<io::Result<Vec<_>>>()
})
})
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.flatten()
.collect::<Vec<_>>();
Ok(Some(records))
}