#![deny(missing_docs)]
#[macro_use]
extern crate serde_derive;
extern crate serde;
extern crate serde_json;
extern crate redis;
extern crate uuid;
use std::error::Error;
use std::thread;
use std::sync::mpsc::channel;
use std::time::Duration;
use std::thread::sleep;
use std::marker::{Send, Sync};
use std::sync::Arc;
use redis::{Commands, Client};
use uuid::Uuid;
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub enum Status {
QUEUED,
RUNNING,
LOST,
FINISHED,
FAILED,
}
#[derive(Debug, Serialize, Deserialize)]
struct Job {
uuid: String,
status: Status,
args: Vec<String>,
result: Option<String>,
}
impl Job {
fn new(args: Vec<String>) -> Job {
Job {
uuid: Uuid::new_v4().to_string(),
status: Status::QUEUED,
args: args,
result: None,
}
}
}
pub struct Queue {
url: String,
name: String,
}
impl Queue {
pub fn new(url: &str, name: &str) -> Queue {
Queue {
url: url.to_string(),
name: name.to_string(),
}
}
pub fn drop(&self) -> Result<(), Box<Error>> {
let client = Client::open(self.url.as_str())?;
let conn = client.get_connection()?;
conn.del(format!("{}:uuids", self.name))?;
Ok(())
}
pub fn enqueue(&self, args: Vec<String>, expire: usize) -> Result<String, Box<Error>> {
let client = Client::open(self.url.as_str())?;
let conn = client.get_connection()?;
let job = Job::new(args);
conn.set_ex(format!("{}:{}", self.name, job.uuid),
serde_json::to_string(&job)?,
expire)?;
conn.rpush(format!("{}:uuids", self.name), &job.uuid)?;
Ok(job.uuid)
}
pub fn status(&self, uuid: &str) -> Result<Status, Box<Error>> {
let client = redis::Client::open(self.url.as_str())?;
let conn = client.get_connection()?;
let json: String = conn.get(format!("{}:{}", self.name, uuid))?;
let job: Job = serde_json::from_str(&json)?;
Ok(job.status)
}
pub fn work<F: Fn(String, Vec<String>) -> Result<String, Box<Error>> + Send + Sync + 'static>
(&self,
fun: F,
wait: Option<usize>,
timeout: Option<usize>,
freq: Option<usize>,
expire: Option<usize>,
fall: Option<bool>,
infinite: Option<bool>)
-> Result<(), Box<Error>> {
let wait = wait.unwrap_or(10);
let timeout = timeout.unwrap_or(30);
let freq = freq.unwrap_or(1);
let expire = expire.unwrap_or(30);
let fall = fall.unwrap_or(true);
let infinite = infinite.unwrap_or(true);
let client = redis::Client::open(self.url.as_str())?;
let conn = client.get_connection()?;
let afun = Arc::new(fun);
let uuids_key = format!("{}:uuids", self.name);
loop {
let uuids: Vec<String> = conn.blpop(&uuids_key, wait)?;
if uuids.len() < 2 {
if !infinite {
break;
}
continue;
}
let uuid = &uuids[1].to_string();
let key = format!("{}:{}", self.name, uuid);
let json: String = match conn.get(&key) {
Ok(o) => o,
Err(_) => {
if !infinite {
break;
}
continue;
}
};
let mut job: Job = serde_json::from_str(&json)?;
job.status = Status::RUNNING;
conn.set_ex(&key, serde_json::to_string(&job)?, timeout + expire)?;
let (tx, rx) = channel();
let cafun = afun.clone();
let cuuid = uuid.clone();
let cargs = job.args.clone();
thread::spawn(move || {
let r = match cafun(cuuid, cargs) {
Ok(o) => (Status::FINISHED, Some(o)),
Err(_) => (Status::FAILED, None),
};
tx.send(r).unwrap_or(())
});
for _ in 0..(timeout * freq) {
let (status, result) = rx.try_recv().unwrap_or((Status::RUNNING, None));
job.status = status;
job.result = result;
if job.status != Status::RUNNING {
break;
}
sleep(Duration::from_millis(1000 / freq as u64));
}
if job.status == Status::RUNNING {
job.status = Status::LOST;
}
conn.set_ex(&key, serde_json::to_string(&job)?, expire)?;
if fall && job.status == Status::LOST {
panic!("LOST");
}
if !infinite {
break;
}
}
Ok(())
}
pub fn result(&self, uuid: &str) -> Result<Option<String>, Box<Error>> {
let client = redis::Client::open(self.url.as_str())?;
let conn = client.get_connection()?;
let json: String = conn.get(format!("{}:{}", self.name, uuid))?;
let job: Job = serde_json::from_str(&json)?;
Ok(job.result)
}
}