rumeter_component/
group.rs

1
2use std::{time::Duration, sync::{Arc, Mutex}};
3
4use crate::{record::RecordData, Output, Controller};
5use tracing::*;
6
7
8#[derive(Debug, Clone, Copy)]
9pub struct ThreadGroup {
10    thread_num: u32,
11    rampup: Duration,
12    loop_num: i32,
13    duration: Option<Duration>,
14}
15
16impl ThreadGroup {
17    pub fn new(thread_num: u32, rampup: Duration, loop_num: i32, duration: Option<Duration>) -> Self {
18        Self { thread_num, rampup, loop_num, duration}
19    }
20
21    pub async fn start<C>(self: &Self, controller: C, out: Arc<Mutex<impl Output+Send + 'static>>)
22    where
23        C: Controller + Send + Sync + Clone + 'static,
24    {
25        let (_test_record_tx, mut test_record_rx) = tokio::sync::mpsc::channel::<Vec<RecordData>>(self.thread_num.try_into().unwrap());
26        let it = self.clone().rampup / self.thread_num;
27        let thread_count = Arc::new(Mutex::new(0i32));
28        let (tx, _rx) = tokio::sync::broadcast::channel::<bool>(1);
29        match self.duration {
30            Some(d) => {
31                
32                for t in 1..=self.thread_num {
33                    let thread_count = Arc::clone(&thread_count);
34                    let test_record_tx = _test_record_tx.clone();
35                    let ctrl = controller.clone();
36                    let mut receiver = tx.subscribe();
37                    
38                    tokio::spawn(async move {
39                        {
40                            let mut tc = thread_count.lock().unwrap();
41                            *tc += 1;
42                        }
43                        loop {
44                            let mut re_vec = ctrl.run().await;
45                            {
46                                let tc = thread_count.lock().unwrap();
47                                for re in &mut re_vec {
48                                    re.grp_threads((*tc).clone() as u32);
49                                    re.all_threads((*tc).clone() as u32);
50                                    re.thread_name(format!("Thread Group 1-{}", &t));
51                                }
52                            }
53                            _ = test_record_tx.send(re_vec).await;
54                            match receiver.try_recv() {
55                                Ok(_) => {
56                                    info!("terminating thread-{}", &t);
57                                    break;
58                                },
59                                Err(_) => {},
60                            }
61                        }
62                        {
63                            let mut tc = thread_count.lock().unwrap();
64                            *tc -= 1;
65                        }
66                        
67                    });
68                    
69                    tokio::time::sleep(it).await;
70                }
71                let task1 = tokio::spawn(async move {
72                    tokio::time::sleep(d).await;
73                    tx.send(true).unwrap();
74                });
75                let task2 = tokio::spawn(async move {
76                    
77                    while let Some(re_vec) = test_record_rx.recv().await {
78                        for re in re_vec {
79                            (*out).lock().unwrap().write(re);
80                        }
81                    }
82                });
83                
84                drop(_test_record_tx);
85                _ = tokio::join!(task1, task2);
86                
87            },
88            None => {
89                for t in 1..=self.thread_num {
90                    let thread_count = Arc::clone(&thread_count);
91                    // let mut receiver = tx.subscribe();
92                    let test_record_tx = _test_record_tx.clone();
93                    let ctrl = controller.clone();
94                    let loop_num = self.loop_num;
95                    tokio::spawn(async move{
96                        {
97                            let mut tc = thread_count.lock().unwrap();
98                            *tc += 1;
99                        }
100                        for _count in 0..loop_num {
101                            let mut re_vec = ctrl.run().await;
102                            {
103                                let tc = thread_count.lock().unwrap();
104                                for re in &mut re_vec {
105                                    re.grp_threads((*tc).clone() as u32);
106                                    re.all_threads((*tc).clone() as u32);
107                                    re.thread_name(format!("Thread Group 1-{}", &t));
108                                    
109                                }
110                                
111                            }
112
113                            _ = test_record_tx.send(re_vec).await;
114                            
115                        }
116                        {
117                            let mut tc = thread_count.lock().unwrap();
118                            *tc -= 1;
119                        }
120
121                    });
122                    tokio::time::sleep(it).await;
123                }
124
125                drop(_test_record_tx);
126                while let Some(re_vec) = test_record_rx.recv().await {
127                    for re in re_vec {
128                        (*out).lock().unwrap().write(re);
129                    }
130                }
131
132            },
133
134            
135        }
136    }
137
138}