1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
use crate::internals::function_wrapper::FunctionWrapper;
use std::{
  collections::VecDeque,
  sync::{Arc, Condvar, Mutex, RwLock},
};

struct AsyncFunctionQueueData<'a> {
  queue: Mutex<VecDeque<FunctionWrapper<'a, (), ()>>>,
  cond: Condvar,
  abort: RwLock<bool>,
}

#[derive(Clone)]
pub struct AsyncFunctionQueue<'a> {
  data: Arc<AsyncFunctionQueueData<'a>>,
}
impl<'a> AsyncFunctionQueue<'a> {
  pub fn new() -> AsyncFunctionQueue<'a> {
    AsyncFunctionQueue {
      data: Arc::new(AsyncFunctionQueueData {
        queue: Mutex::new(VecDeque::new()),
        cond: Condvar::new(),
        abort: RwLock::new(false),
      }),
    }
  }

  pub fn scheduling(&self) {
    loop {
      let f = {
        let que_mtx = self.data.queue.lock().unwrap();
        let mut que_mtx = self
          .data
          .cond
          .wait_while(que_mtx, |que| {
            if *self.data.abort.read().unwrap() {
              false
            } else {
              que.is_empty()
            }
          })
          .unwrap();
        if *self.data.abort.read().unwrap() {
          None
        } else {
          que_mtx.pop_front()
        }
      };
      if let Some(f) = f {
        f.call(());
      } else {
        break;
      }
    }
  }

  pub fn post<F>(&self, f: F)
  where
    F: Fn() + Send + Sync + 'a,
  {
    let mut que_mtx = self.data.queue.lock().unwrap();
    que_mtx.push_back(FunctionWrapper::new(move |_| f()));
    self.data.cond.notify_one();
  }

  pub fn stop(&self) {
    let mut que_mtx = self.data.queue.lock().unwrap();
    que_mtx.clear();
    *self.data.abort.write().unwrap() = true;
    self.data.cond.notify_one();
  }
}

#[cfg(test)]
mod test {
  use super::AsyncFunctionQueue;
  use std::{thread, time};

  #[test]
  fn basic() {
    let scheduler = AsyncFunctionQueue::new();
    let scheduler_thread = scheduler.clone();
    thread::spawn(move || {
      scheduler_thread.scheduling();
    });

    scheduler.post(|| {
      println!("#1 start");
      thread::sleep(time::Duration::from_millis(500));
      println!("#1 end");
    });

    scheduler.post(|| {
      println!("#2 start");
      thread::sleep(time::Duration::from_millis(500));
      println!("#2 end");
    });

    scheduler.post(|| {
      println!("#3 start");
      thread::sleep(time::Duration::from_millis(500));
      println!("#3 end");
    });

    thread::sleep(time::Duration::from_millis(700));
    println!("stop!!");
    scheduler.stop();
    thread::sleep(time::Duration::from_millis(2000));
  }
}