use clap::value_parser;
use clap::{Arg, Command};
use faktory::*;
use rand::prelude::*;
use std::io;
use std::process;
use std::sync::{self, atomic};
use std::thread;
use std::time;
const QUEUES: &[&str] = &["queue0", "queue1", "queue2", "queue3", "queue4"];
fn main() {
let matches = Command::new("My Super Program")
.version("0.1")
.about("Benchmark the performance of Rust Faktory consumers and producers")
.arg(
Arg::new("jobs")
.help("Number of jobs to run")
.value_parser(value_parser!(usize))
.index(1)
.default_value("30000"),
)
.arg(
Arg::new("threads")
.help("Number of consumers/producers to run")
.value_parser(value_parser!(usize))
.index(2)
.default_value("10"),
)
.get_matches();
let jobs: usize = *matches.get_one("jobs").expect("default_value is set");
let threads: usize = *matches.get_one("threads").expect("default_value is set");
println!(
"Running loadtest with {} jobs and {} threads",
jobs, threads
);
if let Err(e) = Producer::connect(None) {
println!("{}", e);
process::exit(1);
}
let pushed = sync::Arc::new(atomic::AtomicUsize::new(0));
let popped = sync::Arc::new(atomic::AtomicUsize::new(0));
let start = time::Instant::now();
let threads: Vec<thread::JoinHandle<Result<_, Error>>> = (0..threads)
.map(|_| {
let pushed = sync::Arc::clone(&pushed);
let popped = sync::Arc::clone(&popped);
thread::spawn(move || {
let mut p = Producer::connect(None).unwrap();
let mut c = ConsumerBuilder::default();
c.register("SomeJob", |_| {
let mut rng = rand::thread_rng();
if rng.gen_bool(0.01) {
Err(io::Error::new(io::ErrorKind::Other, "worker closed"))
} else {
Ok(())
}
});
let mut c = c.connect(None).unwrap();
let mut rng = rand::thread_rng();
let mut random_queues = Vec::from(QUEUES);
random_queues.shuffle(&mut rng);
for idx in 0..jobs {
if idx % 2 == 0 {
let mut job = Job::new(
"SomeJob",
vec![serde_json::Value::from(1), "string".into(), 3.into()],
);
job.priority = Some(rng.gen_range(1..10));
job.queue = QUEUES.choose(&mut rng).unwrap().to_string();
p.enqueue(job)?;
if pushed.fetch_add(1, atomic::Ordering::SeqCst) >= jobs {
return Ok(idx);
}
} else {
c.run_one(0, &random_queues[..])?;
if popped.fetch_add(1, atomic::Ordering::SeqCst) >= jobs {
return Ok(idx);
}
}
}
Ok(jobs)
})
})
.collect();
let _ops_count: Result<Vec<_>, _> = threads.into_iter().map(|jt| jt.join().unwrap()).collect();
let stop = start.elapsed();
let stop_secs = stop.as_secs() * 1_000_000_000 + u64::from(stop.subsec_nanos());
let stop_secs = stop_secs as f64 / 1_000_000_000.0;
println!(
"Processed {} pushes and {} pops in {:.2} seconds, rate: {} jobs/s",
pushed.load(atomic::Ordering::SeqCst),
popped.load(atomic::Ordering::SeqCst),
stop_secs,
jobs as f64 / stop_secs,
);
}