use serde_json;
use uuid::Uuid;
use std::str;
use anyhow::Error;
use redis::{ aio::ConnectionManager, AsyncCommands };
use crate::job::{ Job, JobParams };
use super::{ AsyncTask, QueueOption, QueueType };
impl QueueType for Queue {}
pub struct Queue {
client: ConnectionManager,
pub id: Uuid,
pub name: &'static str,
pub option: QueueOption,
pub process: Box<dyn (Fn() -> AsyncTask) + Send>,
}
impl Queue {
pub fn new(
name: &'static str,
client: ConnectionManager,
option: QueueOption,
process: Box<dyn (Fn() -> AsyncTask) + Send>
) -> Self {
Queue {
id: Uuid::new_v4(),
name: name,
client,
option,
process,
}
}
pub async fn get_job(self, name: &str) -> Result<Job, Error> {
let string_job: String = redis
::cmd("GET")
.arg(&name)
.query_async(&mut self.client.clone()).await
.unwrap();
let job_params: JobParams = serde_json::from_str(&string_job).unwrap();
let job = Job {
id: job_params.id,
name: job_params.name,
qtype: job_params.qtype.to_string(),
queue: job_params.queue,
client: self.client.clone(),
option: job_params.option,
error: None,
};
Ok(job)
}
pub async fn run(self) -> () {
let init_queue = self.parse_q_list(self.name, "initilizing");
let failed_queue = self.parse_q_list(self.name, "failed");
loop {
let query: Result<String, redis::RedisError> = redis
::cmd("BRPOP")
.arg(&init_queue)
.arg(&failed_queue)
.query_async(&mut self.client.clone()).await;
match query {
Ok(string_job) => {
let mut job: JobParams = serde_json::from_str(&string_job).unwrap();
let processed_queue = (self.process)().await;
match processed_queue {
Ok(_) => {}
Err(err) => {
job.error = Some(err.to_string());
let _ = self.move_queue_list(job, "failed").await;
}
}
}
Err(_) => {}
}
}
}
pub async fn move_queue_list(
&self,
mut job: JobParams,
queue_type: &'static str
) -> Result<(), Error> {
let name = &job.name;
let mut con = self.client.clone();
let list_name: String = format!("jqueuers.{}", queue_type);
let _ = con.lrem(&list_name, 0, &name).await?;
let _ = con.rpush(list_name, &name).await?;
job.qtype = queue_type.to_string();
let serialized_job = serde_json::to_string(&job).expect("Parse Error");
let _ = con.set(name, serialized_job).await?;
Ok(())
}
}