jqueuers 0.1.2

jobs queues manager
Documentation
//! # jqueuers
//!
//! A library for job queues management
use uuid::Uuid;
use serde::{ Serialize, Deserialize };
use std::str;
use serde_json;
use anyhow::Error;
use redis::aio::ConnectionManager;
use std::collections::HashMap;
mod queue;
use queue::Queue;
use std::future::Future;
use std::pin::Pin;

mod job;
use job::{ JobParams, Job };

use std::sync::atomic::{ AtomicBool, Ordering };
use std::sync::Arc;
use tokio::signal;

#[derive(Deserialize, Serialize, Clone, Copy)]
pub struct QueueOption {}

#[derive(Deserialize, Serialize, Clone, Copy)]
pub struct QueueParams {
    pub name: &'static str,
    pub option: QueueOption,
}

trait QueueType {
    fn queue_types(&self) -> Vec<String> {
        vec![
            "initializing".to_string(),
            "active".to_string(),
            "failed".to_string(),
            "completed".to_string()
        ]
    }

    fn parse_q_list(&self, queue_name: &str, queue_type: &str) -> String {
        format!("jqueuers.{}.{}", queue_name, queue_type)
    }

    fn parse_q_name(&self, queue_name: &str) -> String {
        format!("jqueuers.{}", queue_name)
    }
}
impl QueueType for App {}
pub type AsyncTask = Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>;

#[derive(Clone)]
pub struct AppOption {
    client: ConnectionManager,
}

#[derive(Clone)]
pub struct Option {
    pub url: &'static str,
}

pub struct App {
    option: AppOption,
    data: HashMap<&'static str, Queue>,
}

impl App {
    pub async fn add(self, params: JobParams) {
        let job = Job::new(
            params.name.clone(),
            params.queue.clone(),
            self.option.client.clone(),
            params.option
        );
        let serialized_job = serde_json::to_string(&params).unwrap();

        let queue: String = redis
            ::cmd("SET")
            .arg(self.parse_q_name(&job.queue))
            .query_async(&mut self.option.client.clone()).await
            .unwrap();

        if queue.is_empty() {
            panic!("Queue Not Found");
        }

        let _: String = redis
            ::cmd("SET")
            .arg(&job.id.to_string())
            .arg(&serialized_job)
            .query_async(&mut self.option.client.clone()).await
            .unwrap();

        let queue_list_name = self.parse_q_list(&job.queue, &job.qtype);
        let _: String = redis
            ::cmd("RPUSH")
            .arg(&queue_list_name)
            .arg(&job.name)
            .query_async(&mut self.option.client.clone()).await
            .unwrap();
    }

    pub async fn define(
        &mut self,
        params: QueueParams,
        process: Box<dyn (Fn() -> AsyncTask) + Send>
    ) {
        let queue = Queue::new(params.name, self.option.client.clone(), params.option, process);

        let queue_name: String = self.parse_q_name(&queue.name);
        let serialized_queue = serde_json::to_string(&params).unwrap();

        let _: String = redis
            ::cmd("SET")
            .arg(&queue_name)
            .arg(&serialized_queue)
            .query_async(&mut self.option.client.clone()).await
            .unwrap();

        self.data.insert(queue.name, queue);
    }

    pub async fn init(app_params: Option) -> Result<Self, Error> {
        let client = redis::Client::open(app_params.url)?;
        let con = client.get_tokio_connection_manager().await?;

        let option: AppOption = AppOption {
            client: con,
        };

        Ok(App { option, data: HashMap::new() })
    }

    pub async fn process(self, queue: Queue) -> () {
        let init_queue = self.parse_q_list(queue.name, "initilizing");
        let failed_queue = self.parse_q_list(queue.name, "failed");

        loop {
            let query: Result<String, redis::RedisError> = redis
                ::cmd("BRPOP")
                .arg(&init_queue)
                .arg(&failed_queue)
                .query_async(&mut self.option.client.clone()).await;

            match query {
                Ok(string_job) => {
                    let mut job: JobParams = serde_json::from_str(&string_job).unwrap();
                    let processed_queue = (queue.process)().await;
                    match processed_queue {
                        Ok(_) => {
                            let _ = queue.move_queue_list(job, "completed").await;
                        }
                        Err(err) => {
                            job.error = Some(err.to_string());
                            let _ = queue.move_queue_list(job, "failed").await;
                        }
                    }
                }
                Err(_) => {}
            }
        }
    }

    pub async fn run(self) -> Result<(), Error> {
        let queues = self.data;

        for (key, queue) in queues {
            println!("Queue: {:?}", key);
            let _ = queue.run();
        }

        let termination_flag = Arc::new(AtomicBool::new(false));
        let termination_flag_clone = Arc::clone(&termination_flag);
        let signal_task = tokio::spawn(async move {
            signal::ctrl_c().await.expect("Failed to listen for termination signal");
            println!("Received termination signal. Stopping the loop.");
            termination_flag_clone.store(true, Ordering::SeqCst);
        });

        while !termination_flag.load(Ordering::SeqCst) {}

        signal_task.await.expect("Signal task failed");

        Ok(())
    }
}