oxirs_stream/performance_optimizer/
mod.rs

1//! # Advanced Performance Optimizer
2//!
3//! High-performance optimizations for achieving 100K+ events/second throughput
4//! with <10ms latency. Implements advanced batching, memory pooling, zero-copy operations,
5//! and parallel processing optimizations.
6//!
7//! ## Modules
8//!
9//! - `config`: Configuration structures for performance optimization
10//! - `memory`: Memory management and pooling
11//! - `batching`: Adaptive batching for throughput optimization
12//! - `ml`: Machine learning components for performance prediction
13//! - `compression`: Compression and bandwidth optimization
14//! - `parallel`: Parallel processing optimizations
15//!
16//! ## Performance Targets
17//!
18//! - **Throughput**: 100K+ events/second sustained
19//! - **Latency**: P99 <10ms for real-time processing
20//! - **Memory**: Efficient memory usage with pooling
21//! - **CPU**: Optimal CPU utilization with parallel processing
22
23pub mod batching;
24pub mod config;
25pub mod memory;
26pub mod ml;
27
28// Re-export commonly used types
29pub use batching::{AdaptiveBatcher, BatchPerformancePoint, BatchSizePredictor, BatchingStats};
30pub use config::{
31    BatchConfig, CompressionAlgorithm, CompressionConfig, EnhancedMLConfig, LoadBalancingStrategy,
32    MemoryPoolConfig, ParallelConfig, PerformanceConfig,
33};
34pub use memory::{AllocationStrategy, MemoryHandle, MemoryPool, MemoryPoolStats};
35pub use ml::{
36    ConfigParams, LinearRegressionModel, ModelStats, PerformanceMetrics, PerformancePredictor,
37};
38
39// TODO: The following modules would be created in subsequent refactoring steps:
40// pub mod compression;  // Compression and bandwidth optimization
41// pub mod parallel;     // Parallel processing optimizations
42
43// Re-export types that are currently in the original file but would be moved
44// to appropriate modules in a complete refactoring:
45
46use crate::StreamEvent;
47use anyhow::Result;
48use serde::{Deserialize, Serialize};
49use std::sync::atomic::{AtomicU64, Ordering};
50use std::time::{Duration, Instant};
51
52/// Processing result for batch operations
53#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct ProcessingResult {
55    /// Number of events processed
56    pub events_processed: usize,
57    /// Processing time in milliseconds
58    pub processing_time_ms: u64,
59    /// Success rate
60    pub success_rate: f64,
61    /// Errors encountered
62    pub errors: Vec<String>,
63}
64
65/// Processing statistics
66#[derive(Debug)]
67pub struct ProcessingStats {
68    /// Total events processed
69    pub total_events: AtomicU64,
70    /// Total processing time
71    pub total_processing_time_ms: AtomicU64,
72    /// Average processing time per event
73    pub avg_processing_time_ms: AtomicU64,
74    /// Throughput in events per second
75    pub throughput_eps: AtomicU64,
76    /// Peak throughput
77    pub peak_throughput_eps: AtomicU64,
78    /// Error count
79    pub error_count: AtomicU64,
80    /// Success rate
81    pub success_rate: f64,
82}
83
84impl Default for ProcessingStats {
85    fn default() -> Self {
86        Self {
87            total_events: AtomicU64::new(0),
88            total_processing_time_ms: AtomicU64::new(0),
89            avg_processing_time_ms: AtomicU64::new(0),
90            throughput_eps: AtomicU64::new(0),
91            peak_throughput_eps: AtomicU64::new(0),
92            error_count: AtomicU64::new(0),
93            success_rate: 1.0,
94        }
95    }
96}
97
98/// Processing status
99#[derive(Debug, Clone, Serialize, Deserialize)]
100pub enum ProcessingStatus {
101    Idle,
102    Processing,
103    Completed,
104    Failed(String),
105}
106
107/// Zero-copy event wrapper for efficient processing
108#[derive(Debug, Clone)]
109pub struct ZeroCopyEvent {
110    event: StreamEvent,
111    processed: bool,
112    processing_start: Option<Instant>,
113}
114
115impl ZeroCopyEvent {
116    /// Create a new zero-copy event wrapper
117    pub fn new(event: StreamEvent) -> Self {
118        Self {
119            event,
120            processed: false,
121            processing_start: None,
122        }
123    }
124
125    /// Mark event as being processed
126    pub fn mark_processing(&mut self) {
127        self.processing_start = Some(Instant::now());
128    }
129
130    /// Mark event as processed
131    pub fn mark_processed(&mut self) {
132        self.processed = true;
133    }
134
135    /// Get processing duration
136    pub fn processing_duration(&self) -> Option<Duration> {
137        self.processing_start.map(|start| start.elapsed())
138    }
139
140    /// Get the underlying event
141    pub fn event(&self) -> &StreamEvent {
142        &self.event
143    }
144
145    /// Check if event is processed
146    pub fn is_processed(&self) -> bool {
147        self.processed
148    }
149}
150
151/// Aggregation function for batch processing
152#[derive(Debug, Clone, Serialize, Deserialize)]
153pub enum AggregationFunction {
154    Count,
155    Sum,
156    Average,
157    Min,
158    Max,
159    Distinct,
160}
161
162/// Tuning decision for performance optimization
163#[derive(Debug, Clone, Serialize, Deserialize)]
164pub struct TuningDecision {
165    /// Parameter being tuned
166    pub parameter: String,
167    /// Old value
168    pub old_value: f64,
169    /// New value
170    pub new_value: f64,
171    /// Reason for the change
172    pub reason: String,
173    /// Expected improvement
174    pub expected_improvement: f64,
175    /// Confidence level
176    pub confidence: f64,
177}
178
179/// Auto-tuner for performance parameters
180pub struct AutoTuner {
181    config: PerformanceConfig,
182    performance_history: Vec<ProcessingStats>,
183    last_tuning: Option<Instant>,
184    tuning_interval: Duration,
185}
186
187impl AutoTuner {
188    /// Create a new auto-tuner
189    pub fn new(config: PerformanceConfig) -> Self {
190        Self {
191            config,
192            performance_history: Vec::new(),
193            last_tuning: None,
194            tuning_interval: Duration::from_secs(60), // 1 minute
195        }
196    }
197
198    /// Record performance data
199    pub fn record_performance(&mut self, stats: ProcessingStats) {
200        self.performance_history.push(stats);
201
202        // Keep only recent history
203        if self.performance_history.len() > 100 {
204            self.performance_history.drain(0..50);
205        }
206    }
207
208    /// Check if tuning is needed
209    pub fn needs_tuning(&self) -> bool {
210        match self.last_tuning {
211            Some(last) => last.elapsed() >= self.tuning_interval,
212            None => true,
213        }
214    }
215
216    /// Perform auto-tuning
217    pub fn tune(&mut self) -> Result<Vec<TuningDecision>> {
218        if self.performance_history.is_empty() {
219            return Ok(Vec::new());
220        }
221
222        let mut decisions = Vec::new();
223
224        // Analyze recent performance
225        let recent_stats: Vec<_> = self.performance_history.iter().rev().take(10).collect();
226        let avg_throughput: f64 = recent_stats
227            .iter()
228            .map(|s| s.throughput_eps.load(Ordering::Relaxed) as f64)
229            .sum::<f64>()
230            / recent_stats.len() as f64;
231
232        let avg_latency: f64 = recent_stats
233            .iter()
234            .map(|s| s.avg_processing_time_ms.load(Ordering::Relaxed) as f64)
235            .sum::<f64>()
236            / recent_stats.len() as f64;
237
238        // Tune batch size if throughput is low
239        if avg_throughput < 50000.0 && self.config.max_batch_size < 2000 {
240            let old_batch_size = self.config.max_batch_size as f64;
241            let new_batch_size = (old_batch_size * 1.2).min(2000.0);
242
243            decisions.push(TuningDecision {
244                parameter: "max_batch_size".to_string(),
245                old_value: old_batch_size,
246                new_value: new_batch_size,
247                reason: "Low throughput detected".to_string(),
248                expected_improvement: 0.2,
249                confidence: 0.8,
250            });
251        }
252
253        // Tune parallel workers if latency is high
254        if avg_latency > 20.0 && self.config.parallel_workers < num_cpus::get() * 2 {
255            let old_workers = self.config.parallel_workers as f64;
256            let new_workers = (old_workers + 1.0).min(num_cpus::get() as f64 * 2.0);
257
258            decisions.push(TuningDecision {
259                parameter: "parallel_workers".to_string(),
260                old_value: old_workers,
261                new_value: new_workers,
262                reason: "High latency detected".to_string(),
263                expected_improvement: 0.15,
264                confidence: 0.7,
265            });
266        }
267
268        Ok(decisions)
269    }
270}
271
272#[cfg(test)]
273mod tests {
274    use super::*;
275    use crate::event::EventMetadata;
276
277    #[test]
278    fn test_zero_copy_event() {
279        let event = StreamEvent::TripleAdded {
280            subject: "test".to_string(),
281            predicate: "test".to_string(),
282            object: "test".to_string(),
283            graph: None,
284            metadata: EventMetadata::default(),
285        };
286
287        let mut zero_copy = ZeroCopyEvent::new(event);
288        assert!(!zero_copy.is_processed());
289
290        zero_copy.mark_processing();
291        zero_copy.mark_processed();
292        assert!(zero_copy.is_processed());
293    }
294
295    #[test]
296    fn test_auto_tuner() {
297        let config = PerformanceConfig::default();
298        let mut tuner = AutoTuner::new(config);
299
300        assert!(tuner.needs_tuning());
301
302        let stats = ProcessingStats::default();
303        tuner.record_performance(stats);
304
305        let decisions = tuner.tune().unwrap();
306        assert!(!decisions.is_empty());
307    }
308
309    #[test]
310    fn test_processing_result() {
311        let result = ProcessingResult {
312            events_processed: 100,
313            processing_time_ms: 50,
314            success_rate: 0.95,
315            errors: vec!["test error".to_string()],
316        };
317
318        assert_eq!(result.events_processed, 100);
319        assert_eq!(result.processing_time_ms, 50);
320        assert_eq!(result.success_rate, 0.95);
321        assert_eq!(result.errors.len(), 1);
322    }
323}