another_rxrust/schedulers/
async_function_queue.rs1use crate::internals::function_wrapper::FunctionWrapper;
2use std::{
3 collections::VecDeque,
4 sync::{Arc, Condvar, Mutex, RwLock},
5};
6
7struct AsyncFunctionQueueData<'a> {
8 queue: Mutex<VecDeque<FunctionWrapper<'a, (), ()>>>,
9 cond: Condvar,
10 abort: RwLock<bool>,
11}
12
13#[derive(Clone)]
14pub struct AsyncFunctionQueue<'a> {
15 data: Arc<AsyncFunctionQueueData<'a>>,
16}
17impl<'a> AsyncFunctionQueue<'a> {
18 pub fn new() -> AsyncFunctionQueue<'a> {
19 AsyncFunctionQueue {
20 data: Arc::new(AsyncFunctionQueueData {
21 queue: Mutex::new(VecDeque::new()),
22 cond: Condvar::new(),
23 abort: RwLock::new(false),
24 }),
25 }
26 }
27
28 pub fn scheduling(&self) {
29 loop {
30 let f = {
31 let que_mtx = self.data.queue.lock().unwrap();
32 let mut que_mtx = self
33 .data
34 .cond
35 .wait_while(que_mtx, |que| {
36 if *self.data.abort.read().unwrap() {
37 false
38 } else {
39 que.is_empty()
40 }
41 })
42 .unwrap();
43 if *self.data.abort.read().unwrap() {
44 None
45 } else {
46 que_mtx.pop_front()
47 }
48 };
49 if let Some(f) = f {
50 f.call(());
51 } else {
52 break;
53 }
54 }
55 }
56
57 pub fn post<F>(&self, f: F)
58 where
59 F: Fn() + Send + Sync + 'a,
60 {
61 let mut que_mtx = self.data.queue.lock().unwrap();
62 que_mtx.push_back(FunctionWrapper::new(move |_| f()));
63 self.data.cond.notify_one();
64 }
65
66 pub fn stop(&self) {
67 let mut que_mtx = self.data.queue.lock().unwrap();
68 que_mtx.clear();
69 *self.data.abort.write().unwrap() = true;
70 self.data.cond.notify_one();
71 }
72}
73
74#[cfg(test)]
75mod test {
76 use super::AsyncFunctionQueue;
77 use std::{thread, time};
78
79 #[test]
80 fn basic() {
81 let scheduler = AsyncFunctionQueue::new();
82 let scheduler_thread = scheduler.clone();
83 thread::spawn(move || {
84 scheduler_thread.scheduling();
85 });
86
87 scheduler.post(|| {
88 println!("#1 start");
89 thread::sleep(time::Duration::from_millis(500));
90 println!("#1 end");
91 });
92
93 scheduler.post(|| {
94 println!("#2 start");
95 thread::sleep(time::Duration::from_millis(500));
96 println!("#2 end");
97 });
98
99 scheduler.post(|| {
100 println!("#3 start");
101 thread::sleep(time::Duration::from_millis(500));
102 println!("#3 end");
103 });
104
105 thread::sleep(time::Duration::from_millis(700));
106 println!("stop!!");
107 scheduler.stop();
108 thread::sleep(time::Duration::from_millis(2000));
109 }
110}