atomic_bomb_engine/core/
run_batch.rs

1use tokio::sync::mpsc;
2
3use crate::core::batch;
4use crate::models::api_endpoint::ApiEndpoint;
5use crate::models::result::BatchResult;
6use crate::models::setup::SetupApiEndpoint;
7use crate::models::step_option::StepOption;
8use futures::stream::{BoxStream, StreamExt};
9
10pub async fn run_batch(
11    test_duration_secs: u64,
12    concurrent_requests: usize,
13    timeout_secs: u64,
14    cookie_store_enable: bool,
15    verbose: bool,
16    should_prevent: bool,
17    api_endpoints: Vec<ApiEndpoint>,
18    step_option: Option<StepOption>,
19    setup_options: Option<Vec<SetupApiEndpoint>>,
20    assert_channel_buffer_size: usize,
21    ema_alpha: f64,
22) -> BoxStream<'static, Result<Option<BatchResult>, anyhow::Error>> {
23    let (sender, receiver) = mpsc::channel(1024);
24
25    tokio::spawn(async move {
26        let res = batch::batch(
27            sender.clone(),
28            test_duration_secs,
29            concurrent_requests,
30            timeout_secs,
31            cookie_store_enable,
32            verbose,
33            should_prevent,
34            api_endpoints,
35            step_option,
36            setup_options,
37            assert_channel_buffer_size,
38            ema_alpha,
39        )
40        .await;
41
42        match res {
43            Ok(r) => {
44                if let Err(_) = sender.send(Some(r)).await {
45                    eprintln!("压测结束,但是发送结果失败");
46                }
47                if let Err(_) = sender.send(None).await {
48                    eprintln!("发送结束信号失败");
49                }
50            }
51            Err(e) => {
52                eprintln!("Error: {:?}", e.to_string());
53            }
54        }
55    });
56
57    let stream = futures::stream::unfold(receiver, |mut receiver| async move {
58        match receiver.recv().await {
59            Some(Some(batch_result)) => Some((Ok(Some(batch_result)), receiver)),
60            Some(None) => Some((Ok(None), receiver)),
61            None => None,
62        }
63    });
64
65    stream.boxed()
66}
67
68#[cfg(test)]
69mod tests {
70    use core::option::Option;
71    use serde_json::{json, Value};
72
73    use crate::models::api_endpoint::ApiEndpoint;
74    use crate::models::assert_option::AssertOption;
75    use crate::models::setup::JsonpathExtract;
76    use crate::models::setup::SetupApiEndpoint;
77    use crate::models::step_option::StepOption;
78
79    use super::*;
80
81    #[tokio::test]
82    async fn test_run_batch() {
83        let mut assert_vec: Vec<AssertOption> = Vec::new();
84        assert_vec.push(AssertOption {
85            jsonpath: "$.code".to_string(),
86            reference_object: Value::from(2000000),
87        });
88        let mut endpoints: Vec<ApiEndpoint> = Vec::new();
89        endpoints.push(ApiEndpoint {
90            name: "test-1".to_string(),
91            url: "http://127.0.0.1:8080/ran_sleep".to_string(),
92            method: "POST".to_string(),
93            weight: 100,
94            json: Some(json!({"name": "test","number": 10086})),
95            headers: None,
96            cookies: None,
97            form_data: None,
98            assert_options: Option::from(assert_vec),
99            think_time_option: None,
100            setup_options: None,
101            multipart_options: None,
102        });
103        let mut jsonpath_extracts: Vec<JsonpathExtract> = Vec::new();
104        jsonpath_extracts.push(JsonpathExtract {
105            key: "test-code".to_string(),
106            jsonpath: "$.code".to_string(),
107        });
108        jsonpath_extracts.push(JsonpathExtract {
109            key: "test-msg".to_string(),
110            jsonpath: "$.msg".to_string(),
111        });
112        let mut setup: Vec<SetupApiEndpoint> = Vec::new();
113        setup.push(SetupApiEndpoint {
114            name: "初始化-1".to_string(),
115            url: "https://ooooo.run/api/short/v1/list".to_string(),
116            method: "get".to_string(),
117            json: None,
118            form_data: None,
119            multipart_options: None,
120            headers: None,
121            cookies: None,
122            jsonpath_extract: Some(jsonpath_extracts),
123        });
124
125        let mut batch_stream = run_batch(
126            30,
127            5,
128            10,
129            true,
130            false,
131            true,
132            endpoints,
133            Option::from(StepOption {
134                increase_step: 1,
135                increase_interval: 2,
136            }),
137            None,
138            4096,
139            0f64,
140        )
141        .await;
142
143        while let Some(result) = batch_stream.next().await {
144            match result {
145                Ok(Some(batch_result)) => {
146                    println!("Received batch result: {:?}", batch_result);
147                }
148                Ok(None) => {
149                    println!("No more results.");
150                    break;
151                }
152                Err(e) => {
153                    println!("Error: {:?}", e);
154                }
155            }
156        }
157    }
158}