jqueuers/
lib.rs

1//! # jqueuers
2//!
3//! A library for job queues management
4use uuid::Uuid;
5use serde::{ Serialize, Deserialize };
6use std::str;
7use serde_json;
8use anyhow::Error;
9use redis::aio::ConnectionManager;
10use std::collections::HashMap;
11mod queue;
12use queue::Queue;
13use std::future::Future;
14use std::pin::Pin;
15
16mod job;
17use job::{ JobParams, Job };
18
19use std::sync::atomic::{ AtomicBool, Ordering };
20use std::sync::Arc;
21use tokio::signal;
22
23#[derive(Deserialize, Serialize, Clone, Copy)]
24pub struct QueueOption {}
25
26#[derive(Deserialize, Serialize, Clone, Copy)]
27pub struct QueueParams {
28    pub name: &'static str,
29    pub option: QueueOption,
30}
31
32trait QueueType {
33    fn queue_types(&self) -> Vec<String> {
34        vec![
35            "initializing".to_string(),
36            "active".to_string(),
37            "failed".to_string(),
38            "completed".to_string()
39        ]
40    }
41
42    fn parse_q_list(&self, queue_name: &str, queue_type: &str) -> String {
43        format!("jqueuers.{}.{}", queue_name, queue_type)
44    }
45
46    fn parse_q_name(&self, queue_name: &str) -> String {
47        format!("jqueuers.{}", queue_name)
48    }
49}
50impl QueueType for App {}
51pub type AsyncTask = Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>;
52
53#[derive(Clone)]
54pub struct AppOption {
55    client: ConnectionManager,
56}
57
58#[derive(Clone)]
59pub struct Option {
60    pub url: &'static str,
61}
62
63pub struct App {
64    option: AppOption,
65    data: HashMap<&'static str, Queue>,
66}
67
68impl App {
69    pub async fn add(self, params: JobParams) {
70        let job = Job::new(
71            params.name.clone(),
72            params.queue.clone(),
73            self.option.client.clone(),
74            params.option
75        );
76        let serialized_job = serde_json::to_string(&params).unwrap();
77
78        let queue: String = redis
79            ::cmd("SET")
80            .arg(self.parse_q_name(&job.queue))
81            .query_async(&mut self.option.client.clone()).await
82            .unwrap();
83
84        if queue.is_empty() {
85            panic!("Queue Not Found");
86        }
87
88        let _: String = redis
89            ::cmd("SET")
90            .arg(&job.id.to_string())
91            .arg(&serialized_job)
92            .query_async(&mut self.option.client.clone()).await
93            .unwrap();
94
95        let queue_list_name = self.parse_q_list(&job.queue, &job.qtype);
96        let _: String = redis
97            ::cmd("RPUSH")
98            .arg(&queue_list_name)
99            .arg(&job.name)
100            .query_async(&mut self.option.client.clone()).await
101            .unwrap();
102    }
103
104    pub async fn define(
105        &mut self,
106        params: QueueParams,
107        process: Box<dyn (Fn() -> AsyncTask) + Send>
108    ) {
109        let queue = Queue::new(params.name, self.option.client.clone(), params.option, process);
110
111        let queue_name: String = self.parse_q_name(&queue.name);
112        let serialized_queue = serde_json::to_string(&params).unwrap();
113
114        let _: String = redis
115            ::cmd("SET")
116            .arg(&queue_name)
117            .arg(&serialized_queue)
118            .query_async(&mut self.option.client.clone()).await
119            .unwrap();
120
121        self.data.insert(queue.name, queue);
122    }
123
124    pub async fn init(app_params: Option) -> Result<Self, Error> {
125        let client = redis::Client::open(app_params.url)?;
126        let con = client.get_tokio_connection_manager().await?;
127
128        let option: AppOption = AppOption {
129            client: con,
130        };
131
132        Ok(App { option, data: HashMap::new() })
133    }
134
135    pub async fn process(self, queue: Queue) -> () {
136        let init_queue = self.parse_q_list(queue.name, "initilizing");
137        let failed_queue = self.parse_q_list(queue.name, "failed");
138
139        loop {
140            let query: Result<String, redis::RedisError> = redis
141                ::cmd("BRPOP")
142                .arg(&init_queue)
143                .arg(&failed_queue)
144                .query_async(&mut self.option.client.clone()).await;
145
146            match query {
147                Ok(string_job) => {
148                    let mut job: JobParams = serde_json::from_str(&string_job).unwrap();
149                    let processed_queue = (queue.process)().await;
150                    match processed_queue {
151                        Ok(_) => {
152                            let _ = queue.move_queue_list(job, "completed").await;
153                        }
154                        Err(err) => {
155                            job.error = Some(err.to_string());
156                            let _ = queue.move_queue_list(job, "failed").await;
157                        }
158                    }
159                }
160                Err(_) => {}
161            }
162        }
163    }
164
165    pub async fn run(self) -> Result<(), Error> {
166        let queues = self.data;
167
168        for (key, queue) in queues {
169            println!("Queue: {:?}", key);
170            let _ = queue.run();
171        }
172
173        let termination_flag = Arc::new(AtomicBool::new(false));
174        let termination_flag_clone = Arc::clone(&termination_flag);
175        let signal_task = tokio::spawn(async move {
176            signal::ctrl_c().await.expect("Failed to listen for termination signal");
177            println!("Received termination signal. Stopping the loop.");
178            termination_flag_clone.store(true, Ordering::SeqCst);
179        });
180
181        while !termination_flag.load(Ordering::SeqCst) {}
182
183        signal_task.await.expect("Signal task failed");
184
185        Ok(())
186    }
187}