use std::io::{BufRead, Seek};
use arrow::array::RecordBatchReader;
use arrow::datatypes::Schema;
use noodles::bgzf::VirtualPosition;
use noodles::csi::BinningIndex;
use crate::bed::model::BatchBuilder;
use crate::bed::model::BedSchema;
use crate::bed::model::Model;
use crate::bed::scanner::batch_iterator::{BatchIterator, QueryBatchIterator};
use crate::util::query::{BgzfChunkReader, ByteRangeReader};
use crate::{OxbowError, Select};
pub struct Scanner {
model: Model,
}
impl Scanner {
pub fn new(bed_schema: BedSchema, fields: Select<String>) -> crate::Result<Self> {
let model = Model::new(bed_schema, fields)?;
Ok(Self { model })
}
pub fn with_model(model: Model) -> Self {
Self { model }
}
pub fn model(&self) -> &Model {
&self.model
}
pub fn schema(&self) -> &Schema {
self.model.schema()
}
pub fn field_names(&self) -> Vec<String> {
self.model.field_names()
}
fn build_batch_builder(
&self,
columns: Option<Vec<String>>,
capacity: usize,
) -> crate::Result<BatchBuilder> {
let model = match columns {
None => self.model.clone(),
Some(cols) => self.model.project(&cols)?,
};
BatchBuilder::from_model(&model, capacity)
}
}
impl Scanner {
pub fn scan<R: BufRead>(
&self,
fmt_reader: noodles::bed::io::Reader<3, R>,
columns: Option<Vec<String>>,
batch_size: Option<usize>,
limit: Option<usize>,
) -> crate::Result<impl RecordBatchReader> {
let batch_size = batch_size.unwrap_or(1024);
let batch_builder = self.build_batch_builder(columns, batch_size)?;
let batch_iter = BatchIterator::new(fmt_reader, batch_builder, batch_size, limit);
Ok(batch_iter)
}
pub fn scan_query<R: noodles::bgzf::io::BufRead + noodles::bgzf::io::Seek>(
&self,
fmt_reader: noodles::bed::io::Reader<3, R>,
region: noodles::core::Region,
index: impl BinningIndex,
columns: Option<Vec<String>>,
batch_size: Option<usize>,
limit: Option<usize>,
) -> crate::Result<impl RecordBatchReader> {
let batch_size = batch_size.unwrap_or(1024);
let reference_sequence_name = region.name().to_string();
let interval = region.interval();
let batch_builder = self.build_batch_builder(columns, batch_size)?;
let Some(header) = index.header() else {
return Err(OxbowError::not_found("Index header not found."));
};
let reference_sequence_id = resolve_chrom_id(header, &reference_sequence_name)?;
let chunks = index.query(reference_sequence_id, interval)?;
let chunks = chunks.into_iter().map(|c| (c.start(), c.end())).collect();
let bgzf_reader = fmt_reader.into_inner();
let query_reader = BgzfChunkReader::new(bgzf_reader, chunks);
let fmt_reader = noodles::bed::io::Reader::new(query_reader);
let batch_iter = QueryBatchIterator::new(
fmt_reader,
header.clone(),
reference_sequence_id,
interval,
batch_builder,
batch_size,
limit,
);
Ok(batch_iter)
}
pub fn scan_byte_ranges<R: BufRead + Seek>(
&self,
fmt_reader: noodles::bed::io::Reader<3, R>,
byte_ranges: Vec<(u64, u64)>,
columns: Option<Vec<String>>,
batch_size: Option<usize>,
limit: Option<usize>,
) -> crate::Result<impl RecordBatchReader> {
let batch_size = batch_size.unwrap_or(1024);
let batch_builder = self.build_batch_builder(columns, batch_size)?;
let inner_reader = fmt_reader.into_inner();
let range_reader = ByteRangeReader::new(inner_reader, byte_ranges);
let fmt_reader = noodles::bed::io::Reader::new(range_reader);
let batch_iter = BatchIterator::new(fmt_reader, batch_builder, batch_size, limit);
Ok(batch_iter)
}
pub fn scan_virtual_ranges<R: noodles::bgzf::io::BufRead + noodles::bgzf::io::Seek>(
&self,
fmt_reader: noodles::bed::io::Reader<3, R>,
vpos_ranges: Vec<(VirtualPosition, VirtualPosition)>,
columns: Option<Vec<String>>,
batch_size: Option<usize>,
limit: Option<usize>,
) -> crate::Result<impl RecordBatchReader> {
let batch_size = batch_size.unwrap_or(1024);
let batch_builder = self.build_batch_builder(columns, batch_size)?;
let bgzf_reader = fmt_reader.into_inner();
let range_reader = BgzfChunkReader::new(bgzf_reader, vpos_ranges);
let fmt_reader = noodles::bed::io::Reader::new(range_reader);
let batch_iter = BatchIterator::new(fmt_reader, batch_builder, batch_size, limit);
Ok(batch_iter)
}
}
fn resolve_chrom_id(
header: &noodles::csi::binning_index::index::Header,
reference_sequence_name: &str,
) -> crate::Result<usize> {
let Some(id) = header
.reference_sequence_names()
.get_index_of(reference_sequence_name.as_bytes())
else {
return Err(OxbowError::not_found(format!(
"Reference sequence {} not found in index header.",
reference_sequence_name
)));
};
Ok(id)
}