makepad_platform/
thread.rs

1use {
2    std::sync::{
3        atomic::{AtomicBool, Ordering},
4        mpsc::{
5            channel,
6            Sender,
7            Receiver,
8            RecvError,
9            TryRecvError,
10            SendError,
11        },
12        Arc,
13        Mutex
14    },
15    crate::{
16        cx::Cx,
17        cx_api::*,
18    }
19};
20
21#[derive(Clone, Debug, Default)]
22pub struct SignalToUI(Arc<AtomicBool>);
23
24pub (crate) static UI_SIGNAL: AtomicBool = AtomicBool::new(false);
25pub (crate) static ACTION_SIGNAL: AtomicBool = AtomicBool::new(false);
26
27impl SignalToUI {
28    pub fn set_ui_signal() {
29        UI_SIGNAL.store(true, Ordering::SeqCst)
30    }
31    
32    pub fn set_action_signal() {
33        ACTION_SIGNAL.store(true, Ordering::SeqCst)
34    }
35    
36    pub (crate) fn check_and_clear_ui_signal() -> bool {
37        UI_SIGNAL.swap(false, Ordering::SeqCst)
38    }
39    
40    pub (crate) fn check_and_clear_action_signal() -> bool {
41        ACTION_SIGNAL.swap(false, Ordering::SeqCst)
42    }
43    
44    pub fn new() -> Self {
45        Self (Arc::new(AtomicBool::new(false)))
46    }
47    
48    pub fn check_and_clear(&self) -> bool {
49        self.0.swap(false, Ordering::SeqCst)
50    }
51    
52    pub fn set(&self) {
53        self.0.store(true, Ordering::SeqCst);
54        Self::set_ui_signal();
55    }
56}
57
58
59#[derive(Clone, Debug, Default)]
60pub struct SignalFromUI(Arc<AtomicBool>);
61
62impl SignalFromUI {
63    pub fn new() -> Self {
64        Self (Arc::new(AtomicBool::new(false)))
65    }
66        
67    pub fn check_and_clear(&self) -> bool {
68        self.0.swap(false, Ordering::SeqCst)
69    }
70        
71    pub fn set(&self) {
72        self.0.store(true, Ordering::SeqCst);
73    }
74}
75
76pub struct ToUIReceiver<T> {
77    sender: Sender<T>,
78    pub receiver: Receiver<T>,
79}
80
81pub struct ToUISender<T> {
82    sender: Sender<T>,
83}
84
85impl<T> Clone for ToUISender<T> {
86    fn clone(&self) -> Self {
87        Self {sender: self.sender.clone()}
88    }
89}
90
91unsafe impl<T: Send> Send for ToUISender<T> {}
92
93impl<T> Default for ToUIReceiver<T> {
94    fn default() -> Self {
95        let (sender, receiver) = channel();
96        Self {
97            sender,
98            receiver,
99        }
100    }
101}
102
103impl<T> ToUIReceiver<T> {
104    pub fn sender(&self) -> ToUISender<T> {
105        ToUISender {
106            sender: self.sender.clone(),
107        }
108    }
109    
110    pub fn try_recv(&self) -> Result<T, TryRecvError> {
111        self.receiver.try_recv()
112    }
113
114    pub fn try_recv_flush(&self) -> Result<T, TryRecvError> {
115        let mut store_last = None;
116        loop {
117            match self.receiver.try_recv() {
118                Ok(last) => {
119                    store_last = Some(last);
120                },
121                Err(TryRecvError::Empty) => {
122                    if let Some(last) = store_last {
123                        return Ok(last)
124                    }
125                    else {
126                        return Err(TryRecvError::Empty)
127                    }
128                },
129                Err(TryRecvError::Disconnected) => {
130                    return Err(TryRecvError::Disconnected)
131                }
132            }
133        }
134    }
135}
136
137impl<T> ToUISender<T> {
138    pub fn send(&self, t: T) -> Result<(), SendError<T >> {
139        let res = self.sender.send(t);
140        SignalToUI::set_ui_signal();
141        res
142    }
143}
144
145pub struct FromUIReceiver<T> {
146    receiver: Receiver<T>,
147}
148
149pub struct FromUISender<T> {
150    receiver: Option<Receiver<T >>,
151    sender: Sender<T>,
152}
153
154unsafe impl<T: Send> Send for FromUIReceiver<T> {}
155
156impl<T> Default for FromUISender<T> {
157    fn default() -> Self {
158        let (sender, receiver) = channel();
159        Self {
160            sender,
161            receiver: Some(receiver),
162        }
163    }
164}
165
166impl<T> FromUISender<T> {
167    pub fn new_channel(&mut self) {
168        let (sender, receiver) = channel();
169        self.sender = sender;
170        self.receiver = Some(receiver)
171    }
172    
173    pub fn send(&self, t: T) -> Result<(), SendError<T >> {
174        self.sender.send(t)
175    }
176    
177    pub fn sender(&self) -> FromUISender<T> {
178        FromUISender {
179            sender: self.sender.clone(),
180            receiver: None
181        }
182    }
183    
184    pub fn receiver(&mut self) -> FromUIReceiver<T> {
185        FromUIReceiver {
186            receiver: self.receiver.take().unwrap()
187        }
188    }
189}
190
191impl<T> FromUIReceiver<T> {
192    pub fn recv(&self) -> Result<T, RecvError> {
193        self.receiver.recv()
194    }
195    
196    pub fn try_recv(&self) -> Result<T, TryRecvError> {
197        self.receiver.try_recv()
198    }
199    
200}
201
202impl<T> std::ops::Deref for FromUIReceiver<T> {
203    type Target = Receiver<T>;
204    fn deref(&self) -> &Receiver<T>{
205        &self.receiver
206    }
207}
208
209pub struct RevThreadPool {
210    tasks: Arc<Mutex<Vec<Box<dyn FnOnce() + Send + 'static >> >>,
211}
212
213impl RevThreadPool {
214    pub fn new(cx: &mut Cx, num_threads: usize) -> Self {
215        let tasks: Arc<Mutex<Vec<Box<dyn FnOnce() + Send + 'static >> >> = Default::default();
216        
217        for _ in 0..num_threads {
218            let tasks = tasks.clone();
219            cx.spawn_thread(move || loop {
220                let task = if let Ok(mut tasks) = tasks.lock() {
221                    tasks.pop()
222                }
223                else {
224                    panic!();
225                };
226                if let Some(task) = task {
227                    task();
228                }
229            })
230        }
231        Self {
232            tasks
233        }
234    }
235    
236    pub fn execute<F>(&self, task: F) where F: FnOnce() + Send + 'static {
237        self.tasks.lock().unwrap().insert(0, Box::new(task));
238    }
239    
240    pub fn execute_rev<F>(&self, task: F) where F: FnOnce() + Send + 'static {
241        self.tasks.lock().unwrap().push(Box::new(task));
242    }
243}
244
245pub struct TagThreadPool<T: Clone + Send + 'static + PartialEq> {
246    tasks: Arc<Mutex<Vec<(T, Box<dyn FnOnce(T) + Send + 'static >) >> >,
247}
248
249impl<T> TagThreadPool<T>where T: Clone + Send + 'static + PartialEq {
250    pub fn new(cx: &mut Cx, num_threads: usize) -> Self {
251        let tasks: Arc<Mutex<Vec<(T, Box<dyn FnOnce(T) + Send + 'static >) >> > = Default::default();
252        
253        for _ in 0..num_threads {
254            let tasks = tasks.clone();
255            cx.spawn_thread(move || loop {
256                let task = if let Ok(mut tasks) = tasks.lock() {
257                    tasks.pop()
258                }
259                else {
260                    panic!()
261                };
262                if let Some((tag, task)) = task {
263                    task(tag);
264                }
265                else{
266                    std::thread::sleep(std::time::Duration::from_millis(50));
267                }
268            })
269        }
270        Self {
271            tasks
272        }
273    }
274    
275    pub fn execute<F>(&self, tag: T, task: F) where F: FnOnce(T) + Send + 'static {
276        if let Ok(mut tasks) = self.tasks.lock() {
277            tasks.retain( | v | v.0 != tag);
278            tasks.insert(0, (tag, Box::new(task)));
279        }
280    }
281    
282    pub fn execute_rev<F>(&self, tag: T, task: F) where F: FnOnce(T) + Send + 'static {
283        if let Ok(mut tasks) = self.tasks.lock() {
284            tasks.retain( | v | v.0 != tag);
285            tasks.push((tag, Box::new(task)));
286        }
287    }
288}
289
290
291
292pub struct MessageThreadPool<T: Clone + Send + 'static> {
293    sender: Sender<Box<dyn FnOnce(Option<T>) + Send + 'static >>,
294    msg_senders: Vec<Sender<T >>
295}
296
297impl<T> MessageThreadPool<T> where T: Clone + Send + 'static {
298    pub fn new(cx: &mut Cx, num_threads: usize) -> Self {
299        let (sender, receiver) = channel::<Box<dyn FnOnce(Option<T>) + Send + 'static >> ();
300        let receiver = Arc::new(Mutex::new(receiver));
301        let mut msg_senders = Vec::new();
302        for _ in 0..num_threads {
303            let receiver = receiver.clone();
304            let (msg_send, msg_recv) = channel::<T>();
305            msg_senders.push(msg_send);
306            cx.spawn_thread(move || loop {
307                let task = if let Ok(receiver) = receiver.lock() {
308                    match receiver.recv() {
309                        Ok(task) => task,
310                        Err(_) => return
311                    }
312                }
313                else {
314                    panic!();
315                };
316                let mut msg_out = None;
317                while let Ok(msg) = msg_recv.try_recv() {
318                    msg_out = Some(msg);
319                }
320                task(msg_out);
321            })
322        }
323        Self {
324            sender,
325            msg_senders
326        }
327    }
328    
329    pub fn send_msg(&self, msg: T) {
330        for sender in &self.msg_senders {
331            sender.send(msg.clone()).unwrap();
332        }
333    }
334    
335    pub fn execute<F>(&self, task: F) where F: FnOnce(Option<T>) + Send + 'static {
336        self.sender.send(Box::new(task)).unwrap();
337    }
338}