use std::io::{BufRead, Seek};
use arrow::array::RecordBatchReader;
use arrow::datatypes::Schema;
use noodles::bgzf::VirtualPosition;
use noodles::csi::BinningIndex;
use crate::util::query::{BgzfChunkReader, ByteRangeReader};
use crate::variant::model::{BatchBuilder, GenotypeBy, Model};
use crate::variant::scanner::batch_iterator::{BatchIterator, QueryBatchIterator};
use crate::{OxbowError, Select};
pub struct Scanner {
header: noodles::vcf::Header,
model: Model,
}
impl Scanner {
#[allow(clippy::too_many_arguments)]
pub fn new(
header: noodles::vcf::Header,
fields: Select<String>,
info_fields: Select<String>,
genotype_fields: Select<String>,
genotype_by: Option<GenotypeBy>,
samples: Select<String>,
samples_nested: Option<bool>,
) -> crate::Result<Self> {
let model = Model::from_header(
&header,
fields,
info_fields,
genotype_fields,
genotype_by,
samples,
samples_nested,
)?;
Ok(Self { header, model })
}
pub fn with_model(header: noodles::vcf::Header, model: Model) -> Self {
Self { header, model }
}
pub fn model(&self) -> &Model {
&self.model
}
pub fn header(&self) -> &noodles::vcf::Header {
&self.header
}
pub fn chrom_names(&self) -> Vec<String> {
self.header
.contigs()
.iter()
.map(|(name, _)| name.to_string())
.collect()
}
pub fn chrom_sizes(&self) -> Vec<(String, u32)> {
self.header
.contigs()
.iter()
.map(|(name, r)| (name.to_string(), r.length().unwrap_or(0) as u32))
.collect()
}
pub fn field_names(&self) -> Vec<String> {
self.model.field_names()
}
pub fn info_field_names(&self) -> Vec<String> {
self.header.infos().iter().map(|(k, _)| k.clone()).collect()
}
pub fn info_field_defs(&self) -> Vec<(String, String, String)> {
use noodles::vcf::header::record::value::map::info::Number;
self.header
.infos()
.iter()
.map(|(k, def)| {
let n = match def.number() {
Number::Count(n) => n.to_string(),
Number::AlternateBases => "A".to_string(),
Number::ReferenceAlternateBases => "R".to_string(),
Number::Samples => "G".to_string(),
Number::Unknown => ".".to_string(),
};
let ty = def.ty().to_string();
(k.clone(), n, ty)
})
.collect()
}
pub fn genotype_field_names(&self) -> Vec<String> {
self.header
.formats()
.iter()
.map(|(k, _)| k.clone())
.collect()
}
pub fn genotype_field_defs(&self) -> Vec<(String, String, String)> {
use noodles::vcf::header::record::value::map::format::Number;
self.header
.formats()
.iter()
.map(|(k, def)| {
let n = match def.number() {
Number::Count(n) => n.to_string(),
Number::AlternateBases => "A".to_string(),
Number::ReferenceAlternateBases => "R".to_string(),
Number::Samples => "G".to_string(),
Number::LocalAlternateBases => "LA".to_string(),
Number::LocalReferenceAlternateBases => "LR".to_string(),
Number::LocalSamples => "LG".to_string(),
Number::Ploidy => "P".to_string(),
Number::BaseModifications => "M".to_string(),
Number::Unknown => ".".to_string(),
};
let ty = def.ty().to_string();
(k.clone(), n, ty)
})
.collect()
}
pub fn sample_names(&self) -> Vec<String> {
self.header.sample_names().iter().cloned().collect()
}
pub fn schema(&self) -> &Schema {
self.model.schema()
}
fn build_batch_builder(
&self,
columns: Option<Vec<String>>,
capacity: usize,
) -> crate::Result<BatchBuilder> {
match columns {
None => BatchBuilder::from_model(&self.model, self.header.clone(), capacity),
Some(cols) => {
let projected = self.model.project(&cols)?;
BatchBuilder::from_model(&projected, self.header.clone(), capacity)
}
}
}
}
impl Scanner {
pub fn scan<R: BufRead>(
&self,
fmt_reader: noodles::vcf::io::Reader<R>,
columns: Option<Vec<String>>,
batch_size: Option<usize>,
limit: Option<usize>,
) -> crate::Result<impl RecordBatchReader> {
let batch_size = batch_size.unwrap_or(1024);
let batch_builder = self.build_batch_builder(columns, batch_size)?;
let batch_iter = BatchIterator::new(fmt_reader, batch_builder, batch_size, limit);
Ok(batch_iter)
}
pub fn scan_query<R: noodles::bgzf::io::BufRead + noodles::bgzf::io::Seek>(
&self,
fmt_reader: noodles::vcf::io::Reader<R>,
region: noodles::core::Region,
index: impl BinningIndex,
columns: Option<Vec<String>>,
batch_size: Option<usize>,
limit: Option<usize>,
) -> crate::Result<impl RecordBatchReader> {
let batch_size = batch_size.unwrap_or(1024);
let reference_sequence_name = region.name().to_string();
let interval = region.interval();
let batch_builder = self.build_batch_builder(columns, batch_size)?;
let reference_sequence_id =
resolve_chrom_id(&self.header, &index, &reference_sequence_name)?;
let chunks = index.query(reference_sequence_id, interval)?;
let chunks = chunks.into_iter().map(|c| (c.start(), c.end())).collect();
let bgzf_reader = fmt_reader.into_inner();
let query_reader = BgzfChunkReader::new(bgzf_reader, chunks);
let fmt_reader = noodles::vcf::io::Reader::new(query_reader);
let batch_iter = QueryBatchIterator::new(
fmt_reader,
self.header.clone(),
reference_sequence_name,
interval,
batch_builder,
batch_size,
limit,
);
Ok(batch_iter)
}
pub fn scan_byte_ranges<R: BufRead + Seek>(
&self,
fmt_reader: noodles::vcf::io::Reader<R>,
byte_ranges: Vec<(u64, u64)>,
columns: Option<Vec<String>>,
batch_size: Option<usize>,
limit: Option<usize>,
) -> crate::Result<impl RecordBatchReader> {
let batch_size = batch_size.unwrap_or(1024);
let batch_builder = self.build_batch_builder(columns, batch_size)?;
let inner_reader = fmt_reader.into_inner();
let range_reader = ByteRangeReader::new(inner_reader, byte_ranges);
let fmt_reader = noodles::vcf::io::Reader::new(range_reader);
let batch_iter = BatchIterator::new(fmt_reader, batch_builder, batch_size, limit);
Ok(batch_iter)
}
pub fn scan_virtual_ranges<R: noodles::bgzf::io::BufRead + noodles::bgzf::io::Seek>(
&self,
fmt_reader: noodles::vcf::io::Reader<R>,
vpos_ranges: Vec<(VirtualPosition, VirtualPosition)>,
columns: Option<Vec<String>>,
batch_size: Option<usize>,
limit: Option<usize>,
) -> crate::Result<impl RecordBatchReader> {
let batch_size = batch_size.unwrap_or(1024);
let batch_builder = self.build_batch_builder(columns, batch_size)?;
let bgzf_reader = fmt_reader.into_inner();
let range_reader = BgzfChunkReader::new(bgzf_reader, vpos_ranges);
let fmt_reader = noodles::vcf::io::Reader::new(range_reader);
let batch_iter = BatchIterator::new(fmt_reader, batch_builder, batch_size, limit);
Ok(batch_iter)
}
}
fn resolve_chrom_id(
header: &noodles::vcf::Header,
index: &impl BinningIndex,
chrom: &str,
) -> crate::Result<usize> {
let id = index
.header()
.and_then(|index_header| {
index_header
.reference_sequence_names()
.get_index_of(chrom.as_bytes())
})
.or_else(|| header.contigs().get_index_of(chrom));
id.ok_or_else(|| {
OxbowError::invalid_input(format!(
"Reference sequence '{}' not found in Index header or VCF header.",
chrom
))
})
}