Skip to main content

genomicframe_core/
plan.rs

1//! Logical query plan representation for GenomicFrame
2//!
3//! This module provides the lazy query plan abstraction. Plans are built up
4//! through method chaining and can be optimized before execution.
5//!
6//! # Examples
7//!
8//! ```no_run
9//! use genomicframe_core::plan::{LogicalPlan, PlanNode};
10//! use genomicframe_core::expression::{col, lit};
11//! use genomicframe_core::schema::FileFormat;
12//!
13//! // Build a query plan
14//! let plan = LogicalPlan::scan("variants.vcf", FileFormat::Vcf)
15//!     .filter(col("qual").gt(lit(30.0)))
16//!     .select(&["chrom", "pos", "ref", "alt"]);
17//!
18//! // Optimize before execution
19//! let optimized = plan.optimize();
20//! ```
21
22use crate::expression::Expr;
23use crate::schema::{FileFormat, GenomicSchema};
24use std::path::PathBuf;
25
26// ============================================================================
27// Core Plan Types
28// ============================================================================
29
30/// A lazy query plan that can be optimized before execution
31///
32/// LogicalPlan represents a series of operations (scan, filter, select, etc.)
33/// without actually performing them. This enables optimization passes that can
34/// reorder operations, combine filters, and push predicates down to readers.
35#[derive(Debug, Clone)]
36pub struct LogicalPlan {
37    /// The root node of the plan tree
38    pub root: PlanNode,
39
40    /// Schema information for this plan
41    pub schema: GenomicSchema,
42}
43
44/// A node in the logical plan tree
45///
46/// Each node represents a single operation. Nodes form a tree where
47/// operations flow from leaves (scans) to the root (final output).
48#[derive(Debug, Clone)]
49pub enum PlanNode {
50    /// Scan a file from disk
51    ///
52    /// This is always a leaf node - the source of data
53    Scan {
54        /// Path to the file
55        path: PathBuf,
56
57        /// File format
58        format: FileFormat,
59
60        /// Optional projection (which columns to read)
61        projection: Option<Vec<String>>,
62    },
63
64    /// Filter records by predicate
65    ///
66    /// Only records where the predicate evaluates to true are kept
67    Filter {
68        /// Input plan to filter
69        input: Box<PlanNode>,
70
71        /// Predicate expression
72        predicate: Expr,
73    },
74
75    /// Select/project specific columns
76    ///
77    /// Reduces the output to only the specified columns
78    Select {
79        /// Input plan
80        input: Box<PlanNode>,
81
82        /// Column names to keep
83        columns: Vec<String>,
84    },
85
86    /// Add a computed column
87    WithColumn {
88        /// Input plan
89        input: Box<PlanNode>,
90
91        /// New column name
92        name: String,
93
94        /// Expression to compute column value
95        expr: Expr,
96    },
97
98    /// Limit number of records
99    Limit {
100        /// Input plan
101        input: Box<PlanNode>,
102
103        /// Maximum number of records
104        count: usize,
105    },
106
107    /// Maximum number of records to scan (before filtering)
108    ///
109    /// This limits how many records are read from the source,
110    /// regardless of how many pass filters. Useful for sampling
111    /// large files without processing the entire dataset.
112    MaxScan {
113        /// Input plan
114        input: Box<PlanNode>,
115
116        /// Maximum number of records to scan
117        count: usize,
118    },
119
120    /// Join two plans (future support)
121    Join {
122        /// Left input
123        left: Box<PlanNode>,
124
125        /// Right input
126        right: Box<PlanNode>,
127
128        /// Join type
129        join_type: JoinType,
130
131        /// Join keys
132        on: Vec<String>,
133    },
134}
135
136/// Type of join operation
137#[derive(Debug, Clone, Copy, PartialEq, Eq)]
138pub enum JoinType {
139    /// Inner join (only matching records)
140    Inner,
141
142    /// Left outer join (all left records, matching right)
143    Left,
144
145    /// Right outer join (all right records, matching left)
146    Right,
147
148    /// Full outer join (all records from both sides)
149    Full,
150
151    /// Overlap join (genomic interval overlap)
152    Overlap,
153}
154
155// ============================================================================
156// LogicalPlan Builder Methods
157// ============================================================================
158
159impl LogicalPlan {
160    /// Create a plan to scan a file
161    ///
162    /// # Examples
163    ///
164    /// ```no_run
165    /// use genomicframe_core::plan::LogicalPlan;
166    /// use genomicframe_core::schema::FileFormat;
167    ///
168    /// let plan = LogicalPlan::scan("data.vcf", FileFormat::Vcf);
169    /// ```
170    pub fn scan<P: Into<PathBuf>>(path: P, format: FileFormat) -> Self {
171        let schema = format.schema();
172        Self {
173            root: PlanNode::Scan {
174                path: path.into(),
175                format,
176                projection: None,
177            },
178            schema,
179        }
180    }
181
182    /// Add a filter to the plan
183    ///
184    /// # Examples
185    ///
186    /// ```no_run
187    /// use genomicframe_core::plan::LogicalPlan;
188    /// use genomicframe_core::expression::{col, lit};
189    /// use genomicframe_core::schema::FileFormat;
190    ///
191    /// let plan = LogicalPlan::scan("variants.vcf", FileFormat::Vcf)
192    ///     .filter(col("qual").gt(lit(30.0)));
193    /// ```
194    pub fn filter(self, predicate: Expr) -> Self {
195        Self {
196            root: PlanNode::Filter {
197                input: Box::new(self.root),
198                predicate,
199            },
200            schema: self.schema,
201        }
202    }
203
204    /// Select specific columns
205    ///
206    /// # Examples
207    ///
208    /// ```no_run
209    /// use genomicframe_core::plan::LogicalPlan;
210    /// use genomicframe_core::schema::FileFormat;
211    ///
212    /// let plan = LogicalPlan::scan("data.vcf", FileFormat::Vcf)
213    ///     .select(&["chrom", "pos", "ref", "alt"]);
214    /// ```
215    pub fn select(self, columns: &[&str]) -> Self {
216        Self {
217            root: PlanNode::Select {
218                input: Box::new(self.root),
219                columns: columns.iter().map(|s| s.to_string()).collect(),
220            },
221            schema: self.schema, // TODO: Update schema to reflect selected columns
222        }
223    }
224
225    /// Add a computed column
226    ///
227    /// # Examples
228    ///
229    /// ```no_run
230    /// use genomicframe_core::plan::LogicalPlan;
231    /// use genomicframe_core::expression::{col, Expr};
232    /// use genomicframe_core::schema::FileFormat;
233    ///
234    /// let plan = LogicalPlan::scan("data.vcf", FileFormat::Vcf)
235    ///     .with_column("is_high_qual", col("qual").gt(lit(30.0)));
236    /// # use genomicframe_core::expression::lit;
237    /// ```
238    pub fn with_column(self, name: &str, expr: Expr) -> Self {
239        Self {
240            root: PlanNode::WithColumn {
241                input: Box::new(self.root),
242                name: name.to_string(),
243                expr,
244            },
245            schema: self.schema, // TODO: Add new column to schema
246        }
247    }
248
249    /// Limit the number of records
250    ///
251    /// # Examples
252    ///
253    /// ```no_run
254    /// use genomicframe_core::plan::LogicalPlan;
255    /// use genomicframe_core::schema::FileFormat;
256    ///
257    /// let plan = LogicalPlan::scan("data.vcf", FileFormat::Vcf)
258    ///     .limit(1000);
259    /// ```
260    pub fn limit(self, count: usize) -> Self {
261        Self {
262            root: PlanNode::Limit {
263                input: Box::new(self.root),
264                count,
265            },
266            schema: self.schema,
267        }
268    }
269
270    /// Limit the maximum number of records to scan from source (before filtering)
271    ///
272    /// This is useful for sampling large files or preventing long-running queries
273    /// on huge datasets. The scan limit is applied before any filters, so you may
274    /// get fewer results than the scan limit if filters are selective.
275    ///
276    /// # Examples
277    ///
278    /// ```no_run
279    /// use genomicframe_core::plan::LogicalPlan;
280    /// use genomicframe_core::schema::FileFormat;
281    /// use genomicframe_core::expression::{col, lit};
282    ///
283    /// // Scan only first 10,000 records, then filter
284    /// let plan = LogicalPlan::scan("huge.vcf", FileFormat::Vcf)
285    ///     .max_scan(10_000)
286    ///     .filter(col("qual").gte(lit(30.0)));
287    /// ```
288    pub fn max_scan(self, count: usize) -> Self {
289        Self {
290            root: PlanNode::MaxScan {
291                input: Box::new(self.root),
292                count,
293            },
294            schema: self.schema,
295        }
296    }
297
298    /// Get the file format of this plan's source
299    pub fn format(&self) -> Option<FileFormat> {
300        self.root.format()
301    }
302}
303
304// ============================================================================
305// PlanNode Helper Methods
306// ============================================================================
307
308impl PlanNode {
309    /// Get the file format if this is or contains a Scan node
310    pub fn format(&self) -> Option<FileFormat> {
311        match self {
312            PlanNode::Scan { format, .. } => Some(*format),
313            PlanNode::Filter { input, .. } => input.format(),
314            PlanNode::Select { input, .. } => input.format(),
315            PlanNode::WithColumn { input, .. } => input.format(),
316            PlanNode::Limit { input, .. } => input.format(),
317            PlanNode::MaxScan { input, .. } => input.format(),
318            PlanNode::Join { left, .. } => left.format(), // Use left side format
319        }
320    }
321
322    /// Get all filter predicates in this plan
323    pub fn filters(&self) -> Vec<&Expr> {
324        let mut result = Vec::new();
325        self.collect_filters(&mut result);
326        result
327    }
328
329    fn collect_filters<'a>(&'a self, acc: &mut Vec<&'a Expr>) {
330        match self {
331            PlanNode::Filter { input, predicate } => {
332                acc.push(predicate);
333                input.collect_filters(acc);
334            }
335            PlanNode::Select { input, .. } => input.collect_filters(acc),
336            PlanNode::WithColumn { input, .. } => input.collect_filters(acc),
337            PlanNode::Limit { input, .. } => input.collect_filters(acc),
338            PlanNode::MaxScan { input, .. } => input.collect_filters(acc),
339            PlanNode::Join { left, right, .. } => {
340                left.collect_filters(acc);
341                right.collect_filters(acc);
342            }
343            PlanNode::Scan { .. } => {}
344        }
345    }
346
347    /// Check if this plan contains any filters
348    pub fn has_filters(&self) -> bool {
349        !self.filters().is_empty()
350    }
351}
352
353// ============================================================================
354// Plan Optimization
355// ============================================================================
356
357impl LogicalPlan {
358    /// Optimize this plan
359    ///
360    /// Applies multiple optimization passes:
361    /// 1. Combine adjacent filters into single AND filter
362    /// 2. Push filters down closer to scans
363    /// 3. Prune unnecessary columns early
364    ///
365    /// # Examples
366    ///
367    /// ```no_run
368    /// use genomicframe_core::plan::LogicalPlan;
369    /// use genomicframe_core::expression::{col, lit};
370    /// use genomicframe_core::schema::FileFormat;
371    ///
372    /// let plan = LogicalPlan::scan("data.vcf", FileFormat::Vcf)
373    ///     .filter(col("qual").gt(lit(30.0)))
374    ///     .filter(col("chrom").eq(lit("chr1")));
375    ///
376    /// // Before: Scan -> Filter(qual) -> Filter(chrom)
377    /// // After:  Scan -> Filter(qual AND chrom)
378    /// let optimized = plan.optimize();
379    /// ```
380    pub fn optimize(self) -> Self {
381        let mut plan = self;
382
383        // Pass 1: Combine adjacent filters
384        plan = plan.combine_filters();
385
386        // Pass 2: Push filters down (closer to scan)
387        plan = plan.push_down_filters();
388
389        // Pass 3: Prune columns (push select down)
390        plan = plan.prune_columns();
391
392        plan
393    }
394
395    /// Combine adjacent filter operations into single filter with AND
396    pub fn combine_filters(self) -> Self {
397        Self {
398            root: self.root.combine_filters_node(),
399            schema: self.schema,
400        }
401    }
402
403    /// Push filter operations down toward the scan
404    pub fn push_down_filters(self) -> Self {
405        Self {
406            root: self.root.push_down_filters_node(),
407            schema: self.schema,
408        }
409    }
410
411    /// Push column selection down (prune early)
412    pub fn prune_columns(self) -> Self {
413        Self {
414            root: self.root.prune_columns_node(),
415            schema: self.schema,
416        }
417    }
418
419    /// Get a human-readable representation of the plan
420    pub fn explain(&self) -> String {
421        self.root.explain(0)
422    }
423}
424
425impl PlanNode {
426    fn combine_filters_node(self) -> Self {
427        match self {
428            // Two adjacent filters -> combine with AND
429            PlanNode::Filter { input, predicate } => {
430                match *input {
431                    PlanNode::Filter {
432                        input: inner_input,
433                        predicate: inner_predicate,
434                    } => {
435                        // Combine predicates
436                        let combined = predicate.and(inner_predicate);
437                        PlanNode::Filter {
438                            input: Box::new(inner_input.combine_filters_node()),
439                            predicate: combined,
440                        }
441                    }
442                    other => PlanNode::Filter {
443                        input: Box::new(other.combine_filters_node()),
444                        predicate,
445                    },
446                }
447            }
448            PlanNode::Select { input, columns } => PlanNode::Select {
449                input: Box::new(input.combine_filters_node()),
450                columns,
451            },
452            PlanNode::WithColumn { input, name, expr } => PlanNode::WithColumn {
453                input: Box::new(input.combine_filters_node()),
454                name,
455                expr,
456            },
457            PlanNode::Limit { input, count } => PlanNode::Limit {
458                input: Box::new(input.combine_filters_node()),
459                count,
460            },
461            PlanNode::MaxScan { input, count } => PlanNode::MaxScan {
462                input: Box::new(input.combine_filters_node()),
463                count,
464            },
465            // Scan and Join don't need changes
466            other => other,
467        }
468    }
469
470    fn push_down_filters_node(self) -> Self {
471        match self {
472            // Filter before Select -> try to push filter down
473            PlanNode::Select { input, columns } => {
474                match *input {
475                    PlanNode::Filter {
476                        input: filter_input,
477                        predicate,
478                    } => {
479                        // Push select down, keep filter above
480                        let select_below = PlanNode::Select {
481                            input: filter_input,
482                            columns,
483                        };
484                        PlanNode::Filter {
485                            input: Box::new(select_below.push_down_filters_node()),
486                            predicate,
487                        }
488                    }
489                    other => PlanNode::Select {
490                        input: Box::new(other.push_down_filters_node()),
491                        columns,
492                    },
493                }
494            }
495            PlanNode::Filter { input, predicate } => PlanNode::Filter {
496                input: Box::new(input.push_down_filters_node()),
497                predicate,
498            },
499            PlanNode::WithColumn { input, name, expr } => PlanNode::WithColumn {
500                input: Box::new(input.push_down_filters_node()),
501                name,
502                expr,
503            },
504            PlanNode::Limit { input, count } => PlanNode::Limit {
505                input: Box::new(input.push_down_filters_node()),
506                count,
507            },
508            PlanNode::MaxScan { input, count } => PlanNode::MaxScan {
509                input: Box::new(input.push_down_filters_node()),
510                count,
511            },
512            other => other,
513        }
514    }
515
516    fn prune_columns_node(self) -> Self {
517        // TODO: Track which columns are actually used and push projection down
518        // For now, just recursively process children
519        match self {
520            PlanNode::Filter { input, predicate } => PlanNode::Filter {
521                input: Box::new(input.prune_columns_node()),
522                predicate,
523            },
524            PlanNode::Select { input, columns } => PlanNode::Select {
525                input: Box::new(input.prune_columns_node()),
526                columns,
527            },
528            PlanNode::WithColumn { input, name, expr } => PlanNode::WithColumn {
529                input: Box::new(input.prune_columns_node()),
530                name,
531                expr,
532            },
533            PlanNode::Limit { input, count } => PlanNode::Limit {
534                input: Box::new(input.prune_columns_node()),
535                count,
536            },
537            other => other,
538        }
539    }
540
541    fn explain(&self, indent: usize) -> String {
542        let prefix = "  ".repeat(indent);
543        match self {
544            PlanNode::Scan {
545                path,
546                format,
547                projection,
548            } => {
549                let proj = if let Some(cols) = projection {
550                    format!(" [{}]", cols.join(", "))
551                } else {
552                    String::new()
553                };
554                format!("{}Scan: {:?} ({:?}){}", prefix, path, format, proj)
555            }
556            PlanNode::Filter { input, predicate } => {
557                format!(
558                    "{}Filter: {}\n{}",
559                    prefix,
560                    predicate,
561                    input.explain(indent + 1)
562                )
563            }
564            PlanNode::Select { input, columns } => {
565                format!(
566                    "{}Select: [{}]\n{}",
567                    prefix,
568                    columns.join(", "),
569                    input.explain(indent + 1)
570                )
571            }
572            PlanNode::WithColumn { input, name, expr } => {
573                format!(
574                    "{}WithColumn: {} = {}\n{}",
575                    prefix,
576                    name,
577                    expr,
578                    input.explain(indent + 1)
579                )
580            }
581            PlanNode::Limit { input, count } => {
582                format!("{}Limit: {}\n{}", prefix, count, input.explain(indent + 1))
583            }
584            PlanNode::MaxScan { input, count } => {
585                format!(
586                    "{}MaxScan: {}\n{}",
587                    prefix,
588                    count,
589                    input.explain(indent + 1)
590                )
591            }
592            PlanNode::Join {
593                left,
594                right,
595                join_type,
596                on,
597            } => {
598                format!(
599                    "{}Join: {:?} ON [{}]\n{}{}",
600                    prefix,
601                    join_type,
602                    on.join(", "),
603                    left.explain(indent + 1),
604                    right.explain(indent + 1)
605                )
606            }
607        }
608    }
609}
610
611// ============================================================================
612// Tests
613// ============================================================================
614
615#[cfg(test)]
616mod tests {
617    use super::*;
618    use crate::expression::{col, lit};
619
620    #[test]
621    fn test_scan_plan() {
622        let plan = LogicalPlan::scan("test.vcf", FileFormat::Vcf);
623        assert_eq!(plan.format(), Some(FileFormat::Vcf));
624    }
625
626    #[test]
627    fn test_filter_plan() {
628        let plan = LogicalPlan::scan("test.vcf", FileFormat::Vcf).filter(col("qual").gt(lit(30.0)));
629
630        assert!(plan.root.has_filters());
631        assert_eq!(plan.root.filters().len(), 1);
632    }
633
634    #[test]
635    fn test_chained_filters() {
636        let plan = LogicalPlan::scan("test.vcf", FileFormat::Vcf)
637            .filter(col("qual").gt(lit(30.0)))
638            .filter(Expr::IsSnp);
639
640        assert_eq!(plan.root.filters().len(), 2);
641    }
642
643    #[test]
644    fn test_combine_filters() {
645        let plan = LogicalPlan::scan("test.vcf", FileFormat::Vcf)
646            .filter(col("qual").gt(lit(30.0)))
647            .filter(Expr::IsSnp);
648
649        let optimized = plan.combine_filters();
650
651        // Should combine into single filter
652        assert_eq!(optimized.root.filters().len(), 1);
653    }
654
655    #[test]
656    fn test_select() {
657        let plan = LogicalPlan::scan("test.vcf", FileFormat::Vcf).select(&["chrom", "pos"]);
658
659        match plan.root {
660            PlanNode::Select { columns, .. } => {
661                assert_eq!(columns, vec!["chrom", "pos"]);
662            }
663            _ => panic!("Expected Select node"),
664        }
665    }
666
667    #[test]
668    fn test_limit() {
669        let plan = LogicalPlan::scan("test.vcf", FileFormat::Vcf).limit(100);
670
671        match plan.root {
672            PlanNode::Limit { count, .. } => {
673                assert_eq!(count, 100);
674            }
675            _ => panic!("Expected Limit node"),
676        }
677    }
678
679    #[test]
680    fn test_complex_plan() {
681        let plan = LogicalPlan::scan("test.vcf", FileFormat::Vcf)
682            .filter(col("qual").gt(lit(30.0)))
683            .filter(Expr::IsSnp)
684            .select(&["chrom", "pos", "ref", "alt"])
685            .limit(1000);
686
687        let optimized = plan.optimize();
688
689        // Check that filters were combined
690        assert_eq!(optimized.root.filters().len(), 1);
691    }
692
693    #[test]
694    fn test_explain() {
695        let plan = LogicalPlan::scan("test.vcf", FileFormat::Vcf)
696            .filter(col("qual").gt(lit(30.0)))
697            .select(&["chrom", "pos"]);
698
699        let explanation = plan.explain();
700        assert!(explanation.contains("Scan"));
701        assert!(explanation.contains("Filter"));
702        assert!(explanation.contains("Select"));
703    }
704}