Skip to main content

varpulis_parser/
optimizer.rs

1//! Rule-based logical plan optimizer
2//!
3//! Applies optimization rules to a `LogicalPlan` before physical materialization.
4//! Rules are applied iteratively (up to `MAX_PASSES`) until no rule produces changes.
5//!
6//! # Current Rules
7//!
8//! - **FilterPushdown**: Move filters before windows/aggregates when safe
9//! - **TemporalFilterPushdown**: Push time-based filters before windows
10//! - **WindowMerge**: Merge adjacent windows with compatible configurations
11//! - **ProjectionPruning**: Remove duplicate projections (keep last)
12
13use varpulis_core::ast::Expr;
14use varpulis_core::plan::{LogicalOp, LogicalPlan, LogicalStream};
15
16/// Maximum optimization passes before stopping.
17const MAX_PASSES: usize = 10;
18
19/// A single optimization rule that can transform a logical stream.
20pub trait OptimizationRule: Send + Sync {
21    /// Human-readable name for logging/debugging.
22    fn name(&self) -> &str;
23
24    /// Apply the rule to a stream, returning `true` if any changes were made.
25    fn apply(&self, stream: &mut LogicalStream) -> bool;
26}
27
28/// The plan optimizer: runs a set of rules iteratively until convergence.
29pub struct Optimizer {
30    rules: Vec<Box<dyn OptimizationRule>>,
31}
32
33impl std::fmt::Debug for Optimizer {
34    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35        f.debug_struct("Optimizer")
36            .field("rules_count", &self.rules.len())
37            .finish_non_exhaustive()
38    }
39}
40
41impl Optimizer {
42    /// Create an optimizer with the default set of rules.
43    pub fn default_rules() -> Self {
44        Self {
45            rules: vec![
46                Box::new(FilterPushdown),
47                Box::new(TemporalFilterPushdown),
48                Box::new(WindowMerge),
49                Box::new(ProjectionPruning),
50            ],
51        }
52    }
53
54    /// Create an optimizer with a custom set of rules.
55    pub fn with_rules(rules: Vec<Box<dyn OptimizationRule>>) -> Self {
56        Self { rules }
57    }
58
59    /// Optimize a logical plan by applying rules iteratively.
60    ///
61    /// Returns the optimized plan and the number of passes performed.
62    pub fn optimize(&self, mut plan: LogicalPlan) -> (LogicalPlan, usize) {
63        let mut total_passes = 0;
64
65        for pass in 0..MAX_PASSES {
66            let mut any_changed = false;
67
68            for stream in &mut plan.streams {
69                for rule in &self.rules {
70                    if rule.apply(stream) {
71                        any_changed = true;
72                    }
73                }
74            }
75
76            total_passes = pass + 1;
77            if !any_changed {
78                break;
79            }
80        }
81
82        (plan, total_passes)
83    }
84}
85
86/// Convenience function: optimize a plan with default rules.
87pub fn optimize_plan(plan: LogicalPlan) -> LogicalPlan {
88    let optimizer = Optimizer::default_rules();
89    let (optimized, _) = optimizer.optimize(plan);
90    optimized
91}
92
93// =============================================================================
94// Rule: FilterPushdown
95// =============================================================================
96
97/// Move filter operations before windows and aggregates when safe.
98///
99/// A filter can be pushed before a window/aggregate if it only references
100/// fields that are available before the window (i.e., input fields, not
101/// aggregation output aliases).
102struct FilterPushdown;
103
104impl OptimizationRule for FilterPushdown {
105    fn name(&self) -> &'static str {
106        "FilterPushdown"
107    }
108
109    fn apply(&self, stream: &mut LogicalStream) -> bool {
110        let ops = &mut stream.operations;
111        let mut changed = false;
112
113        // Scan for Filter ops that appear after a Window or Aggregate
114        // and can be moved before them.
115        let mut i = 0;
116        while i < ops.len() {
117            if let LogicalOp::Filter(expr) = &ops[i] {
118                // Look backwards for a Window or Aggregate to push past
119                if i > 0 {
120                    let prev_idx = i - 1;
121                    let can_push = match &ops[prev_idx] {
122                        LogicalOp::Window(_) => !references_aggregation_output(expr),
123                        _ => false,
124                    };
125
126                    if can_push {
127                        ops.swap(prev_idx, i);
128                        changed = true;
129                        // Don't increment i — check if we can push further
130                        continue;
131                    }
132                }
133            }
134            i += 1;
135        }
136
137        changed
138    }
139}
140
141// =============================================================================
142// Rule: TemporalFilterPushdown
143// =============================================================================
144
145/// Push time-based filters before windows.
146///
147/// Filters on timestamp fields can safely be pushed before window operations
148/// to reduce the number of events entering the window.
149struct TemporalFilterPushdown;
150
151impl OptimizationRule for TemporalFilterPushdown {
152    fn name(&self) -> &'static str {
153        "TemporalFilterPushdown"
154    }
155
156    fn apply(&self, stream: &mut LogicalStream) -> bool {
157        let ops = &mut stream.operations;
158        let mut changed = false;
159
160        let mut i = 0;
161        while i < ops.len() {
162            if let LogicalOp::Filter(expr) = &ops[i] {
163                if is_temporal_filter(expr) && i > 0 {
164                    let prev_idx = i - 1;
165                    if matches!(ops[prev_idx], LogicalOp::Window(_)) {
166                        ops.swap(prev_idx, i);
167                        changed = true;
168                        continue;
169                    }
170                }
171            }
172            i += 1;
173        }
174
175        changed
176    }
177}
178
179// =============================================================================
180// Rule: WindowMerge
181// =============================================================================
182
183/// Merge adjacent window operations with compatible configurations.
184///
185/// Two adjacent windows with the same duration and sliding parameters
186/// can be merged into a single window, reducing processing overhead.
187struct WindowMerge;
188
189impl OptimizationRule for WindowMerge {
190    fn name(&self) -> &'static str {
191        "WindowMerge"
192    }
193
194    fn apply(&self, stream: &mut LogicalStream) -> bool {
195        let ops = &mut stream.operations;
196        if ops.len() < 2 {
197            return false;
198        }
199
200        let mut changed = false;
201        let mut i = 0;
202
203        while i + 1 < ops.len() {
204            let mergeable =
205                if let (LogicalOp::Window(a), LogicalOp::Window(b)) = (&ops[i], &ops[i + 1]) {
206                    // Same duration and sliding config → mergeable
207                    a.duration == b.duration
208                        && a.sliding == b.sliding
209                        && a.session_gap == b.session_gap
210                        && a.policy == b.policy
211                } else {
212                    false
213                };
214
215            if mergeable {
216                ops.remove(i + 1);
217                changed = true;
218                // Don't increment — check if another window follows
219            } else {
220                i += 1;
221            }
222        }
223
224        changed
225    }
226}
227
228// =============================================================================
229// Rule: ProjectionPruning
230// =============================================================================
231
232/// Remove redundant projections: if two adjacent Project ops exist,
233/// keep only the second (it overwrites the first).
234struct ProjectionPruning;
235
236impl OptimizationRule for ProjectionPruning {
237    fn name(&self) -> &'static str {
238        "ProjectionPruning"
239    }
240
241    fn apply(&self, stream: &mut LogicalStream) -> bool {
242        let ops = &mut stream.operations;
243        if ops.len() < 2 {
244            return false;
245        }
246
247        let mut changed = false;
248        let mut i = 0;
249
250        while i + 1 < ops.len() {
251            let both_project = matches!(ops[i], LogicalOp::Project(_))
252                && matches!(ops[i + 1], LogicalOp::Project(_));
253
254            if both_project {
255                ops.remove(i);
256                changed = true;
257                // Don't increment — the next pair might also be projections
258            } else {
259                i += 1;
260            }
261        }
262
263        changed
264    }
265}
266
267// =============================================================================
268// Helper functions
269// =============================================================================
270
271/// Check if an expression references aggregation output aliases.
272///
273/// Simple heuristic: aggregation outputs typically use function call results
274/// (count, sum, avg, etc.). If the filter only uses simple field accesses
275/// and comparisons, it's safe to push before aggregation.
276fn references_aggregation_output(expr: &Expr) -> bool {
277    match expr {
278        Expr::Call { .. } => true,
279        Expr::Binary { left, right, .. } => {
280            references_aggregation_output(left) || references_aggregation_output(right)
281        }
282        Expr::Unary { expr, .. } => references_aggregation_output(expr),
283        Expr::Member { expr, .. } => references_aggregation_output(expr),
284        _ => false,
285    }
286}
287
288/// Check if a filter expression is temporal (references timestamp-like fields).
289fn is_temporal_filter(expr: &Expr) -> bool {
290    match expr {
291        Expr::Binary { left, right, .. } => {
292            is_timestamp_reference(left) || is_timestamp_reference(right)
293        }
294        _ => false,
295    }
296}
297
298fn is_timestamp_reference(expr: &Expr) -> bool {
299    match expr {
300        Expr::Ident(name) => {
301            let lower = name.to_lowercase();
302            lower.contains("timestamp")
303                || lower.contains("time")
304                || lower == "ts"
305                || lower == "event_time"
306        }
307        Expr::Member { member, .. } => {
308            let lower = member.to_lowercase();
309            lower.contains("timestamp")
310                || lower.contains("time")
311                || lower == "ts"
312                || lower == "event_time"
313        }
314        _ => false,
315    }
316}
317
318#[cfg(test)]
319mod tests {
320    use varpulis_core::ast::{BinOp, Expr, SelectItem, WindowArgs};
321    use varpulis_core::plan::{LogicalOp, LogicalPlan, LogicalSource, LogicalStream};
322
323    use super::*;
324
325    fn make_plan(ops: Vec<LogicalOp>) -> LogicalPlan {
326        LogicalPlan {
327            streams: vec![LogicalStream {
328                id: 0,
329                name: "Test".to_string(),
330                source: LogicalSource::EventType("E".to_string()),
331                operations: ops,
332                estimated_cardinality: None,
333            }],
334            functions: vec![],
335            variables: vec![],
336            connectors: vec![],
337            patterns: vec![],
338            events: vec![],
339        }
340    }
341
342    fn window_op() -> LogicalOp {
343        LogicalOp::Window(WindowArgs {
344            duration: Expr::Duration(60_000_000_000), // 60s
345            sliding: None,
346            policy: None,
347            session_gap: None,
348        })
349    }
350
351    fn filter_op(name: &str) -> LogicalOp {
352        LogicalOp::Filter(Expr::Binary {
353            op: BinOp::Gt,
354            left: Box::new(Expr::Ident(name.to_string())),
355            right: Box::new(Expr::Int(100)),
356        })
357    }
358
359    #[test]
360    fn test_filter_pushdown_before_window() {
361        let ops = vec![window_op(), filter_op("temperature")];
362        let plan = make_plan(ops);
363
364        let optimizer = Optimizer::default_rules();
365        let (optimized, passes) = optimizer.optimize(plan);
366
367        let ops = &optimized.streams[0].operations;
368        assert!(
369            matches!(ops[0], LogicalOp::Filter(_)),
370            "Filter should be pushed before Window"
371        );
372        assert!(
373            matches!(ops[1], LogicalOp::Window(_)),
374            "Window should come after Filter"
375        );
376        assert!(passes >= 1);
377    }
378
379    #[test]
380    fn test_filter_not_pushed_past_non_window() {
381        let ops = vec![LogicalOp::Aggregate(vec![]), filter_op("temperature")];
382        let plan = make_plan(ops);
383
384        let optimizer = Optimizer::default_rules();
385        let (optimized, _) = optimizer.optimize(plan);
386
387        let ops = &optimized.streams[0].operations;
388        // Filter should NOT be pushed past Aggregate (not window)
389        assert!(matches!(ops[0], LogicalOp::Aggregate(_)));
390        assert!(matches!(ops[1], LogicalOp::Filter(_)));
391    }
392
393    #[test]
394    fn test_window_merge() {
395        let ops = vec![
396            LogicalOp::Window(WindowArgs {
397                duration: Expr::Duration(60_000_000_000),
398                sliding: None,
399                policy: None,
400                session_gap: None,
401            }),
402            LogicalOp::Window(WindowArgs {
403                duration: Expr::Duration(60_000_000_000),
404                sliding: None,
405                policy: None,
406                session_gap: None,
407            }),
408        ];
409        let plan = make_plan(ops);
410
411        let optimizer = Optimizer::default_rules();
412        let (optimized, _) = optimizer.optimize(plan);
413
414        assert_eq!(
415            optimized.streams[0].operations.len(),
416            1,
417            "Two identical windows should merge into one"
418        );
419    }
420
421    #[test]
422    fn test_window_no_merge_different_duration() {
423        let ops = vec![
424            LogicalOp::Window(WindowArgs {
425                duration: Expr::Duration(60_000_000_000),
426                sliding: None,
427                policy: None,
428                session_gap: None,
429            }),
430            LogicalOp::Window(WindowArgs {
431                duration: Expr::Duration(120_000_000_000),
432                sliding: None,
433                policy: None,
434                session_gap: None,
435            }),
436        ];
437        let plan = make_plan(ops);
438
439        let optimizer = Optimizer::default_rules();
440        let (optimized, _) = optimizer.optimize(plan);
441
442        assert_eq!(
443            optimized.streams[0].operations.len(),
444            2,
445            "Windows with different durations should not merge"
446        );
447    }
448
449    #[test]
450    fn test_projection_pruning() {
451        let ops = vec![
452            LogicalOp::Project(vec![SelectItem::Field("a".to_string())]),
453            LogicalOp::Project(vec![
454                SelectItem::Field("a".to_string()),
455                SelectItem::Field("b".to_string()),
456            ]),
457        ];
458        let plan = make_plan(ops);
459
460        let optimizer = Optimizer::default_rules();
461        let (optimized, _) = optimizer.optimize(plan);
462
463        assert_eq!(
464            optimized.streams[0].operations.len(),
465            1,
466            "Adjacent projections should be pruned to one"
467        );
468        if let LogicalOp::Project(items) = &optimized.streams[0].operations[0] {
469            assert_eq!(items.len(), 2, "Should keep the second (final) projection");
470        } else {
471            panic!("Expected Project op");
472        }
473    }
474
475    #[test]
476    fn test_temporal_filter_pushdown() {
477        let ops = vec![
478            window_op(),
479            LogicalOp::Filter(Expr::Binary {
480                op: BinOp::Gt,
481                left: Box::new(Expr::Ident("timestamp".to_string())),
482                right: Box::new(Expr::Int(1000)),
483            }),
484        ];
485        let plan = make_plan(ops);
486
487        let optimizer = Optimizer::default_rules();
488        let (optimized, _) = optimizer.optimize(plan);
489
490        let ops = &optimized.streams[0].operations;
491        assert!(
492            matches!(ops[0], LogicalOp::Filter(_)),
493            "Temporal filter should be pushed before Window"
494        );
495    }
496
497    #[test]
498    fn test_no_changes_returns_single_pass() {
499        let ops = vec![filter_op("temperature"), window_op()];
500        let plan = make_plan(ops);
501
502        let optimizer = Optimizer::default_rules();
503        let (_, passes) = optimizer.optimize(plan);
504
505        assert_eq!(passes, 1, "No changes should result in a single pass");
506    }
507
508    #[test]
509    fn test_optimize_plan_convenience() {
510        let ops = vec![window_op(), filter_op("temperature")];
511        let plan = make_plan(ops);
512
513        let optimized = optimize_plan(plan);
514        assert!(matches!(
515            optimized.streams[0].operations[0],
516            LogicalOp::Filter(_)
517        ));
518    }
519}