use std::collections::HashMap;
use std::env;
use std::fmt;
use std::io;
use std::process;
use std::result::Result as StdResult;
use std::sync::Arc;
use std::time::Duration;
use futures::{future, Future, IntoFuture, Stream};
use tokio_core::reactor::{Core, Handle};
use wait_timeout::ChildExt;
use error::{self, Result};
use de;
use ser;
use job::{Failure as JobFailure, Job, Status as JobStatus};
use rabbitmq::{Exchange, ExchangeBuilder, Queue, QueueBuilder, RabbitmqBroker, RabbitmqStream};
use task::{Perform, Task};
type WorkerFn<Ctx> = Fn(&Job, Ctx) -> Result<()>;
pub struct WorkerBuilder<Ctx> {
connection_url: String,
context: Ctx,
exchanges: Vec<Exchange>,
handle: Option<Handle>,
handlers: HashMap<&'static str, Box<WorkerFn<Ctx>>>,
queues: Vec<Queue>,
}
impl<Ctx> fmt::Debug for WorkerBuilder<Ctx>
where
Ctx: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> StdResult<(), fmt::Error> {
write!(
f,
"WorkerBuilder {{ connection_url: {:?} context: {:?} exchanges: {:?} queues: {:?} }}",
self.connection_url, self.context, self.exchanges, self.queues
)
}
}
impl<Ctx> WorkerBuilder<Ctx> {
pub fn new(context: Ctx) -> Self {
WorkerBuilder {
context: context,
connection_url: "amqp://localhost/%2f".into(),
exchanges: Vec::new(),
queues: Vec::new(),
handle: None,
handlers: HashMap::new(),
}
}
pub fn connection_url(mut self, url: &str) -> Self {
self.connection_url = url.into();
self
}
pub fn exchanges<EIter>(mut self, exchanges: EIter) -> Self
where
EIter: IntoIterator<Item = ExchangeBuilder>,
{
self.exchanges
.extend(exchanges.into_iter().map(|e| e.build()));
self
}
pub fn queues<QIter>(mut self, queues: QIter) -> Self
where
QIter: IntoIterator<Item = QueueBuilder>,
{
self.queues.extend(queues.into_iter().map(|q| q.build()));
self
}
pub fn handle(mut self, handle: Handle) -> Self {
self.handle = Some(handle);
self
}
pub fn task<T>(mut self) -> Self
where
T: Task + Perform<Context = Ctx>,
{
self.handlers.insert(
T::name(),
Box::new(|job, ctx| -> Result<()> {
let task: T = de::from_slice(job.task()).unwrap();
Perform::perform(&task, ctx);
Ok(())
}),
);
self
}
pub fn build(self) -> Result<Worker<Ctx>> {
if self.handle.is_none() {
Err(error::ErrorKind::NoHandle)?;
}
Ok(Worker {
connection_url: self.connection_url,
context: self.context,
handle: self.handle.unwrap(),
handlers: self.handlers,
exchanges: self.exchanges,
queues: self.queues,
})
}
}
pub struct Worker<Ctx> {
connection_url: String,
context: Ctx,
handle: Handle,
handlers: HashMap<&'static str, Box<WorkerFn<Ctx>>>,
exchanges: Vec<Exchange>,
queues: Vec<Queue>,
}
impl<Ctx> fmt::Debug for Worker<Ctx>
where
Ctx: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> StdResult<(), fmt::Error> {
write!(
f,
"Worker {{ connection_url: {:?} context: {:?} queues: {:?} }}",
self.connection_url, self.context, self.queues
)
}
}
impl<Ctx> Worker<Ctx> {
pub fn run(self) -> Box<Future<Item = (), Error = error::Error>> {
match env::var("BATCHRS_WORKER_IS_EXECUTOR") {
Ok(_) => Box::new(self.execute().into_future()),
Err(_) => self.supervise(),
}
}
fn supervise(self) -> Box<Future<Item = (), Error = error::Error>> {
let handle = self.handle;
let connection_url = self.connection_url;
let queues = self.queues;
let exchanges = self.exchanges;
let ctor = |e: Vec<Exchange>, q: Vec<Queue>, h: &Handle| {
RabbitmqBroker::new_with_handle(&connection_url, e, q, h.clone())
};
let task = ctor(exchanges.clone(), queues.clone(), &handle)
.join(ctor(exchanges, queues, &handle))
.and_then(|(consume_broker, publish_broker)| {
let publish_broker = Arc::new(publish_broker);
consume_broker.recv().and_then(move |consumer| {
future::loop_fn(consumer.into_future(), move |f| {
let publish_broker = Arc::clone(&publish_broker);
let handle = handle.clone();
f.and_then(move |(next, consumer)| {
let (uid, job) = match next {
Some((uid, job)) => (uid, job),
None => return Ok(future::Loop::Break(())),
};
let task = match spawn(&job) {
Err(e) => {
error!("[{}] Couldn't spawn child process: {}", job.uuid(), e);
reject(&consumer, publish_broker, uid, job)
}
Ok(status) => match status {
JobStatus::Success => {
debug!("[{}] Child execution succeeded", job.uuid());
consumer.ack(uid)
}
JobStatus::Failed(_) => {
debug!("[{}] Child execution failed", job.uuid());
reject(&consumer, publish_broker, uid, job)
}
_ => unreachable!(),
},
};
let task = task.map_err(move |e| {
error!("An error occured: {}", e);
});
handle.spawn(task);
Ok(future::Loop::Continue(consumer.into_future()))
}).or_else(|(e, consumer)| {
use failure::Fail;
let cause = match e.kind().cause() {
Some(cause) => format!(" Cause: {}", cause),
None => "".into(),
};
error!("Couldn't receive message from consumer: {}.{}", e, cause);
Ok(future::Loop::Continue(consumer.into_future()))
})
})
})
});
Box::new(task)
}
fn execute(self) -> Result<()> {
let job: Job = de::from_reader(io::stdin()).map_err(error::ErrorKind::Deserialization)?;
if let Some(handler) = self.handlers.get(job.name()) {
if let Err(e) = (*handler)(&job, self.context) {
error!("Couldn't process job: {}", e);
}
} else {
warn!("No handler registered for job: `{}'", job.name());
}
Ok(())
}
}
fn reject(
consumer: &RabbitmqStream,
broker: Arc<RabbitmqBroker>,
uid: u64,
job: Job,
) -> Box<Future<Item = (), Error = error::Error>> {
let task = consumer.reject(uid);
if let Some(job) = job.failed() {
debug!("[{}] Retry job after failure: {:?}", job.uuid(), job);
Box::new(task.and_then(move |_| broker.send(&job)))
} else {
task
}
}
fn spawn(job: &Job) -> Result<JobStatus> {
use std::io::Write;
let current_exe = env::current_exe().map_err(error::ErrorKind::SubProcessManagement)?;
let mut child = process::Command::new(¤t_exe)
.env("BATCHRS_WORKER_IS_EXECUTOR", "1")
.stdin(process::Stdio::piped())
.spawn()
.map_err(error::ErrorKind::SubProcessManagement)?;
let payload = ser::to_vec(&job).map_err(error::ErrorKind::Serialization)?;
{
let stdin = child.stdin.as_mut().expect("failed to get stdin");
stdin
.write_all(&payload)
.map_err(error::ErrorKind::SubProcessManagement)?;
stdin
.flush()
.map_err(error::ErrorKind::SubProcessManagement)?;
}
if let Some(duration) = job.timeout() {
drop(child.stdin.take());
if let Some(status) = child
.wait_timeout(duration)
.map_err(error::ErrorKind::SubProcessManagement)?
{
if status.success() {
Ok(JobStatus::Success)
} else if status.unix_signal().is_some() {
Ok(JobStatus::Failed(JobFailure::Crash))
} else {
Ok(JobStatus::Failed(JobFailure::Error))
}
} else {
child
.kill()
.map_err(error::ErrorKind::SubProcessManagement)?;
child
.wait()
.map_err(error::ErrorKind::SubProcessManagement)?;
Ok(JobStatus::Failed(JobFailure::Timeout))
}
} else {
let status = child
.wait()
.map_err(error::ErrorKind::SubProcessManagement)?;
if status.success() {
Ok(JobStatus::Success)
} else if status.code().is_some() {
Ok(JobStatus::Failed(JobFailure::Error))
} else {
Ok(JobStatus::Failed(JobFailure::Crash))
}
}
}