chimes_utils/utils/
queue.rs1use std::cell::RefCell;
2use std::collections::HashMap;
3use std::future::Future;
4use std::mem::MaybeUninit;
5use std::pin::Pin;
6use std::sync::{Arc, Once};
7use std::time::Duration;
8use tokio::runtime::Runtime;
9use tokio::sync::{Mutex, Notify, RwLock};
10use tokio::task::LocalSet;
11
12use crate::{current_timestamp_secs, generate_rand_numberstring, global_app_data_resizing};
13
14type HandlerFunc = fn(&ProcessTask) -> Pin<Box<dyn Future<Output = ()>>>;
15
16lazy_static! {
17 pub static ref HANDLER_FUNC_REGISTRY: Mutex<HashMap<String, HandlerFunc>> =
18 Mutex::new(HashMap::new());
19}
20
21#[derive(Clone)]
22pub struct ProcessTask {
23 task_id: String,
25 handler_type: String,
26 cyclicity: bool,
27 last_time: RefCell<u64>,
28 period: u64,
29 pub cookie: Option<String>,
30}
31
32impl ProcessTask {
33 pub fn new(h: bool, p: u64, ck: &str, hfn: &str) -> Self {
34 Self {
35 task_id: generate_rand_numberstring(16),
36 cyclicity: h,
37 period: p,
38 last_time: RefCell::new(current_timestamp_secs()),
39 handler_type: hfn.to_owned(),
40 cookie: Some(ck.to_owned()),
41 }
42 }
43
44 pub fn new_with_cookie_fn(ck: &str, cycle: bool, p: u64, hfn: &str) -> Self {
45 Self {
46 task_id: generate_rand_numberstring(16),
47 cyclicity: cycle,
48 period: p,
49 last_time: RefCell::new(current_timestamp_secs()),
50 handler_type: hfn.to_owned(),
51 cookie: Some(ck.to_owned()),
52 }
53 }
54}
55
56unsafe impl Send for ProcessTask {}
57
58unsafe impl Sync for ProcessTask {}
59
60#[derive(Clone)]
61pub struct ProcessQueue {
62 queue: Vec<ProcessTask>,
63 started: bool,
64 max_tasks: usize,
65 waiting_timeout: u64,
66 notify: Arc<Notify>,
67 lock: Arc<RwLock<u32>>,
68}
69
70unsafe impl Send for ProcessQueue {}
71
72unsafe impl Sync for ProcessQueue {}
73
74pub async fn queue_registry_handler(handler_type: &str, fun: HandlerFunc) {
75 let mut h = HANDLER_FUNC_REGISTRY.lock().await;
76 h.insert(handler_type.to_owned(), fun);
77}
78
79impl ProcessQueue {
80 pub fn new() -> Self {
81 Self {
82 queue: vec![],
83 started: false,
84 max_tasks: 1000usize,
85 waiting_timeout: 30,
86 notify: Arc::new(Notify::new()),
87 lock: Arc::new(RwLock::new(2)),
88 }
89 }
90
91 pub fn process_task(&self, rt: &Runtime, task: &ProcessTask) {
92 let local = LocalSet::new();
93 local.block_on(rt, async move {
94 let mlock = HANDLER_FUNC_REGISTRY.lock().await;
95 let hfunc = mlock.get(&task.handler_type);
96 if hfunc.is_some() {
97 let func = hfunc.unwrap();
98 func(task).await;
99 }
100 });
101 }
102
103 pub async fn queue_add(&mut self, emb: ProcessTask) {
104 let ck = self.lock.clone();
105 let mu = ck.write().await;
107 self.queue.push(emb);
108 drop(mu);
109 self.notify.notify_one();
110 }
111
112 pub async fn queue_remove(&mut self, emb: &ProcessTask) {
113 let ck = self.lock.clone();
114 let mu = ck.write().await;
116 let len = self.queue.len();
117 for i in 0..len {
118 if emb.task_id == self.queue[i].task_id {
119 self.queue.remove(i);
120 break;
121 }
122 }
123 drop(mu);
124 }
126 fn start(&'static mut self) {
130 if !self.started {
131 self.started = true;
132 start_queue_thread(self);
133 }
134 }
135
136 fn shutdown(&mut self) {
140 if self.started {
141 self.started = false;
142 self.notify.notify_waiters();
143 }
145 }
146
147 fn notify(&self) {
148 if self.started {
149 self.notify.notify_waiters();
150 }
151 }
152
153 fn count(&self) -> usize {
154 self.queue.len()
155 }
156
157 fn is_queue_full(&self) -> bool {
158 self.queue.len() >= self.max_tasks
159 }
160}
161
162impl Default for ProcessQueue {
163 fn default() -> Self {
164 Self::new()
165 }
166}
167
168fn start_queue_thread(msq: &'static mut ProcessQueue) {
169 let crt = tokio::runtime::Builder::new_multi_thread()
171 .worker_threads(2usize)
172 .enable_all()
173 .build()
174 .unwrap();
175
176 async_std::task::spawn(async move {
177 log::info!("The queue was created and processing.");
178 let notified = msq.notify.clone();
179 let rclock = msq.lock.clone();
180 while msq.started {
183 let current_time = current_timestamp_secs();
185 let mu = rclock.read().await;
186 let mut should_remove_tasks = vec![];
187
188 for ct in msq.queue.clone() {
189 if current_time - *ct.last_time.borrow() > ct.period {
191 update_task_lasttime(msq, &ct, current_time);
192 msq.process_task(&crt, &ct);
193 if !ct.cyclicity {
194 should_remove_tasks.push(ct);
196 }
197 }
198 }
199 drop(mu);
200 for rct in should_remove_tasks {
201 msq.queue_remove(&rct).await;
202 }
203 match async_std::future::timeout(Duration::from_secs(1), notified.notified()).await {
204 Ok(_) => {
205 log::warn!("Received a new task notification. The process loop continued.");
206 }
207 Err(err) => {
208 global_app_data_resizing(); log::debug!("Time out for {}", err);
210 }
211 }
212 }
213 });
214}
215
216pub fn get_task_queue() -> &'static mut ProcessQueue {
217 static mut STATIC_ESQ: MaybeUninit<ProcessQueue> = MaybeUninit::uninit();
219 static ONCE: Once = Once::new();
221
222 ONCE.call_once(|| unsafe {
223 async_std::task::block_on(async {
225 let esq = ProcessQueue::new();
226 STATIC_ESQ.as_mut_ptr().write(esq);
227 });
228 });
229 unsafe { &mut *STATIC_ESQ.as_mut_ptr() }
230}
231
232pub fn config_task_queue_wait_timeout(tmt: u64, _period: u64) {
233 get_task_queue().waiting_timeout = tmt;
234}
235
236pub fn config_task_queue_max_tasks(max: usize) {
237 get_task_queue().max_tasks = max;
238}
239
240pub fn start_task_queue_thread() {
241 get_task_queue().start();
242}
243
244pub fn stop_task_queue_thread() {
245 get_task_queue().shutdown();
246}
247
248pub fn notify_queue_process() {
249 get_task_queue().notify();
250}
251
252pub fn count_queue_tasks() -> usize {
253 get_task_queue().count()
254}
255
256pub fn task_queue_is_full() -> bool {
257 get_task_queue().is_queue_full()
258}
259
260fn update_task_lasttime(msq: &mut ProcessQueue, ct: &ProcessTask, nsec: u64) {
261 let len = msq.queue.len();
263
264 for i in 0..len {
265 if ct.task_id == msq.queue[i].task_id {
266 *msq.queue[i].last_time.borrow_mut() = nsec;
267 }
268 }
269}