use rand::Rng;
use std::time::{self, Duration};
use crate::goose::{GooseUser, GooseUserCommand, Scenario, TransactionFunction};
use crate::logger::GooseLog;
use crate::metrics::{GooseMetric, ScenarioMetric, TransactionMetric};
pub(crate) async fn user_main(
thread_number: usize,
thread_scenario: Scenario,
mut thread_user: GooseUser,
thread_receiver: flume::Receiver<GooseUserCommand>,
) {
info!(
"launching user {} from {}...",
thread_number, thread_scenario.name
);
if !thread_scenario.weighted_on_start_transactions.is_empty() {
for (thread_transaction_index, thread_transaction_name) in
&thread_scenario.weighted_on_start_transactions
{
let function = &thread_scenario.transactions[*thread_transaction_index].function;
debug!(
"[user {}]: launching on_start {} transaction from {}",
thread_number, thread_transaction_name, thread_scenario.name
);
let _todo = invoke_transaction_function(
function,
&mut thread_user,
*thread_transaction_index,
thread_transaction_name,
)
.await;
}
}
if !thread_scenario.weighted_transactions.is_empty() {
'launch_transactions: loop {
thread_user.update_request_cadence(thread_number).await;
let scenario_started = time::Instant::now();
for (thread_transaction_index, thread_transaction_name) in
&thread_scenario.weighted_transactions
{
let function = &thread_scenario.transactions[*thread_transaction_index].function;
debug!(
"[user {}]: launching {} transaction from {}",
thread_number, thread_transaction_name, thread_scenario.name
);
let _todo = invoke_transaction_function(
function,
&mut thread_user,
*thread_transaction_index,
thread_transaction_name,
)
.await;
if received_exit(&thread_receiver) {
break 'launch_transactions;
}
if let Some((min, max)) = thread_scenario.transaction_wait {
let mut wait_time = rand::thread_rng().gen_range(min..=max).as_millis();
let sleep_timer = time::Instant::now();
let maximum_sleep_time = 500;
while wait_time > 0 {
if received_exit(&thread_receiver) {
break 'launch_transactions;
}
let sleep_duration = if wait_time > maximum_sleep_time {
wait_time -= maximum_sleep_time;
Duration::from_millis(maximum_sleep_time as u64)
} else {
let sleep_duration = Duration::from_millis(wait_time as u64);
wait_time = 0;
sleep_duration
};
debug!(
"user {} from {} sleeping {:?} ...",
thread_number, thread_scenario.name, sleep_duration
);
tokio::time::sleep(sleep_duration).await;
}
thread_user.slept += (time::Instant::now() - sleep_timer).as_millis() as u64;
}
}
thread_user.iterations += 1;
let _ = record_scenario(
&thread_scenario,
&thread_user,
scenario_started.elapsed().as_millis(),
)
.await;
if thread_user.config.iterations > 0
&& thread_user.iterations >= thread_user.config.iterations
{
let pluralize = if thread_user.iterations == 0 {
"iteration"
} else {
"iterations"
};
info!(
"user {} completed {} {} of {}...",
thread_number, thread_user.iterations, pluralize, thread_scenario.name,
);
if let Some(shutdown_channel) = thread_user.shutdown_channel.clone() {
let _ = shutdown_channel.send(thread_number);
}
break 'launch_transactions;
}
}
}
if !thread_scenario.weighted_on_stop_transactions.is_empty() {
for (thread_transaction_index, thread_transaction_name) in
&thread_scenario.weighted_on_stop_transactions
{
let function = &thread_scenario.transactions[*thread_transaction_index].function;
debug!(
"[user: {}]: launching on_stop {} transaction from {}",
thread_number, thread_transaction_name, thread_scenario.name
);
let _todo = invoke_transaction_function(
function,
&mut thread_user,
*thread_transaction_index,
thread_transaction_name,
)
.await;
}
}
info!(
"exiting user {} from {}...",
thread_number, thread_scenario.name
);
}
fn received_exit(thread_receiver: &flume::Receiver<GooseUserCommand>) -> bool {
let mut message = thread_receiver.try_recv();
while message.is_ok() {
match message.unwrap() {
GooseUserCommand::Exit => {
return true;
}
command => {
debug!("ignoring unexpected GooseUserCommand: {:?}", command);
}
}
message = thread_receiver.try_recv();
}
false
}
async fn record_scenario(
thread_scenario: &Scenario,
thread_user: &GooseUser,
run_time: u128,
) -> Result<(), flume::SendError<Option<GooseLog>>> {
if !thread_user.config.no_scenario_metrics && !thread_user.config.no_metrics {
let raw_scenario = ScenarioMetric::new(
thread_user.started.elapsed().as_millis(),
&thread_scenario.name,
thread_user.scenarios_index,
run_time,
thread_user.weighted_users_index,
);
if let Some(metrics_channel) = thread_user.metrics_channel.clone() {
let _ = metrics_channel.send(GooseMetric::Scenario(raw_scenario.clone()));
}
if !thread_user.config.scenario_log.is_empty() {
if let Some(logger) = thread_user.logger.as_ref() {
logger.send(Some(GooseLog::Scenario(raw_scenario)))?;
}
}
}
Ok(())
}
async fn invoke_transaction_function(
function: &TransactionFunction,
thread_user: &mut GooseUser,
thread_transaction_index: usize,
thread_transaction_name: &str,
) -> Result<(), flume::SendError<Option<GooseLog>>> {
let started = time::Instant::now();
let mut raw_transaction = TransactionMetric::new(
thread_user.started.elapsed().as_millis(),
thread_user.scenarios_index,
thread_transaction_index,
thread_transaction_name.to_string(),
thread_user.weighted_users_index,
);
if !thread_transaction_name.is_empty() {
thread_user
.transaction_name
.replace(thread_transaction_name.to_string());
} else {
thread_user.transaction_name.take();
}
let success = function(thread_user).await.is_ok();
raw_transaction.set_time(started.elapsed().as_millis(), success);
if thread_user.config.no_metrics || thread_user.config.no_transaction_metrics {
return Ok(());
}
if !thread_user.config.transaction_log.is_empty() {
if let Some(logger) = thread_user.logger.as_ref() {
logger.send(Some(GooseLog::Transaction(raw_transaction.clone())))?;
}
}
if let Some(metrics_channel) = thread_user.metrics_channel.clone() {
let _ = metrics_channel.send(GooseMetric::Transaction(raw_transaction));
}
Ok(())
}