Skip to main content

varpulis_parser/
optimizer.rs

1//! Rule-based logical plan optimizer
2//!
3//! Applies optimization rules to a `LogicalPlan` before physical materialization.
4//! Rules are applied iteratively (up to `MAX_PASSES`) until no rule produces changes.
5//!
6//! # Current Rules
7//!
8//! - **FilterPushdown**: Move filters before windows/aggregates when safe
9//! - **TemporalFilterPushdown**: Push time-based filters before windows
10//! - **WindowMerge**: Merge adjacent windows with compatible configurations
11//! - **ProjectionPruning**: Remove duplicate projections (keep last)
12
13use varpulis_core::ast::Expr;
14use varpulis_core::plan::{LogicalOp, LogicalPlan, LogicalStream};
15
16/// Maximum optimization passes before stopping.
17const MAX_PASSES: usize = 10;
18
19/// A single optimization rule that can transform a logical stream.
20pub trait OptimizationRule: Send + Sync {
21    /// Human-readable name for logging/debugging.
22    fn name(&self) -> &str;
23
24    /// Apply the rule to a stream, returning `true` if any changes were made.
25    fn apply(&self, stream: &mut LogicalStream) -> bool;
26}
27
28/// The plan optimizer: runs a set of rules iteratively until convergence.
29pub struct Optimizer {
30    rules: Vec<Box<dyn OptimizationRule>>,
31}
32
33impl std::fmt::Debug for Optimizer {
34    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35        f.debug_struct("Optimizer")
36            .field("rules_count", &self.rules.len())
37            .finish_non_exhaustive()
38    }
39}
40
41impl Optimizer {
42    /// Create an optimizer with the default set of rules.
43    pub fn default_rules() -> Self {
44        Self {
45            rules: vec![
46                Box::new(FilterPushdown),
47                Box::new(TemporalFilterPushdown),
48                Box::new(WindowMerge),
49                Box::new(ProjectionPruning),
50            ],
51        }
52    }
53
54    /// Create an optimizer with a custom set of rules.
55    pub fn with_rules(rules: Vec<Box<dyn OptimizationRule>>) -> Self {
56        Self { rules }
57    }
58
59    /// Optimize a logical plan by applying rules iteratively.
60    ///
61    /// Returns the optimized plan and the number of passes performed.
62    pub fn optimize(&self, mut plan: LogicalPlan) -> (LogicalPlan, usize) {
63        let mut total_passes = 0;
64
65        for pass in 0..MAX_PASSES {
66            let mut any_changed = false;
67
68            for stream in &mut plan.streams {
69                for rule in &self.rules {
70                    if rule.apply(stream) {
71                        any_changed = true;
72                    }
73                }
74            }
75
76            total_passes = pass + 1;
77            if !any_changed {
78                break;
79            }
80        }
81
82        (plan, total_passes)
83    }
84}
85
86/// Convenience function: optimize a plan with default rules.
87pub fn optimize_plan(plan: LogicalPlan) -> LogicalPlan {
88    let optimizer = Optimizer::default_rules();
89    let (optimized, _) = optimizer.optimize(plan);
90    optimized
91}
92
93// =============================================================================
94// Rule: FilterPushdown
95// =============================================================================
96
97/// Move filter operations before windows and aggregates when safe.
98///
99/// A filter can be pushed before a window/aggregate if it only references
100/// fields that are available before the window (i.e., input fields, not
101/// aggregation output aliases).
102struct FilterPushdown;
103
104impl OptimizationRule for FilterPushdown {
105    fn name(&self) -> &'static str {
106        "FilterPushdown"
107    }
108
109    fn apply(&self, stream: &mut LogicalStream) -> bool {
110        let ops = &mut stream.operations;
111        let mut changed = false;
112
113        // Scan for Filter ops that appear after a Window or Aggregate
114        // and can be moved before them.
115        let mut i = 0;
116        while i < ops.len() {
117            if let LogicalOp::Filter(expr) = &ops[i] {
118                // Look backwards for a Window or Aggregate to push past
119                if i > 0 {
120                    let prev_idx = i - 1;
121                    let can_push = match &ops[prev_idx] {
122                        LogicalOp::Window(_) => !references_aggregation_output(expr),
123                        _ => false,
124                    };
125
126                    if can_push {
127                        ops.swap(prev_idx, i);
128                        changed = true;
129                        // Don't increment i — check if we can push further
130                        continue;
131                    }
132                }
133            }
134            i += 1;
135        }
136
137        changed
138    }
139}
140
141// =============================================================================
142// Rule: TemporalFilterPushdown
143// =============================================================================
144
145/// Push time-based filters before windows.
146///
147/// Filters on timestamp fields can safely be pushed before window operations
148/// to reduce the number of events entering the window.
149struct TemporalFilterPushdown;
150
151impl OptimizationRule for TemporalFilterPushdown {
152    fn name(&self) -> &'static str {
153        "TemporalFilterPushdown"
154    }
155
156    fn apply(&self, stream: &mut LogicalStream) -> bool {
157        let ops = &mut stream.operations;
158        let mut changed = false;
159
160        let mut i = 0;
161        while i < ops.len() {
162            if let LogicalOp::Filter(expr) = &ops[i] {
163                if is_temporal_filter(expr) && i > 0 {
164                    let prev_idx = i - 1;
165                    if matches!(ops[prev_idx], LogicalOp::Window(_)) {
166                        ops.swap(prev_idx, i);
167                        changed = true;
168                        continue;
169                    }
170                }
171            }
172            i += 1;
173        }
174
175        changed
176    }
177}
178
179// =============================================================================
180// Rule: WindowMerge
181// =============================================================================
182
183/// Merge adjacent window operations with compatible configurations.
184///
185/// Two adjacent windows with the same duration and sliding parameters
186/// can be merged into a single window, reducing processing overhead.
187struct WindowMerge;
188
189impl OptimizationRule for WindowMerge {
190    fn name(&self) -> &'static str {
191        "WindowMerge"
192    }
193
194    fn apply(&self, stream: &mut LogicalStream) -> bool {
195        let ops = &mut stream.operations;
196        if ops.len() < 2 {
197            return false;
198        }
199
200        let mut changed = false;
201        let mut i = 0;
202
203        while i + 1 < ops.len() {
204            let mergeable =
205                if let (LogicalOp::Window(a), LogicalOp::Window(b)) = (&ops[i], &ops[i + 1]) {
206                    // Same duration and sliding config → mergeable
207                    a.duration == b.duration
208                        && a.sliding == b.sliding
209                        && a.session_gap == b.session_gap
210                        && a.policy == b.policy
211                } else {
212                    false
213                };
214
215            if mergeable {
216                ops.remove(i + 1);
217                changed = true;
218                // Don't increment — check if another window follows
219            } else {
220                i += 1;
221            }
222        }
223
224        changed
225    }
226}
227
228// =============================================================================
229// Rule: ProjectionPruning
230// =============================================================================
231
232/// Remove redundant projections: if two adjacent Project ops exist,
233/// keep only the second (it overwrites the first).
234struct ProjectionPruning;
235
236impl OptimizationRule for ProjectionPruning {
237    fn name(&self) -> &'static str {
238        "ProjectionPruning"
239    }
240
241    fn apply(&self, stream: &mut LogicalStream) -> bool {
242        let ops = &mut stream.operations;
243        if ops.len() < 2 {
244            return false;
245        }
246
247        let mut changed = false;
248        let mut i = 0;
249
250        while i + 1 < ops.len() {
251            let both_project = matches!(ops[i], LogicalOp::Project(_))
252                && matches!(ops[i + 1], LogicalOp::Project(_));
253
254            if both_project {
255                ops.remove(i);
256                changed = true;
257                // Don't increment — the next pair might also be projections
258            } else {
259                i += 1;
260            }
261        }
262
263        changed
264    }
265}
266
267// =============================================================================
268// Helper functions
269// =============================================================================
270
271/// Check if an expression references aggregation output aliases.
272///
273/// Simple heuristic: aggregation outputs typically use function call results
274/// (count, sum, avg, etc.). If the filter only uses simple field accesses
275/// and comparisons, it's safe to push before aggregation.
276fn references_aggregation_output(expr: &Expr) -> bool {
277    match expr {
278        Expr::Call { .. } => true,
279        Expr::Binary { left, right, .. } => {
280            references_aggregation_output(left) || references_aggregation_output(right)
281        }
282        Expr::Unary { expr, .. } => references_aggregation_output(expr),
283        Expr::Member { expr, .. } => references_aggregation_output(expr),
284        _ => false,
285    }
286}
287
288/// Check if a filter expression is temporal (references timestamp-like fields).
289fn is_temporal_filter(expr: &Expr) -> bool {
290    match expr {
291        Expr::Binary { left, right, .. } => {
292            is_timestamp_reference(left) || is_timestamp_reference(right)
293        }
294        _ => false,
295    }
296}
297
298fn is_timestamp_reference(expr: &Expr) -> bool {
299    match expr {
300        Expr::Ident(name) => {
301            let lower = name.to_lowercase();
302            lower.contains("timestamp")
303                || lower.contains("time")
304                || lower == "ts"
305                || lower == "event_time"
306        }
307        Expr::Member { member, .. } => {
308            let lower = member.to_lowercase();
309            lower.contains("timestamp")
310                || lower.contains("time")
311                || lower == "ts"
312                || lower == "event_time"
313        }
314        _ => false,
315    }
316}
317
318#[cfg(test)]
319mod tests {
320    use super::*;
321    use varpulis_core::ast::{BinOp, Expr, SelectItem, WindowArgs};
322    use varpulis_core::plan::{LogicalOp, LogicalPlan, LogicalSource, LogicalStream};
323
324    fn make_plan(ops: Vec<LogicalOp>) -> LogicalPlan {
325        LogicalPlan {
326            streams: vec![LogicalStream {
327                id: 0,
328                name: "Test".to_string(),
329                source: LogicalSource::EventType("E".to_string()),
330                operations: ops,
331                estimated_cardinality: None,
332            }],
333            functions: vec![],
334            variables: vec![],
335            connectors: vec![],
336            patterns: vec![],
337            events: vec![],
338        }
339    }
340
341    fn window_op() -> LogicalOp {
342        LogicalOp::Window(WindowArgs {
343            duration: Expr::Duration(60_000_000_000), // 60s
344            sliding: None,
345            policy: None,
346            session_gap: None,
347        })
348    }
349
350    fn filter_op(name: &str) -> LogicalOp {
351        LogicalOp::Filter(Expr::Binary {
352            op: BinOp::Gt,
353            left: Box::new(Expr::Ident(name.to_string())),
354            right: Box::new(Expr::Int(100)),
355        })
356    }
357
358    #[test]
359    fn test_filter_pushdown_before_window() {
360        let ops = vec![window_op(), filter_op("temperature")];
361        let plan = make_plan(ops);
362
363        let optimizer = Optimizer::default_rules();
364        let (optimized, passes) = optimizer.optimize(plan);
365
366        let ops = &optimized.streams[0].operations;
367        assert!(
368            matches!(ops[0], LogicalOp::Filter(_)),
369            "Filter should be pushed before Window"
370        );
371        assert!(
372            matches!(ops[1], LogicalOp::Window(_)),
373            "Window should come after Filter"
374        );
375        assert!(passes >= 1);
376    }
377
378    #[test]
379    fn test_filter_not_pushed_past_non_window() {
380        let ops = vec![LogicalOp::Aggregate(vec![]), filter_op("temperature")];
381        let plan = make_plan(ops);
382
383        let optimizer = Optimizer::default_rules();
384        let (optimized, _) = optimizer.optimize(plan);
385
386        let ops = &optimized.streams[0].operations;
387        // Filter should NOT be pushed past Aggregate (not window)
388        assert!(matches!(ops[0], LogicalOp::Aggregate(_)));
389        assert!(matches!(ops[1], LogicalOp::Filter(_)));
390    }
391
392    #[test]
393    fn test_window_merge() {
394        let ops = vec![
395            LogicalOp::Window(WindowArgs {
396                duration: Expr::Duration(60_000_000_000),
397                sliding: None,
398                policy: None,
399                session_gap: None,
400            }),
401            LogicalOp::Window(WindowArgs {
402                duration: Expr::Duration(60_000_000_000),
403                sliding: None,
404                policy: None,
405                session_gap: None,
406            }),
407        ];
408        let plan = make_plan(ops);
409
410        let optimizer = Optimizer::default_rules();
411        let (optimized, _) = optimizer.optimize(plan);
412
413        assert_eq!(
414            optimized.streams[0].operations.len(),
415            1,
416            "Two identical windows should merge into one"
417        );
418    }
419
420    #[test]
421    fn test_window_no_merge_different_duration() {
422        let ops = vec![
423            LogicalOp::Window(WindowArgs {
424                duration: Expr::Duration(60_000_000_000),
425                sliding: None,
426                policy: None,
427                session_gap: None,
428            }),
429            LogicalOp::Window(WindowArgs {
430                duration: Expr::Duration(120_000_000_000),
431                sliding: None,
432                policy: None,
433                session_gap: None,
434            }),
435        ];
436        let plan = make_plan(ops);
437
438        let optimizer = Optimizer::default_rules();
439        let (optimized, _) = optimizer.optimize(plan);
440
441        assert_eq!(
442            optimized.streams[0].operations.len(),
443            2,
444            "Windows with different durations should not merge"
445        );
446    }
447
448    #[test]
449    fn test_projection_pruning() {
450        let ops = vec![
451            LogicalOp::Project(vec![SelectItem::Field("a".to_string())]),
452            LogicalOp::Project(vec![
453                SelectItem::Field("a".to_string()),
454                SelectItem::Field("b".to_string()),
455            ]),
456        ];
457        let plan = make_plan(ops);
458
459        let optimizer = Optimizer::default_rules();
460        let (optimized, _) = optimizer.optimize(plan);
461
462        assert_eq!(
463            optimized.streams[0].operations.len(),
464            1,
465            "Adjacent projections should be pruned to one"
466        );
467        if let LogicalOp::Project(items) = &optimized.streams[0].operations[0] {
468            assert_eq!(items.len(), 2, "Should keep the second (final) projection");
469        } else {
470            panic!("Expected Project op");
471        }
472    }
473
474    #[test]
475    fn test_temporal_filter_pushdown() {
476        let ops = vec![
477            window_op(),
478            LogicalOp::Filter(Expr::Binary {
479                op: BinOp::Gt,
480                left: Box::new(Expr::Ident("timestamp".to_string())),
481                right: Box::new(Expr::Int(1000)),
482            }),
483        ];
484        let plan = make_plan(ops);
485
486        let optimizer = Optimizer::default_rules();
487        let (optimized, _) = optimizer.optimize(plan);
488
489        let ops = &optimized.streams[0].operations;
490        assert!(
491            matches!(ops[0], LogicalOp::Filter(_)),
492            "Temporal filter should be pushed before Window"
493        );
494    }
495
496    #[test]
497    fn test_no_changes_returns_single_pass() {
498        let ops = vec![filter_op("temperature"), window_op()];
499        let plan = make_plan(ops);
500
501        let optimizer = Optimizer::default_rules();
502        let (_, passes) = optimizer.optimize(plan);
503
504        assert_eq!(passes, 1, "No changes should result in a single pass");
505    }
506
507    #[test]
508    fn test_optimize_plan_convenience() {
509        let ops = vec![window_op(), filter_op("temperature")];
510        let plan = make_plan(ops);
511
512        let optimized = optimize_plan(plan);
513        assert!(matches!(
514            optimized.streams[0].operations[0],
515            LogicalOp::Filter(_)
516        ));
517    }
518}