use crate::core::GenomicRecordIterator;
use crate::error::{Error, Result};
use crate::expression::{Expr, ExprToFilter};
use crate::filters::RecordFilter;
use crate::plan::{LogicalPlan, PlanNode};
use crate::schema::FileFormat;
use std::path::PathBuf;
use crate::formats::bam::{BamReader, BamRecord};
use crate::formats::bed::{BedReader, BedRecord};
use crate::formats::fastq::{FastqReader, FastqRecord};
use crate::formats::vcf::{VcfReader, VcfRecord};
pub enum ExecutionResult {
Vcf(VcfExecution),
Bam(BamExecution),
Bed(BedExecution),
Fastq(FastqExecution),
}
impl ExecutionResult {
pub fn to_annotation_index<F>(
self,
extractor: F,
) -> Result<crate::interval::annotation::AnnotationIndex>
where
F: Fn(&BedRecord) -> String,
{
use crate::interval::annotation::{Annotation, AnnotationIndex};
use crate::interval::GenomicInterval;
use std::collections::HashMap;
match self {
ExecutionResult::Bed(mut bed_exec) => {
let mut annotations: HashMap<String, Vec<Annotation>> = HashMap::new();
while let Some(result) = bed_exec.next() {
let record = result?;
let interval: GenomicInterval = (&record).into();
let data = extractor(&record);
annotations
.entry(interval.chrom.clone())
.or_insert_with(Vec::new)
.push(Annotation { interval, data });
}
let mut index = AnnotationIndex::new();
index.build_trees(annotations);
Ok(index)
}
_ => Err(Error::invalid_input(
"to_annotation_index() only works with BED execution results",
)),
}
}
pub fn annotate_with_genes(
self,
genes: &crate::interval::annotation::AnnotationIndex,
exons: &crate::interval::annotation::AnnotationIndex,
) -> Result<AnnotationResult> {
use crate::interval::GenomicInterval;
match self {
ExecutionResult::Vcf(mut vcf_exec) => {
let mut result = AnnotationResult::new();
while let Some(record_result) = vcf_exec.next() {
let record = record_result?;
result.total_variants += 1;
if record.is_snp() {
result.snp_count += 1;
} else if record.is_indel() {
result.indel_count += 1;
}
let interval: GenomicInterval = (&record).into();
let overlapping_genes = genes.query(&interval);
let overlapping_exons = exons.query(&interval);
if !overlapping_genes.is_empty() {
result.genic_variants += 1;
if !overlapping_exons.is_empty() {
result.exonic_variants += 1;
if result.example_exonic.len() < 5 {
result.example_exonic.push(AnnotatedVariant {
chrom: record.chrom.clone(),
pos: record.pos,
reference: record.reference.clone(),
alt: record.alt.clone(),
qual: record.qual,
genes: overlapping_genes
.iter()
.map(|s| s.to_string())
.collect(),
exons: overlapping_exons
.iter()
.map(|s| s.to_string())
.collect(),
});
}
}
} else {
result.intergenic_variants += 1;
}
}
Ok(result)
}
ExecutionResult::Bam(mut bam_exec) => {
let mut result = AnnotationResult::new();
while let Some(record_result) = bam_exec.next() {
let record = record_result?;
result.total_variants += 1;
if record.is_unmapped() {
continue;
}
let interval: GenomicInterval = (&record).into();
let overlapping_genes = genes.query(&interval);
let overlapping_exons = exons.query(&interval);
if !overlapping_genes.is_empty() {
result.genic_variants += 1;
if !overlapping_exons.is_empty() {
result.exonic_variants += 1; }
} else {
result.intergenic_variants += 1;
}
}
Ok(result)
}
_ => Err(Error::invalid_input(
"annotate_with_genes() only works with VCF and BAM execution results",
)),
}
}
pub fn count(mut self) -> Result<usize> {
let mut total = 0;
match &mut self {
ExecutionResult::Vcf(vcf_exec) => {
while let Some(_) = vcf_exec.next() {
total += 1;
}
}
ExecutionResult::Bam(bam_exec) => {
while let Some(_) = bam_exec.next() {
total += 1;
}
}
ExecutionResult::Bed(bed_exec) => {
while let Some(_) = bed_exec.next() {
total += 1;
}
}
ExecutionResult::Fastq(fastq_exec) => {
while let Some(_) = fastq_exec.next() {
total += 1;
}
}
}
Ok(total)
}
pub fn head(mut self, n: usize) -> Result<String> {
use std::fmt::Write;
let mut output = String::new();
match &mut self {
ExecutionResult::Vcf(vcf_exec) => {
writeln!(&mut output, "First {} VCF records:", n).unwrap();
writeln!(&mut output, "{:-<80}", "").unwrap();
for (i, record_result) in vcf_exec.take(n).enumerate() {
let record = record_result?;
writeln!(
&mut output,
"{}. {}:{} {}>{} QUAL={:.1}",
i + 1,
record.chrom,
record.pos,
record.reference,
record.alt.join(","),
record.qual.unwrap_or(0.0)
)
.unwrap();
}
}
ExecutionResult::Bam(bam_exec) => {
writeln!(&mut output, "First {} BAM records:", n).unwrap();
writeln!(&mut output, "{:-<120}", "").unwrap();
writeln!(
&mut output,
"{:<5} | {:<8} | {:<10} | {:<12} | {:<8} | {:<10} | {:<20}",
"Index", "MAPQ", "Chrom/Rname", "Position", "Length", "Flags", "Read Name"
)
.unwrap();
writeln!(&mut output, "{:-<120}", "").unwrap();
let mut count = 0;
for (i, record_result) in bam_exec.take(n).enumerate() {
let record = record_result?;
let length = record.seq_string().len();
let flags = format!("0x{:04x}", record.flag);
writeln!(
&mut output,
"{:<5} | {:<8} | {:<10} | {:<12} | {:<8} | {:<10} | {:<20}",
i + 1,
record.mapq,
record.rname,
format!("{}-{}", record.pos, record.pos + length as i32),
length,
flags,
&record.qname[..record.qname.len().min(20)]
)
.unwrap();
count += 1;
}
if count == 0 {
writeln!(&mut output, "No records passed filters").unwrap();
}
let scanned = bam_exec.scanned_count();
let skipped = bam_exec.skipped_errors();
if scanned > 0 || skipped > 0 {
writeln!(&mut output, "\nScan statistics:").unwrap();
writeln!(&mut output, " Records scanned: {}", scanned).unwrap();
writeln!(&mut output, " Records returned: {}", count).unwrap();
if skipped > 0 {
writeln!(&mut output, " Parse errors skipped: {}", skipped).unwrap();
}
}
}
ExecutionResult::Bed(bed_exec) => {
writeln!(&mut output, "First {} BED records:", n).unwrap();
writeln!(&mut output, "{:-<80}", "").unwrap();
for (i, record_result) in bed_exec.take(n).enumerate() {
let record = record_result?;
writeln!(
&mut output,
"{}. {}:{}-{} {}",
i + 1,
record.chrom,
record.start,
record.end,
record.name.as_deref().unwrap_or("-")
)
.unwrap();
}
}
ExecutionResult::Fastq(fastq_exec) => {
writeln!(&mut output, "First {} FASTQ records:", n).unwrap();
writeln!(&mut output, "{:-<80}", "").unwrap();
for (i, record_result) in fastq_exec.take(n).enumerate() {
let record = record_result?;
writeln!(
&mut output,
"{}. {} length={} mean_qual={:.1}",
i + 1,
record.id,
record.sequence.len(),
record.mean_quality()
)
.unwrap();
}
}
}
Ok(output)
}
}
#[derive(Debug, Clone)]
pub struct AnnotationResult {
pub total_variants: usize,
pub snp_count: usize,
pub indel_count: usize,
pub genic_variants: usize,
pub exonic_variants: usize,
pub intergenic_variants: usize,
pub example_exonic: Vec<AnnotatedVariant>,
}
impl AnnotationResult {
fn new() -> Self {
Self {
total_variants: 0,
snp_count: 0,
indel_count: 0,
genic_variants: 0,
exonic_variants: 0,
intergenic_variants: 0,
example_exonic: Vec::new(),
}
}
pub fn print_summary(&self) {
let item_type = if self.snp_count > 0 || self.indel_count > 0 {
"VCF Annotation Results"
} else {
"BAM Annotation Results"
};
println!(" 📈 {}:", item_type);
let count_label = if self.snp_count > 0 || self.indel_count > 0 {
"Total variants"
} else {
"Total reads"
};
println!(" {}: {:>12}", count_label, self.total_variants);
if self.snp_count > 0 || self.indel_count > 0 {
println!(
" SNPs: {:>12} ({:>5.1}%)",
self.snp_count,
(self.snp_count as f64 / self.total_variants.max(1) as f64) * 100.0
);
println!(
" Indels: {:>12} ({:>5.1}%)",
self.indel_count,
(self.indel_count as f64 / self.total_variants.max(1) as f64) * 100.0
);
}
println!("\n 📊 Annotation Categories:");
println!(
" Genic: {:>12} ({:>5.1}%)",
self.genic_variants,
(self.genic_variants as f64 / self.total_variants.max(1) as f64) * 100.0
);
println!(
" Exonic: {:>12} ({:>5.1}%)",
self.exonic_variants,
(self.exonic_variants as f64 / self.total_variants.max(1) as f64) * 100.0
);
println!(
" Intergenic: {:>12} ({:>5.1}%)",
self.intergenic_variants,
(self.intergenic_variants as f64 / self.total_variants.max(1) as f64) * 100.0
);
if !self.example_exonic.is_empty() {
println!("\n 📋 Example High-Quality Exonic Variants:");
for (i, variant) in self.example_exonic.iter().enumerate() {
println!(
" {}. {}:{} {}>{} QUAL={:.1}",
i + 1,
variant.chrom,
variant.pos,
variant.reference,
variant.alt.join(","),
variant.qual.unwrap_or(0.0)
);
println!(" Genes: {}", variant.genes.join(", "));
println!(" Exons: {} exon(s)", variant.exons.len());
}
}
println!("\n ✅ Functional Impact:");
let coding_percent =
(self.exonic_variants as f64 / self.total_variants.max(1) as f64) * 100.0;
if coding_percent > 10.0 {
println!(" HIGH coding variant rate ({:.1}%)", coding_percent);
println!(" Likely exome/targeted sequencing data");
} else if coding_percent > 1.0 {
println!(" TYPICAL coding variant rate ({:.1}%)", coding_percent);
println!(" Standard whole-genome sequencing");
} else {
println!(" LOW coding variant rate ({:.1}%)", coding_percent);
println!(" Most variants in non-coding regions");
}
}
}
#[derive(Debug, Clone)]
pub struct AnnotatedVariant {
pub chrom: String,
pub pos: u64,
pub reference: String,
pub alt: Vec<String>,
pub qual: Option<f64>,
pub genes: Vec<String>,
pub exons: Vec<String>,
}
pub struct VcfExecution {
reader: VcfReader,
filter: Option<Box<dyn RecordFilter<VcfRecord>>>,
limit: Option<usize>,
count: usize,
}
pub struct BamExecution {
reader: BamReader<std::fs::File>,
filter: Option<Box<dyn RecordFilter<BamRecord>>>,
limit: Option<usize>,
count: usize,
skipped_errors: usize,
}
impl BamExecution {
pub fn skipped_errors(&self) -> usize {
self.skipped_errors
}
pub fn scanned_count(&self) -> usize {
self.count
}
}
pub struct BedExecution {
reader: BedReader,
filter: Option<Box<dyn RecordFilter<BedRecord>>>,
limit: Option<usize>,
count: usize,
}
pub struct FastqExecution {
reader: FastqReader,
filter: Option<Box<dyn RecordFilter<FastqRecord>>>,
limit: Option<usize>,
count: usize,
}
impl Iterator for VcfExecution {
type Item = Result<VcfRecord>;
fn next(&mut self) -> Option<Self::Item> {
loop {
if let Some(limit) = self.limit {
if self.count >= limit {
return None;
}
}
match self.reader.next_record() {
Ok(Some(record)) => {
self.count += 1;
if let Some(ref filter) = self.filter {
if !filter.test(&record) {
continue; }
}
return Some(Ok(record));
}
Ok(None) => return None,
Err(e) => return Some(Err(e)),
}
}
}
}
impl Iterator for BamExecution {
type Item = Result<BamRecord>;
fn next(&mut self) -> Option<Self::Item> {
loop {
if let Some(limit) = self.limit {
if self.count >= limit {
return None;
}
}
match self.reader.next_record() {
Ok(Some(record)) => {
self.count += 1;
if let Some(ref filter) = self.filter {
if !filter.test(&record) {
continue;
}
}
return Some(Ok(record));
}
Ok(None) => return None,
Err(e) => {
self.skipped_errors += 1;
self.count += 1;
if self.skipped_errors <= 3 {
eprintln!("Warning: Skipping BAM record that failed to parse: {}", e);
} else if self.skipped_errors == 4 {
eprintln!("Warning: Additional parse errors will be counted silently...");
}
continue; }
}
}
}
}
impl Iterator for BedExecution {
type Item = Result<BedRecord>;
fn next(&mut self) -> Option<Self::Item> {
loop {
if let Some(limit) = self.limit {
if self.count >= limit {
return None;
}
}
match self.reader.next_record() {
Ok(Some(record)) => {
if let Some(ref filter) = self.filter {
if !filter.test(&record) {
continue;
}
}
self.count += 1;
return Some(Ok(record));
}
Ok(None) => return None,
Err(e) => return Some(Err(e)),
}
}
}
}
impl Iterator for FastqExecution {
type Item = Result<FastqRecord>;
fn next(&mut self) -> Option<Self::Item> {
loop {
if let Some(limit) = self.limit {
if self.count >= limit {
return None;
}
}
match self.reader.next_record() {
Ok(Some(record)) => {
if let Some(ref filter) = self.filter {
if !filter.test(&record) {
continue;
}
}
self.count += 1;
return Some(Ok(record));
}
Ok(None) => return None,
Err(e) => return Some(Err(e)),
}
}
}
}
pub fn execute(plan: LogicalPlan) -> Result<ExecutionResult> {
let optimized = plan.optimize();
let format = optimized
.format()
.ok_or_else(|| Error::invalid_input("Plan has no data source"))?;
let path = extract_scan_path(&optimized.root)?;
let filter_expr = extract_filter(&optimized.root);
let limit = extract_limit(&optimized.root);
match format {
FileFormat::Vcf => {
let reader = VcfReader::from_path(&path)?;
let filter = if let Some(expr) = filter_expr {
Some(expr.compile()?)
} else {
None
};
Ok(ExecutionResult::Vcf(VcfExecution {
reader,
filter,
limit,
count: 0,
}))
}
FileFormat::Bam => {
let mut reader = BamReader::from_path(&path)?;
reader.read_header()?; let filter = if let Some(expr) = filter_expr {
Some(expr.compile()?)
} else {
None
};
Ok(ExecutionResult::Bam(BamExecution {
reader,
filter,
limit,
count: 0,
skipped_errors: 0,
}))
}
FileFormat::Bed => {
let reader = BedReader::from_path(&path)?;
let filter = if let Some(expr) = filter_expr {
Some(expr.compile()?)
} else {
None
};
Ok(ExecutionResult::Bed(BedExecution {
reader,
filter,
limit,
count: 0,
}))
}
FileFormat::Fastq => {
let reader = FastqReader::from_path(&path)?;
let filter = if let Some(expr) = filter_expr {
Some(expr.compile()?)
} else {
None
};
Ok(ExecutionResult::Fastq(FastqExecution {
reader,
filter,
limit,
count: 0,
}))
}
_ => Err(Error::invalid_input(format!(
"Unsupported format: {:?}",
format
))),
}
}
fn extract_scan_path(node: &PlanNode) -> Result<PathBuf> {
match node {
PlanNode::Scan { path, .. } => Ok(path.clone()),
PlanNode::Filter { input, .. } => extract_scan_path(input),
PlanNode::Limit { input, .. } => extract_scan_path(input),
PlanNode::MaxScan { input, .. } => extract_scan_path(input),
_ => Err(Error::invalid_input("No Scan node found in plan")),
}
}
fn extract_filter(node: &PlanNode) -> Option<Expr> {
match node {
PlanNode::Filter { input, predicate } => {
if let Some(inner_filter) = extract_filter(input) {
Some(predicate.clone().and(inner_filter))
} else {
Some(predicate.clone())
}
}
PlanNode::Limit { input, .. } => extract_filter(input),
PlanNode::MaxScan { input, .. } => extract_filter(input),
PlanNode::Scan { .. } => None,
_ => None,
}
}
fn extract_limit(node: &PlanNode) -> Option<usize> {
match node {
PlanNode::Limit { count, .. } => Some(*count),
PlanNode::MaxScan { count, .. } => Some(*count),
PlanNode::Filter { input, .. } => extract_limit(input),
_ => None,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::expression::{col, lit};
#[test]
fn test_execute_vcf_no_filter() {
let plan = LogicalPlan::scan("examples/vcf/test_data.vcf", FileFormat::Vcf);
let result = execute(plan);
assert!(result.is_ok());
if let Ok(ExecutionResult::Vcf(mut exec)) = result {
let mut count = 0;
for record in exec.by_ref() {
assert!(record.is_ok());
count += 1;
}
assert!(count > 0);
} else {
panic!("Expected VCF execution result");
}
}
#[test]
fn test_execute_vcf_with_filter() {
let plan = LogicalPlan::scan("examples/vcf/test_data.vcf", FileFormat::Vcf)
.filter(col("qual").gt(lit(30.0)));
let result = execute(plan);
assert!(result.is_ok());
if let Ok(ExecutionResult::Vcf(mut exec)) = result {
let mut count = 0;
for record in exec.by_ref() {
if let Ok(rec) = record {
if let Some(qual) = rec.qual {
assert!(qual > 30.0);
}
count += 1;
}
}
assert!(count > 0);
}
}
#[test]
fn test_execute_vcf_with_limit() {
let plan = LogicalPlan::scan("examples/vcf/test_data.vcf", FileFormat::Vcf).limit(5);
let result = execute(plan);
assert!(result.is_ok());
if let Ok(ExecutionResult::Vcf(exec)) = result {
let count = exec.count();
assert_eq!(count, 5);
}
}
#[test]
fn test_execute_vcf_filter_and_limit() {
let plan = LogicalPlan::scan("examples/vcf/test_data.vcf", FileFormat::Vcf)
.filter(col("qual").gt(lit(20.0)))
.limit(10);
let result = execute(plan);
assert!(result.is_ok());
if let Ok(ExecutionResult::Vcf(exec)) = result {
let records: Vec<_> = exec.collect();
assert!(records.len() <= 10);
for record in records {
if let Ok(rec) = record {
if let Some(qual) = rec.qual {
assert!(qual > 20.0);
}
}
}
}
}
}