use uuid::Uuid;
use serde::{ Serialize, Deserialize };
use std::str;
use serde_json;
use anyhow::Error;
use redis::aio::ConnectionManager;
use std::collections::HashMap;
mod queue;
use queue::Queue;
use std::future::Future;
use std::pin::Pin;
mod job;
use job::{ JobParams, Job };
use std::sync::atomic::{ AtomicBool, Ordering };
use std::sync::Arc;
use tokio::signal;
#[derive(Deserialize, Serialize, Clone, Copy)]
pub struct QueueOption {}
#[derive(Deserialize, Serialize, Clone, Copy)]
pub struct QueueParams {
pub name: &'static str,
pub option: QueueOption,
}
trait QueueType {
fn queue_types(&self) -> Vec<String> {
vec![
"initializing".to_string(),
"active".to_string(),
"failed".to_string(),
"completed".to_string()
]
}
fn parse_q_list(&self, queue_name: &str, queue_type: &str) -> String {
format!("jqueuers.{}.{}", queue_name, queue_type)
}
fn parse_q_name(&self, queue_name: &str) -> String {
format!("jqueuers.{}", queue_name)
}
}
impl QueueType for App {}
pub type AsyncTask = Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>;
#[derive(Clone)]
pub struct AppOption {
client: ConnectionManager,
}
#[derive(Clone)]
pub struct Option {
pub url: &'static str,
}
pub struct App {
option: AppOption,
data: HashMap<&'static str, Queue>,
}
impl App {
pub async fn add(self, params: JobParams) {
let job = Job::new(
params.name.clone(),
params.queue.clone(),
self.option.client.clone(),
params.option
);
let serialized_job = serde_json::to_string(¶ms).unwrap();
let queue: String = redis
::cmd("SET")
.arg(self.parse_q_name(&job.queue))
.query_async(&mut self.option.client.clone()).await
.unwrap();
if queue.is_empty() {
panic!("Queue Not Found");
}
let _: String = redis
::cmd("SET")
.arg(&job.id.to_string())
.arg(&serialized_job)
.query_async(&mut self.option.client.clone()).await
.unwrap();
let queue_list_name = self.parse_q_list(&job.queue, &job.qtype);
let _: String = redis
::cmd("RPUSH")
.arg(&queue_list_name)
.arg(&job.name)
.query_async(&mut self.option.client.clone()).await
.unwrap();
}
pub async fn define(
&mut self,
params: QueueParams,
process: Box<dyn (Fn() -> AsyncTask) + Send>
) {
let queue = Queue::new(params.name, self.option.client.clone(), params.option, process);
let queue_name: String = self.parse_q_name(&queue.name);
let serialized_queue = serde_json::to_string(¶ms).unwrap();
let _: String = redis
::cmd("SET")
.arg(&queue_name)
.arg(&serialized_queue)
.query_async(&mut self.option.client.clone()).await
.unwrap();
self.data.insert(queue.name, queue);
}
pub async fn init(app_params: Option) -> Result<Self, Error> {
let client = redis::Client::open(app_params.url)?;
let con = client.get_tokio_connection_manager().await?;
let option: AppOption = AppOption {
client: con,
};
Ok(App { option, data: HashMap::new() })
}
pub async fn process(self, queue: Queue) -> () {
let init_queue = self.parse_q_list(queue.name, "initilizing");
let failed_queue = self.parse_q_list(queue.name, "failed");
loop {
let query: Result<String, redis::RedisError> = redis
::cmd("BRPOP")
.arg(&init_queue)
.arg(&failed_queue)
.query_async(&mut self.option.client.clone()).await;
match query {
Ok(string_job) => {
let mut job: JobParams = serde_json::from_str(&string_job).unwrap();
let processed_queue = (queue.process)().await;
match processed_queue {
Ok(_) => {
let _ = queue.move_queue_list(job, "completed").await;
}
Err(err) => {
job.error = Some(err.to_string());
let _ = queue.move_queue_list(job, "failed").await;
}
}
}
Err(_) => {}
}
}
}
pub async fn run(self) -> Result<(), Error> {
let queues = self.data;
for (key, queue) in queues {
println!("Queue: {:?}", key);
let _ = queue.run();
}
let termination_flag = Arc::new(AtomicBool::new(false));
let termination_flag_clone = Arc::clone(&termination_flag);
let signal_task = tokio::spawn(async move {
signal::ctrl_c().await.expect("Failed to listen for termination signal");
println!("Received termination signal. Stopping the loop.");
termination_flag_clone.store(true, Ordering::SeqCst);
});
while !termination_flag.load(Ordering::SeqCst) {}
signal_task.await.expect("Signal task failed");
Ok(())
}
}