mockforge_performance/
controller.rs1use serde::{Deserialize, Serialize};
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8use tokio::sync::RwLock;
9use tokio::time::sleep;
10use tracing::debug;
11
12#[derive(Debug, Clone)]
16pub struct RpsController {
17 target_rps: Arc<RwLock<f64>>,
19 current_rps: Arc<RwLock<f64>>,
21 request_count: Arc<RwLock<u64>>,
23 last_reset: Arc<RwLock<Instant>>,
25 min_interval: Arc<RwLock<Duration>>,
27}
28
29impl RpsController {
30 pub fn new(target_rps: f64) -> Self {
32 let min_interval = if target_rps > 0.0 {
33 Duration::from_secs_f64(1.0 / target_rps)
34 } else {
35 Duration::from_secs(0)
36 };
37
38 Self {
39 target_rps: Arc::new(RwLock::new(target_rps)),
40 current_rps: Arc::new(RwLock::new(0.0)),
41 request_count: Arc::new(RwLock::new(0)),
42 last_reset: Arc::new(RwLock::new(Instant::now())),
43 min_interval: Arc::new(RwLock::new(min_interval)),
44 }
45 }
46
47 pub async fn set_target_rps(&self, rps: f64) {
49 let mut target = self.target_rps.write().await;
50 *target = rps;
51
52 let min_interval = if rps > 0.0 {
53 Duration::from_secs_f64(1.0 / rps)
54 } else {
55 Duration::from_secs(0)
56 };
57
58 let mut interval = self.min_interval.write().await;
59 *interval = min_interval;
60
61 debug!("RPS controller: target RPS set to {}", rps);
62 }
63
64 pub async fn get_target_rps(&self) -> f64 {
66 *self.target_rps.read().await
67 }
68
69 pub async fn get_current_rps(&self) -> f64 {
71 *self.current_rps.read().await
72 }
73
74 pub async fn wait_for_slot(&self) {
78 let min_interval = *self.min_interval.read().await;
79 if min_interval.is_zero() {
80 return; }
82
83 sleep(min_interval).await;
85 }
86
87 pub async fn record_request(&self) {
91 let mut count = self.request_count.write().await;
92 *count += 1;
93
94 let now = Instant::now();
96 let mut last_reset = self.last_reset.write().await;
97 let elapsed = now.duration_since(*last_reset);
98
99 if elapsed >= Duration::from_secs(1) {
100 let rps = *count as f64 / elapsed.as_secs_f64();
102 let mut current = self.current_rps.write().await;
103 *current = rps;
104
105 *count = 0;
106 *last_reset = now;
107
108 debug!("RPS controller: current RPS = {:.2}", rps);
109 }
110 }
111
112 pub async fn get_request_count(&self) -> u64 {
114 *self.request_count.read().await
115 }
116}
117
118#[derive(Debug, Clone, Serialize, Deserialize)]
120pub struct RpsProfile {
121 pub name: String,
123 pub stages: Vec<RpsStage>,
125}
126
127#[derive(Debug, Clone, Serialize, Deserialize)]
129pub struct RpsStage {
130 pub duration_secs: u64,
132 pub target_rps: f64,
134 pub name: Option<String>,
136}
137
138impl RpsProfile {
139 pub fn constant(rps: f64) -> Self {
141 Self {
142 name: format!("Constant {} RPS", rps),
143 stages: vec![RpsStage {
144 duration_secs: 0, target_rps: rps,
146 name: Some("Constant".to_string()),
147 }],
148 }
149 }
150
151 pub fn ramp_up(start_rps: f64, end_rps: f64, duration_secs: u64) -> Self {
153 let steps = (duration_secs / 10).max(1); let rps_step = (end_rps - start_rps) / steps as f64;
155
156 let mut stages = Vec::new();
157 for i in 0..steps {
158 let current_rps = start_rps + (i as f64 * rps_step);
159 stages.push(RpsStage {
160 duration_secs: 10,
161 target_rps: current_rps,
162 name: Some(format!("Ramp {} -> {}", current_rps, current_rps + rps_step)),
163 });
164 }
165
166 Self {
167 name: format!("Ramp up {} -> {} RPS", start_rps, end_rps),
168 stages,
169 }
170 }
171
172 pub fn spike(base_rps: f64, spike_rps: f64, spike_duration_secs: u64) -> Self {
174 Self {
175 name: format!("Spike {} -> {} RPS", base_rps, spike_rps),
176 stages: vec![
177 RpsStage {
178 duration_secs: 30,
179 target_rps: base_rps,
180 name: Some("Base".to_string()),
181 },
182 RpsStage {
183 duration_secs: spike_duration_secs,
184 target_rps: spike_rps,
185 name: Some("Spike".to_string()),
186 },
187 RpsStage {
188 duration_secs: 30,
189 target_rps: base_rps,
190 name: Some("Recovery".to_string()),
191 },
192 ],
193 }
194 }
195}
196
197#[cfg(test)]
198mod tests {
199 use super::*;
200
201 #[tokio::test]
202 async fn test_rps_controller() {
203 let controller = RpsController::new(10.0);
204 assert_eq!(controller.get_target_rps().await, 10.0);
205
206 for _ in 0..5 {
208 controller.record_request().await;
209 }
210
211 assert!(controller.get_request_count().await > 0);
213 }
214
215 #[test]
216 fn test_rps_profile_constant() {
217 let profile = RpsProfile::constant(100.0);
218 assert_eq!(profile.stages.len(), 1);
219 assert_eq!(profile.stages[0].target_rps, 100.0);
220 }
221
222 #[test]
223 fn test_rps_profile_ramp_up() {
224 let profile = RpsProfile::ramp_up(10.0, 100.0, 60);
225 assert!(!profile.stages.is_empty());
226 assert_eq!(profile.stages[0].target_rps, 10.0);
227 }
228
229 #[test]
230 fn test_rps_profile_spike() {
231 let profile = RpsProfile::spike(50.0, 200.0, 10);
232 assert_eq!(profile.stages.len(), 3);
233 assert_eq!(profile.stages[0].target_rps, 50.0);
234 assert_eq!(profile.stages[1].target_rps, 200.0);
235 assert_eq!(profile.stages[2].target_rps, 50.0);
236 }
237}