genomicframe_core/plan.rs
1//! Logical query plan representation for GenomicFrame
2//!
3//! This module provides the lazy query plan abstraction. Plans are built up
4//! through method chaining and can be optimized before execution.
5//!
6//! # Examples
7//!
8//! ```no_run
9//! use genomicframe_core::plan::{LogicalPlan, PlanNode};
10//! use genomicframe_core::expression::{col, lit};
11//! use genomicframe_core::schema::FileFormat;
12//!
13//! // Build a query plan
14//! let plan = LogicalPlan::scan("variants.vcf", FileFormat::Vcf)
15//! .filter(col("qual").gt(lit(30.0)))
16//! .select(&["chrom", "pos", "ref", "alt"]);
17//!
18//! // Optimize before execution
19//! let optimized = plan.optimize();
20//! ```
21
22use crate::expression::Expr;
23use crate::schema::{FileFormat, GenomicSchema};
24use std::path::PathBuf;
25
26// ============================================================================
27// Core Plan Types
28// ============================================================================
29
30/// A lazy query plan that can be optimized before execution
31///
32/// LogicalPlan represents a series of operations (scan, filter, select, etc.)
33/// without actually performing them. This enables optimization passes that can
34/// reorder operations, combine filters, and push predicates down to readers.
35#[derive(Debug, Clone)]
36pub struct LogicalPlan {
37 /// The root node of the plan tree
38 pub root: PlanNode,
39
40 /// Schema information for this plan
41 pub schema: GenomicSchema,
42}
43
44/// A node in the logical plan tree
45///
46/// Each node represents a single operation. Nodes form a tree where
47/// operations flow from leaves (scans) to the root (final output).
48#[derive(Debug, Clone)]
49pub enum PlanNode {
50 /// Scan a file from disk
51 ///
52 /// This is always a leaf node - the source of data
53 Scan {
54 /// Path to the file
55 path: PathBuf,
56
57 /// File format
58 format: FileFormat,
59
60 /// Optional projection (which columns to read)
61 projection: Option<Vec<String>>,
62 },
63
64 /// Filter records by predicate
65 ///
66 /// Only records where the predicate evaluates to true are kept
67 Filter {
68 /// Input plan to filter
69 input: Box<PlanNode>,
70
71 /// Predicate expression
72 predicate: Expr,
73 },
74
75 /// Select/project specific columns
76 ///
77 /// Reduces the output to only the specified columns
78 Select {
79 /// Input plan
80 input: Box<PlanNode>,
81
82 /// Column names to keep
83 columns: Vec<String>,
84 },
85
86 /// Add a computed column
87 WithColumn {
88 /// Input plan
89 input: Box<PlanNode>,
90
91 /// New column name
92 name: String,
93
94 /// Expression to compute column value
95 expr: Expr,
96 },
97
98 /// Limit number of records
99 Limit {
100 /// Input plan
101 input: Box<PlanNode>,
102
103 /// Maximum number of records
104 count: usize,
105 },
106
107 /// Maximum number of records to scan (before filtering)
108 ///
109 /// This limits how many records are read from the source,
110 /// regardless of how many pass filters. Useful for sampling
111 /// large files without processing the entire dataset.
112 MaxScan {
113 /// Input plan
114 input: Box<PlanNode>,
115
116 /// Maximum number of records to scan
117 count: usize,
118 },
119
120 /// Join two plans (future support)
121 Join {
122 /// Left input
123 left: Box<PlanNode>,
124
125 /// Right input
126 right: Box<PlanNode>,
127
128 /// Join type
129 join_type: JoinType,
130
131 /// Join keys
132 on: Vec<String>,
133 },
134}
135
136/// Type of join operation
137#[derive(Debug, Clone, Copy, PartialEq, Eq)]
138pub enum JoinType {
139 /// Inner join (only matching records)
140 Inner,
141
142 /// Left outer join (all left records, matching right)
143 Left,
144
145 /// Right outer join (all right records, matching left)
146 Right,
147
148 /// Full outer join (all records from both sides)
149 Full,
150
151 /// Overlap join (genomic interval overlap)
152 Overlap,
153}
154
155// ============================================================================
156// LogicalPlan Builder Methods
157// ============================================================================
158
159impl LogicalPlan {
160 /// Create a plan to scan a file
161 ///
162 /// # Examples
163 ///
164 /// ```no_run
165 /// use genomicframe_core::plan::LogicalPlan;
166 /// use genomicframe_core::schema::FileFormat;
167 ///
168 /// let plan = LogicalPlan::scan("data.vcf", FileFormat::Vcf);
169 /// ```
170 pub fn scan<P: Into<PathBuf>>(path: P, format: FileFormat) -> Self {
171 let schema = format.schema();
172 Self {
173 root: PlanNode::Scan {
174 path: path.into(),
175 format,
176 projection: None,
177 },
178 schema,
179 }
180 }
181
182 /// Add a filter to the plan
183 ///
184 /// # Examples
185 ///
186 /// ```no_run
187 /// use genomicframe_core::plan::LogicalPlan;
188 /// use genomicframe_core::expression::{col, lit};
189 /// use genomicframe_core::schema::FileFormat;
190 ///
191 /// let plan = LogicalPlan::scan("variants.vcf", FileFormat::Vcf)
192 /// .filter(col("qual").gt(lit(30.0)));
193 /// ```
194 pub fn filter(self, predicate: Expr) -> Self {
195 Self {
196 root: PlanNode::Filter {
197 input: Box::new(self.root),
198 predicate,
199 },
200 schema: self.schema,
201 }
202 }
203
204 /// Select specific columns
205 ///
206 /// # Examples
207 ///
208 /// ```no_run
209 /// use genomicframe_core::plan::LogicalPlan;
210 /// use genomicframe_core::schema::FileFormat;
211 ///
212 /// let plan = LogicalPlan::scan("data.vcf", FileFormat::Vcf)
213 /// .select(&["chrom", "pos", "ref", "alt"]);
214 /// ```
215 pub fn select(self, columns: &[&str]) -> Self {
216 Self {
217 root: PlanNode::Select {
218 input: Box::new(self.root),
219 columns: columns.iter().map(|s| s.to_string()).collect(),
220 },
221 schema: self.schema, // TODO: Update schema to reflect selected columns
222 }
223 }
224
225 /// Add a computed column
226 ///
227 /// # Examples
228 ///
229 /// ```no_run
230 /// use genomicframe_core::plan::LogicalPlan;
231 /// use genomicframe_core::expression::{col, Expr};
232 /// use genomicframe_core::schema::FileFormat;
233 ///
234 /// let plan = LogicalPlan::scan("data.vcf", FileFormat::Vcf)
235 /// .with_column("is_high_qual", col("qual").gt(lit(30.0)));
236 /// # use genomicframe_core::expression::lit;
237 /// ```
238 pub fn with_column(self, name: &str, expr: Expr) -> Self {
239 Self {
240 root: PlanNode::WithColumn {
241 input: Box::new(self.root),
242 name: name.to_string(),
243 expr,
244 },
245 schema: self.schema, // TODO: Add new column to schema
246 }
247 }
248
249 /// Limit the number of records
250 ///
251 /// # Examples
252 ///
253 /// ```no_run
254 /// use genomicframe_core::plan::LogicalPlan;
255 /// use genomicframe_core::schema::FileFormat;
256 ///
257 /// let plan = LogicalPlan::scan("data.vcf", FileFormat::Vcf)
258 /// .limit(1000);
259 /// ```
260 pub fn limit(self, count: usize) -> Self {
261 Self {
262 root: PlanNode::Limit {
263 input: Box::new(self.root),
264 count,
265 },
266 schema: self.schema,
267 }
268 }
269
270 /// Limit the maximum number of records to scan from source (before filtering)
271 ///
272 /// This is useful for sampling large files or preventing long-running queries
273 /// on huge datasets. The scan limit is applied before any filters, so you may
274 /// get fewer results than the scan limit if filters are selective.
275 ///
276 /// # Examples
277 ///
278 /// ```no_run
279 /// use genomicframe_core::plan::LogicalPlan;
280 /// use genomicframe_core::schema::FileFormat;
281 /// use genomicframe_core::expression::{col, lit};
282 ///
283 /// // Scan only first 10,000 records, then filter
284 /// let plan = LogicalPlan::scan("huge.vcf", FileFormat::Vcf)
285 /// .max_scan(10_000)
286 /// .filter(col("qual").gte(lit(30.0)));
287 /// ```
288 pub fn max_scan(self, count: usize) -> Self {
289 Self {
290 root: PlanNode::MaxScan {
291 input: Box::new(self.root),
292 count,
293 },
294 schema: self.schema,
295 }
296 }
297
298 /// Get the file format of this plan's source
299 pub fn format(&self) -> Option<FileFormat> {
300 self.root.format()
301 }
302}
303
304// ============================================================================
305// PlanNode Helper Methods
306// ============================================================================
307
308impl PlanNode {
309 /// Get the file format if this is or contains a Scan node
310 pub fn format(&self) -> Option<FileFormat> {
311 match self {
312 PlanNode::Scan { format, .. } => Some(*format),
313 PlanNode::Filter { input, .. } => input.format(),
314 PlanNode::Select { input, .. } => input.format(),
315 PlanNode::WithColumn { input, .. } => input.format(),
316 PlanNode::Limit { input, .. } => input.format(),
317 PlanNode::MaxScan { input, .. } => input.format(),
318 PlanNode::Join { left, .. } => left.format(), // Use left side format
319 }
320 }
321
322 /// Get all filter predicates in this plan
323 pub fn filters(&self) -> Vec<&Expr> {
324 let mut result = Vec::new();
325 self.collect_filters(&mut result);
326 result
327 }
328
329 fn collect_filters<'a>(&'a self, acc: &mut Vec<&'a Expr>) {
330 match self {
331 PlanNode::Filter { input, predicate } => {
332 acc.push(predicate);
333 input.collect_filters(acc);
334 }
335 PlanNode::Select { input, .. } => input.collect_filters(acc),
336 PlanNode::WithColumn { input, .. } => input.collect_filters(acc),
337 PlanNode::Limit { input, .. } => input.collect_filters(acc),
338 PlanNode::MaxScan { input, .. } => input.collect_filters(acc),
339 PlanNode::Join { left, right, .. } => {
340 left.collect_filters(acc);
341 right.collect_filters(acc);
342 }
343 PlanNode::Scan { .. } => {}
344 }
345 }
346
347 /// Check if this plan contains any filters
348 pub fn has_filters(&self) -> bool {
349 !self.filters().is_empty()
350 }
351}
352
353// ============================================================================
354// Plan Optimization
355// ============================================================================
356
357impl LogicalPlan {
358 /// Optimize this plan
359 ///
360 /// Applies multiple optimization passes:
361 /// 1. Combine adjacent filters into single AND filter
362 /// 2. Push filters down closer to scans
363 /// 3. Prune unnecessary columns early
364 ///
365 /// # Examples
366 ///
367 /// ```no_run
368 /// use genomicframe_core::plan::LogicalPlan;
369 /// use genomicframe_core::expression::{col, lit};
370 /// use genomicframe_core::schema::FileFormat;
371 ///
372 /// let plan = LogicalPlan::scan("data.vcf", FileFormat::Vcf)
373 /// .filter(col("qual").gt(lit(30.0)))
374 /// .filter(col("chrom").eq(lit("chr1")));
375 ///
376 /// // Before: Scan -> Filter(qual) -> Filter(chrom)
377 /// // After: Scan -> Filter(qual AND chrom)
378 /// let optimized = plan.optimize();
379 /// ```
380 pub fn optimize(self) -> Self {
381 let mut plan = self;
382
383 // Pass 1: Combine adjacent filters
384 plan = plan.combine_filters();
385
386 // Pass 2: Push filters down (closer to scan)
387 plan = plan.push_down_filters();
388
389 // Pass 3: Prune columns (push select down)
390 plan = plan.prune_columns();
391
392 plan
393 }
394
395 /// Combine adjacent filter operations into single filter with AND
396 pub fn combine_filters(self) -> Self {
397 Self {
398 root: self.root.combine_filters_node(),
399 schema: self.schema,
400 }
401 }
402
403 /// Push filter operations down toward the scan
404 pub fn push_down_filters(self) -> Self {
405 Self {
406 root: self.root.push_down_filters_node(),
407 schema: self.schema,
408 }
409 }
410
411 /// Push column selection down (prune early)
412 pub fn prune_columns(self) -> Self {
413 Self {
414 root: self.root.prune_columns_node(),
415 schema: self.schema,
416 }
417 }
418
419 /// Get a human-readable representation of the plan
420 pub fn explain(&self) -> String {
421 self.root.explain(0)
422 }
423}
424
425impl PlanNode {
426 fn combine_filters_node(self) -> Self {
427 match self {
428 // Two adjacent filters -> combine with AND
429 PlanNode::Filter { input, predicate } => {
430 match *input {
431 PlanNode::Filter {
432 input: inner_input,
433 predicate: inner_predicate,
434 } => {
435 // Combine predicates
436 let combined = predicate.and(inner_predicate);
437 PlanNode::Filter {
438 input: Box::new(inner_input.combine_filters_node()),
439 predicate: combined,
440 }
441 }
442 other => PlanNode::Filter {
443 input: Box::new(other.combine_filters_node()),
444 predicate,
445 },
446 }
447 }
448 PlanNode::Select { input, columns } => PlanNode::Select {
449 input: Box::new(input.combine_filters_node()),
450 columns,
451 },
452 PlanNode::WithColumn { input, name, expr } => PlanNode::WithColumn {
453 input: Box::new(input.combine_filters_node()),
454 name,
455 expr,
456 },
457 PlanNode::Limit { input, count } => PlanNode::Limit {
458 input: Box::new(input.combine_filters_node()),
459 count,
460 },
461 PlanNode::MaxScan { input, count } => PlanNode::MaxScan {
462 input: Box::new(input.combine_filters_node()),
463 count,
464 },
465 // Scan and Join don't need changes
466 other => other,
467 }
468 }
469
470 fn push_down_filters_node(self) -> Self {
471 match self {
472 // Filter before Select -> try to push filter down
473 PlanNode::Select { input, columns } => {
474 match *input {
475 PlanNode::Filter {
476 input: filter_input,
477 predicate,
478 } => {
479 // Push select down, keep filter above
480 let select_below = PlanNode::Select {
481 input: filter_input,
482 columns,
483 };
484 PlanNode::Filter {
485 input: Box::new(select_below.push_down_filters_node()),
486 predicate,
487 }
488 }
489 other => PlanNode::Select {
490 input: Box::new(other.push_down_filters_node()),
491 columns,
492 },
493 }
494 }
495 PlanNode::Filter { input, predicate } => PlanNode::Filter {
496 input: Box::new(input.push_down_filters_node()),
497 predicate,
498 },
499 PlanNode::WithColumn { input, name, expr } => PlanNode::WithColumn {
500 input: Box::new(input.push_down_filters_node()),
501 name,
502 expr,
503 },
504 PlanNode::Limit { input, count } => PlanNode::Limit {
505 input: Box::new(input.push_down_filters_node()),
506 count,
507 },
508 PlanNode::MaxScan { input, count } => PlanNode::MaxScan {
509 input: Box::new(input.push_down_filters_node()),
510 count,
511 },
512 other => other,
513 }
514 }
515
516 fn prune_columns_node(self) -> Self {
517 // TODO: Track which columns are actually used and push projection down
518 // For now, just recursively process children
519 match self {
520 PlanNode::Filter { input, predicate } => PlanNode::Filter {
521 input: Box::new(input.prune_columns_node()),
522 predicate,
523 },
524 PlanNode::Select { input, columns } => PlanNode::Select {
525 input: Box::new(input.prune_columns_node()),
526 columns,
527 },
528 PlanNode::WithColumn { input, name, expr } => PlanNode::WithColumn {
529 input: Box::new(input.prune_columns_node()),
530 name,
531 expr,
532 },
533 PlanNode::Limit { input, count } => PlanNode::Limit {
534 input: Box::new(input.prune_columns_node()),
535 count,
536 },
537 other => other,
538 }
539 }
540
541 fn explain(&self, indent: usize) -> String {
542 let prefix = " ".repeat(indent);
543 match self {
544 PlanNode::Scan {
545 path,
546 format,
547 projection,
548 } => {
549 let proj = if let Some(cols) = projection {
550 format!(" [{}]", cols.join(", "))
551 } else {
552 String::new()
553 };
554 format!("{}Scan: {:?} ({:?}){}", prefix, path, format, proj)
555 }
556 PlanNode::Filter { input, predicate } => {
557 format!(
558 "{}Filter: {}\n{}",
559 prefix,
560 predicate,
561 input.explain(indent + 1)
562 )
563 }
564 PlanNode::Select { input, columns } => {
565 format!(
566 "{}Select: [{}]\n{}",
567 prefix,
568 columns.join(", "),
569 input.explain(indent + 1)
570 )
571 }
572 PlanNode::WithColumn { input, name, expr } => {
573 format!(
574 "{}WithColumn: {} = {}\n{}",
575 prefix,
576 name,
577 expr,
578 input.explain(indent + 1)
579 )
580 }
581 PlanNode::Limit { input, count } => {
582 format!("{}Limit: {}\n{}", prefix, count, input.explain(indent + 1))
583 }
584 PlanNode::MaxScan { input, count } => {
585 format!(
586 "{}MaxScan: {}\n{}",
587 prefix,
588 count,
589 input.explain(indent + 1)
590 )
591 }
592 PlanNode::Join {
593 left,
594 right,
595 join_type,
596 on,
597 } => {
598 format!(
599 "{}Join: {:?} ON [{}]\n{}{}",
600 prefix,
601 join_type,
602 on.join(", "),
603 left.explain(indent + 1),
604 right.explain(indent + 1)
605 )
606 }
607 }
608 }
609}
610
611// ============================================================================
612// Tests
613// ============================================================================
614
615#[cfg(test)]
616mod tests {
617 use super::*;
618 use crate::expression::{col, lit};
619
620 #[test]
621 fn test_scan_plan() {
622 let plan = LogicalPlan::scan("test.vcf", FileFormat::Vcf);
623 assert_eq!(plan.format(), Some(FileFormat::Vcf));
624 }
625
626 #[test]
627 fn test_filter_plan() {
628 let plan = LogicalPlan::scan("test.vcf", FileFormat::Vcf).filter(col("qual").gt(lit(30.0)));
629
630 assert!(plan.root.has_filters());
631 assert_eq!(plan.root.filters().len(), 1);
632 }
633
634 #[test]
635 fn test_chained_filters() {
636 let plan = LogicalPlan::scan("test.vcf", FileFormat::Vcf)
637 .filter(col("qual").gt(lit(30.0)))
638 .filter(Expr::IsSnp);
639
640 assert_eq!(plan.root.filters().len(), 2);
641 }
642
643 #[test]
644 fn test_combine_filters() {
645 let plan = LogicalPlan::scan("test.vcf", FileFormat::Vcf)
646 .filter(col("qual").gt(lit(30.0)))
647 .filter(Expr::IsSnp);
648
649 let optimized = plan.combine_filters();
650
651 // Should combine into single filter
652 assert_eq!(optimized.root.filters().len(), 1);
653 }
654
655 #[test]
656 fn test_select() {
657 let plan = LogicalPlan::scan("test.vcf", FileFormat::Vcf).select(&["chrom", "pos"]);
658
659 match plan.root {
660 PlanNode::Select { columns, .. } => {
661 assert_eq!(columns, vec!["chrom", "pos"]);
662 }
663 _ => panic!("Expected Select node"),
664 }
665 }
666
667 #[test]
668 fn test_limit() {
669 let plan = LogicalPlan::scan("test.vcf", FileFormat::Vcf).limit(100);
670
671 match plan.root {
672 PlanNode::Limit { count, .. } => {
673 assert_eq!(count, 100);
674 }
675 _ => panic!("Expected Limit node"),
676 }
677 }
678
679 #[test]
680 fn test_complex_plan() {
681 let plan = LogicalPlan::scan("test.vcf", FileFormat::Vcf)
682 .filter(col("qual").gt(lit(30.0)))
683 .filter(Expr::IsSnp)
684 .select(&["chrom", "pos", "ref", "alt"])
685 .limit(1000);
686
687 let optimized = plan.optimize();
688
689 // Check that filters were combined
690 assert_eq!(optimized.root.filters().len(), 1);
691 }
692
693 #[test]
694 fn test_explain() {
695 let plan = LogicalPlan::scan("test.vcf", FileFormat::Vcf)
696 .filter(col("qual").gt(lit(30.0)))
697 .select(&["chrom", "pos"]);
698
699 let explanation = plan.explain();
700 assert!(explanation.contains("Scan"));
701 assert!(explanation.contains("Filter"));
702 assert!(explanation.contains("Select"));
703 }
704}