1use super::{
7 NeuralIntegrationError, NeuralResult, OperationHandle, OperationStats,
8 PerformanceDegradation, PerformanceMonitorTrait, PerformanceStats,
9};
10use std::collections::{HashMap, VecDeque};
11use std::sync::{Arc, Mutex, RwLock};
12use std::time::{Duration, Instant};
13
14pub struct RealTimeMonitor {
16 operations: Arc<RwLock<HashMap<OperationHandle, OngoingOperation>>>,
17 history: Arc<Mutex<PerformanceHistory>>,
18 baselines: Arc<RwLock<HashMap<String, PerformanceBaseline>>>,
19 next_handle: Arc<Mutex<u64>>,
20 config: MonitorConfig,
21}
22
23#[derive(Debug, Clone)]
25pub struct MonitorConfig {
26 pub history_size: usize,
27 pub baseline_window: usize,
28 pub degradation_threshold: f64, pub enable_auto_optimization: bool,
30 pub sample_rate: f64, }
32
33#[derive(Debug)]
35struct OngoingOperation {
36 name: String,
37 start_time: Instant,
38 gpu_start: Option<Instant>,
39 memory_start: usize,
40 expected_duration: Option<Duration>,
41}
42
43struct PerformanceHistory {
45 operations: VecDeque<CompletedOperation>,
46 aggregated_stats: HashMap<String, AggregatedStats>,
47 total_operations: u64,
48}
49
50#[derive(Debug, Clone)]
52struct CompletedOperation {
53 name: String,
54 execution_time: Duration,
55 gpu_time: Duration,
56 memory_transfer_time: Duration,
57 throughput: f64,
58 timestamp: Instant,
59 memory_usage: usize,
60 success: bool,
61}
62
63#[derive(Debug, Clone)]
65struct AggregatedStats {
66 count: u64,
67 total_time: Duration,
68 min_time: Duration,
69 max_time: Duration,
70 avg_time: Duration,
71 std_dev: f64,
72 throughput_sum: f64,
73 memory_usage_sum: usize,
74 failure_count: u64,
75}
76
77#[derive(Debug, Clone)]
79struct PerformanceBaseline {
80 operation_name: String,
81 expected_time: Duration,
82 expected_throughput: f64,
83 confidence: f64,
84 sample_count: u64,
85 last_updated: Instant,
86}
87
88pub struct NoOpMonitor;
90
91impl Default for NoOpMonitor {
92 fn default() -> Self {
93 Self::new()
94 }
95}
96
97impl NoOpMonitor {
98 pub fn new() -> Self {
99 NoOpMonitor
100 }
101}
102
103impl Default for MonitorConfig {
104 fn default() -> Self {
105 Self {
106 history_size: 10000,
107 baseline_window: 100,
108 degradation_threshold: 1.5,
109 enable_auto_optimization: true,
110 sample_rate: 1.0,
111 }
112 }
113}
114
115impl RealTimeMonitor {
116 pub fn new() -> NeuralResult<Self> {
118 Self::with_config(MonitorConfig::default())
119 }
120
121 pub fn with_config(config: MonitorConfig) -> NeuralResult<Self> {
123 Ok(Self {
124 operations: Arc::new(RwLock::new(HashMap::new())),
125 history: Arc::new(Mutex::new(PerformanceHistory::new(config.history_size))),
126 baselines: Arc::new(RwLock::new(HashMap::new())),
127 next_handle: Arc::new(Mutex::new(1)),
128 config,
129 })
130 }
131
132 fn update_baseline(&self, operation: &CompletedOperation) {
134 if !operation.success {
135 return;
136 }
137
138 let mut baselines = self.baselines.write().unwrap();
139 let baseline = baselines.entry(operation.name.clone())
140 .or_insert_with(|| PerformanceBaseline {
141 operation_name: operation.name.clone(),
142 expected_time: operation.execution_time,
143 expected_throughput: operation.throughput,
144 confidence: 0.5,
145 sample_count: 0,
146 last_updated: Instant::now(),
147 });
148
149 let alpha = 0.1; let new_time_ms = operation.execution_time.as_secs_f64() * 1000.0;
152 let old_time_ms = baseline.expected_time.as_secs_f64() * 1000.0;
153 let updated_time_ms = alpha * new_time_ms + (1.0 - alpha) * old_time_ms;
154
155 baseline.expected_time = Duration::from_secs_f64(updated_time_ms / 1000.0);
156 baseline.expected_throughput = alpha * operation.throughput + (1.0 - alpha) * baseline.expected_throughput;
157 baseline.sample_count += 1;
158 baseline.last_updated = Instant::now();
159
160 baseline.confidence = (baseline.sample_count as f64 / 100.0).min(1.0);
162 }
163
164 fn check_degradation(&self, operation: &CompletedOperation) -> Option<PerformanceDegradation> {
166 let baselines = self.baselines.read().unwrap();
167
168 if let Some(baseline) = baselines.get(&operation.name) {
169 if baseline.confidence < 0.3 || baseline.sample_count < 10 {
170 return None; }
172
173 let actual_time = operation.execution_time.as_secs_f64();
174 let expected_time = baseline.expected_time.as_secs_f64();
175 let degradation_factor = actual_time / expected_time;
176
177 if degradation_factor > self.config.degradation_threshold {
178 return Some(PerformanceDegradation {
179 operation: operation.name.clone(),
180 expected_time,
181 actual_time,
182 degradation_factor,
183 suggested_action: self.generate_optimization_suggestion(operation, baseline),
184 });
185 }
186 }
187
188 None
189 }
190
191 fn generate_optimization_suggestion(
193 &self,
194 operation: &CompletedOperation,
195 baseline: &PerformanceBaseline,
196 ) -> String {
197 if operation.memory_transfer_time > operation.execution_time / 2 {
198 "Consider using memory pooling or batch operations to reduce transfer overhead".to_string()
199 } else if operation.gpu_time < operation.execution_time / 3 {
200 "GPU utilization is low, consider increasing batch size or workgroup size".to_string()
201 } else if operation.throughput < baseline.expected_throughput * 0.7 {
202 "Throughput is significantly below baseline, check for memory pressure or resource contention".to_string()
203 } else {
204 "Performance degradation detected, consider profiling individual kernels".to_string()
205 }
206 }
207
208 pub fn get_trends(&self, operation_name: &str, window_size: usize) -> Option<PerformanceTrend> {
210 let history = self.history.lock().unwrap();
211 let recent_ops: Vec<&CompletedOperation> = history.operations
212 .iter()
213 .rev()
214 .filter(|op| op.name == operation_name)
215 .take(window_size)
216 .collect();
217
218 if recent_ops.len() < 5 {
219 return None;
220 }
221
222 let times: Vec<f64> = recent_ops.iter()
223 .map(|op| op.execution_time.as_secs_f64())
224 .collect();
225
226 let trend_slope = calculate_trend_slope(×);
227 let volatility = calculate_volatility(×);
228
229 Some(PerformanceTrend {
230 operation_name: operation_name.to_string(),
231 trend_slope,
232 volatility,
233 sample_count: recent_ops.len(),
234 improving: trend_slope < -0.01, })
236 }
237
238 pub fn get_bottleneck_analysis(&self, operation_name: &str) -> Option<BottleneckAnalysis> {
240 let history = self.history.lock().unwrap();
241 if let Some(stats) = history.aggregated_stats.get(operation_name) {
242 let avg_execution = stats.avg_time.as_secs_f64();
243 let avg_memory_transfer = stats.total_time.as_secs_f64() / stats.count as f64;
244
245 let memory_ratio = avg_memory_transfer / avg_execution;
246 let gpu_ratio = 1.0 - memory_ratio; let bottleneck_type = if memory_ratio > 0.5 {
249 BottleneckType::MemoryTransfer
250 } else if gpu_ratio < 0.3 {
251 BottleneckType::GpuUnderutilization
252 } else if stats.failure_count as f64 / stats.count as f64 > 0.1 {
253 BottleneckType::ErrorRate
254 } else {
255 BottleneckType::Computation
256 };
257
258 Some(BottleneckAnalysis {
259 operation_name: operation_name.to_string(),
260 bottleneck_type,
261 memory_transfer_ratio: memory_ratio,
262 gpu_utilization_ratio: gpu_ratio,
263 error_rate: stats.failure_count as f64 / stats.count as f64,
264 recommendation: generate_bottleneck_recommendation(&bottleneck_type),
265 })
266 } else {
267 None
268 }
269 }
270}
271
272impl PerformanceMonitorTrait for RealTimeMonitor {
273 fn start_operation(&self, name: &str) -> OperationHandle {
274 if self.config.sample_rate < 1.0 && rand::random::<f64>() > self.config.sample_rate {
276 return OperationHandle(0); }
278
279 let mut next_handle = self.next_handle.lock().unwrap();
280 let handle = OperationHandle(*next_handle);
281 *next_handle += 1;
282
283 let operation = OngoingOperation {
284 name: name.to_string(),
285 start_time: Instant::now(),
286 gpu_start: None,
287 memory_start: 0, expected_duration: self.get_expected_duration(name),
289 };
290
291 let mut operations = self.operations.write().unwrap();
292 operations.insert(handle, operation);
293
294 handle
295 }
296
297 fn end_operation(&self, handle: OperationHandle) -> NeuralResult<OperationStats> {
298 if handle.0 == 0 {
299 return Ok(OperationStats {
301 name: "unmonitored".to_string(),
302 execution_time: 0.0,
303 gpu_time: 0.0,
304 memory_transfer_time: 0.0,
305 throughput: 0.0,
306 });
307 }
308
309 let mut operations = self.operations.write().unwrap();
310 let ongoing = operations.remove(&handle).ok_or_else(|| {
311 NeuralIntegrationError::PerformanceError("Invalid operation handle".to_string())
312 })?;
313
314 let end_time = Instant::now();
315 let execution_time = end_time.duration_since(ongoing.start_time);
316
317 let gpu_time = execution_time * 7 / 10; let memory_transfer_time = execution_time * 2 / 10; let throughput = 1.0 / execution_time.as_secs_f64(); let completed_op = CompletedOperation {
324 name: ongoing.name.clone(),
325 execution_time,
326 gpu_time,
327 memory_transfer_time,
328 throughput,
329 timestamp: end_time,
330 memory_usage: 0, success: true, };
333
334 {
336 let mut history = self.history.lock().unwrap();
337 history.add_operation(completed_op.clone());
338 }
339
340 self.update_baseline(&completed_op);
341
342 Ok(OperationStats {
343 name: ongoing.name,
344 execution_time: execution_time.as_secs_f64(),
345 gpu_time: gpu_time.as_secs_f64(),
346 memory_transfer_time: memory_transfer_time.as_secs_f64(),
347 throughput,
348 })
349 }
350
351 fn get_performance_summary(&self) -> PerformanceStats {
352 let history = self.history.lock().unwrap();
353
354 if history.total_operations == 0 {
355 return PerformanceStats {
356 total_operations: 0,
357 average_execution_time: 0.0,
358 gpu_utilization: 0.0,
359 memory_bandwidth: 0.0,
360 throughput: 0.0,
361 };
362 }
363
364 let total_time: Duration = history.operations.iter()
365 .map(|op| op.execution_time)
366 .sum();
367
368 let total_gpu_time: Duration = history.operations.iter()
369 .map(|op| op.gpu_time)
370 .sum();
371
372 let total_throughput: f64 = history.operations.iter()
373 .map(|op| op.throughput)
374 .sum();
375
376 PerformanceStats {
377 total_operations: history.total_operations,
378 average_execution_time: total_time.as_secs_f64() / history.total_operations as f64,
379 gpu_utilization: (total_gpu_time.as_secs_f64() / total_time.as_secs_f64()) as f32,
380 memory_bandwidth: 0.0, throughput: total_throughput / history.total_operations as f64,
382 }
383 }
384
385 fn detect_degradation(&self) -> Option<PerformanceDegradation> {
386 let history = self.history.lock().unwrap();
387
388 if let Some(recent_op) = history.operations.back() {
390 let recent_op_clone = recent_op.clone();
392 drop(history);
393 self.check_degradation(&recent_op_clone)
394 } else {
395 None
396 }
397 }
398}
399
400impl PerformanceHistory {
401 fn new(max_size: usize) -> Self {
402 Self {
403 operations: VecDeque::with_capacity(max_size),
404 aggregated_stats: HashMap::new(),
405 total_operations: 0,
406 }
407 }
408
409 fn add_operation(&mut self, operation: CompletedOperation) {
410 if self.operations.len() >= self.operations.capacity() {
412 self.operations.pop_front();
413 }
414 self.operations.push_back(operation.clone());
415 self.total_operations += 1;
416
417 let stats = self.aggregated_stats.entry(operation.name.clone())
419 .or_insert_with(|| AggregatedStats {
420 count: 0,
421 total_time: Duration::ZERO,
422 min_time: operation.execution_time,
423 max_time: operation.execution_time,
424 avg_time: Duration::ZERO,
425 std_dev: 0.0,
426 throughput_sum: 0.0,
427 memory_usage_sum: 0,
428 failure_count: 0,
429 });
430
431 stats.count += 1;
432 stats.total_time += operation.execution_time;
433 stats.min_time = stats.min_time.min(operation.execution_time);
434 stats.max_time = stats.max_time.max(operation.execution_time);
435 stats.avg_time = stats.total_time / stats.count as u32;
436 stats.throughput_sum += operation.throughput;
437 stats.memory_usage_sum += operation.memory_usage;
438
439 if !operation.success {
440 stats.failure_count += 1;
441 }
442
443 let times: Vec<f64> = self.operations.iter()
445 .filter(|op| op.name == operation.name)
446 .map(|op| op.execution_time.as_secs_f64())
447 .collect();
448
449 if times.len() > 1 {
450 stats.std_dev = calculate_std_dev(×);
451 }
452 }
453}
454
455impl RealTimeMonitor {
456 fn get_expected_duration(&self, name: &str) -> Option<Duration> {
457 let baselines = self.baselines.read().unwrap();
458 baselines.get(name).map(|b| b.expected_time)
459 }
460}
461
462impl PerformanceMonitorTrait for NoOpMonitor {
463 fn start_operation(&self, _name: &str) -> OperationHandle {
464 OperationHandle(0)
465 }
466
467 fn end_operation(&self, _handle: OperationHandle) -> NeuralResult<OperationStats> {
468 Ok(OperationStats {
469 name: "noop".to_string(),
470 execution_time: 0.0,
471 gpu_time: 0.0,
472 memory_transfer_time: 0.0,
473 throughput: 0.0,
474 })
475 }
476
477 fn get_performance_summary(&self) -> PerformanceStats {
478 PerformanceStats {
479 total_operations: 0,
480 average_execution_time: 0.0,
481 gpu_utilization: 0.0,
482 memory_bandwidth: 0.0,
483 throughput: 0.0,
484 }
485 }
486
487 fn detect_degradation(&self) -> Option<PerformanceDegradation> {
488 None
489 }
490}
491
492#[derive(Debug, Clone)]
494pub struct PerformanceTrend {
495 pub operation_name: String,
496 pub trend_slope: f64,
497 pub volatility: f64,
498 pub sample_count: usize,
499 pub improving: bool,
500}
501
502#[derive(Debug, Clone)]
504pub struct BottleneckAnalysis {
505 pub operation_name: String,
506 pub bottleneck_type: BottleneckType,
507 pub memory_transfer_ratio: f64,
508 pub gpu_utilization_ratio: f64,
509 pub error_rate: f64,
510 pub recommendation: String,
511}
512
513#[derive(Debug, Clone, Copy)]
515pub enum BottleneckType {
516 MemoryTransfer,
517 Computation,
518 GpuUnderutilization,
519 ErrorRate,
520}
521
522fn calculate_trend_slope(values: &[f64]) -> f64 {
524 if values.len() < 2 {
525 return 0.0;
526 }
527
528 let n = values.len() as f64;
529 let x_sum: f64 = (0..values.len()).map(|i| i as f64).sum();
530 let y_sum: f64 = values.iter().sum();
531 let xy_sum: f64 = values.iter().enumerate()
532 .map(|(i, &y)| i as f64 * y)
533 .sum();
534 let x_sq_sum: f64 = (0..values.len()).map(|i| (i as f64).powi(2)).sum();
535
536 (n * xy_sum - x_sum * y_sum) / (n * x_sq_sum - x_sum.powi(2))
537}
538
539fn calculate_volatility(values: &[f64]) -> f64 {
541 calculate_std_dev(values)
542}
543
544fn calculate_std_dev(values: &[f64]) -> f64 {
546 if values.len() < 2 {
547 return 0.0;
548 }
549
550 let mean = values.iter().sum::<f64>() / values.len() as f64;
551 let variance = values.iter()
552 .map(|&x| (x - mean).powi(2))
553 .sum::<f64>() / values.len() as f64;
554
555 variance.sqrt()
556}
557
558fn generate_bottleneck_recommendation(bottleneck_type: &BottleneckType) -> String {
560 match bottleneck_type {
561 BottleneckType::MemoryTransfer => {
562 "Optimize memory transfers by using larger batch sizes, memory pooling, or reducing data precision".to_string()
563 }
564 BottleneckType::Computation => {
565 "Optimize computation by improving algorithm efficiency, using better GPU kernels, or increasing parallelism".to_string()
566 }
567 BottleneckType::GpuUnderutilization => {
568 "Increase GPU utilization by using larger workgroup sizes, higher occupancy, or more parallel work".to_string()
569 }
570 BottleneckType::ErrorRate => {
571 "Reduce error rate by improving input validation, handling edge cases, or fixing stability issues".to_string()
572 }
573 }
574}
575
576#[cfg(test)]
577mod tests {
578 use super::*;
579
580 #[test]
581 fn test_performance_monitor_creation() {
582 let monitor = RealTimeMonitor::new().unwrap();
583 let stats = monitor.get_performance_summary();
584 assert_eq!(stats.total_operations, 0);
585 }
586
587 #[test]
588 fn test_operation_tracking() {
589 let monitor = RealTimeMonitor::new().unwrap();
590
591 let handle = monitor.start_operation("test_op");
592 std::thread::sleep(Duration::from_millis(10));
593 let stats = monitor.end_operation(handle).unwrap();
594
595 assert_eq!(stats.name, "test_op");
596 assert!(stats.execution_time > 0.0);
597 }
598
599 #[test]
600 fn test_trend_calculation() {
601 let values = vec![1.0, 1.1, 1.2, 1.15, 1.3];
602 let slope = calculate_trend_slope(&values);
603 assert!(slope > 0.0); }
605
606 #[test]
607 fn test_std_dev_calculation() {
608 let values = vec![1.0, 2.0, 3.0, 4.0, 5.0];
609 let std_dev = calculate_std_dev(&values);
610 assert!((std_dev - 1.58).abs() < 0.1); }
612
613 #[test]
614 fn test_noop_monitor() {
615 let monitor = NoOpMonitor;
616 let handle = monitor.start_operation("test");
617 let stats = monitor.end_operation(handle).unwrap();
618 assert_eq!(stats.name, "noop");
619 }
620}