makepad_platform/
thread.rs1use {
2 std::sync::{
3 atomic::{AtomicBool, Ordering},
4 mpsc::{
5 channel,
6 Sender,
7 Receiver,
8 RecvError,
9 TryRecvError,
10 SendError,
11 },
12 Arc,
13 Mutex
14 },
15 crate::{
16 cx::Cx,
17 cx_api::*,
18 }
19};
20
21#[derive(Clone, Debug, Default)]
22pub struct SignalToUI(Arc<AtomicBool>);
23
24pub (crate) static UI_SIGNAL: AtomicBool = AtomicBool::new(false);
25pub (crate) static ACTION_SIGNAL: AtomicBool = AtomicBool::new(false);
26
27impl SignalToUI {
28 pub fn set_ui_signal() {
29 UI_SIGNAL.store(true, Ordering::SeqCst)
30 }
31
32 pub fn set_action_signal() {
33 ACTION_SIGNAL.store(true, Ordering::SeqCst)
34 }
35
36 pub (crate) fn check_and_clear_ui_signal() -> bool {
37 UI_SIGNAL.swap(false, Ordering::SeqCst)
38 }
39
40 pub (crate) fn check_and_clear_action_signal() -> bool {
41 ACTION_SIGNAL.swap(false, Ordering::SeqCst)
42 }
43
44 pub fn new() -> Self {
45 Self (Arc::new(AtomicBool::new(false)))
46 }
47
48 pub fn check_and_clear(&self) -> bool {
49 self.0.swap(false, Ordering::SeqCst)
50 }
51
52 pub fn set(&self) {
53 self.0.store(true, Ordering::SeqCst);
54 Self::set_ui_signal();
55 }
56}
57
58
59#[derive(Clone, Debug, Default)]
60pub struct SignalFromUI(Arc<AtomicBool>);
61
62impl SignalFromUI {
63 pub fn new() -> Self {
64 Self (Arc::new(AtomicBool::new(false)))
65 }
66
67 pub fn check_and_clear(&self) -> bool {
68 self.0.swap(false, Ordering::SeqCst)
69 }
70
71 pub fn set(&self) {
72 self.0.store(true, Ordering::SeqCst);
73 }
74}
75
76pub struct ToUIReceiver<T> {
77 sender: Sender<T>,
78 pub receiver: Receiver<T>,
79}
80
81pub struct ToUISender<T> {
82 sender: Sender<T>,
83}
84
85impl<T> Clone for ToUISender<T> {
86 fn clone(&self) -> Self {
87 Self {sender: self.sender.clone()}
88 }
89}
90
91unsafe impl<T: Send> Send for ToUISender<T> {}
92
93impl<T> Default for ToUIReceiver<T> {
94 fn default() -> Self {
95 let (sender, receiver) = channel();
96 Self {
97 sender,
98 receiver,
99 }
100 }
101}
102
103impl<T> ToUIReceiver<T> {
104 pub fn sender(&self) -> ToUISender<T> {
105 ToUISender {
106 sender: self.sender.clone(),
107 }
108 }
109
110 pub fn try_recv(&self) -> Result<T, TryRecvError> {
111 self.receiver.try_recv()
112 }
113
114 pub fn try_recv_flush(&self) -> Result<T, TryRecvError> {
115 let mut store_last = None;
116 loop {
117 match self.receiver.try_recv() {
118 Ok(last) => {
119 store_last = Some(last);
120 },
121 Err(TryRecvError::Empty) => {
122 if let Some(last) = store_last {
123 return Ok(last)
124 }
125 else {
126 return Err(TryRecvError::Empty)
127 }
128 },
129 Err(TryRecvError::Disconnected) => {
130 return Err(TryRecvError::Disconnected)
131 }
132 }
133 }
134 }
135}
136
137impl<T> ToUISender<T> {
138 pub fn send(&self, t: T) -> Result<(), SendError<T >> {
139 let res = self.sender.send(t);
140 SignalToUI::set_ui_signal();
141 res
142 }
143}
144
145pub struct FromUIReceiver<T> {
146 receiver: Receiver<T>,
147}
148
149pub struct FromUISender<T> {
150 receiver: Option<Receiver<T >>,
151 sender: Sender<T>,
152}
153
154unsafe impl<T: Send> Send for FromUIReceiver<T> {}
155
156impl<T> Default for FromUISender<T> {
157 fn default() -> Self {
158 let (sender, receiver) = channel();
159 Self {
160 sender,
161 receiver: Some(receiver),
162 }
163 }
164}
165
166impl<T> FromUISender<T> {
167 pub fn new_channel(&mut self) {
168 let (sender, receiver) = channel();
169 self.sender = sender;
170 self.receiver = Some(receiver)
171 }
172
173 pub fn send(&self, t: T) -> Result<(), SendError<T >> {
174 self.sender.send(t)
175 }
176
177 pub fn sender(&self) -> FromUISender<T> {
178 FromUISender {
179 sender: self.sender.clone(),
180 receiver: None
181 }
182 }
183
184 pub fn receiver(&mut self) -> FromUIReceiver<T> {
185 FromUIReceiver {
186 receiver: self.receiver.take().unwrap()
187 }
188 }
189}
190
191impl<T> FromUIReceiver<T> {
192 pub fn recv(&self) -> Result<T, RecvError> {
193 self.receiver.recv()
194 }
195
196 pub fn try_recv(&self) -> Result<T, TryRecvError> {
197 self.receiver.try_recv()
198 }
199
200}
201
202impl<T> std::ops::Deref for FromUIReceiver<T> {
203 type Target = Receiver<T>;
204 fn deref(&self) -> &Receiver<T>{
205 &self.receiver
206 }
207}
208
209pub struct RevThreadPool {
210 tasks: Arc<Mutex<Vec<Box<dyn FnOnce() + Send + 'static >> >>,
211}
212
213impl RevThreadPool {
214 pub fn new(cx: &mut Cx, num_threads: usize) -> Self {
215 let tasks: Arc<Mutex<Vec<Box<dyn FnOnce() + Send + 'static >> >> = Default::default();
216
217 for _ in 0..num_threads {
218 let tasks = tasks.clone();
219 cx.spawn_thread(move || loop {
220 let task = if let Ok(mut tasks) = tasks.lock() {
221 tasks.pop()
222 }
223 else {
224 panic!();
225 };
226 if let Some(task) = task {
227 task();
228 }
229 })
230 }
231 Self {
232 tasks
233 }
234 }
235
236 pub fn execute<F>(&self, task: F) where F: FnOnce() + Send + 'static {
237 self.tasks.lock().unwrap().insert(0, Box::new(task));
238 }
239
240 pub fn execute_rev<F>(&self, task: F) where F: FnOnce() + Send + 'static {
241 self.tasks.lock().unwrap().push(Box::new(task));
242 }
243}
244
245pub struct TagThreadPool<T: Clone + Send + 'static + PartialEq> {
246 tasks: Arc<Mutex<Vec<(T, Box<dyn FnOnce(T) + Send + 'static >) >> >,
247}
248
249impl<T> TagThreadPool<T>where T: Clone + Send + 'static + PartialEq {
250 pub fn new(cx: &mut Cx, num_threads: usize) -> Self {
251 let tasks: Arc<Mutex<Vec<(T, Box<dyn FnOnce(T) + Send + 'static >) >> > = Default::default();
252
253 for _ in 0..num_threads {
254 let tasks = tasks.clone();
255 cx.spawn_thread(move || loop {
256 let task = if let Ok(mut tasks) = tasks.lock() {
257 tasks.pop()
258 }
259 else {
260 panic!()
261 };
262 if let Some((tag, task)) = task {
263 task(tag);
264 }
265 else{
266 std::thread::sleep(std::time::Duration::from_millis(50));
267 }
268 })
269 }
270 Self {
271 tasks
272 }
273 }
274
275 pub fn execute<F>(&self, tag: T, task: F) where F: FnOnce(T) + Send + 'static {
276 if let Ok(mut tasks) = self.tasks.lock() {
277 tasks.retain( | v | v.0 != tag);
278 tasks.insert(0, (tag, Box::new(task)));
279 }
280 }
281
282 pub fn execute_rev<F>(&self, tag: T, task: F) where F: FnOnce(T) + Send + 'static {
283 if let Ok(mut tasks) = self.tasks.lock() {
284 tasks.retain( | v | v.0 != tag);
285 tasks.push((tag, Box::new(task)));
286 }
287 }
288}
289
290
291
292pub struct MessageThreadPool<T: Clone + Send + 'static> {
293 sender: Sender<Box<dyn FnOnce(Option<T>) + Send + 'static >>,
294 msg_senders: Vec<Sender<T >>
295}
296
297impl<T> MessageThreadPool<T> where T: Clone + Send + 'static {
298 pub fn new(cx: &mut Cx, num_threads: usize) -> Self {
299 let (sender, receiver) = channel::<Box<dyn FnOnce(Option<T>) + Send + 'static >> ();
300 let receiver = Arc::new(Mutex::new(receiver));
301 let mut msg_senders = Vec::new();
302 for _ in 0..num_threads {
303 let receiver = receiver.clone();
304 let (msg_send, msg_recv) = channel::<T>();
305 msg_senders.push(msg_send);
306 cx.spawn_thread(move || loop {
307 let task = if let Ok(receiver) = receiver.lock() {
308 match receiver.recv() {
309 Ok(task) => task,
310 Err(_) => return
311 }
312 }
313 else {
314 panic!();
315 };
316 let mut msg_out = None;
317 while let Ok(msg) = msg_recv.try_recv() {
318 msg_out = Some(msg);
319 }
320 task(msg_out);
321 })
322 }
323 Self {
324 sender,
325 msg_senders
326 }
327 }
328
329 pub fn send_msg(&self, msg: T) {
330 for sender in &self.msg_senders {
331 sender.send(msg.clone()).unwrap();
332 }
333 }
334
335 pub fn execute<F>(&self, task: F) where F: FnOnce(Option<T>) + Send + 'static {
336 self.sender.send(Box::new(task)).unwrap();
337 }
338}