use std::fmt;
use std::result::Result as StdResult;
use std::time::Duration;
use futures::Future;
use uuid::Uuid;
use client::Client;
use error::{self, Error, Result};
use rabbitmq::Exchange;
use task::Task;
use ser;
pub struct Query<T>
where
T: Task,
{
task: T,
exchange: String,
routing_key: String,
timeout: Option<Duration>,
retries: u32,
}
impl<T> fmt::Debug for Query<T>
where
T: Task,
{
fn fmt(&self, f: &mut fmt::Formatter) -> StdResult<(), fmt::Error> {
write!(
f,
"Query {{ exchange: {:?} routing_key: {:?} timeout: {:?} retries: {:?} }}",
self.exchange, self.routing_key, self.timeout, self.retries
)
}
}
impl<T> Query<T>
where
T: Task,
{
pub fn new(task: T) -> Self {
Query {
task,
exchange: T::exchange().into(),
routing_key: T::routing_key().into(),
timeout: T::timeout(),
retries: T::retries(),
}
}
pub fn exchange(mut self, exchange: &str) -> Self {
self.exchange = exchange.into();
self
}
pub fn routing_key(mut self, routing_key: &str) -> Self {
self.routing_key = routing_key.into();
self
}
pub fn timeout(mut self, timeout: Option<Duration>) -> Self {
self.timeout = timeout;
self
}
pub fn retries(mut self, retries: u32) -> Self {
self.retries = retries;
self
}
pub fn send(self, client: &Client) -> Box<Future<Item = (), Error = Error>> {
let serialized = ser::to_vec(&self.task)
.map_err(error::ErrorKind::Serialization)
.unwrap();
let job = Job {
uuid: Uuid::new_v4(),
name: String::from(T::name()),
queue: self.routing_key,
task: serialized,
timeout: self.timeout,
retries: self.retries,
};
client.send(&job)
}
}
pub fn job<T>(task: T) -> Query<T>
where
T: Task,
{
Query::new(task)
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct Job {
uuid: Uuid,
name: String,
queue: String,
task: Vec<u8>,
timeout: Option<Duration>,
retries: u32,
}
impl Job {
pub fn uuid(&self) -> &Uuid {
&self.uuid
}
pub fn name(&self) -> &str {
&self.name
}
pub fn queue(&self) -> &str {
&self.queue
}
pub fn task(&self) -> &[u8] {
&self.task
}
pub fn timeout(&self) -> Option<Duration> {
self.timeout
}
pub fn retries(&self) -> u32 {
self.retries
}
pub fn failed(self) -> Option<Job> {
if self.retries() == 0 {
None
} else {
Some(Job {
retries: self.retries() - 1,
..self
})
}
}
}
#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub enum Status {
Pending,
Started,
Success,
Failed(Failure),
}
#[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub enum Failure {
Error,
Timeout,
Crash,
}