use std::io::prelude::*;
use std::io;
use std::net::TcpStream;
use std::error::Error;
use proto::{self, Client, ClientOptions, HeartbeatStatus, Reconnect};
use std::sync::{atomic, Arc};
use atomic_option::AtomicOption;
use fnv::FnvHashMap;
use proto::{Ack, Fail, Job};
const STATUS_RUNNING: usize = 0;
const STATUS_QUIET: usize = 1;
const STATUS_TERMINATING: usize = 2;
pub struct Consumer<S, F>
where
S: Read + Write,
{
c: Client<S>,
last_job_results: Arc<Vec<AtomicOption<Result<String, Fail>>>>,
running_jobs: Arc<Vec<AtomicOption<String>>>,
callbacks: Arc<FnvHashMap<String, F>>,
terminated: bool,
}
#[derive(Clone)]
pub struct ConsumerBuilder<F> {
opts: ClientOptions,
workers: usize,
callbacks: FnvHashMap<String, F>,
}
impl<F> Default for ConsumerBuilder<F> {
fn default() -> Self {
ConsumerBuilder {
opts: ClientOptions::default(),
workers: 1,
callbacks: Default::default(),
}
}
}
impl<F> ConsumerBuilder<F> {
pub fn hostname(&mut self, hn: String) -> &mut Self {
self.opts.hostname = Some(hn);
self
}
pub fn wid(&mut self, wid: String) -> &mut Self {
self.opts.wid = Some(wid);
self
}
pub fn labels(&mut self, labels: Vec<String>) -> &mut Self {
self.opts.labels = labels;
self
}
pub fn workers(&mut self, w: usize) -> &mut Self {
self.workers = w;
self
}
}
impl<F, E> ConsumerBuilder<F>
where
F: Fn(Job) -> Result<(), E> + Send + Sync + 'static,
{
pub fn register<K>(&mut self, kind: K, handler: F) -> &mut Self
where
K: ToString,
{
self.callbacks.insert(kind.to_string(), handler);
self
}
pub fn connect(self, url: Option<&str>) -> io::Result<Consumer<TcpStream, F>> {
let url = match url {
Some(url) => proto::url_parse(url),
None => proto::url_parse(&proto::get_env_url()),
}?;
let stream = TcpStream::connect(proto::host_from_url(&url))?;
Self::connect_with(self, stream, url.password().map(|p| p.to_string()))
}
pub fn connect_with<S: Read + Write>(
mut self,
stream: S,
pwd: Option<String>,
) -> io::Result<Consumer<S, F>> {
self.opts.password = pwd;
Ok(Consumer::new(
Client::new(stream, self.opts)?,
self.workers,
self.callbacks,
))
}
}
enum Failed<E: Error> {
Application(E),
BadJobType(String),
}
impl<F, S: Read + Write> Consumer<S, F> {
fn new(c: Client<S>, workers: usize, callbacks: FnvHashMap<String, F>) -> Self {
Consumer {
c: c,
callbacks: Arc::new(callbacks),
running_jobs: Arc::new((0..workers).map(|_| AtomicOption::empty()).collect()),
last_job_results: Arc::new((0..workers).map(|_| AtomicOption::empty()).collect()),
terminated: false,
}
}
}
impl<F, S: Read + Write + Reconnect> Consumer<S, F> {
fn reconnect(&mut self) -> io::Result<()> {
self.c.reconnect()
}
}
impl<S, E, F> Consumer<S, F>
where
S: Read + Write,
E: Error,
F: Fn(Job) -> Result<(), E> + Send + Sync + 'static,
{
fn run_job(&mut self, job: Job) -> Result<(), Failed<E>> {
match self.callbacks.get(&job.kind) {
Some(callback) => (callback)(job).map_err(Failed::Application),
None => {
Err(Failed::BadJobType(job.kind))
}
}
}
pub fn run_one<Q>(&mut self, worker: usize, queues: &[Q]) -> io::Result<bool>
where
Q: AsRef<str>,
{
let job = match self.c.fetch(queues)? {
Some(job) => job,
None => return Ok(false),
};
let jid = job.jid.clone();
self.running_jobs[worker].swap(Box::new(jid.clone()), atomic::Ordering::SeqCst);
let r = self.run_job(job);
match r {
Ok(_) => {
self.last_job_results[worker]
.swap(Box::new(Ok(jid.clone())), atomic::Ordering::SeqCst);
self.c.issue(Ack::new(jid))?.await_ok()?;
}
Err(e) => {
let fail = match e {
Failed::BadJobType(jt) => {
Fail::new(jid, "unknown", format!("No handler for {}", jt))
}
Failed::Application(e) => {
let mut f = Fail::new(jid, "unknown", format!("{}", e));
let mut root = e.cause();
let mut backtrace = Vec::new();
while let Some(r) = root.take() {
backtrace.push(format!("{}", r));
root = r.cause();
}
f.set_backtrace(backtrace);
f
}
};
let fail2 = fail.clone();
self.last_job_results[worker].swap(Box::new(Err(fail)), atomic::Ordering::SeqCst);
self.c.issue(&fail2)?.await_ok()?;
}
}
self.last_job_results[worker].take(atomic::Ordering::SeqCst);
self.running_jobs[worker].take(atomic::Ordering::SeqCst);
Ok(true)
}
#[cfg(test)]
pub(crate) fn run_n<Q>(&mut self, n: usize, queues: &[Q]) -> io::Result<()>
where
Q: AsRef<str>,
{
for _ in 0..n {
self.run_one(0, queues)?;
}
Ok(())
}
}
impl<S, E, F> Consumer<S, F>
where
S: Read + Write + Reconnect + Send + 'static,
E: Error,
F: Fn(Job) -> Result<(), E> + Send + Sync + 'static,
{
fn for_worker(&mut self) -> io::Result<Self> {
Ok(Consumer {
c: self.c.connect_again()?,
callbacks: self.callbacks.clone(),
running_jobs: self.running_jobs.clone(),
last_job_results: self.last_job_results.clone(),
terminated: self.terminated,
})
}
pub fn run<Q>(&mut self, queues: &[Q]) -> io::Result<usize>
where
Q: AsRef<str>,
{
assert!(!self.terminated, "do not re-run a terminated worker");
assert_eq!(Arc::strong_count(&self.last_job_results), 1);
for last_job_result in self.last_job_results.iter() {
if let Some(res) = last_job_result.take(atomic::Ordering::SeqCst) {
let r = match *res {
Ok(ref jid) => self.c.issue(Ack::new(jid)),
Err(ref fail) => self.c.issue(fail),
};
let r = match r {
Ok(r) => r,
Err(e) => {
last_job_result.swap(res, atomic::Ordering::SeqCst);
return Err(e);
}
};
if let Err(e) = r.await_ok() {
if e.kind() != io::ErrorKind::InvalidInput {
last_job_result.swap(res, atomic::Ordering::SeqCst);
return Err(e);
}
}
}
}
let status: Vec<_> = (0..self.running_jobs.len())
.map(|_| Arc::new(atomic::AtomicUsize::new(STATUS_RUNNING)))
.collect();
use std::thread;
let workers = status
.iter()
.enumerate()
.map(|(worker, status)| {
let mut w = self.for_worker()?;
let status = status.clone();
let queues: Vec<_> = queues.into_iter().map(|s| s.as_ref().to_string()).collect();
Ok(thread::spawn(move || {
while status.load(atomic::Ordering::SeqCst) == STATUS_RUNNING {
if let Err(e) = w.run_one(worker, &queues[..]) {
status.store(STATUS_TERMINATING, atomic::Ordering::SeqCst);
return Err(e);
}
}
status.store(STATUS_TERMINATING, atomic::Ordering::SeqCst);
Ok(())
}))
})
.collect::<io::Result<Vec<_>>>()?;
let mut target = STATUS_RUNNING;
let exit = {
use std::time;
let mut last = time::Instant::now();
loop {
use std::thread;
thread::sleep(time::Duration::from_millis(100));
if target == STATUS_RUNNING
&& status
.iter()
.any(|s| s.load(atomic::Ordering::SeqCst) == STATUS_TERMINATING)
{
for s in status.iter() {
s.store(STATUS_TERMINATING, atomic::Ordering::SeqCst);
}
break Ok(false);
}
if last.elapsed().as_secs() < 5 {
continue;
}
match self.c.heartbeat() {
Ok(hb) => {
match hb {
HeartbeatStatus::Ok => {}
HeartbeatStatus::Quiet => {
for s in status.iter() {
s.store(STATUS_QUIET, atomic::Ordering::SeqCst);
}
target = STATUS_QUIET;
}
HeartbeatStatus::Terminate => {
for s in status.iter() {
s.store(STATUS_QUIET, atomic::Ordering::SeqCst);
}
break Ok(true);
}
}
}
Err(e) => {
for s in status.iter() {
s.store(STATUS_TERMINATING, atomic::Ordering::SeqCst);
}
break Err(e);
}
}
last = time::Instant::now();
}
};
self.terminated = exit.is_ok();
if let Ok(true) = exit {
let mut running = 0;
for running_job in self.running_jobs.iter() {
if let Some(jid) = running_job.take(atomic::Ordering::SeqCst) {
let f = Fail::new(jid, "unknown", "terminated");
self.c.issue(&f).and_then(|r| r.await_ok()).is_ok();
running += 1;
}
}
if running != 0 {
return Ok(running);
}
}
match exit {
Ok(_) => {
workers
.into_iter()
.map(|w| w.join().unwrap())
.collect::<Result<Vec<_>, _>>()
.map(|_| 0)
}
Err(e) => {
workers
.into_iter()
.map(|w| w.join().unwrap())
.collect::<Result<Vec<_>, _>>()
.and_then(|_| Err(e))
}
}
}
pub fn run_to_completion<Q, U>(mut self, queues: &[Q]) -> !
where
Q: AsRef<str>,
{
use std::process;
while self.run(queues).is_err() {
if self.reconnect().is_err() {
break;
}
}
process::exit(0);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
#[ignore]
fn it_works() {
use std::io;
use producer::Producer;
let mut p = Producer::connect(None).unwrap();
let mut j = Job::new("foobar", vec!["z"]);
j.queue = "worker_test_1".to_string();
p.enqueue(j).unwrap();
let mut c = ConsumerBuilder::default();
c.register("foobar", |job| -> io::Result<()> {
assert_eq!(job.args, vec!["z"]);
Ok(())
});
let mut c = c.connect(None).unwrap();
let e = c.run_n(1, &["worker_test_1"]);
if e.is_err() {
println!("{:?}", e);
}
assert!(e.is_ok());
}
}