chimes_utils/utils/
queue.rs

1use std::cell::RefCell;
2use std::collections::HashMap;
3use std::future::Future;
4use std::mem::MaybeUninit;
5use std::pin::Pin;
6use std::sync::{Arc, Once};
7use std::time::Duration;
8use tokio::runtime::Runtime;
9use tokio::sync::{Mutex, Notify, RwLock};
10use tokio::task::LocalSet;
11
12use crate::{current_timestamp_secs, generate_rand_numberstring, global_app_data_resizing};
13
14type HandlerFunc = fn(&ProcessTask) -> Pin<Box<dyn Future<Output = ()>>>;
15
16lazy_static! {
17    pub static ref HANDLER_FUNC_REGISTRY: Mutex<HashMap<String, HandlerFunc>> =
18        Mutex::new(HashMap::new());
19}
20
21#[derive(Clone)]
22pub struct ProcessTask {
23    // handler: Rc<&'static dyn TaskHandler>,
24    task_id: String,
25    handler_type: String,
26    cyclicity: bool,
27    last_time: RefCell<u64>,
28    period: u64,
29    pub cookie: Option<String>,
30}
31
32impl ProcessTask {
33    pub fn new(h: bool, p: u64, ck: &str, hfn: &str) -> Self {
34        Self {
35            task_id: generate_rand_numberstring(16),
36            cyclicity: h,
37            period: p,
38            last_time: RefCell::new(current_timestamp_secs()),
39            handler_type: hfn.to_owned(),
40            cookie: Some(ck.to_owned()),
41        }
42    }
43
44    pub fn new_with_cookie_fn(ck: &str, cycle: bool, p: u64, hfn: &str) -> Self {
45        Self {
46            task_id: generate_rand_numberstring(16),
47            cyclicity: cycle,
48            period: p,
49            last_time: RefCell::new(current_timestamp_secs()),
50            handler_type: hfn.to_owned(),
51            cookie: Some(ck.to_owned()),
52        }
53    }
54}
55
56unsafe impl Send for ProcessTask {}
57
58unsafe impl Sync for ProcessTask {}
59
60#[derive(Clone)]
61pub struct ProcessQueue {
62    queue: Vec<ProcessTask>,
63    started: bool,
64    max_tasks: usize,
65    waiting_timeout: u64,
66    notify: Arc<Notify>,
67    lock: Arc<RwLock<u32>>,
68}
69
70unsafe impl Send for ProcessQueue {}
71
72unsafe impl Sync for ProcessQueue {}
73
74pub async fn queue_registry_handler(handler_type: &str, fun: HandlerFunc) {
75    let mut h = HANDLER_FUNC_REGISTRY.lock().await;
76    h.insert(handler_type.to_owned(), fun);
77}
78
79impl ProcessQueue {
80    pub fn new() -> Self {
81        Self {
82            queue: vec![],
83            started: false,
84            max_tasks: 1000usize,
85            waiting_timeout: 30,
86            notify: Arc::new(Notify::new()),
87            lock: Arc::new(RwLock::new(2)),
88        }
89    }
90
91    pub fn process_task(&self, rt: &Runtime, task: &ProcessTask) {
92        let local = LocalSet::new();
93        local.block_on(rt, async move {
94            let mlock = HANDLER_FUNC_REGISTRY.lock().await;
95            let hfunc = mlock.get(&task.handler_type);
96            if hfunc.is_some() {
97                let func = hfunc.unwrap();
98                func(task).await;
99            }
100        });
101    }
102
103    pub async fn queue_add(&mut self, emb: ProcessTask) {
104        let ck = self.lock.clone();
105        // let mut clemb = emb.clone();
106        let mu = ck.write().await;
107        self.queue.push(emb);
108        drop(mu);
109        self.notify.notify_one();
110    }
111
112    pub async fn queue_remove(&mut self, emb: &ProcessTask) {
113        let ck = self.lock.clone();
114        // let mut clemb = emb.clone();
115        let mu = ck.write().await;
116        let len = self.queue.len();
117        for i in 0..len {
118            if emb.task_id == self.queue[i].task_id {
119                self.queue.remove(i);
120                break;
121            }
122        }
123        drop(mu);
124        // self.notify.notify_one();
125    }
126    /**
127     * 启动邮件发送队列
128     */
129    fn start(&'static mut self) {
130        if !self.started {
131            self.started = true;
132            start_queue_thread(self);
133        }
134    }
135
136    /**
137     * 关闭邮件发送队列
138     */
139    fn shutdown(&mut self) {
140        if self.started {
141            self.started = false;
142            self.notify.notify_waiters();
143            // waiting for shutdown
144        }
145    }
146
147    fn notify(&self) {
148        if self.started {
149            self.notify.notify_waiters();
150        }
151    }
152
153    fn count(&self) -> usize {
154        self.queue.len()
155    }
156
157    fn is_queue_full(&self) -> bool {
158        self.queue.len() >= self.max_tasks
159    }
160}
161
162impl Default for ProcessQueue {
163    fn default() -> Self {
164        Self::new()
165    }
166}
167
168fn start_queue_thread(msq: &'static mut ProcessQueue) {
169    // async_std::task::spawn(
170    let crt = tokio::runtime::Builder::new_multi_thread()
171        .worker_threads(2usize)
172        .enable_all()
173        .build()
174        .unwrap();
175
176    async_std::task::spawn(async move {
177        log::info!("The queue was created and processing.");
178        let notified = msq.notify.clone();
179        let rclock = msq.lock.clone();
180        // let crt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
181
182        while msq.started {
183            // Open connection to Gmail
184            let current_time = current_timestamp_secs();
185            let mu = rclock.read().await;
186            let mut should_remove_tasks = vec![];
187
188            for ct in msq.queue.clone() {
189                // log::info!("Current task: {} >= ? {}", current_time - *ct.last_time.borrow(), ct.period );
190                if current_time - *ct.last_time.borrow() > ct.period {
191                    update_task_lasttime(msq, &ct, current_time);
192                    msq.process_task(&crt, &ct);
193                    if !ct.cyclicity {
194                        // msq.queue_remove(&ct).await;
195                        should_remove_tasks.push(ct);
196                    }
197                }
198            }
199            drop(mu);
200            for rct in should_remove_tasks {
201                msq.queue_remove(&rct).await;
202            }
203            match async_std::future::timeout(Duration::from_secs(1), notified.notified()).await {
204                Ok(_) => {
205                    log::warn!("Received a new task notification. The process loop continued.");
206                }
207                Err(err) => {
208                    global_app_data_resizing(); // clear all data period
209                    log::debug!("Time out for {}", err);
210                }
211            }
212        }
213    });
214}
215
216pub fn get_task_queue() -> &'static mut ProcessQueue {
217    // 使用MaybeUninit延迟初始化
218    static mut STATIC_ESQ: MaybeUninit<ProcessQueue> = MaybeUninit::uninit();
219    // Once带锁保证只进行一次初始化
220    static ONCE: Once = Once::new();
221
222    ONCE.call_once(|| unsafe {
223        // CONF = 1u64;
224        async_std::task::block_on(async {
225            let esq = ProcessQueue::new();
226            STATIC_ESQ.as_mut_ptr().write(esq);
227        });
228    });
229    unsafe { &mut *STATIC_ESQ.as_mut_ptr() }
230}
231
232pub fn config_task_queue_wait_timeout(tmt: u64, _period: u64) {
233    get_task_queue().waiting_timeout = tmt;
234}
235
236pub fn config_task_queue_max_tasks(max: usize) {
237    get_task_queue().max_tasks = max;
238}
239
240pub fn start_task_queue_thread() {
241    get_task_queue().start();
242}
243
244pub fn stop_task_queue_thread() {
245    get_task_queue().shutdown();
246}
247
248pub fn notify_queue_process() {
249    get_task_queue().notify();
250}
251
252pub fn count_queue_tasks() -> usize {
253    get_task_queue().count()
254}
255
256pub fn task_queue_is_full() -> bool {
257    get_task_queue().is_queue_full()
258}
259
260fn update_task_lasttime(msq: &mut ProcessQueue, ct: &ProcessTask, nsec: u64) {
261    // let msq = get_task_queue();
262    let len = msq.queue.len();
263
264    for i in 0..len {
265        if ct.task_id == msq.queue[i].task_id {
266            *msq.queue[i].last_time.borrow_mut() = nsec;
267        }
268    }
269}