Skip to main content

genomicframe_core/
execution.rs

1//! Execution engine that converts logical plans into filtered readers
2//!
3//! This module executes optimized LogicalPlans by creating readers and applying
4//! compiled filters. It handles the translation from lazy plans to eager execution.
5
6use crate::core::GenomicRecordIterator;
7use crate::error::{Error, Result};
8use crate::expression::{Expr, ExprToFilter};
9use crate::filters::RecordFilter;
10use crate::plan::{LogicalPlan, PlanNode};
11use crate::schema::FileFormat;
12use std::path::PathBuf;
13
14// Import format-specific types
15use crate::formats::bam::{BamReader, BamRecord};
16use crate::formats::bed::{BedReader, BedRecord};
17use crate::formats::fastq::{FastqReader, FastqRecord};
18use crate::formats::vcf::{VcfReader, VcfRecord};
19
20// ============================================================================
21// Execution Result Types
22// ============================================================================
23
24/// Result of executing a plan - holds the reader and metadata
25pub enum ExecutionResult {
26    Vcf(VcfExecution),
27    Bam(BamExecution),
28    Bed(BedExecution),
29    Fastq(FastqExecution),
30}
31// TODO: Ergonomics! Only allow the user to call if the ExecutionResult is a BED result
32impl ExecutionResult {
33    /// Build an annotation index from BED execution results
34    ///
35    /// This provides an ergonomic way to go directly from a filtered query
36    /// to an annotation index without manual iteration.
37    ///
38    /// # Examples
39    ///
40    /// ```no_run
41    /// use genomicframe_core::plan::LogicalPlan;
42    /// use genomicframe_core::execution::execute;
43    /// use genomicframe_core::expression::col;
44    /// use genomicframe_core::schema::{lit, FileFormat};
45    ///
46    /// // Query: Large genes only
47    /// let plan = LogicalPlan::scan("genes.bed", FileFormat::Bed)
48    ///     .filter(col("length").gte(lit(1000)));
49    ///
50    /// // Execute and build annotation index in one go!
51    /// let genes = execute(plan)?
52    ///     .to_annotation_index(|record| {
53    ///         record.name.clone().unwrap_or_else(|| "unknown".to_string())
54    ///     })?;
55    /// # Ok::<(), genomicframe_core::error::Error>(())
56    /// ```
57    pub fn to_annotation_index<F>(
58        self,
59        extractor: F,
60    ) -> Result<crate::interval::annotation::AnnotationIndex>
61    where
62        F: Fn(&BedRecord) -> String,
63    {
64        use crate::interval::annotation::{Annotation, AnnotationIndex};
65        use crate::interval::GenomicInterval;
66        use std::collections::HashMap;
67
68        match self {
69            ExecutionResult::Bed(mut bed_exec) => {
70                // Collect all annotations grouped by chromosome
71                let mut annotations: HashMap<String, Vec<Annotation>> = HashMap::new();
72
73                while let Some(result) = bed_exec.next() {
74                    let record = result?;
75                    let interval: GenomicInterval = (&record).into();
76                    let data = extractor(&record);
77
78                    annotations
79                        .entry(interval.chrom.clone())
80                        .or_insert_with(Vec::new)
81                        .push(Annotation { interval, data });
82                }
83
84                // Build interval trees efficiently from collected annotations
85                let mut index = AnnotationIndex::new();
86                index.build_trees(annotations);
87
88                Ok(index)
89            }
90            _ => Err(Error::invalid_input(
91                "to_annotation_index() only works with BED execution results",
92            )),
93        }
94    }
95
96    /// Annotate VCF variants with gene/exon information
97    ///
98    /// This provides an ergonomic way to annotate variants from VCF files
99    /// with gene and exon annotations, returning structured statistics.
100    ///
101    /// # Examples
102    ///
103    /// ```no_run
104    /// use genomicframe_core::plan::LogicalPlan;
105    /// use genomicframe_core::execution::execute;
106    /// use genomicframe_core::expression::{col, lit};
107    /// use genomicframe_core::schema::FileFormat;
108    ///
109    /// // Load annotation indexes (genes and exons)
110    /// let genes = /* ... load genes ... */;
111    /// let exons = /* ... load exons ... */;
112    ///
113    /// // Query and annotate in one go!
114    /// let plan = LogicalPlan::scan("variants.vcf", FileFormat::Vcf)
115    ///     .filter(col("qual").gte(lit(30.0)));
116    ///
117    /// let result = execute(plan)?
118    ///     .annotate_with_genes(&genes, &exons)?;
119    ///
120    /// println!("Genic variants: {}", result.genic_variants);
121    /// println!("Exonic variants: {}", result.exonic_variants);
122    /// # use genomicframe_core::interval::annotation::AnnotationIndex;
123    /// # let genes = AnnotationIndex::new();
124    /// # let exons = AnnotationIndex::new();
125    /// # Ok::<(), genomicframe_core::error::Error>(())
126    /// ```
127    pub fn annotate_with_genes(
128        self,
129        genes: &crate::interval::annotation::AnnotationIndex,
130        exons: &crate::interval::annotation::AnnotationIndex,
131    ) -> Result<AnnotationResult> {
132        use crate::interval::GenomicInterval;
133
134        match self {
135            ExecutionResult::Vcf(mut vcf_exec) => {
136                let mut result = AnnotationResult::new();
137
138                while let Some(record_result) = vcf_exec.next() {
139                    let record = record_result?;
140                    result.total_variants += 1;
141
142                    // Classify variant type
143                    if record.is_snp() {
144                        result.snp_count += 1;
145                    } else if record.is_indel() {
146                        result.indel_count += 1;
147                    }
148
149                    // Convert to genomic interval
150                    let interval: GenomicInterval = (&record).into();
151
152                    // Query annotations using interval trees (O(log n + k))
153                    // AnnotationIndex.query() automatically handles chromosome normalization (chr22 ↔ 22)
154                    let overlapping_genes = genes.query(&interval);
155                    let overlapping_exons = exons.query(&interval);
156
157                    if !overlapping_genes.is_empty() {
158                        result.genic_variants += 1;
159
160                        if !overlapping_exons.is_empty() {
161                            result.exonic_variants += 1;
162
163                            // Collect examples (up to 5)
164                            if result.example_exonic.len() < 5 {
165                                result.example_exonic.push(AnnotatedVariant {
166                                    chrom: record.chrom.clone(),
167                                    pos: record.pos,
168                                    reference: record.reference.clone(),
169                                    alt: record.alt.clone(),
170                                    qual: record.qual,
171                                    genes: overlapping_genes
172                                        .iter()
173                                        .map(|s| s.to_string())
174                                        .collect(),
175                                    exons: overlapping_exons
176                                        .iter()
177                                        .map(|s| s.to_string())
178                                        .collect(),
179                                });
180                            }
181                        }
182                    } else {
183                        result.intergenic_variants += 1;
184                    }
185                }
186
187                Ok(result)
188            }
189            ExecutionResult::Bam(mut bam_exec) => {
190                let mut result = AnnotationResult::new();
191
192                while let Some(record_result) = bam_exec.next() {
193                    let record = record_result?;
194                    result.total_variants += 1; // For BAM, this counts total reads
195
196                    // Skip unmapped reads
197                    if record.is_unmapped() {
198                        continue;
199                    }
200
201                    // Convert to genomic interval
202                    let interval: GenomicInterval = (&record).into();
203
204                    // Query annotations using interval trees (O(log n + k))
205                    let overlapping_genes = genes.query(&interval);
206                    let overlapping_exons = exons.query(&interval);
207
208                    if !overlapping_genes.is_empty() {
209                        result.genic_variants += 1; // For BAM, this counts genic reads
210
211                        if !overlapping_exons.is_empty() {
212                            result.exonic_variants += 1; // For BAM, this counts exonic reads
213                        }
214                    } else {
215                        result.intergenic_variants += 1;
216                    }
217                }
218
219                Ok(result)
220            }
221            _ => Err(Error::invalid_input(
222                "annotate_with_genes() only works with VCF and BAM execution results",
223            )),
224        }
225    }
226
227    /// Count total records in the execution result
228    ///
229    /// This consumes the iterator and returns the total count.
230    ///
231    /// # Examples
232    ///
233    /// ```no_run
234    /// use genomicframe_core::plan::LogicalPlan;
235    /// use genomicframe_core::execution::execute;
236    /// use genomicframe_core::schema::FileFormat;
237    ///
238    /// let plan = LogicalPlan::scan("variants.vcf", FileFormat::Vcf);
239    /// let count = execute(plan)?.count()?;
240    /// println!("Total variants: {}", count);
241    /// # Ok::<(), genomicframe_core::error::Error>(())
242    /// ```
243    pub fn count(mut self) -> Result<usize> {
244        let mut total = 0;
245
246        match &mut self {
247            ExecutionResult::Vcf(vcf_exec) => {
248                while let Some(_) = vcf_exec.next() {
249                    total += 1;
250                }
251            }
252            ExecutionResult::Bam(bam_exec) => {
253                while let Some(_) = bam_exec.next() {
254                    total += 1;
255                }
256            }
257            ExecutionResult::Bed(bed_exec) => {
258                while let Some(_) = bed_exec.next() {
259                    total += 1;
260                }
261            }
262            ExecutionResult::Fastq(fastq_exec) => {
263                while let Some(_) = fastq_exec.next() {
264                    total += 1;
265                }
266            }
267        }
268
269        Ok(total)
270    }
271
272    /// Peek at the first N records
273    ///
274    /// Returns a string representation of the first N records for quick inspection.
275    /// This is useful for exploratory analysis and debugging.
276    ///
277    /// # Examples
278    ///
279    /// ```no_run
280    /// use genomicframe_core::plan::LogicalPlan;
281    /// use genomicframe_core::execution::execute;
282    /// use genomicframe_core::expression::col;
283    /// use genomicframe_core::schema::FileFormat;
284    ///
285    /// let plan = LogicalPlan::scan("variants.vcf", FileFormat::Vcf);
286    /// let preview = execute(plan)?.head(5)?;
287    /// println!("{}", preview);
288    /// # Ok::<(), genomicframe_core::error::Error>(())
289    /// ```
290    pub fn head(mut self, n: usize) -> Result<String> {
291        use std::fmt::Write;
292        let mut output = String::new();
293
294        match &mut self {
295            ExecutionResult::Vcf(vcf_exec) => {
296                writeln!(&mut output, "First {} VCF records:", n).unwrap();
297                writeln!(&mut output, "{:-<80}", "").unwrap();
298                for (i, record_result) in vcf_exec.take(n).enumerate() {
299                    let record = record_result?;
300                    writeln!(
301                        &mut output,
302                        "{}. {}:{} {}>{} QUAL={:.1}",
303                        i + 1,
304                        record.chrom,
305                        record.pos,
306                        record.reference,
307                        record.alt.join(","),
308                        record.qual.unwrap_or(0.0)
309                    )
310                    .unwrap();
311                }
312            }
313            ExecutionResult::Bam(bam_exec) => {
314                writeln!(&mut output, "First {} BAM records:", n).unwrap();
315                writeln!(&mut output, "{:-<120}", "").unwrap();
316                writeln!(
317                    &mut output,
318                    "{:<5} | {:<8} | {:<10} | {:<12} | {:<8} | {:<10} | {:<20}",
319                    "Index", "MAPQ", "Chrom/Rname", "Position", "Length", "Flags", "Read Name"
320                )
321                .unwrap();
322                writeln!(&mut output, "{:-<120}", "").unwrap();
323
324                let mut count = 0;
325                for (i, record_result) in bam_exec.take(n).enumerate() {
326                    let record = record_result?;
327                    let length = record.seq_string().len();
328                    let flags = format!("0x{:04x}", record.flag);
329                    writeln!(
330                        &mut output,
331                        "{:<5} | {:<8} | {:<10} | {:<12} | {:<8} | {:<10} | {:<20}",
332                        i + 1,
333                        record.mapq,
334                        record.rname,
335                        format!("{}-{}", record.pos, record.pos + length as i32),
336                        length,
337                        flags,
338                        &record.qname[..record.qname.len().min(20)]
339                    )
340                    .unwrap();
341                    count += 1;
342                }
343
344                if count == 0 {
345                    writeln!(&mut output, "No records passed filters").unwrap();
346                }
347
348                // Show scan statistics
349                let scanned = bam_exec.scanned_count();
350                let skipped = bam_exec.skipped_errors();
351                if scanned > 0 || skipped > 0 {
352                    writeln!(&mut output, "\nScan statistics:").unwrap();
353                    writeln!(&mut output, "  Records scanned: {}", scanned).unwrap();
354                    writeln!(&mut output, "  Records returned: {}", count).unwrap();
355                    if skipped > 0 {
356                        writeln!(&mut output, "  Parse errors skipped: {}", skipped).unwrap();
357                    }
358                }
359            }
360            ExecutionResult::Bed(bed_exec) => {
361                writeln!(&mut output, "First {} BED records:", n).unwrap();
362                writeln!(&mut output, "{:-<80}", "").unwrap();
363                for (i, record_result) in bed_exec.take(n).enumerate() {
364                    let record = record_result?;
365                    writeln!(
366                        &mut output,
367                        "{}. {}:{}-{} {}",
368                        i + 1,
369                        record.chrom,
370                        record.start,
371                        record.end,
372                        record.name.as_deref().unwrap_or("-")
373                    )
374                    .unwrap();
375                }
376            }
377            ExecutionResult::Fastq(fastq_exec) => {
378                writeln!(&mut output, "First {} FASTQ records:", n).unwrap();
379                writeln!(&mut output, "{:-<80}", "").unwrap();
380                for (i, record_result) in fastq_exec.take(n).enumerate() {
381                    let record = record_result?;
382                    writeln!(
383                        &mut output,
384                        "{}. {} length={} mean_qual={:.1}",
385                        i + 1,
386                        record.id,
387                        record.sequence.len(),
388                        record.mean_quality()
389                    )
390                    .unwrap();
391                }
392            }
393        }
394
395        Ok(output)
396    }
397}
398
399// ============================================================================
400// Annotation Result Types
401// ============================================================================
402
403/// Result of variant annotation with genes/exons
404#[derive(Debug, Clone)]
405pub struct AnnotationResult {
406    pub total_variants: usize,
407    pub snp_count: usize,
408    pub indel_count: usize,
409    pub genic_variants: usize,
410    pub exonic_variants: usize,
411    pub intergenic_variants: usize,
412    pub example_exonic: Vec<AnnotatedVariant>,
413}
414
415impl AnnotationResult {
416    fn new() -> Self {
417        Self {
418            total_variants: 0,
419            snp_count: 0,
420            indel_count: 0,
421            genic_variants: 0,
422            exonic_variants: 0,
423            intergenic_variants: 0,
424            example_exonic: Vec::new(),
425        }
426    }
427
428    /// Print a summary of the annotation results
429    pub fn print_summary(&self) {
430        // Generic title (works for both VCF variants and BAM reads)
431        let item_type = if self.snp_count > 0 || self.indel_count > 0 {
432            "VCF Annotation Results"
433        } else {
434            "BAM Annotation Results"
435        };
436
437        println!("  📈 {}:", item_type);
438
439        let count_label = if self.snp_count > 0 || self.indel_count > 0 {
440            "Total variants"
441        } else {
442            "Total reads"
443        };
444
445        println!("     {}:     {:>12}", count_label, self.total_variants);
446
447        // Only show SNP/Indel stats for VCF
448        if self.snp_count > 0 || self.indel_count > 0 {
449            println!(
450                "     SNPs:               {:>12} ({:>5.1}%)",
451                self.snp_count,
452                (self.snp_count as f64 / self.total_variants.max(1) as f64) * 100.0
453            );
454            println!(
455                "     Indels:             {:>12} ({:>5.1}%)",
456                self.indel_count,
457                (self.indel_count as f64 / self.total_variants.max(1) as f64) * 100.0
458            );
459        }
460
461        println!("\n  📊 Annotation Categories:");
462        println!(
463            "     Genic:              {:>12} ({:>5.1}%)",
464            self.genic_variants,
465            (self.genic_variants as f64 / self.total_variants.max(1) as f64) * 100.0
466        );
467        println!(
468            "     Exonic:             {:>12} ({:>5.1}%)",
469            self.exonic_variants,
470            (self.exonic_variants as f64 / self.total_variants.max(1) as f64) * 100.0
471        );
472        println!(
473            "     Intergenic:         {:>12} ({:>5.1}%)",
474            self.intergenic_variants,
475            (self.intergenic_variants as f64 / self.total_variants.max(1) as f64) * 100.0
476        );
477
478        // Show example exonic variants
479        if !self.example_exonic.is_empty() {
480            println!("\n  📋 Example High-Quality Exonic Variants:");
481            for (i, variant) in self.example_exonic.iter().enumerate() {
482                println!(
483                    "     {}. {}:{} {}>{} QUAL={:.1}",
484                    i + 1,
485                    variant.chrom,
486                    variant.pos,
487                    variant.reference,
488                    variant.alt.join(","),
489                    variant.qual.unwrap_or(0.0)
490                );
491                println!("        Genes: {}", variant.genes.join(", "));
492                println!("        Exons: {} exon(s)", variant.exons.len());
493            }
494        }
495
496        println!("\n  ✅ Functional Impact:");
497        let coding_percent =
498            (self.exonic_variants as f64 / self.total_variants.max(1) as f64) * 100.0;
499        if coding_percent > 10.0 {
500            println!("     HIGH coding variant rate ({:.1}%)", coding_percent);
501            println!("     Likely exome/targeted sequencing data");
502        } else if coding_percent > 1.0 {
503            println!("     TYPICAL coding variant rate ({:.1}%)", coding_percent);
504            println!("     Standard whole-genome sequencing");
505        } else {
506            println!("     LOW coding variant rate ({:.1}%)", coding_percent);
507            println!("     Most variants in non-coding regions");
508        }
509    }
510}
511
512/// An annotated variant with gene/exon information
513#[derive(Debug, Clone)]
514pub struct AnnotatedVariant {
515    pub chrom: String,
516    pub pos: u64,
517    pub reference: String,
518    pub alt: Vec<String>,
519    pub qual: Option<f64>,
520    pub genes: Vec<String>,
521    pub exons: Vec<String>,
522}
523
524/// VCF execution result
525pub struct VcfExecution {
526    reader: VcfReader,
527    filter: Option<Box<dyn RecordFilter<VcfRecord>>>,
528    limit: Option<usize>,
529    count: usize,
530}
531
532/// BAM execution result
533pub struct BamExecution {
534    reader: BamReader<std::fs::File>,
535    filter: Option<Box<dyn RecordFilter<BamRecord>>>,
536    limit: Option<usize>,
537    count: usize,
538    skipped_errors: usize,
539}
540
541impl BamExecution {
542    /// Get the number of records that failed to parse and were skipped
543    pub fn skipped_errors(&self) -> usize {
544        self.skipped_errors
545    }
546
547    /// Get the number of records scanned so far
548    pub fn scanned_count(&self) -> usize {
549        self.count
550    }
551}
552
553/// BED execution result
554pub struct BedExecution {
555    reader: BedReader,
556    filter: Option<Box<dyn RecordFilter<BedRecord>>>,
557    limit: Option<usize>,
558    count: usize,
559}
560
561/// FASTQ execution result
562pub struct FastqExecution {
563    reader: FastqReader,
564    filter: Option<Box<dyn RecordFilter<FastqRecord>>>,
565    limit: Option<usize>,
566    count: usize,
567}
568
569// ============================================================================
570// Iterator implementations for execution results
571// ============================================================================
572
573impl Iterator for VcfExecution {
574    type Item = Result<VcfRecord>;
575
576    fn next(&mut self) -> Option<Self::Item> {
577        loop {
578            // Check limit BEFORE reading next record (scan limit, not result limit)
579            if let Some(limit) = self.limit {
580                if self.count >= limit {
581                    return None;
582                }
583            }
584
585            // Get next record
586            match self.reader.next_record() {
587                Ok(Some(record)) => {
588                    self.count += 1; // Count ALL records scanned
589
590                    // Apply filter if present
591                    if let Some(ref filter) = self.filter {
592                        if !filter.test(&record) {
593                            continue; // Skip filtered records
594                        }
595                    }
596                    return Some(Ok(record));
597                }
598                Ok(None) => return None,
599                Err(e) => return Some(Err(e)),
600            }
601        }
602    }
603}
604
605impl Iterator for BamExecution {
606    type Item = Result<BamRecord>;
607
608    fn next(&mut self) -> Option<Self::Item> {
609        loop {
610            // Check limit BEFORE reading next record (scan limit, not result limit)
611            if let Some(limit) = self.limit {
612                if self.count >= limit {
613                    return None;
614                }
615            }
616
617            match self.reader.next_record() {
618                Ok(Some(record)) => {
619                    self.count += 1; // Count ALL records scanned
620
621                    if let Some(ref filter) = self.filter {
622                        if !filter.test(&record) {
623                            continue;
624                        }
625                    }
626                    return Some(Ok(record));
627                }
628                Ok(None) => return None,
629                Err(e) => {
630                    // Skip records that fail to parse (e.g., invalid CIGAR, corrupt data)
631                    // This matches the behavior of filtered readers that skip bad records
632                    self.skipped_errors += 1;
633                    self.count += 1; // Still count as scanned
634
635                    // Only print warning for first few errors to avoid spam
636                    if self.skipped_errors <= 3 {
637                        eprintln!("Warning: Skipping BAM record that failed to parse: {}", e);
638                    } else if self.skipped_errors == 4 {
639                        eprintln!("Warning: Additional parse errors will be counted silently...");
640                    }
641
642                    continue; // Try next record
643                }
644            }
645        }
646    }
647}
648
649impl Iterator for BedExecution {
650    type Item = Result<BedRecord>;
651
652    fn next(&mut self) -> Option<Self::Item> {
653        loop {
654            if let Some(limit) = self.limit {
655                if self.count >= limit {
656                    return None;
657                }
658            }
659
660            match self.reader.next_record() {
661                Ok(Some(record)) => {
662                    if let Some(ref filter) = self.filter {
663                        if !filter.test(&record) {
664                            continue;
665                        }
666                    }
667                    self.count += 1;
668                    return Some(Ok(record));
669                }
670                Ok(None) => return None,
671                Err(e) => return Some(Err(e)),
672            }
673        }
674    }
675}
676
677impl Iterator for FastqExecution {
678    type Item = Result<FastqRecord>;
679
680    fn next(&mut self) -> Option<Self::Item> {
681        loop {
682            if let Some(limit) = self.limit {
683                if self.count >= limit {
684                    return None;
685                }
686            }
687
688            match self.reader.next_record() {
689                Ok(Some(record)) => {
690                    if let Some(ref filter) = self.filter {
691                        if !filter.test(&record) {
692                            continue;
693                        }
694                    }
695                    self.count += 1;
696                    return Some(Ok(record));
697                }
698                Ok(None) => return None,
699                Err(e) => return Some(Err(e)),
700            }
701        }
702    }
703}
704
705// ============================================================================
706// Main Executor
707// ============================================================================
708
709/// Executes a logical plan and returns an iterator over results
710pub fn execute(plan: LogicalPlan) -> Result<ExecutionResult> {
711    // Optimize the plan first
712    let optimized = plan.optimize();
713
714    // Extract execution parameters
715    let format = optimized
716        .format()
717        .ok_or_else(|| Error::invalid_input("Plan has no data source"))?;
718    let path = extract_scan_path(&optimized.root)?;
719    let filter_expr = extract_filter(&optimized.root);
720    let limit = extract_limit(&optimized.root);
721
722    // Execute based on format
723    match format {
724        FileFormat::Vcf => {
725            let reader = VcfReader::from_path(&path)?;
726            let filter = if let Some(expr) = filter_expr {
727                Some(expr.compile()?)
728            } else {
729                None
730            };
731            Ok(ExecutionResult::Vcf(VcfExecution {
732                reader,
733                filter,
734                limit,
735                count: 0,
736            }))
737        }
738        FileFormat::Bam => {
739            let mut reader = BamReader::from_path(&path)?;
740            reader.read_header()?; // CRITICAL: Must read header before reading records
741            let filter = if let Some(expr) = filter_expr {
742                Some(expr.compile()?)
743            } else {
744                None
745            };
746            Ok(ExecutionResult::Bam(BamExecution {
747                reader,
748                filter,
749                limit,
750                count: 0,
751                skipped_errors: 0,
752            }))
753        }
754        FileFormat::Bed => {
755            let reader = BedReader::from_path(&path)?;
756            let filter = if let Some(expr) = filter_expr {
757                Some(expr.compile()?)
758            } else {
759                None
760            };
761            Ok(ExecutionResult::Bed(BedExecution {
762                reader,
763                filter,
764                limit,
765                count: 0,
766            }))
767        }
768        FileFormat::Fastq => {
769            let reader = FastqReader::from_path(&path)?;
770            let filter = if let Some(expr) = filter_expr {
771                Some(expr.compile()?)
772            } else {
773                None
774            };
775            Ok(ExecutionResult::Fastq(FastqExecution {
776                reader,
777                filter,
778                limit,
779                count: 0,
780            }))
781        }
782        _ => Err(Error::invalid_input(format!(
783            "Unsupported format: {:?}",
784            format
785        ))),
786    }
787}
788
789// ============================================================================
790// Helper Functions
791// ============================================================================
792
793fn extract_scan_path(node: &PlanNode) -> Result<PathBuf> {
794    match node {
795        PlanNode::Scan { path, .. } => Ok(path.clone()),
796        PlanNode::Filter { input, .. } => extract_scan_path(input),
797        PlanNode::Limit { input, .. } => extract_scan_path(input),
798        PlanNode::MaxScan { input, .. } => extract_scan_path(input),
799        _ => Err(Error::invalid_input("No Scan node found in plan")),
800    }
801}
802
803fn extract_filter(node: &PlanNode) -> Option<Expr> {
804    match node {
805        PlanNode::Filter { input, predicate } => {
806            // Combine with inner filters
807            if let Some(inner_filter) = extract_filter(input) {
808                Some(predicate.clone().and(inner_filter))
809            } else {
810                Some(predicate.clone())
811            }
812        }
813        PlanNode::Limit { input, .. } => extract_filter(input),
814        PlanNode::MaxScan { input, .. } => extract_filter(input),
815        PlanNode::Scan { .. } => None,
816        _ => None,
817    }
818}
819
820fn extract_limit(node: &PlanNode) -> Option<usize> {
821    match node {
822        PlanNode::Limit { count, .. } => Some(*count),
823        PlanNode::MaxScan { count, .. } => Some(*count),
824        PlanNode::Filter { input, .. } => extract_limit(input),
825        _ => None,
826    }
827}
828
829// ============================================================================
830// Tests
831// ============================================================================
832
833#[cfg(test)]
834mod tests {
835    use super::*;
836    use crate::expression::{col, lit};
837
838    #[test]
839    fn test_execute_vcf_no_filter() {
840        let plan = LogicalPlan::scan("examples/vcf/test_data.vcf", FileFormat::Vcf);
841
842        let result = execute(plan);
843        assert!(result.is_ok());
844
845        if let Ok(ExecutionResult::Vcf(mut exec)) = result {
846            let mut count = 0;
847            for record in exec.by_ref() {
848                assert!(record.is_ok());
849                count += 1;
850            }
851            assert!(count > 0);
852        } else {
853            panic!("Expected VCF execution result");
854        }
855    }
856
857    #[test]
858    fn test_execute_vcf_with_filter() {
859        let plan = LogicalPlan::scan("examples/vcf/test_data.vcf", FileFormat::Vcf)
860            .filter(col("qual").gt(lit(30.0)));
861
862        let result = execute(plan);
863        assert!(result.is_ok());
864
865        if let Ok(ExecutionResult::Vcf(mut exec)) = result {
866            let mut count = 0;
867            for record in exec.by_ref() {
868                if let Ok(rec) = record {
869                    if let Some(qual) = rec.qual {
870                        assert!(qual > 30.0);
871                    }
872                    count += 1;
873                }
874            }
875            assert!(count > 0);
876        }
877    }
878
879    #[test]
880    fn test_execute_vcf_with_limit() {
881        let plan = LogicalPlan::scan("examples/vcf/test_data.vcf", FileFormat::Vcf).limit(5);
882
883        let result = execute(plan);
884        assert!(result.is_ok());
885
886        if let Ok(ExecutionResult::Vcf(exec)) = result {
887            let count = exec.count();
888            assert_eq!(count, 5);
889        }
890    }
891
892    #[test]
893    fn test_execute_vcf_filter_and_limit() {
894        let plan = LogicalPlan::scan("examples/vcf/test_data.vcf", FileFormat::Vcf)
895            .filter(col("qual").gt(lit(20.0)))
896            .limit(10);
897
898        let result = execute(plan);
899        assert!(result.is_ok());
900
901        if let Ok(ExecutionResult::Vcf(exec)) = result {
902            let records: Vec<_> = exec.collect();
903            assert!(records.len() <= 10);
904            for record in records {
905                if let Ok(rec) = record {
906                    if let Some(qual) = rec.qual {
907                        assert!(qual > 20.0);
908                    }
909                }
910            }
911        }
912    }
913}