#[macro_use]
extern crate log;
extern crate structopt;
mod client;
pub mod goose;
mod stats;
mod util;
use std::collections::{BTreeMap, HashMap};
use std::f32;
use std::fs::File;
use std::path::PathBuf;
use std::sync::{Arc, mpsc};
use std::sync::atomic::{AtomicBool, Ordering};
use std::{thread, time};
use rand::thread_rng;
use rand::seq::SliceRandom;
use simplelog::*;
use structopt::StructOpt;
use url::Url;
use goose::{GooseTaskSets, GooseTaskSet, GooseTask, GooseClient, GooseClientMode, GooseClientCommand, GooseRequest};
#[derive(Debug, Clone)]
pub struct GooseState {
configuration: Configuration,
number_of_cpus: usize,
run_time: usize,
clients: usize,
active_clients: usize,
}
impl GooseState {
fn new(configuration: Configuration) -> GooseState {
GooseState {
configuration: configuration,
number_of_cpus: num_cpus::get(),
run_time: 0,
clients: 0,
active_clients: 0,
}
}
}
#[derive(StructOpt, Debug, Clone)]
#[structopt(name = "client")]
pub struct Configuration {
#[structopt(short = "H", long, required=false, default_value="")]
host: String,
#[structopt(short, long)]
clients: Option<usize>,
#[structopt(short = "r", long)]
hatch_rate: Option<usize>,
#[structopt(short = "t", long, required=false, default_value="")]
run_time: String,
#[structopt(long)]
print_stats: bool,
#[structopt(long)]
status_codes: bool,
#[structopt(long)]
only_summary: bool,
#[structopt(long)]
reset_stats: bool,
#[structopt(short, long)]
list: bool,
#[structopt(short = "v", long, parse(from_occurrences))]
verbose: u8,
#[structopt(short = "g", long, parse(from_occurrences))]
log_level: u8,
#[structopt(long, default_value="goose.log")]
log_file: String,
}
fn weight_task_set_clients(task_sets: &GooseTaskSets, clients: usize, state: &GooseState) -> Vec<GooseClient> {
trace!("weight_task_set_clients");
let mut u: usize = 0;
let mut v: usize;
for task_set in &task_sets.task_sets {
if u == 0 {
u = task_set.weight;
}
else {
v = task_set.weight;
trace!("calculating greatest common denominator of {} and {}", u, v);
u = util::gcd(u, v);
trace!("inner gcd: {}", u);
}
}
debug!("gcd: {}", u);
let mut weighted_task_sets = Vec::new();
for (index, task_set) in task_sets.task_sets.iter().enumerate() {
let weight = task_set.weight / u;
trace!("{}: {} has weight of {} (reduced with gcd to {})", index, task_set.name, task_set.weight, weight);
let mut weighted_sets = vec![index; weight];
weighted_task_sets.append(&mut weighted_sets);
}
weighted_task_sets.shuffle(&mut thread_rng());
let mut weighted_clients = Vec::new();
let mut client_count = 0;
let config = state.configuration.clone();
loop {
for task_sets_index in &weighted_task_sets {
let task_set_host = task_sets.task_sets[*task_sets_index].host.clone();
weighted_clients.push(GooseClient::new(
task_sets.task_sets[*task_sets_index].task_sets_index,
task_set_host,
task_sets.task_sets[*task_sets_index].min_wait,
task_sets.task_sets[*task_sets_index].max_wait,
&config
));
client_count += 1;
if client_count >= clients {
trace!("created {} weighted_clients", client_count);
return weighted_clients;
}
}
}
}
fn weight_tasks(task_set: &GooseTaskSet) -> (Vec<Vec<usize>>, Vec<Vec<usize>>, Vec<Vec<usize>>) {
trace!("weight_tasks for {}", task_set.name);
let mut sequenced_tasks: BTreeMap <usize, Vec<GooseTask>> = BTreeMap::new();
let mut sequenced_on_start_tasks: BTreeMap <usize, Vec<GooseTask>> = BTreeMap::new();
let mut sequenced_on_stop_tasks: BTreeMap <usize, Vec<GooseTask>> = BTreeMap::new();
let mut unsequenced_tasks: Vec<GooseTask> = Vec::new();
let mut unsequenced_on_start_tasks: Vec<GooseTask> = Vec::new();
let mut unsequenced_on_stop_tasks: Vec<GooseTask> = Vec::new();
let mut u: usize = 0;
let mut v: usize;
for task in &task_set.tasks {
if task.sequence > 0 {
if task.on_start {
if let Some(sequence) = sequenced_on_start_tasks.get_mut(&task.sequence) {
sequence.push(task.clone());
}
else {
sequenced_on_start_tasks.insert(task.sequence, vec![task.clone()]);
}
}
if task.on_stop {
if let Some(sequence) = sequenced_on_stop_tasks.get_mut(&task.sequence) {
sequence.push(task.clone());
}
else {
sequenced_on_stop_tasks.insert(task.sequence, vec![task.clone()]);
}
}
if !task.on_start && !task.on_stop {
if let Some(sequence) = sequenced_tasks.get_mut(&task.sequence) {
sequence.push(task.clone());
}
else {
sequenced_tasks.insert(task.sequence, vec![task.clone()]);
}
}
}
else {
if task.on_start {
unsequenced_on_start_tasks.push(task.clone());
}
if task.on_stop {
unsequenced_on_stop_tasks.push(task.clone());
}
if !task.on_start && !task.on_stop {
unsequenced_tasks.push(task.clone());
}
}
if u == 0 {
u = task.weight;
}
else {
v = task.weight;
trace!("calculating greatest common denominator of {} and {}", u, v);
u = util::gcd(u, v);
trace!("inner gcd: {}", u);
}
}
debug!("gcd: {}", u);
let mut weighted_tasks: Vec<Vec<usize>> = Vec::new();
for (_sequence, tasks) in sequenced_tasks.iter() {
let mut sequence_weighted_tasks = Vec::new();
for task in tasks {
let weight = task.weight / u;
trace!("{}: {} has weight of {} (reduced with gcd to {})", task.tasks_index, task.name, task.weight, weight);
let mut tasks = vec![task.tasks_index; weight];
sequence_weighted_tasks.append(&mut tasks);
}
weighted_tasks.push(sequence_weighted_tasks);
}
trace!("created weighted_tasks: {:?}", weighted_tasks);
let mut weighted_unsequenced_tasks = Vec::new();
for task in unsequenced_tasks {
let weight = task.weight / u;
trace!("{}: {} has weight of {} (reduced with gcd to {})", task.tasks_index, task.name, task.weight, weight);
let mut tasks = vec![task.tasks_index; weight];
weighted_unsequenced_tasks.append(&mut tasks);
}
weighted_tasks.push(weighted_unsequenced_tasks);
let mut weighted_on_start_tasks: Vec<Vec<usize>> = Vec::new();
for (_sequence, tasks) in sequenced_on_start_tasks.iter() {
let mut sequence_on_start_weighted_tasks = Vec::new();
for task in tasks {
let weight = task.weight / u;
trace!("{}: {} has weight of {} (reduced with gcd to {})", task.tasks_index, task.name, task.weight, weight);
let mut tasks = vec![task.tasks_index; weight];
sequence_on_start_weighted_tasks.append(&mut tasks);
}
weighted_on_start_tasks.push(sequence_on_start_weighted_tasks);
}
trace!("created weighted_on_start_tasks: {:?}", weighted_tasks);
let mut weighted_on_start_unsequenced_tasks = Vec::new();
for task in unsequenced_on_start_tasks {
let weight = task.weight / u;
trace!("{}: {} has weight of {} (reduced with gcd to {})", task.tasks_index, task.name, task.weight, weight);
let mut tasks = vec![task.tasks_index; weight];
weighted_on_start_unsequenced_tasks.append(&mut tasks);
}
weighted_on_start_tasks.push(weighted_on_start_unsequenced_tasks);
let mut weighted_on_stop_tasks: Vec<Vec<usize>> = Vec::new();
for (_sequence, tasks) in sequenced_on_stop_tasks.iter() {
let mut sequence_on_stop_weighted_tasks = Vec::new();
for task in tasks {
let weight = task.weight / u;
trace!("{}: {} has weight of {} (reduced with gcd to {})", task.tasks_index, task.name, task.weight, weight);
let mut tasks = vec![task.tasks_index; weight];
sequence_on_stop_weighted_tasks.append(&mut tasks);
}
weighted_on_stop_tasks.push(sequence_on_stop_weighted_tasks);
}
trace!("created weighted_on_stop_tasks: {:?}", weighted_tasks);
let mut weighted_on_stop_unsequenced_tasks = Vec::new();
for task in unsequenced_on_stop_tasks {
let weight = task.weight / u;
trace!("{}: {} has weight of {} (reduced with gcd to {})", task.tasks_index, task.name, task.weight, weight);
let mut tasks = vec![task.tasks_index; weight];
weighted_on_stop_unsequenced_tasks.append(&mut tasks);
}
weighted_on_stop_tasks.push(weighted_on_stop_unsequenced_tasks);
(weighted_on_start_tasks, weighted_tasks, weighted_on_stop_tasks)
}
fn is_valid_host(host: &str) -> bool {
match Url::parse(host) {
Ok(_) => true,
Err(e) => {
error!("invalid host '{}': {}", host, e);
std::process::exit(1);
}
}
}
fn timer_expired(started: time::Instant, run_time: usize) -> bool {
if run_time > 0 && started.elapsed().as_secs() >= run_time as u64 {
true
}
else {
false
}
}
fn merge_from_client(
parent_request: &GooseRequest,
client_request: &GooseRequest,
config: &Configuration,
) -> GooseRequest {
let mut merged_request = parent_request.clone();
merged_request.response_times.extend_from_slice(&client_request.response_times);
merged_request.success_count += &client_request.success_count;
merged_request.fail_count += &client_request.fail_count;
if config.status_codes {
for (status_code, count) in &client_request.status_code_counts {
let new_count;
if let Some(existing_status_code_count) = merged_request.status_code_counts.get(&status_code) {
new_count = *existing_status_code_count + *count;
}
else {
new_count = *count;
}
merged_request.status_code_counts.insert(*status_code, new_count);
}
}
merged_request
}
pub fn goose_init() -> GooseState {
let mut goose_state = GooseState::new(Configuration::from_args());
let debug_level;
match goose_state.configuration.verbose {
0 => debug_level = LevelFilter::Warn,
1 => debug_level = LevelFilter::Info,
2 => debug_level = LevelFilter::Debug,
_ => debug_level = LevelFilter::Trace,
}
let log_level;
match goose_state.configuration.log_level {
0 => log_level = LevelFilter::Info,
1 => log_level = LevelFilter::Debug,
_ => log_level = LevelFilter::Trace,
}
let log_file = PathBuf::from(&goose_state.configuration.log_file);
CombinedLogger::init(vec![
TermLogger::new(
debug_level,
Config::default(),
TerminalMode::Mixed).unwrap(),
WriteLogger::new(
log_level,
Config::default(),
File::create(&log_file).unwrap(),
)]).unwrap();
info!("Output verbosity level: {}", debug_level);
info!("Logfile verbosity level: {}", log_level);
info!("Writing to log file: {}", log_file.display());
if goose_state.configuration.status_codes && !goose_state.configuration.print_stats {
error!("You must enable --print-stats to enable --status-codes.");
std::process::exit(1);
}
if goose_state.configuration.only_summary && !goose_state.configuration.print_stats {
error!("You must enable --print-stats to enable --only-summary.");
std::process::exit(1);
}
if goose_state.configuration.run_time != "" {
goose_state.run_time = util::parse_timespan(&goose_state.configuration.run_time);
}
else {
goose_state.run_time = 0;
}
info!("run_time = {}", goose_state.run_time);
goose_state.clients = match goose_state.configuration.clients {
Some(c) => {
if c == 0 {
error!("At least 1 client is required.");
std::process::exit(1);
}
else {
c
}
}
None => {
let c = goose_state.number_of_cpus;
info!("concurrent clients defaulted to {} (number of CPUs)", c);
c
}
};
debug!("clients = {}", goose_state.clients);
goose_state
}
pub fn goose_launch(mut goose_state: GooseState, mut goose_task_sets: GooseTaskSets) {
if goose_task_sets.task_sets.len() <= 0 {
error!("No task sets defined in goosefile.");
std::process::exit(1);
}
if goose_state.configuration.list {
println!("Available tasks:");
for task_set in goose_task_sets.task_sets {
println!(" - {} (weight: {})", task_set.name, task_set.weight);
for task in task_set.tasks {
println!(" o {} (weight: {})", task.name, task.weight);
}
}
std::process::exit(0);
}
let hatch_rate = match goose_state.configuration.hatch_rate {
Some(h) => {
if h == 0 {
error!("The hatch_rate must be greater than 0, and generally should be no more than 100 * NUM_CORES.");
std::process::exit(1);
}
else {
h
}
}
None => {
let h = goose_state.number_of_cpus;
info!("hatch_rate defaulted to {} (number of CPUs)", h);
h
}
};
debug!("hatch_rate = {}", hatch_rate);
if goose_state.configuration.host.len() == 0 {
for task_set in &goose_task_sets.task_sets {
match &task_set.host {
Some(h) => {
if is_valid_host(h) {
info!("host for {} configured: {}", task_set.name, h);
}
}
None => {
error!("Host must be defined globally or per-TaskSet. No host defined for {}.", task_set.name);
std::process::exit(1);
}
}
}
}
else {
if is_valid_host(&goose_state.configuration.host) {
info!("global host configured: {}", goose_state.configuration.host);
}
}
for task_set in &mut goose_task_sets.task_sets {
let (weighted_on_start_tasks, weighted_tasks, weighted_on_stop_tasks) = weight_tasks(&task_set);
task_set.weighted_on_start_tasks = weighted_on_start_tasks;
task_set.weighted_tasks = weighted_tasks;
task_set.weighted_on_stop_tasks = weighted_on_stop_tasks;
debug!("weighted {} on_start: {:?} tasks: {:?} on_stop: {:?}", task_set.name, task_set.weighted_on_start_tasks, task_set.weighted_tasks, task_set.weighted_on_stop_tasks);
}
goose_task_sets.weighted_clients = weight_task_set_clients(&goose_task_sets, goose_state.clients, &goose_state);
let mut started = time::Instant::now();
let sleep_float = 1.0 / hatch_rate as f32;
let sleep_duration = time::Duration::from_secs_f32(sleep_float);
let mut clients = vec![];
let mut client_channels = vec![];
let (all_threads_sender, parent_receiver): (mpsc::Sender<GooseClient>, mpsc::Receiver<GooseClient>) = mpsc::channel();
for mut thread_client in goose_task_sets.weighted_clients.clone() {
if timer_expired(started, goose_state.run_time) {
break;
}
thread_client.weighted_tasks = goose_task_sets.task_sets[thread_client.task_sets_index].weighted_tasks.clone();
thread_client.weighted_on_start_tasks = goose_task_sets.task_sets[thread_client.task_sets_index].weighted_on_start_tasks.clone();
thread_client.weighted_on_stop_tasks = goose_task_sets.task_sets[thread_client.task_sets_index].weighted_on_stop_tasks.clone();
thread_client.weighted_clients_index = goose_state.active_clients;
let (parent_sender, thread_receiver): (mpsc::Sender<GooseClientCommand>, mpsc::Receiver<GooseClientCommand>) = mpsc::channel();
client_channels.push(parent_sender);
if thread_client.weighted_tasks.len() > 0 {
let thread_sender = all_threads_sender.clone();
thread_client.set_mode(GooseClientMode::HATCHING);
thread_sender.send(thread_client.clone()).unwrap();
let thread_task_set = goose_task_sets.task_sets[thread_client.task_sets_index].clone();
let thread_number = goose_state.active_clients + 1;
let client = thread::spawn(move || {
client::client_main(thread_number, thread_task_set, thread_client, thread_receiver, thread_sender)
});
clients.push(client);
goose_state.active_clients += 1;
debug!("sleeping {:?} milliseconds...", sleep_duration);
thread::sleep(sleep_duration);
}
}
started = time::Instant::now();
info!("launched {} clients...", goose_state.active_clients);
if goose_state.configuration.print_stats && !goose_state.configuration.only_summary {
for (index, send_to_client) in client_channels.iter().enumerate() {
send_to_client.send(GooseClientCommand::SYNC).unwrap();
debug!("telling client {} to sync stats", index);
}
}
let mut statistics_reset: bool = false;
let canceled = Arc::new(AtomicBool::new(false));
let caught_ctrlc = canceled.clone();
ctrlc::set_handler(move || {
if caught_ctrlc.load(Ordering::SeqCst) {
error!("caught another ctrl-c, exiting immediately...");
std::process::exit(1);
}
else {
warn!("caught ctrl-c, stopping...");
caught_ctrlc.store(true, Ordering::SeqCst);
}
}).expect("Failed to set Ctrl-C signal handler.");
let mut statistics_timer = time::Instant::now();
let mut display_running_statistics = false;
let mut run_time = goose_state.run_time;
loop {
if goose_state.configuration.print_stats {
if timer_expired(statistics_timer, 15) {
statistics_timer = time::Instant::now();
for (index, send_to_client) in client_channels.iter().enumerate() {
send_to_client.send(GooseClientCommand::SYNC).unwrap();
debug!("telling client {} to sync stats", index);
}
if !goose_state.configuration.only_summary {
display_running_statistics = true;
let pause = time::Duration::from_millis(100);
thread::sleep(pause);
}
}
let mut message = parent_receiver.try_recv();
while message.is_ok() {
let unwrapped_message = message.unwrap();
let weighted_clients_index = unwrapped_message.weighted_clients_index;
goose_task_sets.weighted_clients[weighted_clients_index].weighted_bucket = unwrapped_message.weighted_bucket;
goose_task_sets.weighted_clients[weighted_clients_index].weighted_bucket_position = unwrapped_message.weighted_bucket_position;
goose_task_sets.weighted_clients[weighted_clients_index].mode = unwrapped_message.mode;
if goose_task_sets.weighted_clients[weighted_clients_index].weighted_tasks.len() == 0 {
goose_task_sets.weighted_clients[weighted_clients_index].weighted_clients_index = unwrapped_message.weighted_clients_index;
goose_task_sets.weighted_clients[weighted_clients_index].weighted_tasks = unwrapped_message.weighted_tasks.clone();
}
for (request_key, request) in unwrapped_message.requests {
trace!("request_key: {}", request_key);
let merged_request;
if let Some(parent_request) = goose_task_sets.weighted_clients[weighted_clients_index].requests.get(&request_key) {
merged_request = merge_from_client(parent_request, &request, &goose_state.configuration);
}
else {
merged_request = request.clone();
}
goose_task_sets.weighted_clients[weighted_clients_index].requests.insert(request_key.to_string(), merged_request);
}
message = parent_receiver.try_recv();
}
if goose_state.configuration.reset_stats && !statistics_reset {
info!("statistics reset...");
for (client_index, client) in goose_task_sets.weighted_clients.clone().iter().enumerate() {
let mut reset_client = client.clone();
reset_client.requests = HashMap::new();
goose_task_sets.weighted_clients[client_index] = reset_client;
}
statistics_reset = true;
}
}
if timer_expired(started, run_time) || canceled.load(Ordering::SeqCst) {
run_time = started.elapsed().as_secs() as usize;
info!("stopping after {} seconds...", run_time);
for (index, send_to_client) in client_channels.iter().enumerate() {
send_to_client.send(GooseClientCommand::EXIT).unwrap();
debug!("telling client {} to sync stats", index);
}
info!("waiting for clients to exit");
for client in clients {
let _ = client.join();
}
debug!("all clients exited");
if goose_state.configuration.print_stats {
let mut message = parent_receiver.try_recv();
while message.is_ok() {
let unwrapped_message = message.unwrap();
let weighted_clients_index = unwrapped_message.weighted_clients_index;
goose_task_sets.weighted_clients[weighted_clients_index].mode = unwrapped_message.mode;
for (request_key, request) in unwrapped_message.requests {
trace!("request_key: {}", request_key);
let merged_request;
if let Some(parent_request) = goose_task_sets.weighted_clients[weighted_clients_index].requests.get(&request_key) {
merged_request = merge_from_client(parent_request, &request, &goose_state.configuration);
}
else {
merged_request = request.clone();
}
goose_task_sets.weighted_clients[weighted_clients_index].requests.insert(request_key.to_string(), merged_request);
}
message = parent_receiver.try_recv();
}
}
break;
}
if display_running_statistics {
display_running_statistics = false;
stats::print_running_stats(&goose_state.configuration, &goose_task_sets, started.elapsed().as_secs() as usize);
}
let one_second = time::Duration::from_secs(1);
thread::sleep(one_second);
}
if goose_state.configuration.print_stats {
stats::print_final_stats(&goose_state.configuration, &goose_task_sets, started.elapsed().as_secs() as usize);
}
}