1use crate::traits::{Receiver, Work};
2use std::collections::VecDeque;
3use std::sync::Arc;
4
5use crate::priv_sync::{Condvar, Mutex};
6
7type QueueType = VecDeque<Box<dyn FnOnce() + Send>>;
8
9struct Inner {
10 queue: Mutex<QueueType>,
11 cond_var: Condvar,
12}
13
14impl Inner {
15 pub fn new() -> Self {
16 Self {
17 queue: Mutex::new(QueueType::with_capacity(256)),
18 cond_var: Condvar::new(),
19 }
20 }
21
22 pub fn add<F: 'static + FnOnce() + Send>(&self, work: F) {
23 {
24 let mut queue = self.queue.lock();
25 queue.push_back(Box::new(work));
26 }
27 self.cond_var.notify_one();
28 }
29
30 pub fn run_one(&self) -> bool {
31 let to_run = {
32 let guard = self.queue.lock();
33 let mut guard = self.cond_var.wait_while(guard, |x| x.is_empty());
34 guard.pop_front()
35 };
36
37 if let Some(to_run) = to_run {
38 to_run();
39 true
40 } else {
41 false
42 }
43 }
44}
45
46pub struct ManualExecutor {
47 inner: Arc<Inner>,
48}
49
50impl Default for ManualExecutor {
51 fn default() -> Self {
52 Self::new()
53 }
54}
55
56impl ManualExecutor {
57 pub fn new() -> Self {
58 Self {
59 inner: Arc::new(Inner::new()),
60 }
61 }
62
63 pub fn scheduler(&self) -> Scheduler {
64 Scheduler {
65 inner: self.inner.clone(),
66 }
67 }
68
69 pub fn runner(&self) -> Runner {
70 Runner {
71 inner: self.inner.clone(),
72 }
73 }
74}
75
76#[derive(Clone)]
77pub struct Runner {
78 inner: Arc<Inner>,
79}
80
81impl Runner {
82 pub fn run_one(&self) -> bool {
83 self.inner.run_one()
84 }
85}
86
87pub struct ScheduledSender {
88 inner: Arc<Inner>,
89}
90
91impl crate::traits::Sender for ScheduledSender {
92 type Output = ();
93 type Scheduler = Scheduler;
94
95 fn start<R>(self, receiver: R)
96 where
97 R: 'static + Send + Receiver<Input = Self::Output>,
98 {
99 self.inner.add(move || {
100 receiver.set_value(());
101 });
102 }
103
104 fn get_scheduler(&self) -> Self::Scheduler {
105 Self::Scheduler {
106 inner: self.inner.clone(),
107 }
108 }
109}
110
111#[derive(Clone)]
112pub struct Scheduler {
113 inner: Arc<Inner>,
114}
115
116impl crate::traits::Scheduler for Scheduler {
117 type Sender = ScheduledSender;
118
119 fn schedule(&mut self) -> Self::Sender {
120 ScheduledSender {
121 inner: self.inner.clone(),
122 }
123 }
124
125 fn execute<W>(&mut self, work: W)
126 where
127 W: 'static + Send + Work,
128 {
129 self.inner.add(move || {
130 work.execute();
131 });
132 }
133}