mockforge_performance/
simulator.rs

1//! Performance Simulator
2//!
3//! Main orchestrator for performance mode, combining RPS control,
4//! bottleneck simulation, and latency recording.
5
6use crate::bottleneck::{BottleneckConfig, BottleneckSimulator};
7use crate::controller::{RpsController, RpsProfile};
8use crate::latency::{LatencyAnalyzer, LatencyRecorder};
9use crate::metrics::{PerformanceMetrics, PerformanceSnapshot};
10use serde::{Deserialize, Serialize};
11use std::sync::Arc;
12use tokio::sync::RwLock;
13use tracing::info;
14
15/// Simulator configuration
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct SimulatorConfig {
18    /// Initial RPS
19    pub initial_rps: f64,
20    /// RPS profile
21    pub rps_profile: Option<RpsProfile>,
22    /// Bottleneck configurations
23    pub bottlenecks: Vec<BottleneckConfig>,
24    /// Maximum latency samples to keep
25    pub max_latency_samples: usize,
26    /// Maximum age of latency samples (seconds)
27    pub max_latency_age_seconds: u64,
28}
29
30impl SimulatorConfig {
31    /// Create a new simulator configuration
32    pub fn new(initial_rps: f64) -> Self {
33        Self {
34            initial_rps,
35            rps_profile: None,
36            bottlenecks: Vec::new(),
37            max_latency_samples: 10000,
38            max_latency_age_seconds: 300, // 5 minutes
39        }
40    }
41
42    /// Set RPS profile
43    pub fn with_rps_profile(mut self, profile: RpsProfile) -> Self {
44        self.rps_profile = Some(profile);
45        self
46    }
47
48    /// Add bottleneck
49    pub fn with_bottleneck(mut self, bottleneck: BottleneckConfig) -> Self {
50        self.bottlenecks.push(bottleneck);
51        self
52    }
53}
54
55/// Performance simulator
56///
57/// Orchestrates RPS control, bottleneck simulation, and latency recording.
58#[derive(Debug, Clone)]
59pub struct PerformanceSimulator {
60    /// RPS controller
61    rps_controller: Arc<RpsController>,
62    /// Bottleneck simulator
63    bottleneck_simulator: Arc<BottleneckSimulator>,
64    /// Latency recorder
65    latency_recorder: Arc<LatencyRecorder>,
66    /// Latency analyzer
67    latency_analyzer: Arc<LatencyAnalyzer>,
68    /// Configuration
69    config: Arc<RwLock<SimulatorConfig>>,
70    /// Is running
71    is_running: Arc<RwLock<bool>>,
72}
73
74impl PerformanceSimulator {
75    /// Create a new performance simulator
76    pub fn new(config: SimulatorConfig) -> Self {
77        let rps_controller = Arc::new(RpsController::new(config.initial_rps));
78        let bottleneck_simulator = Arc::new(BottleneckSimulator::new());
79        let latency_recorder = Arc::new(LatencyRecorder::new(
80            config.max_latency_samples,
81            config.max_latency_age_seconds,
82        ));
83        let latency_analyzer = Arc::new(LatencyAnalyzer::new(latency_recorder.clone()));
84
85        // Set up bottlenecks
86        for bottleneck in &config.bottlenecks {
87            let simulator = bottleneck_simulator.clone();
88            let bottleneck = bottleneck.clone();
89            tokio::spawn(async move {
90                simulator.add_bottleneck(bottleneck).await;
91            });
92        }
93
94        Self {
95            rps_controller,
96            bottleneck_simulator,
97            latency_recorder,
98            latency_analyzer,
99            config: Arc::new(RwLock::new(config)),
100            is_running: Arc::new(RwLock::new(false)),
101        }
102    }
103
104    /// Start the simulator
105    pub async fn start(&self) {
106        let mut is_running = self.is_running.write().await;
107        *is_running = true;
108        drop(is_running);
109
110        info!("Performance simulator started");
111
112        // Start RPS profile execution if configured
113        let config = self.config.read().await;
114        if let Some(ref profile) = config.rps_profile {
115            let controller = self.rps_controller.clone();
116            let profile = profile.clone();
117            tokio::spawn(async move {
118                Self::execute_rps_profile(controller, profile).await;
119            });
120        }
121    }
122
123    /// Stop the simulator
124    pub async fn stop(&self) {
125        let mut is_running = self.is_running.write().await;
126        *is_running = false;
127        drop(is_running);
128
129        self.bottleneck_simulator.clear_bottlenecks().await;
130        info!("Performance simulator stopped");
131    }
132
133    /// Check if simulator is running
134    pub async fn is_running(&self) -> bool {
135        *self.is_running.read().await
136    }
137
138    /// Execute RPS profile
139    async fn execute_rps_profile(controller: Arc<RpsController>, profile: RpsProfile) {
140        info!("Executing RPS profile: {}", profile.name);
141
142        for stage in profile.stages {
143            if let Some(ref name) = stage.name {
144                info!(
145                    "RPS stage: {} - {} RPS for {}s",
146                    name, stage.target_rps, stage.duration_secs
147                );
148            }
149
150            controller.set_target_rps(stage.target_rps).await;
151
152            if stage.duration_secs > 0 {
153                tokio::time::sleep(tokio::time::Duration::from_secs(stage.duration_secs)).await;
154            } else {
155                // Infinite duration - wait until stopped
156                loop {
157                    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
158                }
159            }
160        }
161    }
162
163    /// Process a request through the simulator
164    ///
165    /// This should be called for each request to apply RPS control,
166    /// bottleneck simulation, and latency recording.
167    pub async fn process_request(&self, endpoint: &str, method: &str) -> Result<(), anyhow::Error> {
168        // Wait for RPS slot
169        self.rps_controller.wait_for_slot().await;
170
171        // Apply bottlenecks
172        let bottleneck_delay_ms = self.bottleneck_simulator.apply_bottlenecks(endpoint).await;
173
174        // Record request
175        self.rps_controller.record_request().await;
176
177        // Record latency (will be updated when response is received)
178        // For now, we record the bottleneck delay
179        if bottleneck_delay_ms > 0 {
180            self.latency_recorder
181                .record(
182                    bottleneck_delay_ms,
183                    Some(endpoint.to_string()),
184                    Some(method.to_string()),
185                    None,
186                    None,
187                )
188                .await;
189        }
190
191        Ok(())
192    }
193
194    /// Record request completion with latency
195    pub async fn record_completion(
196        &self,
197        endpoint: &str,
198        method: &str,
199        latency_ms: u64,
200        status_code: u16,
201        error: Option<String>,
202    ) {
203        self.latency_recorder
204            .record(
205                latency_ms,
206                Some(endpoint.to_string()),
207                Some(method.to_string()),
208                Some(status_code),
209                error,
210            )
211            .await;
212    }
213
214    /// Get current performance snapshot
215    pub async fn get_snapshot(&self) -> PerformanceSnapshot {
216        let stats = self.latency_analyzer.calculate_stats().await;
217        let current_rps = self.rps_controller.get_current_rps().await;
218        let target_rps = self.rps_controller.get_target_rps().await;
219
220        let mut metrics = PerformanceMetrics::new();
221        metrics.update_from_latency_stats(&stats, current_rps, target_rps);
222
223        // Get active bottlenecks
224        let bottlenecks = self.bottleneck_simulator.get_bottlenecks().await;
225        let active_bottlenecks: Vec<String> =
226            bottlenecks.iter().map(|b| format!("{:?}", b.bottleneck_type)).collect();
227
228        PerformanceSnapshot {
229            id: uuid::Uuid::new_v4().to_string(),
230            timestamp: chrono::Utc::now(),
231            metrics,
232            active_bottlenecks,
233        }
234    }
235
236    /// Get RPS controller
237    pub fn rps_controller(&self) -> &Arc<RpsController> {
238        &self.rps_controller
239    }
240
241    /// Get bottleneck simulator
242    pub fn bottleneck_simulator(&self) -> &Arc<BottleneckSimulator> {
243        &self.bottleneck_simulator
244    }
245
246    /// Get latency analyzer
247    pub fn latency_analyzer(&self) -> &Arc<LatencyAnalyzer> {
248        &self.latency_analyzer
249    }
250}
251
252#[cfg(test)]
253mod tests {
254    use super::*;
255
256    #[tokio::test]
257    async fn test_performance_simulator() {
258        let config = SimulatorConfig::new(10.0);
259        let simulator = PerformanceSimulator::new(config);
260
261        simulator.start().await;
262        assert!(simulator.is_running().await);
263
264        simulator.process_request("/api/users", "GET").await.unwrap();
265
266        let snapshot = simulator.get_snapshot().await;
267        // total_requests is u64, so >= 0 is always true - removed redundant assertion
268
269        simulator.stop().await;
270        assert!(!simulator.is_running().await);
271    }
272}