rumeter_component/
group.rs1
2use std::{time::Duration, sync::{Arc, Mutex}};
3
4use crate::{record::RecordData, Output, Controller};
5use tracing::*;
6
7
8#[derive(Debug, Clone, Copy)]
9pub struct ThreadGroup {
10 thread_num: u32,
11 rampup: Duration,
12 loop_num: i32,
13 duration: Option<Duration>,
14}
15
16impl ThreadGroup {
17 pub fn new(thread_num: u32, rampup: Duration, loop_num: i32, duration: Option<Duration>) -> Self {
18 Self { thread_num, rampup, loop_num, duration}
19 }
20
21 pub async fn start<C>(self: &Self, controller: C, out: Arc<Mutex<impl Output+Send + 'static>>)
22 where
23 C: Controller + Send + Sync + Clone + 'static,
24 {
25 let (_test_record_tx, mut test_record_rx) = tokio::sync::mpsc::channel::<Vec<RecordData>>(self.thread_num.try_into().unwrap());
26 let it = self.clone().rampup / self.thread_num;
27 let thread_count = Arc::new(Mutex::new(0i32));
28 let (tx, _rx) = tokio::sync::broadcast::channel::<bool>(1);
29 match self.duration {
30 Some(d) => {
31
32 for t in 1..=self.thread_num {
33 let thread_count = Arc::clone(&thread_count);
34 let test_record_tx = _test_record_tx.clone();
35 let ctrl = controller.clone();
36 let mut receiver = tx.subscribe();
37
38 tokio::spawn(async move {
39 {
40 let mut tc = thread_count.lock().unwrap();
41 *tc += 1;
42 }
43 loop {
44 let mut re_vec = ctrl.run().await;
45 {
46 let tc = thread_count.lock().unwrap();
47 for re in &mut re_vec {
48 re.grp_threads((*tc).clone() as u32);
49 re.all_threads((*tc).clone() as u32);
50 re.thread_name(format!("Thread Group 1-{}", &t));
51 }
52 }
53 _ = test_record_tx.send(re_vec).await;
54 match receiver.try_recv() {
55 Ok(_) => {
56 info!("terminating thread-{}", &t);
57 break;
58 },
59 Err(_) => {},
60 }
61 }
62 {
63 let mut tc = thread_count.lock().unwrap();
64 *tc -= 1;
65 }
66
67 });
68
69 tokio::time::sleep(it).await;
70 }
71 let task1 = tokio::spawn(async move {
72 tokio::time::sleep(d).await;
73 tx.send(true).unwrap();
74 });
75 let task2 = tokio::spawn(async move {
76
77 while let Some(re_vec) = test_record_rx.recv().await {
78 for re in re_vec {
79 (*out).lock().unwrap().write(re);
80 }
81 }
82 });
83
84 drop(_test_record_tx);
85 _ = tokio::join!(task1, task2);
86
87 },
88 None => {
89 for t in 1..=self.thread_num {
90 let thread_count = Arc::clone(&thread_count);
91 let test_record_tx = _test_record_tx.clone();
93 let ctrl = controller.clone();
94 let loop_num = self.loop_num;
95 tokio::spawn(async move{
96 {
97 let mut tc = thread_count.lock().unwrap();
98 *tc += 1;
99 }
100 for _count in 0..loop_num {
101 let mut re_vec = ctrl.run().await;
102 {
103 let tc = thread_count.lock().unwrap();
104 for re in &mut re_vec {
105 re.grp_threads((*tc).clone() as u32);
106 re.all_threads((*tc).clone() as u32);
107 re.thread_name(format!("Thread Group 1-{}", &t));
108
109 }
110
111 }
112
113 _ = test_record_tx.send(re_vec).await;
114
115 }
116 {
117 let mut tc = thread_count.lock().unwrap();
118 *tc -= 1;
119 }
120
121 });
122 tokio::time::sleep(it).await;
123 }
124
125 drop(_test_record_tx);
126 while let Some(re_vec) = test_record_rx.recv().await {
127 for re in re_vec {
128 (*out).lock().unwrap().write(re);
129 }
130 }
131
132 },
133
134
135 }
136 }
137
138}