1use varpulis_core::ast::Expr;
14use varpulis_core::plan::{LogicalOp, LogicalPlan, LogicalStream};
15
16const MAX_PASSES: usize = 10;
18
19pub trait OptimizationRule: Send + Sync {
21 fn name(&self) -> &str;
23
24 fn apply(&self, stream: &mut LogicalStream) -> bool;
26}
27
28pub struct Optimizer {
30 rules: Vec<Box<dyn OptimizationRule>>,
31}
32
33impl std::fmt::Debug for Optimizer {
34 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35 f.debug_struct("Optimizer")
36 .field("rules_count", &self.rules.len())
37 .finish_non_exhaustive()
38 }
39}
40
41impl Optimizer {
42 pub fn default_rules() -> Self {
44 Self {
45 rules: vec![
46 Box::new(FilterPushdown),
47 Box::new(TemporalFilterPushdown),
48 Box::new(WindowMerge),
49 Box::new(ProjectionPruning),
50 ],
51 }
52 }
53
54 pub fn with_rules(rules: Vec<Box<dyn OptimizationRule>>) -> Self {
56 Self { rules }
57 }
58
59 pub fn optimize(&self, mut plan: LogicalPlan) -> (LogicalPlan, usize) {
63 let mut total_passes = 0;
64
65 for pass in 0..MAX_PASSES {
66 let mut any_changed = false;
67
68 for stream in &mut plan.streams {
69 for rule in &self.rules {
70 if rule.apply(stream) {
71 any_changed = true;
72 }
73 }
74 }
75
76 total_passes = pass + 1;
77 if !any_changed {
78 break;
79 }
80 }
81
82 (plan, total_passes)
83 }
84}
85
86pub fn optimize_plan(plan: LogicalPlan) -> LogicalPlan {
88 let optimizer = Optimizer::default_rules();
89 let (optimized, _) = optimizer.optimize(plan);
90 optimized
91}
92
93struct FilterPushdown;
103
104impl OptimizationRule for FilterPushdown {
105 fn name(&self) -> &'static str {
106 "FilterPushdown"
107 }
108
109 fn apply(&self, stream: &mut LogicalStream) -> bool {
110 let ops = &mut stream.operations;
111 let mut changed = false;
112
113 let mut i = 0;
116 while i < ops.len() {
117 if let LogicalOp::Filter(expr) = &ops[i] {
118 if i > 0 {
120 let prev_idx = i - 1;
121 let can_push = match &ops[prev_idx] {
122 LogicalOp::Window(_) => !references_aggregation_output(expr),
123 _ => false,
124 };
125
126 if can_push {
127 ops.swap(prev_idx, i);
128 changed = true;
129 continue;
131 }
132 }
133 }
134 i += 1;
135 }
136
137 changed
138 }
139}
140
141struct TemporalFilterPushdown;
150
151impl OptimizationRule for TemporalFilterPushdown {
152 fn name(&self) -> &'static str {
153 "TemporalFilterPushdown"
154 }
155
156 fn apply(&self, stream: &mut LogicalStream) -> bool {
157 let ops = &mut stream.operations;
158 let mut changed = false;
159
160 let mut i = 0;
161 while i < ops.len() {
162 if let LogicalOp::Filter(expr) = &ops[i] {
163 if is_temporal_filter(expr) && i > 0 {
164 let prev_idx = i - 1;
165 if matches!(ops[prev_idx], LogicalOp::Window(_)) {
166 ops.swap(prev_idx, i);
167 changed = true;
168 continue;
169 }
170 }
171 }
172 i += 1;
173 }
174
175 changed
176 }
177}
178
179struct WindowMerge;
188
189impl OptimizationRule for WindowMerge {
190 fn name(&self) -> &'static str {
191 "WindowMerge"
192 }
193
194 fn apply(&self, stream: &mut LogicalStream) -> bool {
195 let ops = &mut stream.operations;
196 if ops.len() < 2 {
197 return false;
198 }
199
200 let mut changed = false;
201 let mut i = 0;
202
203 while i + 1 < ops.len() {
204 let mergeable =
205 if let (LogicalOp::Window(a), LogicalOp::Window(b)) = (&ops[i], &ops[i + 1]) {
206 a.duration == b.duration
208 && a.sliding == b.sliding
209 && a.session_gap == b.session_gap
210 && a.policy == b.policy
211 } else {
212 false
213 };
214
215 if mergeable {
216 ops.remove(i + 1);
217 changed = true;
218 } else {
220 i += 1;
221 }
222 }
223
224 changed
225 }
226}
227
228struct ProjectionPruning;
235
236impl OptimizationRule for ProjectionPruning {
237 fn name(&self) -> &'static str {
238 "ProjectionPruning"
239 }
240
241 fn apply(&self, stream: &mut LogicalStream) -> bool {
242 let ops = &mut stream.operations;
243 if ops.len() < 2 {
244 return false;
245 }
246
247 let mut changed = false;
248 let mut i = 0;
249
250 while i + 1 < ops.len() {
251 let both_project = matches!(ops[i], LogicalOp::Project(_))
252 && matches!(ops[i + 1], LogicalOp::Project(_));
253
254 if both_project {
255 ops.remove(i);
256 changed = true;
257 } else {
259 i += 1;
260 }
261 }
262
263 changed
264 }
265}
266
267fn references_aggregation_output(expr: &Expr) -> bool {
277 match expr {
278 Expr::Call { .. } => true,
279 Expr::Binary { left, right, .. } => {
280 references_aggregation_output(left) || references_aggregation_output(right)
281 }
282 Expr::Unary { expr, .. } => references_aggregation_output(expr),
283 Expr::Member { expr, .. } => references_aggregation_output(expr),
284 _ => false,
285 }
286}
287
288fn is_temporal_filter(expr: &Expr) -> bool {
290 match expr {
291 Expr::Binary { left, right, .. } => {
292 is_timestamp_reference(left) || is_timestamp_reference(right)
293 }
294 _ => false,
295 }
296}
297
298fn is_timestamp_reference(expr: &Expr) -> bool {
299 match expr {
300 Expr::Ident(name) => {
301 let lower = name.to_lowercase();
302 lower.contains("timestamp")
303 || lower.contains("time")
304 || lower == "ts"
305 || lower == "event_time"
306 }
307 Expr::Member { member, .. } => {
308 let lower = member.to_lowercase();
309 lower.contains("timestamp")
310 || lower.contains("time")
311 || lower == "ts"
312 || lower == "event_time"
313 }
314 _ => false,
315 }
316}
317
318#[cfg(test)]
319mod tests {
320 use super::*;
321 use varpulis_core::ast::{BinOp, Expr, SelectItem, WindowArgs};
322 use varpulis_core::plan::{LogicalOp, LogicalPlan, LogicalSource, LogicalStream};
323
324 fn make_plan(ops: Vec<LogicalOp>) -> LogicalPlan {
325 LogicalPlan {
326 streams: vec![LogicalStream {
327 id: 0,
328 name: "Test".to_string(),
329 source: LogicalSource::EventType("E".to_string()),
330 operations: ops,
331 estimated_cardinality: None,
332 }],
333 functions: vec![],
334 variables: vec![],
335 connectors: vec![],
336 patterns: vec![],
337 events: vec![],
338 }
339 }
340
341 fn window_op() -> LogicalOp {
342 LogicalOp::Window(WindowArgs {
343 duration: Expr::Duration(60_000_000_000), sliding: None,
345 policy: None,
346 session_gap: None,
347 })
348 }
349
350 fn filter_op(name: &str) -> LogicalOp {
351 LogicalOp::Filter(Expr::Binary {
352 op: BinOp::Gt,
353 left: Box::new(Expr::Ident(name.to_string())),
354 right: Box::new(Expr::Int(100)),
355 })
356 }
357
358 #[test]
359 fn test_filter_pushdown_before_window() {
360 let ops = vec![window_op(), filter_op("temperature")];
361 let plan = make_plan(ops);
362
363 let optimizer = Optimizer::default_rules();
364 let (optimized, passes) = optimizer.optimize(plan);
365
366 let ops = &optimized.streams[0].operations;
367 assert!(
368 matches!(ops[0], LogicalOp::Filter(_)),
369 "Filter should be pushed before Window"
370 );
371 assert!(
372 matches!(ops[1], LogicalOp::Window(_)),
373 "Window should come after Filter"
374 );
375 assert!(passes >= 1);
376 }
377
378 #[test]
379 fn test_filter_not_pushed_past_non_window() {
380 let ops = vec![LogicalOp::Aggregate(vec![]), filter_op("temperature")];
381 let plan = make_plan(ops);
382
383 let optimizer = Optimizer::default_rules();
384 let (optimized, _) = optimizer.optimize(plan);
385
386 let ops = &optimized.streams[0].operations;
387 assert!(matches!(ops[0], LogicalOp::Aggregate(_)));
389 assert!(matches!(ops[1], LogicalOp::Filter(_)));
390 }
391
392 #[test]
393 fn test_window_merge() {
394 let ops = vec![
395 LogicalOp::Window(WindowArgs {
396 duration: Expr::Duration(60_000_000_000),
397 sliding: None,
398 policy: None,
399 session_gap: None,
400 }),
401 LogicalOp::Window(WindowArgs {
402 duration: Expr::Duration(60_000_000_000),
403 sliding: None,
404 policy: None,
405 session_gap: None,
406 }),
407 ];
408 let plan = make_plan(ops);
409
410 let optimizer = Optimizer::default_rules();
411 let (optimized, _) = optimizer.optimize(plan);
412
413 assert_eq!(
414 optimized.streams[0].operations.len(),
415 1,
416 "Two identical windows should merge into one"
417 );
418 }
419
420 #[test]
421 fn test_window_no_merge_different_duration() {
422 let ops = vec![
423 LogicalOp::Window(WindowArgs {
424 duration: Expr::Duration(60_000_000_000),
425 sliding: None,
426 policy: None,
427 session_gap: None,
428 }),
429 LogicalOp::Window(WindowArgs {
430 duration: Expr::Duration(120_000_000_000),
431 sliding: None,
432 policy: None,
433 session_gap: None,
434 }),
435 ];
436 let plan = make_plan(ops);
437
438 let optimizer = Optimizer::default_rules();
439 let (optimized, _) = optimizer.optimize(plan);
440
441 assert_eq!(
442 optimized.streams[0].operations.len(),
443 2,
444 "Windows with different durations should not merge"
445 );
446 }
447
448 #[test]
449 fn test_projection_pruning() {
450 let ops = vec![
451 LogicalOp::Project(vec![SelectItem::Field("a".to_string())]),
452 LogicalOp::Project(vec![
453 SelectItem::Field("a".to_string()),
454 SelectItem::Field("b".to_string()),
455 ]),
456 ];
457 let plan = make_plan(ops);
458
459 let optimizer = Optimizer::default_rules();
460 let (optimized, _) = optimizer.optimize(plan);
461
462 assert_eq!(
463 optimized.streams[0].operations.len(),
464 1,
465 "Adjacent projections should be pruned to one"
466 );
467 if let LogicalOp::Project(items) = &optimized.streams[0].operations[0] {
468 assert_eq!(items.len(), 2, "Should keep the second (final) projection");
469 } else {
470 panic!("Expected Project op");
471 }
472 }
473
474 #[test]
475 fn test_temporal_filter_pushdown() {
476 let ops = vec![
477 window_op(),
478 LogicalOp::Filter(Expr::Binary {
479 op: BinOp::Gt,
480 left: Box::new(Expr::Ident("timestamp".to_string())),
481 right: Box::new(Expr::Int(1000)),
482 }),
483 ];
484 let plan = make_plan(ops);
485
486 let optimizer = Optimizer::default_rules();
487 let (optimized, _) = optimizer.optimize(plan);
488
489 let ops = &optimized.streams[0].operations;
490 assert!(
491 matches!(ops[0], LogicalOp::Filter(_)),
492 "Temporal filter should be pushed before Window"
493 );
494 }
495
496 #[test]
497 fn test_no_changes_returns_single_pass() {
498 let ops = vec![filter_op("temperature"), window_op()];
499 let plan = make_plan(ops);
500
501 let optimizer = Optimizer::default_rules();
502 let (_, passes) = optimizer.optimize(plan);
503
504 assert_eq!(passes, 1, "No changes should result in a single pass");
505 }
506
507 #[test]
508 fn test_optimize_plan_convenience() {
509 let ops = vec![window_op(), filter_op("temperature")];
510 let plan = make_plan(ops);
511
512 let optimized = optimize_plan(plan);
513 assert!(matches!(
514 optimized.streams[0].operations[0],
515 LogicalOp::Filter(_)
516 ));
517 }
518}