1use uuid::Uuid;
5use serde::{ Serialize, Deserialize };
6use std::str;
7use serde_json;
8use anyhow::Error;
9use redis::aio::ConnectionManager;
10use std::collections::HashMap;
11mod queue;
12use queue::Queue;
13use std::future::Future;
14use std::pin::Pin;
15
16mod job;
17use job::{ JobParams, Job };
18
19use std::sync::atomic::{ AtomicBool, Ordering };
20use std::sync::Arc;
21use tokio::signal;
22
23#[derive(Deserialize, Serialize, Clone, Copy)]
24pub struct QueueOption {}
25
26#[derive(Deserialize, Serialize, Clone, Copy)]
27pub struct QueueParams {
28 pub name: &'static str,
29 pub option: QueueOption,
30}
31
32trait QueueType {
33 fn queue_types(&self) -> Vec<String> {
34 vec![
35 "initializing".to_string(),
36 "active".to_string(),
37 "failed".to_string(),
38 "completed".to_string()
39 ]
40 }
41
42 fn parse_q_list(&self, queue_name: &str, queue_type: &str) -> String {
43 format!("jqueuers.{}.{}", queue_name, queue_type)
44 }
45
46 fn parse_q_name(&self, queue_name: &str) -> String {
47 format!("jqueuers.{}", queue_name)
48 }
49}
50impl QueueType for App {}
51pub type AsyncTask = Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>;
52
53#[derive(Clone)]
54pub struct AppOption {
55 client: ConnectionManager,
56}
57
58#[derive(Clone)]
59pub struct Option {
60 pub url: &'static str,
61}
62
63pub struct App {
64 option: AppOption,
65 data: HashMap<&'static str, Queue>,
66}
67
68impl App {
69 pub async fn add(self, params: JobParams) {
70 let job = Job::new(
71 params.name.clone(),
72 params.queue.clone(),
73 self.option.client.clone(),
74 params.option
75 );
76 let serialized_job = serde_json::to_string(¶ms).unwrap();
77
78 let queue: String = redis
79 ::cmd("SET")
80 .arg(self.parse_q_name(&job.queue))
81 .query_async(&mut self.option.client.clone()).await
82 .unwrap();
83
84 if queue.is_empty() {
85 panic!("Queue Not Found");
86 }
87
88 let _: String = redis
89 ::cmd("SET")
90 .arg(&job.id.to_string())
91 .arg(&serialized_job)
92 .query_async(&mut self.option.client.clone()).await
93 .unwrap();
94
95 let queue_list_name = self.parse_q_list(&job.queue, &job.qtype);
96 let _: String = redis
97 ::cmd("RPUSH")
98 .arg(&queue_list_name)
99 .arg(&job.name)
100 .query_async(&mut self.option.client.clone()).await
101 .unwrap();
102 }
103
104 pub async fn define(
105 &mut self,
106 params: QueueParams,
107 process: Box<dyn (Fn() -> AsyncTask) + Send>
108 ) {
109 let queue = Queue::new(params.name, self.option.client.clone(), params.option, process);
110
111 let queue_name: String = self.parse_q_name(&queue.name);
112 let serialized_queue = serde_json::to_string(¶ms).unwrap();
113
114 let _: String = redis
115 ::cmd("SET")
116 .arg(&queue_name)
117 .arg(&serialized_queue)
118 .query_async(&mut self.option.client.clone()).await
119 .unwrap();
120
121 self.data.insert(queue.name, queue);
122 }
123
124 pub async fn init(app_params: Option) -> Result<Self, Error> {
125 let client = redis::Client::open(app_params.url)?;
126 let con = client.get_tokio_connection_manager().await?;
127
128 let option: AppOption = AppOption {
129 client: con,
130 };
131
132 Ok(App { option, data: HashMap::new() })
133 }
134
135 pub async fn process(self, queue: Queue) -> () {
136 let init_queue = self.parse_q_list(queue.name, "initilizing");
137 let failed_queue = self.parse_q_list(queue.name, "failed");
138
139 loop {
140 let query: Result<String, redis::RedisError> = redis
141 ::cmd("BRPOP")
142 .arg(&init_queue)
143 .arg(&failed_queue)
144 .query_async(&mut self.option.client.clone()).await;
145
146 match query {
147 Ok(string_job) => {
148 let mut job: JobParams = serde_json::from_str(&string_job).unwrap();
149 let processed_queue = (queue.process)().await;
150 match processed_queue {
151 Ok(_) => {
152 let _ = queue.move_queue_list(job, "completed").await;
153 }
154 Err(err) => {
155 job.error = Some(err.to_string());
156 let _ = queue.move_queue_list(job, "failed").await;
157 }
158 }
159 }
160 Err(_) => {}
161 }
162 }
163 }
164
165 pub async fn run(self) -> Result<(), Error> {
166 let queues = self.data;
167
168 for (key, queue) in queues {
169 println!("Queue: {:?}", key);
170 let _ = queue.run();
171 }
172
173 let termination_flag = Arc::new(AtomicBool::new(false));
174 let termination_flag_clone = Arc::clone(&termination_flag);
175 let signal_task = tokio::spawn(async move {
176 signal::ctrl_c().await.expect("Failed to listen for termination signal");
177 println!("Received termination signal. Stopping the loop.");
178 termination_flag_clone.store(true, Ordering::SeqCst);
179 });
180
181 while !termination_flag.load(Ordering::SeqCst) {}
182
183 signal_task.await.expect("Signal task failed");
184
185 Ok(())
186 }
187}