oxirs_stream/performance_optimizer/
mod.rs1pub mod batching;
24pub mod config;
25pub mod memory;
26pub mod ml;
27
28pub use batching::{AdaptiveBatcher, BatchPerformancePoint, BatchSizePredictor, BatchingStats};
30pub use config::{
31 BatchConfig, CompressionAlgorithm, CompressionConfig, EnhancedMLConfig, LoadBalancingStrategy,
32 MemoryPoolConfig, ParallelConfig, PerformanceConfig,
33};
34pub use memory::{AllocationStrategy, MemoryHandle, MemoryPool, MemoryPoolStats};
35pub use ml::{
36 ConfigParams, LinearRegressionModel, ModelStats, PerformanceMetrics, PerformancePredictor,
37};
38
39use crate::StreamEvent;
47use anyhow::Result;
48use serde::{Deserialize, Serialize};
49use std::sync::atomic::{AtomicU64, Ordering};
50use std::time::{Duration, Instant};
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct ProcessingResult {
55 pub events_processed: usize,
57 pub processing_time_ms: u64,
59 pub success_rate: f64,
61 pub errors: Vec<String>,
63}
64
65#[derive(Debug)]
67pub struct ProcessingStats {
68 pub total_events: AtomicU64,
70 pub total_processing_time_ms: AtomicU64,
72 pub avg_processing_time_ms: AtomicU64,
74 pub throughput_eps: AtomicU64,
76 pub peak_throughput_eps: AtomicU64,
78 pub error_count: AtomicU64,
80 pub success_rate: f64,
82}
83
84impl Default for ProcessingStats {
85 fn default() -> Self {
86 Self {
87 total_events: AtomicU64::new(0),
88 total_processing_time_ms: AtomicU64::new(0),
89 avg_processing_time_ms: AtomicU64::new(0),
90 throughput_eps: AtomicU64::new(0),
91 peak_throughput_eps: AtomicU64::new(0),
92 error_count: AtomicU64::new(0),
93 success_rate: 1.0,
94 }
95 }
96}
97
98#[derive(Debug, Clone, Serialize, Deserialize)]
100pub enum ProcessingStatus {
101 Idle,
102 Processing,
103 Completed,
104 Failed(String),
105}
106
107#[derive(Debug, Clone)]
109pub struct ZeroCopyEvent {
110 event: StreamEvent,
111 processed: bool,
112 processing_start: Option<Instant>,
113}
114
115impl ZeroCopyEvent {
116 pub fn new(event: StreamEvent) -> Self {
118 Self {
119 event,
120 processed: false,
121 processing_start: None,
122 }
123 }
124
125 pub fn mark_processing(&mut self) {
127 self.processing_start = Some(Instant::now());
128 }
129
130 pub fn mark_processed(&mut self) {
132 self.processed = true;
133 }
134
135 pub fn processing_duration(&self) -> Option<Duration> {
137 self.processing_start.map(|start| start.elapsed())
138 }
139
140 pub fn event(&self) -> &StreamEvent {
142 &self.event
143 }
144
145 pub fn is_processed(&self) -> bool {
147 self.processed
148 }
149}
150
151#[derive(Debug, Clone, Serialize, Deserialize)]
153pub enum AggregationFunction {
154 Count,
155 Sum,
156 Average,
157 Min,
158 Max,
159 Distinct,
160}
161
162#[derive(Debug, Clone, Serialize, Deserialize)]
164pub struct TuningDecision {
165 pub parameter: String,
167 pub old_value: f64,
169 pub new_value: f64,
171 pub reason: String,
173 pub expected_improvement: f64,
175 pub confidence: f64,
177}
178
179pub struct AutoTuner {
181 config: PerformanceConfig,
182 performance_history: Vec<ProcessingStats>,
183 last_tuning: Option<Instant>,
184 tuning_interval: Duration,
185}
186
187impl AutoTuner {
188 pub fn new(config: PerformanceConfig) -> Self {
190 Self {
191 config,
192 performance_history: Vec::new(),
193 last_tuning: None,
194 tuning_interval: Duration::from_secs(60), }
196 }
197
198 pub fn record_performance(&mut self, stats: ProcessingStats) {
200 self.performance_history.push(stats);
201
202 if self.performance_history.len() > 100 {
204 self.performance_history.drain(0..50);
205 }
206 }
207
208 pub fn needs_tuning(&self) -> bool {
210 match self.last_tuning {
211 Some(last) => last.elapsed() >= self.tuning_interval,
212 None => true,
213 }
214 }
215
216 pub fn tune(&mut self) -> Result<Vec<TuningDecision>> {
218 if self.performance_history.is_empty() {
219 return Ok(Vec::new());
220 }
221
222 let mut decisions = Vec::new();
223
224 let recent_stats: Vec<_> = self.performance_history.iter().rev().take(10).collect();
226 let avg_throughput: f64 = recent_stats
227 .iter()
228 .map(|s| s.throughput_eps.load(Ordering::Relaxed) as f64)
229 .sum::<f64>()
230 / recent_stats.len() as f64;
231
232 let avg_latency: f64 = recent_stats
233 .iter()
234 .map(|s| s.avg_processing_time_ms.load(Ordering::Relaxed) as f64)
235 .sum::<f64>()
236 / recent_stats.len() as f64;
237
238 if avg_throughput < 50000.0 && self.config.max_batch_size < 2000 {
240 let old_batch_size = self.config.max_batch_size as f64;
241 let new_batch_size = (old_batch_size * 1.2).min(2000.0);
242
243 decisions.push(TuningDecision {
244 parameter: "max_batch_size".to_string(),
245 old_value: old_batch_size,
246 new_value: new_batch_size,
247 reason: "Low throughput detected".to_string(),
248 expected_improvement: 0.2,
249 confidence: 0.8,
250 });
251 }
252
253 if avg_latency > 20.0 && self.config.parallel_workers < num_cpus::get() * 2 {
255 let old_workers = self.config.parallel_workers as f64;
256 let new_workers = (old_workers + 1.0).min(num_cpus::get() as f64 * 2.0);
257
258 decisions.push(TuningDecision {
259 parameter: "parallel_workers".to_string(),
260 old_value: old_workers,
261 new_value: new_workers,
262 reason: "High latency detected".to_string(),
263 expected_improvement: 0.15,
264 confidence: 0.7,
265 });
266 }
267
268 Ok(decisions)
269 }
270}
271
272#[cfg(test)]
273mod tests {
274 use super::*;
275 use crate::event::EventMetadata;
276
277 #[test]
278 fn test_zero_copy_event() {
279 let event = StreamEvent::TripleAdded {
280 subject: "test".to_string(),
281 predicate: "test".to_string(),
282 object: "test".to_string(),
283 graph: None,
284 metadata: EventMetadata::default(),
285 };
286
287 let mut zero_copy = ZeroCopyEvent::new(event);
288 assert!(!zero_copy.is_processed());
289
290 zero_copy.mark_processing();
291 zero_copy.mark_processed();
292 assert!(zero_copy.is_processed());
293 }
294
295 #[test]
296 fn test_auto_tuner() {
297 let config = PerformanceConfig::default();
298 let mut tuner = AutoTuner::new(config);
299
300 assert!(tuner.needs_tuning());
301
302 let stats = ProcessingStats::default();
303 tuner.record_performance(stats);
304
305 let decisions = tuner.tune().unwrap();
306 assert!(!decisions.is_empty());
307 }
308
309 #[test]
310 fn test_processing_result() {
311 let result = ProcessingResult {
312 events_processed: 100,
313 processing_time_ms: 50,
314 success_rate: 0.95,
315 errors: vec!["test error".to_string()],
316 };
317
318 assert_eq!(result.events_processed, 100);
319 assert_eq!(result.processing_time_ms, 50);
320 assert_eq!(result.success_rate, 0.95);
321 assert_eq!(result.errors.len(), 1);
322 }
323}