rust_rule_engine/streaming/
join_optimizer.rs

1use crate::rete::stream_join_node::{JoinStrategy, JoinType};
2use std::time::Duration;
3
4/// Join optimization strategies
5#[derive(Debug, Clone)]
6pub enum JoinOptimization {
7    /// Build smaller stream as the hash table (for hash joins)
8    BuildSmaller,
9    /// Pre-partition streams by join key
10    PrePartition { partition_count: usize },
11    /// Use bloom filters to skip non-matching events early
12    BloomFilter { false_positive_rate: f64 },
13    /// Index the join key for faster lookups
14    IndexJoinKey,
15    /// Merge overlapping time windows
16    MergeWindows,
17}
18
19/// Statistics about a stream for optimization decisions
20#[derive(Debug, Clone)]
21pub struct StreamStats {
22    pub stream_name: String,
23    pub estimated_event_rate: f64,    // events per second
24    pub estimated_cardinality: usize, // unique join key values
25    pub average_event_size: usize,    // bytes
26}
27
28/// Join plan with optimization recommendations
29#[derive(Debug, Clone)]
30pub struct OptimizedJoinPlan {
31    pub left_stream: String,
32    pub right_stream: String,
33    pub join_type: JoinType,
34    pub join_strategy: JoinStrategy,
35    pub optimizations: Vec<JoinOptimization>,
36    pub estimated_cost: f64,
37    pub explanation: String,
38}
39
40/// Join optimizer - analyzes stream characteristics and suggests optimizations
41pub struct JoinOptimizer {
42    /// Statistics for known streams
43    stream_stats: Vec<StreamStats>,
44}
45
46impl JoinOptimizer {
47    /// Create a new join optimizer
48    pub fn new() -> Self {
49        Self {
50            stream_stats: Vec::new(),
51        }
52    }
53
54    /// Register stream statistics for optimization
55    pub fn register_stream_stats(&mut self, stats: StreamStats) {
56        // Remove existing stats for this stream
57        self.stream_stats
58            .retain(|s| s.stream_name != stats.stream_name);
59        self.stream_stats.push(stats);
60    }
61
62    /// Optimize a join plan based on stream characteristics
63    pub fn optimize_join(
64        &self,
65        left_stream: &str,
66        right_stream: &str,
67        join_type: JoinType,
68        join_strategy: JoinStrategy,
69    ) -> OptimizedJoinPlan {
70        let left_stats = self
71            .stream_stats
72            .iter()
73            .find(|s| s.stream_name == left_stream);
74        let right_stats = self
75            .stream_stats
76            .iter()
77            .find(|s| s.stream_name == right_stream);
78
79        let mut optimizations = Vec::new();
80        let mut explanation = String::new();
81        let mut estimated_cost = 1.0;
82
83        // Optimization 1: Build smaller stream
84        if let (Some(left), Some(right)) = (left_stats, right_stats) {
85            let left_size = left.estimated_event_rate * left.average_event_size as f64;
86            let right_size = right.estimated_event_rate * right.average_event_size as f64;
87
88            if left_size < right_size * 0.7 {
89                optimizations.push(JoinOptimization::BuildSmaller);
90                explanation.push_str("Using left stream as build side (smaller). ");
91                estimated_cost *= 0.8;
92            } else if right_size < left_size * 0.7 {
93                optimizations.push(JoinOptimization::BuildSmaller);
94                explanation.push_str("Using right stream as build side (smaller). ");
95                estimated_cost *= 0.8;
96            }
97        }
98
99        // Optimization 2: Pre-partitioning for high-cardinality joins
100        if let (Some(left), Some(right)) = (left_stats, right_stats) {
101            let max_cardinality = left.estimated_cardinality.max(right.estimated_cardinality);
102            if max_cardinality > 1000 {
103                let partition_count = (max_cardinality / 100).min(32);
104                optimizations.push(JoinOptimization::PrePartition { partition_count });
105                explanation.push_str(&format!(
106                    "Pre-partitioning into {} partitions for high cardinality. ",
107                    partition_count
108                ));
109                estimated_cost *= 0.7;
110            }
111        }
112
113        // Optimization 3: Bloom filter for sparse joins
114        if let (Some(left), Some(right)) = (left_stats, right_stats) {
115            let join_selectivity = (left.estimated_cardinality.min(right.estimated_cardinality)
116                as f64)
117                / (left.estimated_cardinality.max(right.estimated_cardinality) as f64);
118
119            if join_selectivity < 0.1 {
120                // Very sparse join
121                optimizations.push(JoinOptimization::BloomFilter {
122                    false_positive_rate: 0.01,
123                });
124                explanation.push_str("Using bloom filter for sparse join (< 10% selectivity). ");
125                estimated_cost *= 0.6;
126            }
127        }
128
129        // Optimization 4: Index join key for frequent lookups
130        if let (Some(left), Some(right)) = (left_stats, right_stats) {
131            if left.estimated_event_rate > 100.0 || right.estimated_event_rate > 100.0 {
132                optimizations.push(JoinOptimization::IndexJoinKey);
133                explanation.push_str("Indexing join key for high-frequency streams. ");
134                estimated_cost *= 0.85;
135            }
136        }
137
138        // Optimization 5: Window merging for tumbling windows
139        if let JoinStrategy::TimeWindow { duration } = &join_strategy {
140            if duration.as_secs() >= 60 {
141                optimizations.push(JoinOptimization::MergeWindows);
142                explanation.push_str("Merging adjacent windows for efficiency. ");
143                estimated_cost *= 0.9;
144            }
145        }
146
147        // Adjust cost based on join type
148        match join_type {
149            JoinType::Inner => {
150                explanation.push_str("Inner join - most efficient. ");
151            }
152            JoinType::LeftOuter | JoinType::RightOuter => {
153                explanation.push_str("Outer join - tracking unmatched events. ");
154                estimated_cost *= 1.2;
155            }
156            JoinType::FullOuter => {
157                explanation.push_str("Full outer join - tracking all unmatched events. ");
158                estimated_cost *= 1.4;
159            }
160        }
161
162        if optimizations.is_empty() {
163            explanation.push_str("No specific optimizations recommended - using default strategy.");
164        }
165
166        OptimizedJoinPlan {
167            left_stream: left_stream.to_string(),
168            right_stream: right_stream.to_string(),
169            join_type,
170            join_strategy,
171            optimizations,
172            estimated_cost,
173            explanation: explanation.trim().to_string(),
174        }
175    }
176
177    /// Estimate memory usage for a join
178    pub fn estimate_memory_usage(
179        &self,
180        left_stream: &str,
181        right_stream: &str,
182        window_duration: Duration,
183    ) -> usize {
184        let left_stats = self
185            .stream_stats
186            .iter()
187            .find(|s| s.stream_name == left_stream);
188        let right_stats = self
189            .stream_stats
190            .iter()
191            .find(|s| s.stream_name == right_stream);
192
193        let mut total_memory = 0;
194
195        if let Some(left) = left_stats {
196            let events_in_window =
197                (left.estimated_event_rate * window_duration.as_secs_f64()) as usize;
198            total_memory += events_in_window * left.average_event_size;
199        }
200
201        if let Some(right) = right_stats {
202            let events_in_window =
203                (right.estimated_event_rate * window_duration.as_secs_f64()) as usize;
204            total_memory += events_in_window * right.average_event_size;
205        }
206
207        // Add overhead for hash tables and tracking structures (roughly 50%)
208        (total_memory as f64 * 1.5) as usize
209    }
210
211    /// Recommend optimal window size based on stream characteristics
212    pub fn recommend_window_size(
213        &self,
214        left_stream: &str,
215        right_stream: &str,
216        max_memory_bytes: usize,
217    ) -> Duration {
218        let left_stats = self
219            .stream_stats
220            .iter()
221            .find(|s| s.stream_name == left_stream);
222        let right_stats = self
223            .stream_stats
224            .iter()
225            .find(|s| s.stream_name == right_stream);
226
227        // Default to 5 minutes if no stats
228        let default_window = Duration::from_secs(300);
229
230        match (left_stats, right_stats) {
231            (Some(left), Some(right)) => {
232                let combined_rate = left.estimated_event_rate + right.estimated_event_rate;
233                let avg_event_size = (left.average_event_size + right.average_event_size) / 2;
234
235                // Calculate maximum window duration that fits in memory
236                let max_events = max_memory_bytes / avg_event_size;
237                let max_duration_secs = (max_events as f64 / combined_rate) as u64;
238
239                // Use 80% of max to leave buffer
240                let recommended_secs = (max_duration_secs as f64 * 0.8) as u64;
241
242                // Clamp between 10 seconds and 1 hour
243                let clamped_secs = recommended_secs.clamp(10, 3600);
244
245                Duration::from_secs(clamped_secs)
246            }
247            _ => default_window,
248        }
249    }
250}
251
252impl Default for JoinOptimizer {
253    fn default() -> Self {
254        Self::new()
255    }
256}
257
258#[cfg(test)]
259mod tests {
260    use super::*;
261
262    #[test]
263    fn test_build_smaller_optimization() {
264        let mut optimizer = JoinOptimizer::new();
265
266        optimizer.register_stream_stats(StreamStats {
267            stream_name: "small".to_string(),
268            estimated_event_rate: 10.0,
269            estimated_cardinality: 100,
270            average_event_size: 100,
271        });
272
273        optimizer.register_stream_stats(StreamStats {
274            stream_name: "large".to_string(),
275            estimated_event_rate: 100.0,
276            estimated_cardinality: 1000,
277            average_event_size: 100,
278        });
279
280        let plan = optimizer.optimize_join(
281            "small",
282            "large",
283            JoinType::Inner,
284            JoinStrategy::TimeWindow {
285                duration: Duration::from_secs(60),
286            },
287        );
288
289        assert!(plan
290            .optimizations
291            .iter()
292            .any(|opt| matches!(opt, JoinOptimization::BuildSmaller)));
293        assert!(plan.estimated_cost < 1.0); // Should be optimized
294    }
295
296    #[test]
297    fn test_pre_partition_optimization() {
298        let mut optimizer = JoinOptimizer::new();
299
300        optimizer.register_stream_stats(StreamStats {
301            stream_name: "high_cardinality".to_string(),
302            estimated_event_rate: 50.0,
303            estimated_cardinality: 5000,
304            average_event_size: 200,
305        });
306
307        optimizer.register_stream_stats(StreamStats {
308            stream_name: "other".to_string(),
309            estimated_event_rate: 50.0,
310            estimated_cardinality: 100,
311            average_event_size: 200,
312        });
313
314        let plan = optimizer.optimize_join(
315            "high_cardinality",
316            "other",
317            JoinType::Inner,
318            JoinStrategy::TimeWindow {
319                duration: Duration::from_secs(60),
320            },
321        );
322
323        assert!(plan
324            .optimizations
325            .iter()
326            .any(|opt| matches!(opt, JoinOptimization::PrePartition { .. })));
327    }
328
329    #[test]
330    fn test_bloom_filter_for_sparse_join() {
331        let mut optimizer = JoinOptimizer::new();
332
333        optimizer.register_stream_stats(StreamStats {
334            stream_name: "sparse_left".to_string(),
335            estimated_event_rate: 50.0,
336            estimated_cardinality: 50,
337            average_event_size: 200,
338        });
339
340        optimizer.register_stream_stats(StreamStats {
341            stream_name: "sparse_right".to_string(),
342            estimated_event_rate: 50.0,
343            estimated_cardinality: 1000,
344            average_event_size: 200,
345        });
346
347        let plan = optimizer.optimize_join(
348            "sparse_left",
349            "sparse_right",
350            JoinType::Inner,
351            JoinStrategy::TimeWindow {
352                duration: Duration::from_secs(60),
353            },
354        );
355
356        assert!(plan
357            .optimizations
358            .iter()
359            .any(|opt| matches!(opt, JoinOptimization::BloomFilter { .. })));
360    }
361
362    #[test]
363    fn test_memory_estimation() {
364        let mut optimizer = JoinOptimizer::new();
365
366        optimizer.register_stream_stats(StreamStats {
367            stream_name: "stream1".to_string(),
368            estimated_event_rate: 100.0, // 100 events/sec
369            estimated_cardinality: 100,
370            average_event_size: 1000, // 1KB per event
371        });
372
373        optimizer.register_stream_stats(StreamStats {
374            stream_name: "stream2".to_string(),
375            estimated_event_rate: 100.0,
376            estimated_cardinality: 100,
377            average_event_size: 1000,
378        });
379
380        let memory = optimizer.estimate_memory_usage(
381            "stream1",
382            "stream2",
383            Duration::from_secs(10), // 10 second window
384        );
385
386        // Should be roughly: 100 events/sec * 10 sec * 1KB * 2 streams * 1.5 overhead
387        // = 3MB
388        assert!(memory > 2_000_000 && memory < 4_000_000);
389    }
390
391    #[test]
392    fn test_window_size_recommendation() {
393        let mut optimizer = JoinOptimizer::new();
394
395        optimizer.register_stream_stats(StreamStats {
396            stream_name: "stream1".to_string(),
397            estimated_event_rate: 100.0,
398            estimated_cardinality: 100,
399            average_event_size: 1000,
400        });
401
402        optimizer.register_stream_stats(StreamStats {
403            stream_name: "stream2".to_string(),
404            estimated_event_rate: 100.0,
405            estimated_cardinality: 100,
406            average_event_size: 1000,
407        });
408
409        let window = optimizer.recommend_window_size(
410            "stream1", "stream2", 10_000_000, // 10MB max memory
411        );
412
413        // With 200 events/sec at 1KB each, 10MB should allow ~50 seconds
414        // But we use 80% of that, so ~40 seconds
415        assert!(window.as_secs() >= 30 && window.as_secs() <= 50);
416    }
417
418    #[test]
419    fn test_outer_join_cost_adjustment() {
420        let optimizer = JoinOptimizer::new();
421
422        let inner_plan = optimizer.optimize_join(
423            "left",
424            "right",
425            JoinType::Inner,
426            JoinStrategy::TimeWindow {
427                duration: Duration::from_secs(60),
428            },
429        );
430
431        let full_outer_plan = optimizer.optimize_join(
432            "left",
433            "right",
434            JoinType::FullOuter,
435            JoinStrategy::TimeWindow {
436                duration: Duration::from_secs(60),
437            },
438        );
439
440        // Full outer join should be more expensive than inner join
441        assert!(full_outer_plan.estimated_cost > inner_plan.estimated_cost);
442    }
443}