1use varpulis_core::ast::Expr;
14use varpulis_core::plan::{LogicalOp, LogicalPlan, LogicalStream};
15
16const MAX_PASSES: usize = 10;
18
19pub trait OptimizationRule: Send + Sync {
21 fn name(&self) -> &str;
23
24 fn apply(&self, stream: &mut LogicalStream) -> bool;
26}
27
28pub struct Optimizer {
30 rules: Vec<Box<dyn OptimizationRule>>,
31}
32
33impl std::fmt::Debug for Optimizer {
34 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35 f.debug_struct("Optimizer")
36 .field("rules_count", &self.rules.len())
37 .finish_non_exhaustive()
38 }
39}
40
41impl Optimizer {
42 pub fn default_rules() -> Self {
44 Self {
45 rules: vec![
46 Box::new(FilterPushdown),
47 Box::new(TemporalFilterPushdown),
48 Box::new(WindowMerge),
49 Box::new(ProjectionPruning),
50 ],
51 }
52 }
53
54 pub fn with_rules(rules: Vec<Box<dyn OptimizationRule>>) -> Self {
56 Self { rules }
57 }
58
59 pub fn optimize(&self, mut plan: LogicalPlan) -> (LogicalPlan, usize) {
63 let mut total_passes = 0;
64
65 for pass in 0..MAX_PASSES {
66 let mut any_changed = false;
67
68 for stream in &mut plan.streams {
69 for rule in &self.rules {
70 if rule.apply(stream) {
71 any_changed = true;
72 }
73 }
74 }
75
76 total_passes = pass + 1;
77 if !any_changed {
78 break;
79 }
80 }
81
82 (plan, total_passes)
83 }
84}
85
86pub fn optimize_plan(plan: LogicalPlan) -> LogicalPlan {
88 let optimizer = Optimizer::default_rules();
89 let (optimized, _) = optimizer.optimize(plan);
90 optimized
91}
92
93struct FilterPushdown;
103
104impl OptimizationRule for FilterPushdown {
105 fn name(&self) -> &'static str {
106 "FilterPushdown"
107 }
108
109 fn apply(&self, stream: &mut LogicalStream) -> bool {
110 let ops = &mut stream.operations;
111 let mut changed = false;
112
113 let mut i = 0;
116 while i < ops.len() {
117 if let LogicalOp::Filter(expr) = &ops[i] {
118 if i > 0 {
120 let prev_idx = i - 1;
121 let can_push = match &ops[prev_idx] {
122 LogicalOp::Window(_) => !references_aggregation_output(expr),
123 _ => false,
124 };
125
126 if can_push {
127 ops.swap(prev_idx, i);
128 changed = true;
129 continue;
131 }
132 }
133 }
134 i += 1;
135 }
136
137 changed
138 }
139}
140
141struct TemporalFilterPushdown;
150
151impl OptimizationRule for TemporalFilterPushdown {
152 fn name(&self) -> &'static str {
153 "TemporalFilterPushdown"
154 }
155
156 fn apply(&self, stream: &mut LogicalStream) -> bool {
157 let ops = &mut stream.operations;
158 let mut changed = false;
159
160 let mut i = 0;
161 while i < ops.len() {
162 if let LogicalOp::Filter(expr) = &ops[i] {
163 if is_temporal_filter(expr) && i > 0 {
164 let prev_idx = i - 1;
165 if matches!(ops[prev_idx], LogicalOp::Window(_)) {
166 ops.swap(prev_idx, i);
167 changed = true;
168 continue;
169 }
170 }
171 }
172 i += 1;
173 }
174
175 changed
176 }
177}
178
179struct WindowMerge;
188
189impl OptimizationRule for WindowMerge {
190 fn name(&self) -> &'static str {
191 "WindowMerge"
192 }
193
194 fn apply(&self, stream: &mut LogicalStream) -> bool {
195 let ops = &mut stream.operations;
196 if ops.len() < 2 {
197 return false;
198 }
199
200 let mut changed = false;
201 let mut i = 0;
202
203 while i + 1 < ops.len() {
204 let mergeable =
205 if let (LogicalOp::Window(a), LogicalOp::Window(b)) = (&ops[i], &ops[i + 1]) {
206 a.duration == b.duration
208 && a.sliding == b.sliding
209 && a.session_gap == b.session_gap
210 && a.policy == b.policy
211 } else {
212 false
213 };
214
215 if mergeable {
216 ops.remove(i + 1);
217 changed = true;
218 } else {
220 i += 1;
221 }
222 }
223
224 changed
225 }
226}
227
228struct ProjectionPruning;
235
236impl OptimizationRule for ProjectionPruning {
237 fn name(&self) -> &'static str {
238 "ProjectionPruning"
239 }
240
241 fn apply(&self, stream: &mut LogicalStream) -> bool {
242 let ops = &mut stream.operations;
243 if ops.len() < 2 {
244 return false;
245 }
246
247 let mut changed = false;
248 let mut i = 0;
249
250 while i + 1 < ops.len() {
251 let both_project = matches!(ops[i], LogicalOp::Project(_))
252 && matches!(ops[i + 1], LogicalOp::Project(_));
253
254 if both_project {
255 ops.remove(i);
256 changed = true;
257 } else {
259 i += 1;
260 }
261 }
262
263 changed
264 }
265}
266
267fn references_aggregation_output(expr: &Expr) -> bool {
277 match expr {
278 Expr::Call { .. } => true,
279 Expr::Binary { left, right, .. } => {
280 references_aggregation_output(left) || references_aggregation_output(right)
281 }
282 Expr::Unary { expr, .. } => references_aggregation_output(expr),
283 Expr::Member { expr, .. } => references_aggregation_output(expr),
284 _ => false,
285 }
286}
287
288fn is_temporal_filter(expr: &Expr) -> bool {
290 match expr {
291 Expr::Binary { left, right, .. } => {
292 is_timestamp_reference(left) || is_timestamp_reference(right)
293 }
294 _ => false,
295 }
296}
297
298fn is_timestamp_reference(expr: &Expr) -> bool {
299 match expr {
300 Expr::Ident(name) => {
301 let lower = name.to_lowercase();
302 lower.contains("timestamp")
303 || lower.contains("time")
304 || lower == "ts"
305 || lower == "event_time"
306 }
307 Expr::Member { member, .. } => {
308 let lower = member.to_lowercase();
309 lower.contains("timestamp")
310 || lower.contains("time")
311 || lower == "ts"
312 || lower == "event_time"
313 }
314 _ => false,
315 }
316}
317
318#[cfg(test)]
319mod tests {
320 use varpulis_core::ast::{BinOp, Expr, SelectItem, WindowArgs};
321 use varpulis_core::plan::{LogicalOp, LogicalPlan, LogicalSource, LogicalStream};
322
323 use super::*;
324
325 fn make_plan(ops: Vec<LogicalOp>) -> LogicalPlan {
326 LogicalPlan {
327 streams: vec![LogicalStream {
328 id: 0,
329 name: "Test".to_string(),
330 source: LogicalSource::EventType("E".to_string()),
331 operations: ops,
332 estimated_cardinality: None,
333 }],
334 functions: vec![],
335 variables: vec![],
336 connectors: vec![],
337 patterns: vec![],
338 events: vec![],
339 }
340 }
341
342 fn window_op() -> LogicalOp {
343 LogicalOp::Window(WindowArgs {
344 duration: Expr::Duration(60_000_000_000), sliding: None,
346 policy: None,
347 session_gap: None,
348 })
349 }
350
351 fn filter_op(name: &str) -> LogicalOp {
352 LogicalOp::Filter(Expr::Binary {
353 op: BinOp::Gt,
354 left: Box::new(Expr::Ident(name.to_string())),
355 right: Box::new(Expr::Int(100)),
356 })
357 }
358
359 #[test]
360 fn test_filter_pushdown_before_window() {
361 let ops = vec![window_op(), filter_op("temperature")];
362 let plan = make_plan(ops);
363
364 let optimizer = Optimizer::default_rules();
365 let (optimized, passes) = optimizer.optimize(plan);
366
367 let ops = &optimized.streams[0].operations;
368 assert!(
369 matches!(ops[0], LogicalOp::Filter(_)),
370 "Filter should be pushed before Window"
371 );
372 assert!(
373 matches!(ops[1], LogicalOp::Window(_)),
374 "Window should come after Filter"
375 );
376 assert!(passes >= 1);
377 }
378
379 #[test]
380 fn test_filter_not_pushed_past_non_window() {
381 let ops = vec![LogicalOp::Aggregate(vec![]), filter_op("temperature")];
382 let plan = make_plan(ops);
383
384 let optimizer = Optimizer::default_rules();
385 let (optimized, _) = optimizer.optimize(plan);
386
387 let ops = &optimized.streams[0].operations;
388 assert!(matches!(ops[0], LogicalOp::Aggregate(_)));
390 assert!(matches!(ops[1], LogicalOp::Filter(_)));
391 }
392
393 #[test]
394 fn test_window_merge() {
395 let ops = vec![
396 LogicalOp::Window(WindowArgs {
397 duration: Expr::Duration(60_000_000_000),
398 sliding: None,
399 policy: None,
400 session_gap: None,
401 }),
402 LogicalOp::Window(WindowArgs {
403 duration: Expr::Duration(60_000_000_000),
404 sliding: None,
405 policy: None,
406 session_gap: None,
407 }),
408 ];
409 let plan = make_plan(ops);
410
411 let optimizer = Optimizer::default_rules();
412 let (optimized, _) = optimizer.optimize(plan);
413
414 assert_eq!(
415 optimized.streams[0].operations.len(),
416 1,
417 "Two identical windows should merge into one"
418 );
419 }
420
421 #[test]
422 fn test_window_no_merge_different_duration() {
423 let ops = vec![
424 LogicalOp::Window(WindowArgs {
425 duration: Expr::Duration(60_000_000_000),
426 sliding: None,
427 policy: None,
428 session_gap: None,
429 }),
430 LogicalOp::Window(WindowArgs {
431 duration: Expr::Duration(120_000_000_000),
432 sliding: None,
433 policy: None,
434 session_gap: None,
435 }),
436 ];
437 let plan = make_plan(ops);
438
439 let optimizer = Optimizer::default_rules();
440 let (optimized, _) = optimizer.optimize(plan);
441
442 assert_eq!(
443 optimized.streams[0].operations.len(),
444 2,
445 "Windows with different durations should not merge"
446 );
447 }
448
449 #[test]
450 fn test_projection_pruning() {
451 let ops = vec![
452 LogicalOp::Project(vec![SelectItem::Field("a".to_string())]),
453 LogicalOp::Project(vec![
454 SelectItem::Field("a".to_string()),
455 SelectItem::Field("b".to_string()),
456 ]),
457 ];
458 let plan = make_plan(ops);
459
460 let optimizer = Optimizer::default_rules();
461 let (optimized, _) = optimizer.optimize(plan);
462
463 assert_eq!(
464 optimized.streams[0].operations.len(),
465 1,
466 "Adjacent projections should be pruned to one"
467 );
468 if let LogicalOp::Project(items) = &optimized.streams[0].operations[0] {
469 assert_eq!(items.len(), 2, "Should keep the second (final) projection");
470 } else {
471 panic!("Expected Project op");
472 }
473 }
474
475 #[test]
476 fn test_temporal_filter_pushdown() {
477 let ops = vec![
478 window_op(),
479 LogicalOp::Filter(Expr::Binary {
480 op: BinOp::Gt,
481 left: Box::new(Expr::Ident("timestamp".to_string())),
482 right: Box::new(Expr::Int(1000)),
483 }),
484 ];
485 let plan = make_plan(ops);
486
487 let optimizer = Optimizer::default_rules();
488 let (optimized, _) = optimizer.optimize(plan);
489
490 let ops = &optimized.streams[0].operations;
491 assert!(
492 matches!(ops[0], LogicalOp::Filter(_)),
493 "Temporal filter should be pushed before Window"
494 );
495 }
496
497 #[test]
498 fn test_no_changes_returns_single_pass() {
499 let ops = vec![filter_op("temperature"), window_op()];
500 let plan = make_plan(ops);
501
502 let optimizer = Optimizer::default_rules();
503 let (_, passes) = optimizer.optimize(plan);
504
505 assert_eq!(passes, 1, "No changes should result in a single pass");
506 }
507
508 #[test]
509 fn test_optimize_plan_convenience() {
510 let ops = vec![window_op(), filter_op("temperature")];
511 let plan = make_plan(ops);
512
513 let optimized = optimize_plan(plan);
514 assert!(matches!(
515 optimized.streams[0].operations[0],
516 LogicalOp::Filter(_)
517 ));
518 }
519}