atomic_bomb_engine/core/
run_batch.rs1use tokio::sync::mpsc;
2
3use crate::core::batch;
4use crate::models::api_endpoint::ApiEndpoint;
5use crate::models::result::BatchResult;
6use crate::models::setup::SetupApiEndpoint;
7use crate::models::step_option::StepOption;
8use futures::stream::{BoxStream, StreamExt};
9
10pub async fn run_batch(
11 test_duration_secs: u64,
12 concurrent_requests: usize,
13 timeout_secs: u64,
14 cookie_store_enable: bool,
15 verbose: bool,
16 should_prevent: bool,
17 api_endpoints: Vec<ApiEndpoint>,
18 step_option: Option<StepOption>,
19 setup_options: Option<Vec<SetupApiEndpoint>>,
20 assert_channel_buffer_size: usize,
21 ema_alpha: f64,
22) -> BoxStream<'static, Result<Option<BatchResult>, anyhow::Error>> {
23 let (sender, receiver) = mpsc::channel(1024);
24
25 tokio::spawn(async move {
26 let res = batch::batch(
27 sender.clone(),
28 test_duration_secs,
29 concurrent_requests,
30 timeout_secs,
31 cookie_store_enable,
32 verbose,
33 should_prevent,
34 api_endpoints,
35 step_option,
36 setup_options,
37 assert_channel_buffer_size,
38 ema_alpha,
39 )
40 .await;
41
42 match res {
43 Ok(r) => {
44 if let Err(_) = sender.send(Some(r)).await {
45 eprintln!("压测结束,但是发送结果失败");
46 }
47 if let Err(_) = sender.send(None).await {
48 eprintln!("发送结束信号失败");
49 }
50 }
51 Err(e) => {
52 eprintln!("Error: {:?}", e.to_string());
53 }
54 }
55 });
56
57 let stream = futures::stream::unfold(receiver, |mut receiver| async move {
58 match receiver.recv().await {
59 Some(Some(batch_result)) => Some((Ok(Some(batch_result)), receiver)),
60 Some(None) => Some((Ok(None), receiver)),
61 None => None,
62 }
63 });
64
65 stream.boxed()
66}
67
68#[cfg(test)]
69mod tests {
70 use core::option::Option;
71 use serde_json::{json, Value};
72
73 use crate::models::api_endpoint::ApiEndpoint;
74 use crate::models::assert_option::AssertOption;
75 use crate::models::setup::JsonpathExtract;
76 use crate::models::setup::SetupApiEndpoint;
77 use crate::models::step_option::StepOption;
78
79 use super::*;
80
81 #[tokio::test]
82 async fn test_run_batch() {
83 let mut assert_vec: Vec<AssertOption> = Vec::new();
84 assert_vec.push(AssertOption {
85 jsonpath: "$.code".to_string(),
86 reference_object: Value::from(2000000),
87 });
88 let mut endpoints: Vec<ApiEndpoint> = Vec::new();
89 endpoints.push(ApiEndpoint {
90 name: "test-1".to_string(),
91 url: "http://127.0.0.1:8080/ran_sleep".to_string(),
92 method: "POST".to_string(),
93 weight: 100,
94 json: Some(json!({"name": "test","number": 10086})),
95 headers: None,
96 cookies: None,
97 form_data: None,
98 assert_options: Option::from(assert_vec),
99 think_time_option: None,
100 setup_options: None,
101 multipart_options: None,
102 });
103 let mut jsonpath_extracts: Vec<JsonpathExtract> = Vec::new();
104 jsonpath_extracts.push(JsonpathExtract {
105 key: "test-code".to_string(),
106 jsonpath: "$.code".to_string(),
107 });
108 jsonpath_extracts.push(JsonpathExtract {
109 key: "test-msg".to_string(),
110 jsonpath: "$.msg".to_string(),
111 });
112 let mut setup: Vec<SetupApiEndpoint> = Vec::new();
113 setup.push(SetupApiEndpoint {
114 name: "初始化-1".to_string(),
115 url: "https://ooooo.run/api/short/v1/list".to_string(),
116 method: "get".to_string(),
117 json: None,
118 form_data: None,
119 multipart_options: None,
120 headers: None,
121 cookies: None,
122 jsonpath_extract: Some(jsonpath_extracts),
123 });
124
125 let mut batch_stream = run_batch(
126 30,
127 5,
128 10,
129 true,
130 false,
131 true,
132 endpoints,
133 Option::from(StepOption {
134 increase_step: 1,
135 increase_interval: 2,
136 }),
137 None,
138 4096,
139 0f64,
140 )
141 .await;
142
143 while let Some(result) = batch_stream.next().await {
144 match result {
145 Ok(Some(batch_result)) => {
146 println!("Received batch result: {:?}", batch_result);
147 }
148 Ok(None) => {
149 println!("No more results.");
150 break;
151 }
152 Err(e) => {
153 println!("Error: {:?}", e);
154 }
155 }
156 }
157 }
158}