mockforge_performance/
controller.rs

1//! RPS (Requests Per Second) Controller
2//!
3//! Controls the rate of requests to simulate load at a specific RPS.
4
5use serde::{Deserialize, Serialize};
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8use tokio::sync::RwLock;
9use tokio::time::sleep;
10use tracing::debug;
11
12/// RPS (Requests Per Second) controller
13///
14/// Controls request rate to maintain a target RPS.
15#[derive(Debug, Clone)]
16pub struct RpsController {
17    /// Target RPS
18    target_rps: Arc<RwLock<f64>>,
19    /// Current RPS (calculated)
20    current_rps: Arc<RwLock<f64>>,
21    /// Request counter
22    request_count: Arc<RwLock<u64>>,
23    /// Last reset time
24    last_reset: Arc<RwLock<Instant>>,
25    /// Minimum interval between requests (calculated from RPS)
26    min_interval: Arc<RwLock<Duration>>,
27}
28
29impl RpsController {
30    /// Create a new RPS controller
31    pub fn new(target_rps: f64) -> Self {
32        let min_interval = if target_rps > 0.0 {
33            Duration::from_secs_f64(1.0 / target_rps)
34        } else {
35            Duration::from_secs(0)
36        };
37
38        Self {
39            target_rps: Arc::new(RwLock::new(target_rps)),
40            current_rps: Arc::new(RwLock::new(0.0)),
41            request_count: Arc::new(RwLock::new(0)),
42            last_reset: Arc::new(RwLock::new(Instant::now())),
43            min_interval: Arc::new(RwLock::new(min_interval)),
44        }
45    }
46
47    /// Set target RPS
48    pub async fn set_target_rps(&self, rps: f64) {
49        let mut target = self.target_rps.write().await;
50        *target = rps;
51
52        let min_interval = if rps > 0.0 {
53            Duration::from_secs_f64(1.0 / rps)
54        } else {
55            Duration::from_secs(0)
56        };
57
58        let mut interval = self.min_interval.write().await;
59        *interval = min_interval;
60
61        debug!("RPS controller: target RPS set to {}", rps);
62    }
63
64    /// Get target RPS
65    pub async fn get_target_rps(&self) -> f64 {
66        *self.target_rps.read().await
67    }
68
69    /// Get current RPS (calculated over last second)
70    pub async fn get_current_rps(&self) -> f64 {
71        *self.current_rps.read().await
72    }
73
74    /// Wait for next request slot (rate limiting)
75    ///
76    /// This will sleep if necessary to maintain the target RPS.
77    pub async fn wait_for_slot(&self) {
78        let min_interval = *self.min_interval.read().await;
79        if min_interval.is_zero() {
80            return; // No rate limiting
81        }
82
83        // Simple rate limiting: sleep for minimum interval
84        sleep(min_interval).await;
85    }
86
87    /// Record a request
88    ///
89    /// Updates the current RPS calculation.
90    pub async fn record_request(&self) {
91        let mut count = self.request_count.write().await;
92        *count += 1;
93
94        // Calculate current RPS over last second
95        let now = Instant::now();
96        let mut last_reset = self.last_reset.write().await;
97        let elapsed = now.duration_since(*last_reset);
98
99        if elapsed >= Duration::from_secs(1) {
100            // Reset and calculate RPS
101            let rps = *count as f64 / elapsed.as_secs_f64();
102            let mut current = self.current_rps.write().await;
103            *current = rps;
104
105            *count = 0;
106            *last_reset = now;
107
108            debug!("RPS controller: current RPS = {:.2}", rps);
109        }
110    }
111
112    /// Get request count since last reset
113    pub async fn get_request_count(&self) -> u64 {
114        *self.request_count.read().await
115    }
116}
117
118/// RPS Profile for dynamic RPS changes
119#[derive(Debug, Clone, Serialize, Deserialize)]
120pub struct RpsProfile {
121    /// Profile name
122    pub name: String,
123    /// RPS stages (time in seconds, target RPS)
124    pub stages: Vec<RpsStage>,
125}
126
127/// RPS Stage
128#[derive(Debug, Clone, Serialize, Deserialize)]
129pub struct RpsStage {
130    /// Duration in seconds
131    pub duration_secs: u64,
132    /// Target RPS for this stage
133    pub target_rps: f64,
134    /// Stage name/description
135    pub name: Option<String>,
136}
137
138impl RpsProfile {
139    /// Create a simple constant RPS profile
140    pub fn constant(rps: f64) -> Self {
141        Self {
142            name: format!("Constant {} RPS", rps),
143            stages: vec![RpsStage {
144                duration_secs: 0, // Infinite
145                target_rps: rps,
146                name: Some("Constant".to_string()),
147            }],
148        }
149    }
150
151    /// Create a ramp-up profile
152    pub fn ramp_up(start_rps: f64, end_rps: f64, duration_secs: u64) -> Self {
153        let steps = (duration_secs / 10).max(1); // 10 second steps
154        let rps_step = (end_rps - start_rps) / steps as f64;
155
156        let mut stages = Vec::new();
157        for i in 0..steps {
158            let current_rps = start_rps + (i as f64 * rps_step);
159            stages.push(RpsStage {
160                duration_secs: 10,
161                target_rps: current_rps,
162                name: Some(format!("Ramp {} -> {}", current_rps, current_rps + rps_step)),
163            });
164        }
165
166        Self {
167            name: format!("Ramp up {} -> {} RPS", start_rps, end_rps),
168            stages,
169        }
170    }
171
172    /// Create a spike profile
173    pub fn spike(base_rps: f64, spike_rps: f64, spike_duration_secs: u64) -> Self {
174        Self {
175            name: format!("Spike {} -> {} RPS", base_rps, spike_rps),
176            stages: vec![
177                RpsStage {
178                    duration_secs: 30,
179                    target_rps: base_rps,
180                    name: Some("Base".to_string()),
181                },
182                RpsStage {
183                    duration_secs: spike_duration_secs,
184                    target_rps: spike_rps,
185                    name: Some("Spike".to_string()),
186                },
187                RpsStage {
188                    duration_secs: 30,
189                    target_rps: base_rps,
190                    name: Some("Recovery".to_string()),
191                },
192            ],
193        }
194    }
195}
196
197#[cfg(test)]
198mod tests {
199    use super::*;
200
201    #[tokio::test]
202    async fn test_rps_controller() {
203        let controller = RpsController::new(10.0);
204        assert_eq!(controller.get_target_rps().await, 10.0);
205
206        // Record some requests
207        for _ in 0..5 {
208            controller.record_request().await;
209        }
210
211        // Should have recorded requests
212        assert!(controller.get_request_count().await > 0);
213    }
214
215    #[test]
216    fn test_rps_profile_constant() {
217        let profile = RpsProfile::constant(100.0);
218        assert_eq!(profile.stages.len(), 1);
219        assert_eq!(profile.stages[0].target_rps, 100.0);
220    }
221
222    #[test]
223    fn test_rps_profile_ramp_up() {
224        let profile = RpsProfile::ramp_up(10.0, 100.0, 60);
225        assert!(!profile.stages.is_empty());
226        assert_eq!(profile.stages[0].target_rps, 10.0);
227    }
228
229    #[test]
230    fn test_rps_profile_spike() {
231        let profile = RpsProfile::spike(50.0, 200.0, 10);
232        assert_eq!(profile.stages.len(), 3);
233        assert_eq!(profile.stages[0].target_rps, 50.0);
234        assert_eq!(profile.stages[1].target_rps, 200.0);
235        assert_eq!(profile.stages[2].target_rps, 50.0);
236    }
237}