makepad_platform/
thread.rs1use {
2 std::sync::{
3 atomic::{AtomicBool, Ordering},
4 mpsc::{
5 channel,
6 Sender,
7 Receiver,
8 RecvError,
9 TryRecvError,
10 SendError,
11 },
12 Arc,
13 Mutex
14 },
15 crate::{
16 cx::Cx,
17 cx_api::*,
18 }
19};
20
21#[derive(Clone, Debug, Default)]
22pub struct Signal(Arc<AtomicBool>);
23
24pub (crate) static UI_SIGNAL: AtomicBool = AtomicBool::new(false);
25
26impl Signal {
27 pub fn set_ui_signal() {
28 UI_SIGNAL.store(true, Ordering::SeqCst)
29 }
30
31 pub (crate) fn check_and_clear_ui_signal() -> bool {
32 UI_SIGNAL.swap(false, Ordering::SeqCst)
33 }
34
35 pub fn new() -> Self {
36 Self (Arc::new(AtomicBool::new(false)))
37 }
38
39 pub fn check_and_clear(&self) -> bool {
40 self.0.swap(false, Ordering::SeqCst)
41 }
42
43 pub fn set(&self) {
44 self.0.store(true, Ordering::SeqCst);
45 Self::set_ui_signal();
46 }
47}
48
49pub struct ToUIReceiver<T> {
50 sender: Sender<T>,
51 pub receiver: Receiver<T>,
52}
53
54pub struct ToUISender<T> {
55 sender: Sender<T>,
56}
57
58impl<T> Clone for ToUISender<T> {
59 fn clone(&self) -> Self {
60 Self {sender: self.sender.clone()}
61 }
62}
63
64unsafe impl<T: Send> Send for ToUISender<T> {}
65
66impl<T> Default for ToUIReceiver<T> {
67 fn default() -> Self {
68 let (sender, receiver) = channel();
69 Self {
70 sender,
71 receiver,
72 }
73 }
74}
75
76impl<T> ToUIReceiver<T> {
77 pub fn sender(&self) -> ToUISender<T> {
78 ToUISender {
79 sender: self.sender.clone(),
80 }
81 }
82
83 pub fn try_recv(&self) -> Result<T, TryRecvError> {
84 self.receiver.try_recv()
85 }
86
87 pub fn try_recv_flush(&self) -> Result<T, TryRecvError> {
88 let mut store_last = None;
89 loop {
90 match self.receiver.try_recv() {
91 Ok(last) => {
92 store_last = Some(last);
93 },
94 Err(TryRecvError::Empty) => {
95 if let Some(last) = store_last {
96 return Ok(last)
97 }
98 else {
99 return Err(TryRecvError::Empty)
100 }
101 },
102 Err(TryRecvError::Disconnected) => {
103 return Err(TryRecvError::Disconnected)
104 }
105 }
106 }
107 }
108}
109
110impl<T> ToUISender<T> {
111 pub fn send(&self, t: T) -> Result<(), SendError<T >> {
112 let res = self.sender.send(t);
113 Signal::set_ui_signal();
114 res
115 }
116}
117
118pub struct FromUIReceiver<T> {
119 receiver: Receiver<T>,
120}
121
122pub struct FromUISender<T> {
123 receiver: Option<Receiver<T >>,
124 sender: Sender<T>,
125}
126
127unsafe impl<T: Send> Send for FromUIReceiver<T> {}
128
129impl<T> Default for FromUISender<T> {
130 fn default() -> Self {
131 let (sender, receiver) = channel();
132 Self {
133 sender,
134 receiver: Some(receiver),
135 }
136 }
137}
138
139impl<T> FromUISender<T> {
140 pub fn new_channel(&mut self) {
141 let (sender, receiver) = channel();
142 self.sender = sender;
143 self.receiver = Some(receiver)
144 }
145
146 pub fn send(&self, t: T) -> Result<(), SendError<T >> {
147 self.sender.send(t)
148 }
149
150 pub fn sender(&self) -> FromUISender<T> {
151 FromUISender {
152 sender: self.sender.clone(),
153 receiver: None
154 }
155 }
156
157 pub fn receiver(&mut self) -> FromUIReceiver<T> {
158 FromUIReceiver {
159 receiver: self.receiver.take().unwrap()
160 }
161 }
162}
163
164impl<T> FromUIReceiver<T> {
165 pub fn recv(&self) -> Result<T, RecvError> {
166 self.receiver.recv()
167 }
168
169 pub fn try_recv(&self) -> Result<T, TryRecvError> {
170 self.receiver.try_recv()
171 }
172
173}
174
175impl<T> std::ops::Deref for FromUIReceiver<T> {
176 type Target = Receiver<T>;
177 fn deref(&self) -> &Receiver<T>{
178 &self.receiver
179 }
180}
181
182pub struct RevThreadPool {
183 tasks: Arc<Mutex<Vec<Box<dyn FnOnce() + Send + 'static >> >>,
184}
185
186impl RevThreadPool {
187 pub fn new(cx: &mut Cx, num_threads: usize) -> Self {
188 let tasks: Arc<Mutex<Vec<Box<dyn FnOnce() + Send + 'static >> >> = Default::default();
189
190 for _ in 0..num_threads {
191 let tasks = tasks.clone();
192 cx.spawn_thread(move || loop {
193 let task = if let Ok(mut tasks) = tasks.lock() {
194 tasks.pop()
195 }
196 else {
197 panic!();
198 };
199 if let Some(task) = task {
200 task();
201 }
202 })
203 }
204 Self {
205 tasks
206 }
207 }
208
209 pub fn execute<F>(&self, task: F) where F: FnOnce() + Send + 'static {
210 self.tasks.lock().unwrap().insert(0, Box::new(task));
211 }
212
213 pub fn execute_rev<F>(&self, task: F) where F: FnOnce() + Send + 'static {
214 self.tasks.lock().unwrap().push(Box::new(task));
215 }
216}
217
218pub struct TagThreadPool<T: Clone + Send + 'static + PartialEq> {
219 tasks: Arc<Mutex<Vec<(T, Box<dyn FnOnce(T) + Send + 'static >) >> >,
220}
221
222impl<T> TagThreadPool<T>where T: Clone + Send + 'static + PartialEq {
223 pub fn new(cx: &mut Cx, num_threads: usize) -> Self {
224 let tasks: Arc<Mutex<Vec<(T, Box<dyn FnOnce(T) + Send + 'static >) >> > = Default::default();
225
226 for _ in 0..num_threads {
227 let tasks = tasks.clone();
228 cx.spawn_thread(move || loop {
229 let task = if let Ok(mut tasks) = tasks.lock() {
230 tasks.pop()
231 }
232 else {
233 panic!()
234 };
235 if let Some((tag, task)) = task {
236 task(tag);
237 }
238 else{
239 std::thread::sleep(std::time::Duration::from_millis(50));
240 }
241 })
242 }
243 Self {
244 tasks
245 }
246 }
247
248 pub fn execute<F>(&self, tag: T, task: F) where F: FnOnce(T) + Send + 'static {
249 if let Ok(mut tasks) = self.tasks.lock() {
250 tasks.retain( | v | v.0 != tag);
251 tasks.insert(0, (tag, Box::new(task)));
252 }
253 }
254
255 pub fn execute_rev<F>(&self, tag: T, task: F) where F: FnOnce(T) + Send + 'static {
256 if let Ok(mut tasks) = self.tasks.lock() {
257 tasks.retain( | v | v.0 != tag);
258 tasks.push((tag, Box::new(task)));
259 }
260 }
261}
262
263
264
265pub struct MessageThreadPool<T: Clone + Send + 'static> {
266 sender: Sender<Box<dyn FnOnce(Option<T>) + Send + 'static >>,
267 msg_senders: Vec<Sender<T >>
268}
269
270impl<T> MessageThreadPool<T> where T: Clone + Send + 'static {
271 pub fn new(cx: &mut Cx, num_threads: usize) -> Self {
272 let (sender, receiver) = channel::<Box<dyn FnOnce(Option<T>) + Send + 'static >> ();
273 let receiver = Arc::new(Mutex::new(receiver));
274 let mut msg_senders = Vec::new();
275 for _ in 0..num_threads {
276 let receiver = receiver.clone();
277 let (msg_send, msg_recv) = channel::<T>();
278 msg_senders.push(msg_send);
279 cx.spawn_thread(move || loop {
280 let task = if let Ok(receiver) = receiver.lock() {
281 match receiver.recv() {
282 Ok(task) => task,
283 Err(_) => return
284 }
285 }
286 else {
287 panic!();
288 };
289 let mut msg_out = None;
290 while let Ok(msg) = msg_recv.try_recv() {
291 msg_out = Some(msg);
292 }
293 task(msg_out);
294 })
295 }
296 Self {
297 sender,
298 msg_senders
299 }
300 }
301
302 pub fn send_msg(&self, msg: T) {
303 for sender in &self.msg_senders {
304 sender.send(msg.clone()).unwrap();
305 }
306 }
307
308 pub fn execute<F>(&self, task: F) where F: FnOnce(Option<T>) + Send + 'static {
309 self.sender.send(Box::new(task)).unwrap();
310 }
311}