txrx/
manual_executor.rs

1use crate::traits::{Receiver, Work};
2use std::collections::VecDeque;
3use std::sync::Arc;
4
5use crate::priv_sync::{Condvar, Mutex};
6
7type QueueType = VecDeque<Box<dyn FnOnce() + Send>>;
8
9struct Inner {
10    queue: Mutex<QueueType>,
11    cond_var: Condvar,
12}
13
14impl Inner {
15    pub fn new() -> Self {
16        Self {
17            queue: Mutex::new(QueueType::with_capacity(256)),
18            cond_var: Condvar::new(),
19        }
20    }
21
22    pub fn add<F: 'static + FnOnce() + Send>(&self, work: F) {
23        {
24            let mut queue = self.queue.lock();
25            queue.push_back(Box::new(work));
26        }
27        self.cond_var.notify_one();
28    }
29
30    pub fn run_one(&self) -> bool {
31        let to_run = {
32            let guard = self.queue.lock();
33            let mut guard = self.cond_var.wait_while(guard, |x| x.is_empty());
34            guard.pop_front()
35        };
36
37        if let Some(to_run) = to_run {
38            to_run();
39            true
40        } else {
41            false
42        }
43    }
44}
45
46pub struct ManualExecutor {
47    inner: Arc<Inner>,
48}
49
50impl Default for ManualExecutor {
51    fn default() -> Self {
52        Self::new()
53    }
54}
55
56impl ManualExecutor {
57    pub fn new() -> Self {
58        Self {
59            inner: Arc::new(Inner::new()),
60        }
61    }
62
63    pub fn scheduler(&self) -> Scheduler {
64        Scheduler {
65            inner: self.inner.clone(),
66        }
67    }
68
69    pub fn runner(&self) -> Runner {
70        Runner {
71            inner: self.inner.clone(),
72        }
73    }
74}
75
76#[derive(Clone)]
77pub struct Runner {
78    inner: Arc<Inner>,
79}
80
81impl Runner {
82    pub fn run_one(&self) -> bool {
83        self.inner.run_one()
84    }
85}
86
87pub struct ScheduledSender {
88    inner: Arc<Inner>,
89}
90
91impl crate::traits::Sender for ScheduledSender {
92    type Output = ();
93    type Scheduler = Scheduler;
94
95    fn start<R>(self, receiver: R)
96    where
97        R: 'static + Send + Receiver<Input = Self::Output>,
98    {
99        self.inner.add(move || {
100            receiver.set_value(());
101        });
102    }
103
104    fn get_scheduler(&self) -> Self::Scheduler {
105        Self::Scheduler {
106            inner: self.inner.clone(),
107        }
108    }
109}
110
111#[derive(Clone)]
112pub struct Scheduler {
113    inner: Arc<Inner>,
114}
115
116impl crate::traits::Scheduler for Scheduler {
117    type Sender = ScheduledSender;
118
119    fn schedule(&mut self) -> Self::Sender {
120        ScheduledSender {
121            inner: self.inner.clone(),
122        }
123    }
124
125    fn execute<W>(&mut self, work: W)
126    where
127        W: 'static + Send + Work,
128    {
129        self.inner.add(move || {
130            work.execute();
131        });
132    }
133}