1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
use std::collections::HashMap;
use std::sync::mpsc;

use rand::thread_rng;
use rand::seq::SliceRandom;
use rand::Rng;
use std::{thread, time};

use crate::goose::{GooseTaskSet, GooseClient, GooseClientMode, GooseClientCommand};

pub fn client_main(
    thread_number: usize,
    thread_task_set: GooseTaskSet,
    mut thread_client: GooseClient,
    thread_receiver: mpsc::Receiver<GooseClientCommand>,
    thread_sender: mpsc::Sender<GooseClient>,
) {
    info!("launching client {} from {}...", thread_number, thread_task_set.name);
    // Notify parent that our run mode has changed to Running.
    thread_client.set_mode(GooseClientMode::RUNNING);
    thread_sender.send(thread_client.clone()).unwrap();

    // Client is starting, first invoke the weighted on_start tasks.
    if thread_client.weighted_on_start_tasks.len() > 0 {
        for mut sequence in thread_client.weighted_on_start_tasks.clone() {
            if sequence.len() > 1 {
                sequence.shuffle(&mut thread_rng());
            }
            for task_index in &sequence {
                // Determine which task we're going to run next.
                let thread_task_name = &thread_task_set.tasks[*task_index].name;
                let function = thread_task_set.tasks[*task_index].function;
                debug!("launching on_start {} task from {}", thread_task_name, thread_task_set.name);
                if thread_task_name != "" {
                    thread_client.task_request_name = Some(thread_task_name.to_string());
                }
                // Invoke the task function.
                function(&mut thread_client);
            }
        }
    }

    // Repeatedly loop through all available tasks in a random order.
    let mut thread_continue = true;
    while thread_continue {
        // Weighted_tasks is divided into buckets of tasks sorted by sequence, and then all non-sequenced tasks.
        if thread_client.weighted_tasks[thread_client.weighted_bucket].len() <= thread_client.weighted_bucket_position {
            // This bucket is exhausted, move on to position 0 of the next bucket.
            thread_client.weighted_bucket_position = 0;
            thread_client.weighted_bucket += 1;
            if thread_client.weighted_tasks.len() <= thread_client.weighted_bucket {
                thread_client.weighted_bucket = 0;
            }
            // Shuffle new bucket before we walk through the tasks.
            thread_client.weighted_tasks[thread_client.weighted_bucket].shuffle(&mut thread_rng());
            debug!("re-shuffled {} tasks: {:?}", &thread_task_set.name, thread_client.weighted_tasks[thread_client.weighted_bucket]);
        }

        // Determine which task we're going to run next.
        let thread_weighted_task = thread_client.weighted_tasks[thread_client.weighted_bucket][thread_client.weighted_bucket_position];
        let thread_task_name = &thread_task_set.tasks[thread_weighted_task].name;
        let function = thread_task_set.tasks[thread_weighted_task].function;
        debug!("launching {} task from {}", thread_task_name, thread_task_set.name);
        // If task name is set, it will be used for storing request statistics instead of the raw url.
        if thread_task_name != "" {
            thread_client.task_request_name = Some(thread_task_name.to_string());
        }
        // Invoke the task function.
        function(&mut thread_client);

        // Check if the parent thread has sent us any messages.
        let mut message = thread_receiver.try_recv();
        while message.is_ok() {
            match message.unwrap() {
                // Sync our state to the parent.
                GooseClientCommand::SYNC => {
                    thread_sender.send(thread_client.clone()).unwrap();
                    // Reset per-thread counters, as totals have been sent to the parent
                    thread_client.requests = HashMap::new();
                },
                // Sync our state to the parent and then exit.
                GooseClientCommand::EXIT => {
                    thread_client.set_mode(GooseClientMode::EXITING);
                    // No need to reset per-thread counters, we're exiting and memory will be freed
                    thread_continue = false;
                }
            }
            message = thread_receiver.try_recv();
        }

        if thread_continue && thread_client.min_wait > 0 {
            let wait_time = rand::thread_rng().gen_range(thread_client.min_wait, thread_client.max_wait);
            let sleep_duration = time::Duration::from_secs(wait_time as u64);
            debug!("client {} from {} sleeping {:?} seconds...", thread_number, thread_task_set.name, sleep_duration);
            thread::sleep(sleep_duration);
        }

        // Move to the next task in thread_client.weighted_tasks.
        thread_client.weighted_bucket_position += 1;
    }

    // Client is exiting, first invoke the weighted on_stop tasks.
    if thread_client.weighted_on_stop_tasks.len() > 0 {
        for mut sequence in thread_client.weighted_on_stop_tasks.clone() {
            if sequence.len() > 1 {
                sequence.shuffle(&mut thread_rng());
            }
            for task_index in &sequence {
                // Determine which task we're going to run next.
                let thread_task_name = &thread_task_set.tasks[*task_index].name;
                let function = thread_task_set.tasks[*task_index].function;
                debug!("launching on_stop {} task from {}", thread_task_name, thread_task_set.name);
                if thread_task_name != "" {
                    thread_client.task_request_name = Some(thread_task_name.to_string());
                }
                // Invoke the task function.
                function(&mut thread_client);
            }
        }
    }

    // Do our final sync before we exit.
    thread_sender.send(thread_client.clone()).unwrap();
}