another_rxrust/schedulers/
async_function_queue.rs

1use crate::internals::function_wrapper::FunctionWrapper;
2use std::{
3  collections::VecDeque,
4  sync::{Arc, Condvar, Mutex, RwLock},
5};
6
7struct AsyncFunctionQueueData<'a> {
8  queue: Mutex<VecDeque<FunctionWrapper<'a, (), ()>>>,
9  cond: Condvar,
10  abort: RwLock<bool>,
11}
12
13#[derive(Clone)]
14pub struct AsyncFunctionQueue<'a> {
15  data: Arc<AsyncFunctionQueueData<'a>>,
16}
17impl<'a> AsyncFunctionQueue<'a> {
18  pub fn new() -> AsyncFunctionQueue<'a> {
19    AsyncFunctionQueue {
20      data: Arc::new(AsyncFunctionQueueData {
21        queue: Mutex::new(VecDeque::new()),
22        cond: Condvar::new(),
23        abort: RwLock::new(false),
24      }),
25    }
26  }
27
28  pub fn scheduling(&self) {
29    loop {
30      let f = {
31        let que_mtx = self.data.queue.lock().unwrap();
32        let mut que_mtx = self
33          .data
34          .cond
35          .wait_while(que_mtx, |que| {
36            if *self.data.abort.read().unwrap() {
37              false
38            } else {
39              que.is_empty()
40            }
41          })
42          .unwrap();
43        if *self.data.abort.read().unwrap() {
44          None
45        } else {
46          que_mtx.pop_front()
47        }
48      };
49      if let Some(f) = f {
50        f.call(());
51      } else {
52        break;
53      }
54    }
55  }
56
57  pub fn post<F>(&self, f: F)
58  where
59    F: Fn() + Send + Sync + 'a,
60  {
61    let mut que_mtx = self.data.queue.lock().unwrap();
62    que_mtx.push_back(FunctionWrapper::new(move |_| f()));
63    self.data.cond.notify_one();
64  }
65
66  pub fn stop(&self) {
67    let mut que_mtx = self.data.queue.lock().unwrap();
68    que_mtx.clear();
69    *self.data.abort.write().unwrap() = true;
70    self.data.cond.notify_one();
71  }
72}
73
74#[cfg(test)]
75mod test {
76  use super::AsyncFunctionQueue;
77  use std::{thread, time};
78
79  #[test]
80  fn basic() {
81    let scheduler = AsyncFunctionQueue::new();
82    let scheduler_thread = scheduler.clone();
83    thread::spawn(move || {
84      scheduler_thread.scheduling();
85    });
86
87    scheduler.post(|| {
88      println!("#1 start");
89      thread::sleep(time::Duration::from_millis(500));
90      println!("#1 end");
91    });
92
93    scheduler.post(|| {
94      println!("#2 start");
95      thread::sleep(time::Duration::from_millis(500));
96      println!("#2 end");
97    });
98
99    scheduler.post(|| {
100      println!("#3 start");
101      thread::sleep(time::Duration::from_millis(500));
102      println!("#3 end");
103    });
104
105    thread::sleep(time::Duration::from_millis(700));
106    println!("stop!!");
107    scheduler.stop();
108    thread::sleep(time::Duration::from_millis(2000));
109  }
110}