Skip to main content

rusmes_loadtest/
scenarios.rs

1//! Load test scenarios
2
3use anyhow::Result;
4use serde::{Deserialize, Serialize};
5use std::future::Future;
6use std::pin::Pin;
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9use tokio::sync::RwLock;
10use tokio::time::sleep;
11
12use crate::config::LoadTestConfig;
13use crate::generators::MessageGenerator;
14use crate::metrics::LoadTestMetrics;
15use crate::protocols::{ImapClient, SmtpClient};
16
17/// Available test scenarios
18#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
19pub enum ScenarioType {
20    /// SMTP throughput test
21    SmtpThroughput,
22    /// Concurrent connections test
23    ConcurrentConnections,
24    /// Mixed protocol test
25    MixedProtocol,
26    /// Sustained load test
27    SustainedLoad,
28}
29
30impl ScenarioType {
31    /// Create a scenario runner
32    pub fn create_runner(
33        &self,
34        config: &LoadTestConfig,
35    ) -> Box<dyn LoadTestScenario + Send + Sync> {
36        match self {
37            ScenarioType::SmtpThroughput => Box::new(SmtpThroughputScenario::new(config.clone())),
38            ScenarioType::ConcurrentConnections => {
39                Box::new(ConcurrentConnectionsScenario::new(config.clone()))
40            }
41            ScenarioType::MixedProtocol => Box::new(MixedProtocolScenario::new(config.clone())),
42            ScenarioType::SustainedLoad => Box::new(SustainedLoadScenario::new(config.clone())),
43        }
44    }
45}
46
47/// Trait for load test scenarios
48pub trait LoadTestScenario {
49    /// Run the scenario
50    fn run(
51        &self,
52        metrics: Arc<RwLock<LoadTestMetrics>>,
53    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>>;
54}
55
56/// SMTP throughput scenario
57pub struct SmtpThroughputScenario {
58    config: LoadTestConfig,
59}
60
61impl SmtpThroughputScenario {
62    pub fn new(config: LoadTestConfig) -> Self {
63        Self { config }
64    }
65}
66
67impl LoadTestScenario for SmtpThroughputScenario {
68    fn run(
69        &self,
70        metrics: Arc<RwLock<LoadTestMetrics>>,
71    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
72        Box::pin(async move {
73            metrics.write().await.mark_started();
74
75            let (min_size, max_size) = self.config.message_size_range();
76            let generator = MessageGenerator::with_content_type(
77                min_size,
78                max_size,
79                self.config.message_content,
80            );
81
82            let duration = Duration::from_secs(self.config.duration_secs);
83            let start = Instant::now();
84
85            let mut tasks = vec![];
86
87            for _ in 0..self.config.concurrency {
88                let metrics = metrics.clone();
89                let config = self.config.clone();
90                let generator = generator.clone();
91
92                let task = tokio::spawn(async move {
93                    while start.elapsed() < duration {
94                        let message = generator.generate();
95                        let request_start = Instant::now();
96
97                        match SmtpClient::send_message(
98                            &config.target_host,
99                            config.target_port,
100                            &message,
101                        )
102                        .await
103                        {
104                            Ok(bytes_received) => {
105                                let latency = request_start.elapsed();
106                                metrics.write().await.record_success(
107                                    latency,
108                                    message.len(),
109                                    bytes_received,
110                                );
111                            }
112                            Err(e) => {
113                                metrics.write().await.record_failure(e.to_string());
114                            }
115                        }
116
117                        // Rate limiting
118                        let delay = Duration::from_millis(1000 / config.message_rate);
119                        sleep(delay).await;
120                    }
121                });
122
123                tasks.push(task);
124            }
125
126            for task in tasks {
127                let _ = task.await;
128            }
129
130            metrics.write().await.mark_completed();
131            Ok(())
132        })
133    }
134}
135
136/// Concurrent connections scenario
137pub struct ConcurrentConnectionsScenario {
138    config: LoadTestConfig,
139}
140
141impl ConcurrentConnectionsScenario {
142    pub fn new(config: LoadTestConfig) -> Self {
143        Self { config }
144    }
145}
146
147impl LoadTestScenario for ConcurrentConnectionsScenario {
148    fn run(
149        &self,
150        metrics: Arc<RwLock<LoadTestMetrics>>,
151    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
152        Box::pin(async move {
153            metrics.write().await.mark_started();
154
155            let (min_size, max_size) = self.config.message_size_range();
156            let generator = MessageGenerator::with_content_type(
157                min_size,
158                max_size,
159                self.config.message_content,
160            );
161
162            let mut tasks = vec![];
163
164            // Create many concurrent connections
165            for _ in 0..self.config.concurrency {
166                let metrics = metrics.clone();
167                let config = self.config.clone();
168                let generator = generator.clone();
169
170                let task = tokio::spawn(async move {
171                    let message = generator.generate();
172                    let request_start = Instant::now();
173
174                    match SmtpClient::send_message(
175                        &config.target_host,
176                        config.target_port,
177                        &message,
178                    )
179                    .await
180                    {
181                        Ok(bytes_received) => {
182                            let latency = request_start.elapsed();
183                            metrics.write().await.record_success(
184                                latency,
185                                message.len(),
186                                bytes_received,
187                            );
188                        }
189                        Err(e) => {
190                            metrics.write().await.record_failure(e.to_string());
191                        }
192                    }
193                });
194
195                tasks.push(task);
196            }
197
198            for task in tasks {
199                let _ = task.await;
200            }
201
202            metrics.write().await.mark_completed();
203            Ok(())
204        })
205    }
206}
207
208/// Mixed protocol scenario
209pub struct MixedProtocolScenario {
210    config: LoadTestConfig,
211}
212
213impl MixedProtocolScenario {
214    pub fn new(config: LoadTestConfig) -> Self {
215        Self { config }
216    }
217}
218
219impl LoadTestScenario for MixedProtocolScenario {
220    fn run(
221        &self,
222        metrics: Arc<RwLock<LoadTestMetrics>>,
223    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
224        Box::pin(async move {
225            metrics.write().await.mark_started();
226
227            let (min_size, max_size) = self.config.message_size_range();
228            let generator = MessageGenerator::with_content_type(
229                min_size,
230                max_size,
231                self.config.message_content,
232            );
233
234            let duration = Duration::from_secs(self.config.duration_secs);
235            let start = Instant::now();
236
237            let mut tasks = vec![];
238
239            for i in 0..self.config.concurrency {
240                let metrics = metrics.clone();
241                let config = self.config.clone();
242                let generator = generator.clone();
243
244                let task = tokio::spawn(async move {
245                    while start.elapsed() < duration {
246                        if i % 2 == 0 {
247                            // SMTP
248                            let message = generator.generate();
249                            let request_start = Instant::now();
250
251                            match SmtpClient::send_message(
252                                &config.target_host,
253                                config.target_port,
254                                &message,
255                            )
256                            .await
257                            {
258                                Ok(bytes_received) => {
259                                    let latency = request_start.elapsed();
260                                    metrics.write().await.record_success(
261                                        latency,
262                                        message.len(),
263                                        bytes_received,
264                                    );
265                                }
266                                Err(e) => {
267                                    metrics.write().await.record_failure(e.to_string());
268                                }
269                            }
270                        } else {
271                            // IMAP
272                            let request_start = Instant::now();
273
274                            match ImapClient::fetch_messages(
275                                &config.target_host,
276                                config.target_port + 100,
277                            )
278                            .await
279                            {
280                                Ok(bytes_received) => {
281                                    let latency = request_start.elapsed();
282                                    metrics.write().await.record_success(
283                                        latency,
284                                        0,
285                                        bytes_received,
286                                    );
287                                }
288                                Err(e) => {
289                                    metrics.write().await.record_failure(e.to_string());
290                                }
291                            }
292                        }
293
294                        sleep(Duration::from_millis(100)).await;
295                    }
296                });
297
298                tasks.push(task);
299            }
300
301            for task in tasks {
302                let _ = task.await;
303            }
304
305            metrics.write().await.mark_completed();
306            Ok(())
307        })
308    }
309}
310
311/// Sustained load scenario
312pub struct SustainedLoadScenario {
313    config: LoadTestConfig,
314}
315
316impl SustainedLoadScenario {
317    pub fn new(config: LoadTestConfig) -> Self {
318        Self { config }
319    }
320}
321
322impl LoadTestScenario for SustainedLoadScenario {
323    fn run(
324        &self,
325        metrics: Arc<RwLock<LoadTestMetrics>>,
326    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
327        Box::pin(async move {
328            metrics.write().await.mark_started();
329
330            let (min_size, max_size) = self.config.message_size_range();
331            let generator = MessageGenerator::with_content_type(
332                min_size,
333                max_size,
334                self.config.message_content,
335            );
336
337            let duration = Duration::from_secs(self.config.duration_secs);
338            let start = Instant::now();
339
340            let mut tasks = vec![];
341
342            for _ in 0..self.config.concurrency {
343                let metrics = metrics.clone();
344                let config = self.config.clone();
345                let generator = generator.clone();
346
347                let task = tokio::spawn(async move {
348                    while start.elapsed() < duration {
349                        let message = generator.generate();
350                        let request_start = Instant::now();
351
352                        match SmtpClient::send_message(
353                            &config.target_host,
354                            config.target_port,
355                            &message,
356                        )
357                        .await
358                        {
359                            Ok(bytes_received) => {
360                                let latency = request_start.elapsed();
361                                metrics.write().await.record_success(
362                                    latency,
363                                    message.len(),
364                                    bytes_received,
365                                );
366                            }
367                            Err(e) => {
368                                metrics.write().await.record_failure(e.to_string());
369                            }
370                        }
371
372                        // Constant rate
373                        let delay = Duration::from_millis(1000);
374                        sleep(delay).await;
375                    }
376                });
377
378                tasks.push(task);
379            }
380
381            for task in tasks {
382                let _ = task.await;
383            }
384
385            metrics.write().await.mark_completed();
386            Ok(())
387        })
388    }
389}
390
391pub type ScenarioRunner = Box<dyn LoadTestScenario + Send + Sync>;
392
393#[cfg(test)]
394mod tests {
395    use super::*;
396
397    #[test]
398    fn test_scenario_creation() {
399        let config = LoadTestConfig::default();
400        let _runner = ScenarioType::SmtpThroughput.create_runner(&config);
401        let _runner = ScenarioType::ConcurrentConnections.create_runner(&config);
402        let _runner = ScenarioType::MixedProtocol.create_runner(&config);
403        let _runner = ScenarioType::SustainedLoad.create_runner(&config);
404    }
405}