1use crate::core::GenomicRecordIterator;
7use crate::error::{Error, Result};
8use crate::expression::{Expr, ExprToFilter};
9use crate::filters::RecordFilter;
10use crate::plan::{LogicalPlan, PlanNode};
11use crate::schema::FileFormat;
12use std::path::PathBuf;
13
14use crate::formats::bam::{BamReader, BamRecord};
16use crate::formats::bed::{BedReader, BedRecord};
17use crate::formats::fastq::{FastqReader, FastqRecord};
18use crate::formats::vcf::{VcfReader, VcfRecord};
19
20pub enum ExecutionResult {
26 Vcf(VcfExecution),
27 Bam(BamExecution),
28 Bed(BedExecution),
29 Fastq(FastqExecution),
30}
31impl ExecutionResult {
33 pub fn to_annotation_index<F>(
58 self,
59 extractor: F,
60 ) -> Result<crate::interval::annotation::AnnotationIndex>
61 where
62 F: Fn(&BedRecord) -> String,
63 {
64 use crate::interval::annotation::{Annotation, AnnotationIndex};
65 use crate::interval::GenomicInterval;
66 use std::collections::HashMap;
67
68 match self {
69 ExecutionResult::Bed(mut bed_exec) => {
70 let mut annotations: HashMap<String, Vec<Annotation>> = HashMap::new();
72
73 while let Some(result) = bed_exec.next() {
74 let record = result?;
75 let interval: GenomicInterval = (&record).into();
76 let data = extractor(&record);
77
78 annotations
79 .entry(interval.chrom.clone())
80 .or_insert_with(Vec::new)
81 .push(Annotation { interval, data });
82 }
83
84 let mut index = AnnotationIndex::new();
86 index.build_trees(annotations);
87
88 Ok(index)
89 }
90 _ => Err(Error::invalid_input(
91 "to_annotation_index() only works with BED execution results",
92 )),
93 }
94 }
95
96 pub fn annotate_with_genes(
128 self,
129 genes: &crate::interval::annotation::AnnotationIndex,
130 exons: &crate::interval::annotation::AnnotationIndex,
131 ) -> Result<AnnotationResult> {
132 use crate::interval::GenomicInterval;
133
134 match self {
135 ExecutionResult::Vcf(mut vcf_exec) => {
136 let mut result = AnnotationResult::new();
137
138 while let Some(record_result) = vcf_exec.next() {
139 let record = record_result?;
140 result.total_variants += 1;
141
142 if record.is_snp() {
144 result.snp_count += 1;
145 } else if record.is_indel() {
146 result.indel_count += 1;
147 }
148
149 let interval: GenomicInterval = (&record).into();
151
152 let overlapping_genes = genes.query(&interval);
155 let overlapping_exons = exons.query(&interval);
156
157 if !overlapping_genes.is_empty() {
158 result.genic_variants += 1;
159
160 if !overlapping_exons.is_empty() {
161 result.exonic_variants += 1;
162
163 if result.example_exonic.len() < 5 {
165 result.example_exonic.push(AnnotatedVariant {
166 chrom: record.chrom.clone(),
167 pos: record.pos,
168 reference: record.reference.clone(),
169 alt: record.alt.clone(),
170 qual: record.qual,
171 genes: overlapping_genes
172 .iter()
173 .map(|s| s.to_string())
174 .collect(),
175 exons: overlapping_exons
176 .iter()
177 .map(|s| s.to_string())
178 .collect(),
179 });
180 }
181 }
182 } else {
183 result.intergenic_variants += 1;
184 }
185 }
186
187 Ok(result)
188 }
189 ExecutionResult::Bam(mut bam_exec) => {
190 let mut result = AnnotationResult::new();
191
192 while let Some(record_result) = bam_exec.next() {
193 let record = record_result?;
194 result.total_variants += 1; if record.is_unmapped() {
198 continue;
199 }
200
201 let interval: GenomicInterval = (&record).into();
203
204 let overlapping_genes = genes.query(&interval);
206 let overlapping_exons = exons.query(&interval);
207
208 if !overlapping_genes.is_empty() {
209 result.genic_variants += 1; if !overlapping_exons.is_empty() {
212 result.exonic_variants += 1; }
214 } else {
215 result.intergenic_variants += 1;
216 }
217 }
218
219 Ok(result)
220 }
221 _ => Err(Error::invalid_input(
222 "annotate_with_genes() only works with VCF and BAM execution results",
223 )),
224 }
225 }
226
227 pub fn count(mut self) -> Result<usize> {
244 let mut total = 0;
245
246 match &mut self {
247 ExecutionResult::Vcf(vcf_exec) => {
248 while let Some(_) = vcf_exec.next() {
249 total += 1;
250 }
251 }
252 ExecutionResult::Bam(bam_exec) => {
253 while let Some(_) = bam_exec.next() {
254 total += 1;
255 }
256 }
257 ExecutionResult::Bed(bed_exec) => {
258 while let Some(_) = bed_exec.next() {
259 total += 1;
260 }
261 }
262 ExecutionResult::Fastq(fastq_exec) => {
263 while let Some(_) = fastq_exec.next() {
264 total += 1;
265 }
266 }
267 }
268
269 Ok(total)
270 }
271
272 pub fn head(mut self, n: usize) -> Result<String> {
291 use std::fmt::Write;
292 let mut output = String::new();
293
294 match &mut self {
295 ExecutionResult::Vcf(vcf_exec) => {
296 writeln!(&mut output, "First {} VCF records:", n).unwrap();
297 writeln!(&mut output, "{:-<80}", "").unwrap();
298 for (i, record_result) in vcf_exec.take(n).enumerate() {
299 let record = record_result?;
300 writeln!(
301 &mut output,
302 "{}. {}:{} {}>{} QUAL={:.1}",
303 i + 1,
304 record.chrom,
305 record.pos,
306 record.reference,
307 record.alt.join(","),
308 record.qual.unwrap_or(0.0)
309 )
310 .unwrap();
311 }
312 }
313 ExecutionResult::Bam(bam_exec) => {
314 writeln!(&mut output, "First {} BAM records:", n).unwrap();
315 writeln!(&mut output, "{:-<120}", "").unwrap();
316 writeln!(
317 &mut output,
318 "{:<5} | {:<8} | {:<10} | {:<12} | {:<8} | {:<10} | {:<20}",
319 "Index", "MAPQ", "Chrom/Rname", "Position", "Length", "Flags", "Read Name"
320 )
321 .unwrap();
322 writeln!(&mut output, "{:-<120}", "").unwrap();
323
324 let mut count = 0;
325 for (i, record_result) in bam_exec.take(n).enumerate() {
326 let record = record_result?;
327 let length = record.seq_string().len();
328 let flags = format!("0x{:04x}", record.flag);
329 writeln!(
330 &mut output,
331 "{:<5} | {:<8} | {:<10} | {:<12} | {:<8} | {:<10} | {:<20}",
332 i + 1,
333 record.mapq,
334 record.rname,
335 format!("{}-{}", record.pos, record.pos + length as i32),
336 length,
337 flags,
338 &record.qname[..record.qname.len().min(20)]
339 )
340 .unwrap();
341 count += 1;
342 }
343
344 if count == 0 {
345 writeln!(&mut output, "No records passed filters").unwrap();
346 }
347
348 let scanned = bam_exec.scanned_count();
350 let skipped = bam_exec.skipped_errors();
351 if scanned > 0 || skipped > 0 {
352 writeln!(&mut output, "\nScan statistics:").unwrap();
353 writeln!(&mut output, " Records scanned: {}", scanned).unwrap();
354 writeln!(&mut output, " Records returned: {}", count).unwrap();
355 if skipped > 0 {
356 writeln!(&mut output, " Parse errors skipped: {}", skipped).unwrap();
357 }
358 }
359 }
360 ExecutionResult::Bed(bed_exec) => {
361 writeln!(&mut output, "First {} BED records:", n).unwrap();
362 writeln!(&mut output, "{:-<80}", "").unwrap();
363 for (i, record_result) in bed_exec.take(n).enumerate() {
364 let record = record_result?;
365 writeln!(
366 &mut output,
367 "{}. {}:{}-{} {}",
368 i + 1,
369 record.chrom,
370 record.start,
371 record.end,
372 record.name.as_deref().unwrap_or("-")
373 )
374 .unwrap();
375 }
376 }
377 ExecutionResult::Fastq(fastq_exec) => {
378 writeln!(&mut output, "First {} FASTQ records:", n).unwrap();
379 writeln!(&mut output, "{:-<80}", "").unwrap();
380 for (i, record_result) in fastq_exec.take(n).enumerate() {
381 let record = record_result?;
382 writeln!(
383 &mut output,
384 "{}. {} length={} mean_qual={:.1}",
385 i + 1,
386 record.id,
387 record.sequence.len(),
388 record.mean_quality()
389 )
390 .unwrap();
391 }
392 }
393 }
394
395 Ok(output)
396 }
397}
398
399#[derive(Debug, Clone)]
405pub struct AnnotationResult {
406 pub total_variants: usize,
407 pub snp_count: usize,
408 pub indel_count: usize,
409 pub genic_variants: usize,
410 pub exonic_variants: usize,
411 pub intergenic_variants: usize,
412 pub example_exonic: Vec<AnnotatedVariant>,
413}
414
415impl AnnotationResult {
416 fn new() -> Self {
417 Self {
418 total_variants: 0,
419 snp_count: 0,
420 indel_count: 0,
421 genic_variants: 0,
422 exonic_variants: 0,
423 intergenic_variants: 0,
424 example_exonic: Vec::new(),
425 }
426 }
427
428 pub fn print_summary(&self) {
430 let item_type = if self.snp_count > 0 || self.indel_count > 0 {
432 "VCF Annotation Results"
433 } else {
434 "BAM Annotation Results"
435 };
436
437 println!(" 📈 {}:", item_type);
438
439 let count_label = if self.snp_count > 0 || self.indel_count > 0 {
440 "Total variants"
441 } else {
442 "Total reads"
443 };
444
445 println!(" {}: {:>12}", count_label, self.total_variants);
446
447 if self.snp_count > 0 || self.indel_count > 0 {
449 println!(
450 " SNPs: {:>12} ({:>5.1}%)",
451 self.snp_count,
452 (self.snp_count as f64 / self.total_variants.max(1) as f64) * 100.0
453 );
454 println!(
455 " Indels: {:>12} ({:>5.1}%)",
456 self.indel_count,
457 (self.indel_count as f64 / self.total_variants.max(1) as f64) * 100.0
458 );
459 }
460
461 println!("\n 📊 Annotation Categories:");
462 println!(
463 " Genic: {:>12} ({:>5.1}%)",
464 self.genic_variants,
465 (self.genic_variants as f64 / self.total_variants.max(1) as f64) * 100.0
466 );
467 println!(
468 " Exonic: {:>12} ({:>5.1}%)",
469 self.exonic_variants,
470 (self.exonic_variants as f64 / self.total_variants.max(1) as f64) * 100.0
471 );
472 println!(
473 " Intergenic: {:>12} ({:>5.1}%)",
474 self.intergenic_variants,
475 (self.intergenic_variants as f64 / self.total_variants.max(1) as f64) * 100.0
476 );
477
478 if !self.example_exonic.is_empty() {
480 println!("\n 📋 Example High-Quality Exonic Variants:");
481 for (i, variant) in self.example_exonic.iter().enumerate() {
482 println!(
483 " {}. {}:{} {}>{} QUAL={:.1}",
484 i + 1,
485 variant.chrom,
486 variant.pos,
487 variant.reference,
488 variant.alt.join(","),
489 variant.qual.unwrap_or(0.0)
490 );
491 println!(" Genes: {}", variant.genes.join(", "));
492 println!(" Exons: {} exon(s)", variant.exons.len());
493 }
494 }
495
496 println!("\n ✅ Functional Impact:");
497 let coding_percent =
498 (self.exonic_variants as f64 / self.total_variants.max(1) as f64) * 100.0;
499 if coding_percent > 10.0 {
500 println!(" HIGH coding variant rate ({:.1}%)", coding_percent);
501 println!(" Likely exome/targeted sequencing data");
502 } else if coding_percent > 1.0 {
503 println!(" TYPICAL coding variant rate ({:.1}%)", coding_percent);
504 println!(" Standard whole-genome sequencing");
505 } else {
506 println!(" LOW coding variant rate ({:.1}%)", coding_percent);
507 println!(" Most variants in non-coding regions");
508 }
509 }
510}
511
512#[derive(Debug, Clone)]
514pub struct AnnotatedVariant {
515 pub chrom: String,
516 pub pos: u64,
517 pub reference: String,
518 pub alt: Vec<String>,
519 pub qual: Option<f64>,
520 pub genes: Vec<String>,
521 pub exons: Vec<String>,
522}
523
524pub struct VcfExecution {
526 reader: VcfReader,
527 filter: Option<Box<dyn RecordFilter<VcfRecord>>>,
528 limit: Option<usize>,
529 count: usize,
530}
531
532pub struct BamExecution {
534 reader: BamReader<std::fs::File>,
535 filter: Option<Box<dyn RecordFilter<BamRecord>>>,
536 limit: Option<usize>,
537 count: usize,
538 skipped_errors: usize,
539}
540
541impl BamExecution {
542 pub fn skipped_errors(&self) -> usize {
544 self.skipped_errors
545 }
546
547 pub fn scanned_count(&self) -> usize {
549 self.count
550 }
551}
552
553pub struct BedExecution {
555 reader: BedReader,
556 filter: Option<Box<dyn RecordFilter<BedRecord>>>,
557 limit: Option<usize>,
558 count: usize,
559}
560
561pub struct FastqExecution {
563 reader: FastqReader,
564 filter: Option<Box<dyn RecordFilter<FastqRecord>>>,
565 limit: Option<usize>,
566 count: usize,
567}
568
569impl Iterator for VcfExecution {
574 type Item = Result<VcfRecord>;
575
576 fn next(&mut self) -> Option<Self::Item> {
577 loop {
578 if let Some(limit) = self.limit {
580 if self.count >= limit {
581 return None;
582 }
583 }
584
585 match self.reader.next_record() {
587 Ok(Some(record)) => {
588 self.count += 1; if let Some(ref filter) = self.filter {
592 if !filter.test(&record) {
593 continue; }
595 }
596 return Some(Ok(record));
597 }
598 Ok(None) => return None,
599 Err(e) => return Some(Err(e)),
600 }
601 }
602 }
603}
604
605impl Iterator for BamExecution {
606 type Item = Result<BamRecord>;
607
608 fn next(&mut self) -> Option<Self::Item> {
609 loop {
610 if let Some(limit) = self.limit {
612 if self.count >= limit {
613 return None;
614 }
615 }
616
617 match self.reader.next_record() {
618 Ok(Some(record)) => {
619 self.count += 1; if let Some(ref filter) = self.filter {
622 if !filter.test(&record) {
623 continue;
624 }
625 }
626 return Some(Ok(record));
627 }
628 Ok(None) => return None,
629 Err(e) => {
630 self.skipped_errors += 1;
633 self.count += 1; if self.skipped_errors <= 3 {
637 eprintln!("Warning: Skipping BAM record that failed to parse: {}", e);
638 } else if self.skipped_errors == 4 {
639 eprintln!("Warning: Additional parse errors will be counted silently...");
640 }
641
642 continue; }
644 }
645 }
646 }
647}
648
649impl Iterator for BedExecution {
650 type Item = Result<BedRecord>;
651
652 fn next(&mut self) -> Option<Self::Item> {
653 loop {
654 if let Some(limit) = self.limit {
655 if self.count >= limit {
656 return None;
657 }
658 }
659
660 match self.reader.next_record() {
661 Ok(Some(record)) => {
662 if let Some(ref filter) = self.filter {
663 if !filter.test(&record) {
664 continue;
665 }
666 }
667 self.count += 1;
668 return Some(Ok(record));
669 }
670 Ok(None) => return None,
671 Err(e) => return Some(Err(e)),
672 }
673 }
674 }
675}
676
677impl Iterator for FastqExecution {
678 type Item = Result<FastqRecord>;
679
680 fn next(&mut self) -> Option<Self::Item> {
681 loop {
682 if let Some(limit) = self.limit {
683 if self.count >= limit {
684 return None;
685 }
686 }
687
688 match self.reader.next_record() {
689 Ok(Some(record)) => {
690 if let Some(ref filter) = self.filter {
691 if !filter.test(&record) {
692 continue;
693 }
694 }
695 self.count += 1;
696 return Some(Ok(record));
697 }
698 Ok(None) => return None,
699 Err(e) => return Some(Err(e)),
700 }
701 }
702 }
703}
704
705pub fn execute(plan: LogicalPlan) -> Result<ExecutionResult> {
711 let optimized = plan.optimize();
713
714 let format = optimized
716 .format()
717 .ok_or_else(|| Error::invalid_input("Plan has no data source"))?;
718 let path = extract_scan_path(&optimized.root)?;
719 let filter_expr = extract_filter(&optimized.root);
720 let limit = extract_limit(&optimized.root);
721
722 match format {
724 FileFormat::Vcf => {
725 let reader = VcfReader::from_path(&path)?;
726 let filter = if let Some(expr) = filter_expr {
727 Some(expr.compile()?)
728 } else {
729 None
730 };
731 Ok(ExecutionResult::Vcf(VcfExecution {
732 reader,
733 filter,
734 limit,
735 count: 0,
736 }))
737 }
738 FileFormat::Bam => {
739 let mut reader = BamReader::from_path(&path)?;
740 reader.read_header()?; let filter = if let Some(expr) = filter_expr {
742 Some(expr.compile()?)
743 } else {
744 None
745 };
746 Ok(ExecutionResult::Bam(BamExecution {
747 reader,
748 filter,
749 limit,
750 count: 0,
751 skipped_errors: 0,
752 }))
753 }
754 FileFormat::Bed => {
755 let reader = BedReader::from_path(&path)?;
756 let filter = if let Some(expr) = filter_expr {
757 Some(expr.compile()?)
758 } else {
759 None
760 };
761 Ok(ExecutionResult::Bed(BedExecution {
762 reader,
763 filter,
764 limit,
765 count: 0,
766 }))
767 }
768 FileFormat::Fastq => {
769 let reader = FastqReader::from_path(&path)?;
770 let filter = if let Some(expr) = filter_expr {
771 Some(expr.compile()?)
772 } else {
773 None
774 };
775 Ok(ExecutionResult::Fastq(FastqExecution {
776 reader,
777 filter,
778 limit,
779 count: 0,
780 }))
781 }
782 _ => Err(Error::invalid_input(format!(
783 "Unsupported format: {:?}",
784 format
785 ))),
786 }
787}
788
789fn extract_scan_path(node: &PlanNode) -> Result<PathBuf> {
794 match node {
795 PlanNode::Scan { path, .. } => Ok(path.clone()),
796 PlanNode::Filter { input, .. } => extract_scan_path(input),
797 PlanNode::Limit { input, .. } => extract_scan_path(input),
798 PlanNode::MaxScan { input, .. } => extract_scan_path(input),
799 _ => Err(Error::invalid_input("No Scan node found in plan")),
800 }
801}
802
803fn extract_filter(node: &PlanNode) -> Option<Expr> {
804 match node {
805 PlanNode::Filter { input, predicate } => {
806 if let Some(inner_filter) = extract_filter(input) {
808 Some(predicate.clone().and(inner_filter))
809 } else {
810 Some(predicate.clone())
811 }
812 }
813 PlanNode::Limit { input, .. } => extract_filter(input),
814 PlanNode::MaxScan { input, .. } => extract_filter(input),
815 PlanNode::Scan { .. } => None,
816 _ => None,
817 }
818}
819
820fn extract_limit(node: &PlanNode) -> Option<usize> {
821 match node {
822 PlanNode::Limit { count, .. } => Some(*count),
823 PlanNode::MaxScan { count, .. } => Some(*count),
824 PlanNode::Filter { input, .. } => extract_limit(input),
825 _ => None,
826 }
827}
828
829#[cfg(test)]
834mod tests {
835 use super::*;
836 use crate::expression::{col, lit};
837
838 #[test]
839 fn test_execute_vcf_no_filter() {
840 let plan = LogicalPlan::scan("examples/vcf/test_data.vcf", FileFormat::Vcf);
841
842 let result = execute(plan);
843 assert!(result.is_ok());
844
845 if let Ok(ExecutionResult::Vcf(mut exec)) = result {
846 let mut count = 0;
847 for record in exec.by_ref() {
848 assert!(record.is_ok());
849 count += 1;
850 }
851 assert!(count > 0);
852 } else {
853 panic!("Expected VCF execution result");
854 }
855 }
856
857 #[test]
858 fn test_execute_vcf_with_filter() {
859 let plan = LogicalPlan::scan("examples/vcf/test_data.vcf", FileFormat::Vcf)
860 .filter(col("qual").gt(lit(30.0)));
861
862 let result = execute(plan);
863 assert!(result.is_ok());
864
865 if let Ok(ExecutionResult::Vcf(mut exec)) = result {
866 let mut count = 0;
867 for record in exec.by_ref() {
868 if let Ok(rec) = record {
869 if let Some(qual) = rec.qual {
870 assert!(qual > 30.0);
871 }
872 count += 1;
873 }
874 }
875 assert!(count > 0);
876 }
877 }
878
879 #[test]
880 fn test_execute_vcf_with_limit() {
881 let plan = LogicalPlan::scan("examples/vcf/test_data.vcf", FileFormat::Vcf).limit(5);
882
883 let result = execute(plan);
884 assert!(result.is_ok());
885
886 if let Ok(ExecutionResult::Vcf(exec)) = result {
887 let count = exec.count();
888 assert_eq!(count, 5);
889 }
890 }
891
892 #[test]
893 fn test_execute_vcf_filter_and_limit() {
894 let plan = LogicalPlan::scan("examples/vcf/test_data.vcf", FileFormat::Vcf)
895 .filter(col("qual").gt(lit(20.0)))
896 .limit(10);
897
898 let result = execute(plan);
899 assert!(result.is_ok());
900
901 if let Ok(ExecutionResult::Vcf(exec)) = result {
902 let records: Vec<_> = exec.collect();
903 assert!(records.len() <= 10);
904 for record in records {
905 if let Ok(rec) = record {
906 if let Some(qual) = rec.qual {
907 assert!(qual > 20.0);
908 }
909 }
910 }
911 }
912 }
913}