rust_rule_engine/streaming/
join_optimizer.rs1use crate::rete::stream_join_node::{JoinStrategy, JoinType};
2use std::time::Duration;
3
4#[derive(Debug, Clone)]
6pub enum JoinOptimization {
7 BuildSmaller,
9 PrePartition { partition_count: usize },
11 BloomFilter { false_positive_rate: f64 },
13 IndexJoinKey,
15 MergeWindows,
17}
18
19#[derive(Debug, Clone)]
21pub struct StreamStats {
22 pub stream_name: String,
23 pub estimated_event_rate: f64, pub estimated_cardinality: usize, pub average_event_size: usize, }
27
28#[derive(Debug, Clone)]
30pub struct OptimizedJoinPlan {
31 pub left_stream: String,
32 pub right_stream: String,
33 pub join_type: JoinType,
34 pub join_strategy: JoinStrategy,
35 pub optimizations: Vec<JoinOptimization>,
36 pub estimated_cost: f64,
37 pub explanation: String,
38}
39
40pub struct JoinOptimizer {
42 stream_stats: Vec<StreamStats>,
44}
45
46impl JoinOptimizer {
47 pub fn new() -> Self {
49 Self {
50 stream_stats: Vec::new(),
51 }
52 }
53
54 pub fn register_stream_stats(&mut self, stats: StreamStats) {
56 self.stream_stats
58 .retain(|s| s.stream_name != stats.stream_name);
59 self.stream_stats.push(stats);
60 }
61
62 pub fn optimize_join(
64 &self,
65 left_stream: &str,
66 right_stream: &str,
67 join_type: JoinType,
68 join_strategy: JoinStrategy,
69 ) -> OptimizedJoinPlan {
70 let left_stats = self
71 .stream_stats
72 .iter()
73 .find(|s| s.stream_name == left_stream);
74 let right_stats = self
75 .stream_stats
76 .iter()
77 .find(|s| s.stream_name == right_stream);
78
79 let mut optimizations = Vec::new();
80 let mut explanation = String::new();
81 let mut estimated_cost = 1.0;
82
83 if let (Some(left), Some(right)) = (left_stats, right_stats) {
85 let left_size = left.estimated_event_rate * left.average_event_size as f64;
86 let right_size = right.estimated_event_rate * right.average_event_size as f64;
87
88 if left_size < right_size * 0.7 {
89 optimizations.push(JoinOptimization::BuildSmaller);
90 explanation.push_str("Using left stream as build side (smaller). ");
91 estimated_cost *= 0.8;
92 } else if right_size < left_size * 0.7 {
93 optimizations.push(JoinOptimization::BuildSmaller);
94 explanation.push_str("Using right stream as build side (smaller). ");
95 estimated_cost *= 0.8;
96 }
97 }
98
99 if let (Some(left), Some(right)) = (left_stats, right_stats) {
101 let max_cardinality = left.estimated_cardinality.max(right.estimated_cardinality);
102 if max_cardinality > 1000 {
103 let partition_count = (max_cardinality / 100).min(32);
104 optimizations.push(JoinOptimization::PrePartition { partition_count });
105 explanation.push_str(&format!(
106 "Pre-partitioning into {} partitions for high cardinality. ",
107 partition_count
108 ));
109 estimated_cost *= 0.7;
110 }
111 }
112
113 if let (Some(left), Some(right)) = (left_stats, right_stats) {
115 let join_selectivity = (left.estimated_cardinality.min(right.estimated_cardinality)
116 as f64)
117 / (left.estimated_cardinality.max(right.estimated_cardinality) as f64);
118
119 if join_selectivity < 0.1 {
120 optimizations.push(JoinOptimization::BloomFilter {
122 false_positive_rate: 0.01,
123 });
124 explanation.push_str("Using bloom filter for sparse join (< 10% selectivity). ");
125 estimated_cost *= 0.6;
126 }
127 }
128
129 if let (Some(left), Some(right)) = (left_stats, right_stats) {
131 if left.estimated_event_rate > 100.0 || right.estimated_event_rate > 100.0 {
132 optimizations.push(JoinOptimization::IndexJoinKey);
133 explanation.push_str("Indexing join key for high-frequency streams. ");
134 estimated_cost *= 0.85;
135 }
136 }
137
138 if let JoinStrategy::TimeWindow { duration } = &join_strategy {
140 if duration.as_secs() >= 60 {
141 optimizations.push(JoinOptimization::MergeWindows);
142 explanation.push_str("Merging adjacent windows for efficiency. ");
143 estimated_cost *= 0.9;
144 }
145 }
146
147 match join_type {
149 JoinType::Inner => {
150 explanation.push_str("Inner join - most efficient. ");
151 }
152 JoinType::LeftOuter | JoinType::RightOuter => {
153 explanation.push_str("Outer join - tracking unmatched events. ");
154 estimated_cost *= 1.2;
155 }
156 JoinType::FullOuter => {
157 explanation.push_str("Full outer join - tracking all unmatched events. ");
158 estimated_cost *= 1.4;
159 }
160 }
161
162 if optimizations.is_empty() {
163 explanation.push_str("No specific optimizations recommended - using default strategy.");
164 }
165
166 OptimizedJoinPlan {
167 left_stream: left_stream.to_string(),
168 right_stream: right_stream.to_string(),
169 join_type,
170 join_strategy,
171 optimizations,
172 estimated_cost,
173 explanation: explanation.trim().to_string(),
174 }
175 }
176
177 pub fn estimate_memory_usage(
179 &self,
180 left_stream: &str,
181 right_stream: &str,
182 window_duration: Duration,
183 ) -> usize {
184 let left_stats = self
185 .stream_stats
186 .iter()
187 .find(|s| s.stream_name == left_stream);
188 let right_stats = self
189 .stream_stats
190 .iter()
191 .find(|s| s.stream_name == right_stream);
192
193 let mut total_memory = 0;
194
195 if let Some(left) = left_stats {
196 let events_in_window =
197 (left.estimated_event_rate * window_duration.as_secs_f64()) as usize;
198 total_memory += events_in_window * left.average_event_size;
199 }
200
201 if let Some(right) = right_stats {
202 let events_in_window =
203 (right.estimated_event_rate * window_duration.as_secs_f64()) as usize;
204 total_memory += events_in_window * right.average_event_size;
205 }
206
207 (total_memory as f64 * 1.5) as usize
209 }
210
211 pub fn recommend_window_size(
213 &self,
214 left_stream: &str,
215 right_stream: &str,
216 max_memory_bytes: usize,
217 ) -> Duration {
218 let left_stats = self
219 .stream_stats
220 .iter()
221 .find(|s| s.stream_name == left_stream);
222 let right_stats = self
223 .stream_stats
224 .iter()
225 .find(|s| s.stream_name == right_stream);
226
227 let default_window = Duration::from_secs(300);
229
230 match (left_stats, right_stats) {
231 (Some(left), Some(right)) => {
232 let combined_rate = left.estimated_event_rate + right.estimated_event_rate;
233 let avg_event_size = (left.average_event_size + right.average_event_size) / 2;
234
235 let max_events = max_memory_bytes / avg_event_size;
237 let max_duration_secs = (max_events as f64 / combined_rate) as u64;
238
239 let recommended_secs = (max_duration_secs as f64 * 0.8) as u64;
241
242 let clamped_secs = recommended_secs.max(10).min(3600);
244
245 Duration::from_secs(clamped_secs)
246 }
247 _ => default_window,
248 }
249 }
250}
251
252impl Default for JoinOptimizer {
253 fn default() -> Self {
254 Self::new()
255 }
256}
257
258#[cfg(test)]
259mod tests {
260 use super::*;
261
262 #[test]
263 fn test_build_smaller_optimization() {
264 let mut optimizer = JoinOptimizer::new();
265
266 optimizer.register_stream_stats(StreamStats {
267 stream_name: "small".to_string(),
268 estimated_event_rate: 10.0,
269 estimated_cardinality: 100,
270 average_event_size: 100,
271 });
272
273 optimizer.register_stream_stats(StreamStats {
274 stream_name: "large".to_string(),
275 estimated_event_rate: 100.0,
276 estimated_cardinality: 1000,
277 average_event_size: 100,
278 });
279
280 let plan = optimizer.optimize_join(
281 "small",
282 "large",
283 JoinType::Inner,
284 JoinStrategy::TimeWindow {
285 duration: Duration::from_secs(60),
286 },
287 );
288
289 assert!(plan
290 .optimizations
291 .iter()
292 .any(|opt| matches!(opt, JoinOptimization::BuildSmaller)));
293 assert!(plan.estimated_cost < 1.0); }
295
296 #[test]
297 fn test_pre_partition_optimization() {
298 let mut optimizer = JoinOptimizer::new();
299
300 optimizer.register_stream_stats(StreamStats {
301 stream_name: "high_cardinality".to_string(),
302 estimated_event_rate: 50.0,
303 estimated_cardinality: 5000,
304 average_event_size: 200,
305 });
306
307 optimizer.register_stream_stats(StreamStats {
308 stream_name: "other".to_string(),
309 estimated_event_rate: 50.0,
310 estimated_cardinality: 100,
311 average_event_size: 200,
312 });
313
314 let plan = optimizer.optimize_join(
315 "high_cardinality",
316 "other",
317 JoinType::Inner,
318 JoinStrategy::TimeWindow {
319 duration: Duration::from_secs(60),
320 },
321 );
322
323 assert!(plan
324 .optimizations
325 .iter()
326 .any(|opt| matches!(opt, JoinOptimization::PrePartition { .. })));
327 }
328
329 #[test]
330 fn test_bloom_filter_for_sparse_join() {
331 let mut optimizer = JoinOptimizer::new();
332
333 optimizer.register_stream_stats(StreamStats {
334 stream_name: "sparse_left".to_string(),
335 estimated_event_rate: 50.0,
336 estimated_cardinality: 50,
337 average_event_size: 200,
338 });
339
340 optimizer.register_stream_stats(StreamStats {
341 stream_name: "sparse_right".to_string(),
342 estimated_event_rate: 50.0,
343 estimated_cardinality: 1000,
344 average_event_size: 200,
345 });
346
347 let plan = optimizer.optimize_join(
348 "sparse_left",
349 "sparse_right",
350 JoinType::Inner,
351 JoinStrategy::TimeWindow {
352 duration: Duration::from_secs(60),
353 },
354 );
355
356 assert!(plan
357 .optimizations
358 .iter()
359 .any(|opt| matches!(opt, JoinOptimization::BloomFilter { .. })));
360 }
361
362 #[test]
363 fn test_memory_estimation() {
364 let mut optimizer = JoinOptimizer::new();
365
366 optimizer.register_stream_stats(StreamStats {
367 stream_name: "stream1".to_string(),
368 estimated_event_rate: 100.0, estimated_cardinality: 100,
370 average_event_size: 1000, });
372
373 optimizer.register_stream_stats(StreamStats {
374 stream_name: "stream2".to_string(),
375 estimated_event_rate: 100.0,
376 estimated_cardinality: 100,
377 average_event_size: 1000,
378 });
379
380 let memory = optimizer.estimate_memory_usage(
381 "stream1",
382 "stream2",
383 Duration::from_secs(10), );
385
386 assert!(memory > 2_000_000 && memory < 4_000_000);
389 }
390
391 #[test]
392 fn test_window_size_recommendation() {
393 let mut optimizer = JoinOptimizer::new();
394
395 optimizer.register_stream_stats(StreamStats {
396 stream_name: "stream1".to_string(),
397 estimated_event_rate: 100.0,
398 estimated_cardinality: 100,
399 average_event_size: 1000,
400 });
401
402 optimizer.register_stream_stats(StreamStats {
403 stream_name: "stream2".to_string(),
404 estimated_event_rate: 100.0,
405 estimated_cardinality: 100,
406 average_event_size: 1000,
407 });
408
409 let window = optimizer.recommend_window_size(
410 "stream1", "stream2", 10_000_000, );
412
413 assert!(window.as_secs() >= 30 && window.as_secs() <= 50);
416 }
417
418 #[test]
419 fn test_outer_join_cost_adjustment() {
420 let optimizer = JoinOptimizer::new();
421
422 let inner_plan = optimizer.optimize_join(
423 "left",
424 "right",
425 JoinType::Inner,
426 JoinStrategy::TimeWindow {
427 duration: Duration::from_secs(60),
428 },
429 );
430
431 let full_outer_plan = optimizer.optimize_join(
432 "left",
433 "right",
434 JoinType::FullOuter,
435 JoinStrategy::TimeWindow {
436 duration: Duration::from_secs(60),
437 },
438 );
439
440 assert!(full_outer_plan.estimated_cost > inner_plan.estimated_cost);
442 }
443}