mockforge_performance/
simulator.rs1use crate::bottleneck::{BottleneckConfig, BottleneckSimulator};
7use crate::controller::{RpsController, RpsProfile};
8use crate::latency::{LatencyAnalyzer, LatencyRecorder};
9use crate::metrics::{PerformanceMetrics, PerformanceSnapshot};
10use serde::{Deserialize, Serialize};
11use std::sync::Arc;
12use tokio::sync::RwLock;
13use tracing::info;
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct SimulatorConfig {
18 pub initial_rps: f64,
20 pub rps_profile: Option<RpsProfile>,
22 pub bottlenecks: Vec<BottleneckConfig>,
24 pub max_latency_samples: usize,
26 pub max_latency_age_seconds: u64,
28}
29
30impl SimulatorConfig {
31 pub fn new(initial_rps: f64) -> Self {
33 Self {
34 initial_rps,
35 rps_profile: None,
36 bottlenecks: Vec::new(),
37 max_latency_samples: 10000,
38 max_latency_age_seconds: 300, }
40 }
41
42 pub fn with_rps_profile(mut self, profile: RpsProfile) -> Self {
44 self.rps_profile = Some(profile);
45 self
46 }
47
48 pub fn with_bottleneck(mut self, bottleneck: BottleneckConfig) -> Self {
50 self.bottlenecks.push(bottleneck);
51 self
52 }
53}
54
55#[derive(Debug, Clone)]
59pub struct PerformanceSimulator {
60 rps_controller: Arc<RpsController>,
62 bottleneck_simulator: Arc<BottleneckSimulator>,
64 latency_recorder: Arc<LatencyRecorder>,
66 latency_analyzer: Arc<LatencyAnalyzer>,
68 config: Arc<RwLock<SimulatorConfig>>,
70 is_running: Arc<RwLock<bool>>,
72}
73
74impl PerformanceSimulator {
75 pub fn new(config: SimulatorConfig) -> Self {
77 let rps_controller = Arc::new(RpsController::new(config.initial_rps));
78 let bottleneck_simulator = Arc::new(BottleneckSimulator::new());
79 let latency_recorder = Arc::new(LatencyRecorder::new(
80 config.max_latency_samples,
81 config.max_latency_age_seconds,
82 ));
83 let latency_analyzer = Arc::new(LatencyAnalyzer::new(latency_recorder.clone()));
84
85 for bottleneck in &config.bottlenecks {
87 let simulator = bottleneck_simulator.clone();
88 let bottleneck = bottleneck.clone();
89 tokio::spawn(async move {
90 simulator.add_bottleneck(bottleneck).await;
91 });
92 }
93
94 Self {
95 rps_controller,
96 bottleneck_simulator,
97 latency_recorder,
98 latency_analyzer,
99 config: Arc::new(RwLock::new(config)),
100 is_running: Arc::new(RwLock::new(false)),
101 }
102 }
103
104 pub async fn start(&self) {
106 let mut is_running = self.is_running.write().await;
107 *is_running = true;
108 drop(is_running);
109
110 info!("Performance simulator started");
111
112 let config = self.config.read().await;
114 if let Some(ref profile) = config.rps_profile {
115 let controller = self.rps_controller.clone();
116 let profile = profile.clone();
117 tokio::spawn(async move {
118 Self::execute_rps_profile(controller, profile).await;
119 });
120 }
121 }
122
123 pub async fn stop(&self) {
125 let mut is_running = self.is_running.write().await;
126 *is_running = false;
127 drop(is_running);
128
129 self.bottleneck_simulator.clear_bottlenecks().await;
130 info!("Performance simulator stopped");
131 }
132
133 pub async fn is_running(&self) -> bool {
135 *self.is_running.read().await
136 }
137
138 async fn execute_rps_profile(controller: Arc<RpsController>, profile: RpsProfile) {
140 info!("Executing RPS profile: {}", profile.name);
141
142 for stage in profile.stages {
143 if let Some(ref name) = stage.name {
144 info!(
145 "RPS stage: {} - {} RPS for {}s",
146 name, stage.target_rps, stage.duration_secs
147 );
148 }
149
150 controller.set_target_rps(stage.target_rps).await;
151
152 if stage.duration_secs > 0 {
153 tokio::time::sleep(tokio::time::Duration::from_secs(stage.duration_secs)).await;
154 } else {
155 loop {
157 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
158 }
159 }
160 }
161 }
162
163 pub async fn process_request(&self, endpoint: &str, method: &str) -> Result<(), anyhow::Error> {
168 self.rps_controller.wait_for_slot().await;
170
171 let bottleneck_delay_ms = self.bottleneck_simulator.apply_bottlenecks(endpoint).await;
173
174 self.rps_controller.record_request().await;
176
177 if bottleneck_delay_ms > 0 {
180 self.latency_recorder
181 .record(
182 bottleneck_delay_ms,
183 Some(endpoint.to_string()),
184 Some(method.to_string()),
185 None,
186 None,
187 )
188 .await;
189 }
190
191 Ok(())
192 }
193
194 pub async fn record_completion(
196 &self,
197 endpoint: &str,
198 method: &str,
199 latency_ms: u64,
200 status_code: u16,
201 error: Option<String>,
202 ) {
203 self.latency_recorder
204 .record(
205 latency_ms,
206 Some(endpoint.to_string()),
207 Some(method.to_string()),
208 Some(status_code),
209 error,
210 )
211 .await;
212 }
213
214 pub async fn get_snapshot(&self) -> PerformanceSnapshot {
216 let stats = self.latency_analyzer.calculate_stats().await;
217 let current_rps = self.rps_controller.get_current_rps().await;
218 let target_rps = self.rps_controller.get_target_rps().await;
219
220 let mut metrics = PerformanceMetrics::new();
221 metrics.update_from_latency_stats(&stats, current_rps, target_rps);
222
223 let bottlenecks = self.bottleneck_simulator.get_bottlenecks().await;
225 let active_bottlenecks: Vec<String> =
226 bottlenecks.iter().map(|b| format!("{:?}", b.bottleneck_type)).collect();
227
228 PerformanceSnapshot {
229 id: uuid::Uuid::new_v4().to_string(),
230 timestamp: chrono::Utc::now(),
231 metrics,
232 active_bottlenecks,
233 }
234 }
235
236 pub fn rps_controller(&self) -> &Arc<RpsController> {
238 &self.rps_controller
239 }
240
241 pub fn bottleneck_simulator(&self) -> &Arc<BottleneckSimulator> {
243 &self.bottleneck_simulator
244 }
245
246 pub fn latency_analyzer(&self) -> &Arc<LatencyAnalyzer> {
248 &self.latency_analyzer
249 }
250}
251
252#[cfg(test)]
253mod tests {
254 use super::*;
255
256 #[tokio::test]
257 async fn test_performance_simulator() {
258 let config = SimulatorConfig::new(10.0);
259 let simulator = PerformanceSimulator::new(config);
260
261 simulator.start().await;
262 assert!(simulator.is_running().await);
263
264 simulator.process_request("/api/users", "GET").await.unwrap();
265
266 let snapshot = simulator.get_snapshot().await;
267 simulator.stop().await;
270 assert!(!simulator.is_running().await);
271 }
272}