1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
use rand::Rng;
use std::time::{self, Duration};

use crate::goose::{GooseUser, GooseUserCommand, Scenario, TransactionFunction};
use crate::logger::GooseLog;
use crate::metrics::{GooseMetric, ScenarioMetric, TransactionMetric};

pub(crate) async fn user_main(
    thread_number: usize,
    thread_scenario: Scenario,
    mut thread_user: GooseUser,
    thread_receiver: flume::Receiver<GooseUserCommand>,
) {
    info!(
        "launching user {} from {}...",
        thread_number, thread_scenario.name
    );

    // User is starting, first invoke the weighted on_start transactions.
    if !thread_scenario.weighted_on_start_transactions.is_empty() {
        // Transactions are already weighted and scheduled, execute each in order.
        for (thread_transaction_index, thread_transaction_name) in
            &thread_scenario.weighted_on_start_transactions
        {
            // Determine which transaction we're going to run next.
            let function = &thread_scenario.transactions[*thread_transaction_index].function;
            debug!(
                "[user {}]: launching on_start {} transaction from {}",
                thread_number, thread_transaction_name, thread_scenario.name
            );
            // Invoke the transaction function.
            let _todo = invoke_transaction_function(
                function,
                &mut thread_user,
                *thread_transaction_index,
                thread_transaction_name,
            )
            .await;
        }
    }

    // If normal transactions are defined, loop launching transactions until parent tells us to stop.
    if !thread_scenario.weighted_transactions.is_empty() {
        'launch_transactions: loop {
            // Tracks the time it takes to loop through all Transactions when Coordinated Omission
            // Mitigation is enabled.
            thread_user.update_request_cadence(thread_number).await;
            let scenario_started = time::Instant::now();

            for (thread_transaction_index, thread_transaction_name) in
                &thread_scenario.weighted_transactions
            {
                // Determine which transaction we're going to run next.
                let function = &thread_scenario.transactions[*thread_transaction_index].function;
                debug!(
                    "[user {}]: launching {} transaction from {}",
                    thread_number, thread_transaction_name, thread_scenario.name
                );
                // Invoke the transaction function.
                let _todo = invoke_transaction_function(
                    function,
                    &mut thread_user,
                    *thread_transaction_index,
                    thread_transaction_name,
                )
                .await;

                if received_exit(&thread_receiver) {
                    break 'launch_transactions;
                }

                // If the transaction_wait is defined, wait for a random time between transaction.
                if let Some((min, max)) = thread_scenario.transaction_wait {
                    // Total time left to wait before running the next transaction.
                    let mut wait_time = rand::thread_rng().gen_range(min..=max).as_millis();
                    // Track the time slept for Coordinated Omission Mitigation.
                    let sleep_timer = time::Instant::now();
                    // Never sleep more than 500 milliseconds, allowing a sleeping transaction to shut
                    // down quickly when the load test ends.
                    let maximum_sleep_time = 500;

                    while wait_time > 0 {
                        // Exit immediately if message received from parent.
                        if received_exit(&thread_receiver) {
                            break 'launch_transactions;
                        }

                        // Wake regularly to detect if the load test has shut down.
                        let sleep_duration = if wait_time > maximum_sleep_time {
                            wait_time -= maximum_sleep_time;
                            Duration::from_millis(maximum_sleep_time as u64)
                        } else {
                            let sleep_duration = Duration::from_millis(wait_time as u64);
                            wait_time = 0;
                            sleep_duration
                        };

                        debug!(
                            "user {} from {} sleeping {:?} ...",
                            thread_number, thread_scenario.name, sleep_duration
                        );

                        tokio::time::sleep(sleep_duration).await;
                    }
                    // Track how much time the GooseUser sleeps during this loop through all Transactions,
                    // used by Coordinated Omission Mitigation.
                    thread_user.slept += (time::Instant::now() - sleep_timer).as_millis() as u64;
                }
            }
            // Record a complete iteration running this Scenario.
            thread_user.iterations += 1;

            // Send scenario metrics to parent and logger if enabled, ignoring errors.
            let _ = record_scenario(
                &thread_scenario,
                &thread_user,
                scenario_started.elapsed().as_millis(),
            )
            .await;

            // Check if configured to exit after a certain number of iterations, and exit if
            // that number of iterations have run.
            if thread_user.config.iterations > 0
                && thread_user.iterations >= thread_user.config.iterations
            {
                // Pluralize the word "iteration" if more than one iteration completed.
                let pluralize = if thread_user.iterations == 0 {
                    "iteration"
                } else {
                    "iterations"
                };
                // Provide visual indication that a GooseUSer has completed the confifgured
                // number of iterations.
                info!(
                    "user {} completed {} {} of {}...",
                    thread_number, thread_user.iterations, pluralize, thread_scenario.name,
                );
                // Attempt to notify the parent this thread is shutting down.
                if let Some(shutdown_channel) = thread_user.shutdown_channel.clone() {
                    let _ = shutdown_channel.send(thread_number);
                }
                break 'launch_transactions;
            }
        }
    }

    // User is exiting, first invoke the weighted on_stop transactions.
    if !thread_scenario.weighted_on_stop_transactions.is_empty() {
        // Transactions are already weighted and scheduled, execute each in order.
        for (thread_transaction_index, thread_transaction_name) in
            &thread_scenario.weighted_on_stop_transactions
        {
            // Determine which transaction we're going to run next.
            let function = &thread_scenario.transactions[*thread_transaction_index].function;
            debug!(
                "[user: {}]: launching on_stop {} transaction from {}",
                thread_number, thread_transaction_name, thread_scenario.name
            );
            // Invoke the transaction function.
            let _todo = invoke_transaction_function(
                function,
                &mut thread_user,
                *thread_transaction_index,
                thread_transaction_name,
            )
            .await;
        }
    }

    // Optional debug output when exiting.
    info!(
        "exiting user {} from {}...",
        thread_number, thread_scenario.name
    );
}

// Determine if the parent has sent a GooseUserCommand::Exit message.
fn received_exit(thread_receiver: &flume::Receiver<GooseUserCommand>) -> bool {
    let mut message = thread_receiver.try_recv();
    while message.is_ok() {
        match message.unwrap() {
            // GooseUserCommand::Exit received.
            GooseUserCommand::Exit => {
                return true;
            }
            command => {
                debug!("ignoring unexpected GooseUserCommand: {:?}", command);
            }
        }
        message = thread_receiver.try_recv();
    }
    // GooseUserCommand::Exit not received.
    false
}

// Send scenario metric to parent and logger when enabled.
async fn record_scenario(
    thread_scenario: &Scenario,
    thread_user: &GooseUser,
    run_time: u128,
) -> Result<(), flume::SendError<Option<GooseLog>>> {
    if !thread_user.config.no_scenario_metrics && !thread_user.config.no_metrics {
        let raw_scenario = ScenarioMetric::new(
            thread_user.started.elapsed().as_millis(),
            &thread_scenario.name,
            thread_user.scenarios_index,
            run_time,
            thread_user.weighted_users_index,
        );
        if let Some(metrics_channel) = thread_user.metrics_channel.clone() {
            // Best effort metrics.
            let _ = metrics_channel.send(GooseMetric::Scenario(raw_scenario.clone()));
        }
        // If transaction-log is enabled, send a copy of the raw transaction metric to the logger thread.
        if !thread_user.config.scenario_log.is_empty() {
            if let Some(logger) = thread_user.logger.as_ref() {
                logger.send(Some(GooseLog::Scenario(raw_scenario)))?;
            }
        }
    }
    Ok(())
}

// Invoke the transaction function, collecting transaction metrics.
async fn invoke_transaction_function(
    function: &TransactionFunction,
    thread_user: &mut GooseUser,
    thread_transaction_index: usize,
    thread_transaction_name: &str,
) -> Result<(), flume::SendError<Option<GooseLog>>> {
    let started = time::Instant::now();
    let mut raw_transaction = TransactionMetric::new(
        thread_user.started.elapsed().as_millis(),
        thread_user.scenarios_index,
        thread_transaction_index,
        thread_transaction_name.to_string(),
        thread_user.weighted_users_index,
    );
    if !thread_transaction_name.is_empty() {
        thread_user
            .transaction_name
            .replace(thread_transaction_name.to_string());
    } else {
        thread_user.transaction_name.take();
    }

    let success = function(thread_user).await.is_ok();
    raw_transaction.set_time(started.elapsed().as_millis(), success);

    // Exit if all metrics or transaction metrics are disabled.
    if thread_user.config.no_metrics || thread_user.config.no_transaction_metrics {
        return Ok(());
    }

    // If transaction-log is enabled, send a copy of the raw transaction metric to the logger thread.
    if !thread_user.config.transaction_log.is_empty() {
        if let Some(logger) = thread_user.logger.as_ref() {
            logger.send(Some(GooseLog::Transaction(raw_transaction.clone())))?;
        }
    }

    // Otherwise send metrics to parent.
    if let Some(metrics_channel) = thread_user.metrics_channel.clone() {
        // Best effort metrics.
        let _ = metrics_channel.send(GooseMetric::Transaction(raw_transaction));
    }

    Ok(())
}