makepad_platform/
thread.rs

1use {
2    std::sync::{
3        atomic::{AtomicBool, Ordering},
4        mpsc::{
5            channel,
6            Sender,
7            Receiver,
8            RecvError,
9            TryRecvError,
10            SendError,
11        },
12        Arc,
13        Mutex
14    },
15    crate::{
16        cx::Cx,
17        cx_api::*,
18    }
19};
20
21#[derive(Clone, Debug, Default)]
22pub struct Signal(Arc<AtomicBool>);
23
24pub (crate) static UI_SIGNAL: AtomicBool = AtomicBool::new(false);
25
26impl Signal {
27    pub fn set_ui_signal() {
28        UI_SIGNAL.store(true, Ordering::SeqCst)
29    }
30    
31    pub (crate) fn check_and_clear_ui_signal() -> bool {
32        UI_SIGNAL.swap(false, Ordering::SeqCst)
33    }
34    
35    pub fn new() -> Self {
36        Self (Arc::new(AtomicBool::new(false)))
37    }
38    
39    pub fn check_and_clear(&self) -> bool {
40        self.0.swap(false, Ordering::SeqCst)
41    }
42    
43    pub fn set(&self) {
44        self.0.store(true, Ordering::SeqCst);
45        Self::set_ui_signal();
46    }
47}
48
49pub struct ToUIReceiver<T> {
50    sender: Sender<T>,
51    pub receiver: Receiver<T>,
52}
53
54pub struct ToUISender<T> {
55    sender: Sender<T>,
56}
57
58impl<T> Clone for ToUISender<T> {
59    fn clone(&self) -> Self {
60        Self {sender: self.sender.clone()}
61    }
62}
63
64unsafe impl<T: Send> Send for ToUISender<T> {}
65
66impl<T> Default for ToUIReceiver<T> {
67    fn default() -> Self {
68        let (sender, receiver) = channel();
69        Self {
70            sender,
71            receiver,
72        }
73    }
74}
75
76impl<T> ToUIReceiver<T> {
77    pub fn sender(&self) -> ToUISender<T> {
78        ToUISender {
79            sender: self.sender.clone(),
80        }
81    }
82    
83    pub fn try_recv(&self) -> Result<T, TryRecvError> {
84        self.receiver.try_recv()
85    }
86
87    pub fn try_recv_flush(&self) -> Result<T, TryRecvError> {
88        let mut store_last = None;
89        loop {
90            match self.receiver.try_recv() {
91                Ok(last) => {
92                    store_last = Some(last);
93                },
94                Err(TryRecvError::Empty) => {
95                    if let Some(last) = store_last {
96                        return Ok(last)
97                    }
98                    else {
99                        return Err(TryRecvError::Empty)
100                    }
101                },
102                Err(TryRecvError::Disconnected) => {
103                    return Err(TryRecvError::Disconnected)
104                }
105            }
106        }
107    }
108}
109
110impl<T> ToUISender<T> {
111    pub fn send(&self, t: T) -> Result<(), SendError<T >> {
112        let res = self.sender.send(t);
113        Signal::set_ui_signal();
114        res
115    }
116}
117
118pub struct FromUIReceiver<T> {
119    receiver: Receiver<T>,
120}
121
122pub struct FromUISender<T> {
123    receiver: Option<Receiver<T >>,
124    sender: Sender<T>,
125}
126
127unsafe impl<T: Send> Send for FromUIReceiver<T> {}
128
129impl<T> Default for FromUISender<T> {
130    fn default() -> Self {
131        let (sender, receiver) = channel();
132        Self {
133            sender,
134            receiver: Some(receiver),
135        }
136    }
137}
138
139impl<T> FromUISender<T> {
140    pub fn new_channel(&mut self) {
141        let (sender, receiver) = channel();
142        self.sender = sender;
143        self.receiver = Some(receiver)
144    }
145    
146    pub fn send(&self, t: T) -> Result<(), SendError<T >> {
147        self.sender.send(t)
148    }
149    
150    pub fn sender(&self) -> FromUISender<T> {
151        FromUISender {
152            sender: self.sender.clone(),
153            receiver: None
154        }
155    }
156    
157    pub fn receiver(&mut self) -> FromUIReceiver<T> {
158        FromUIReceiver {
159            receiver: self.receiver.take().unwrap()
160        }
161    }
162}
163
164impl<T> FromUIReceiver<T> {
165    pub fn recv(&self) -> Result<T, RecvError> {
166        self.receiver.recv()
167    }
168    
169    pub fn try_recv(&self) -> Result<T, TryRecvError> {
170        self.receiver.try_recv()
171    }
172    
173}
174
175impl<T> std::ops::Deref for FromUIReceiver<T> {
176    type Target = Receiver<T>;
177    fn deref(&self) -> &Receiver<T>{
178        &self.receiver
179    }
180}
181
182pub struct RevThreadPool {
183    tasks: Arc<Mutex<Vec<Box<dyn FnOnce() + Send + 'static >> >>,
184}
185
186impl RevThreadPool {
187    pub fn new(cx: &mut Cx, num_threads: usize) -> Self {
188        let tasks: Arc<Mutex<Vec<Box<dyn FnOnce() + Send + 'static >> >> = Default::default();
189        
190        for _ in 0..num_threads {
191            let tasks = tasks.clone();
192            cx.spawn_thread(move || loop {
193                let task = if let Ok(mut tasks) = tasks.lock() {
194                    tasks.pop()
195                }
196                else {
197                    panic!();
198                };
199                if let Some(task) = task {
200                    task();
201                }
202            })
203        }
204        Self {
205            tasks
206        }
207    }
208    
209    pub fn execute<F>(&self, task: F) where F: FnOnce() + Send + 'static {
210        self.tasks.lock().unwrap().insert(0, Box::new(task));
211    }
212    
213    pub fn execute_rev<F>(&self, task: F) where F: FnOnce() + Send + 'static {
214        self.tasks.lock().unwrap().push(Box::new(task));
215    }
216}
217
218pub struct TagThreadPool<T: Clone + Send + 'static + PartialEq> {
219    tasks: Arc<Mutex<Vec<(T, Box<dyn FnOnce(T) + Send + 'static >) >> >,
220}
221
222impl<T> TagThreadPool<T>where T: Clone + Send + 'static + PartialEq {
223    pub fn new(cx: &mut Cx, num_threads: usize) -> Self {
224        let tasks: Arc<Mutex<Vec<(T, Box<dyn FnOnce(T) + Send + 'static >) >> > = Default::default();
225        
226        for _ in 0..num_threads {
227            let tasks = tasks.clone();
228            cx.spawn_thread(move || loop {
229                let task = if let Ok(mut tasks) = tasks.lock() {
230                    tasks.pop()
231                }
232                else {
233                    panic!()
234                };
235                if let Some((tag, task)) = task {
236                    task(tag);
237                }
238                else{
239                    std::thread::sleep(std::time::Duration::from_millis(50));
240                }
241            })
242        }
243        Self {
244            tasks
245        }
246    }
247    
248    pub fn execute<F>(&self, tag: T, task: F) where F: FnOnce(T) + Send + 'static {
249        if let Ok(mut tasks) = self.tasks.lock() {
250            tasks.retain( | v | v.0 != tag);
251            tasks.insert(0, (tag, Box::new(task)));
252        }
253    }
254    
255    pub fn execute_rev<F>(&self, tag: T, task: F) where F: FnOnce(T) + Send + 'static {
256        if let Ok(mut tasks) = self.tasks.lock() {
257            tasks.retain( | v | v.0 != tag);
258            tasks.push((tag, Box::new(task)));
259        }
260    }
261}
262
263
264
265pub struct MessageThreadPool<T: Clone + Send + 'static> {
266    sender: Sender<Box<dyn FnOnce(Option<T>) + Send + 'static >>,
267    msg_senders: Vec<Sender<T >>
268}
269
270impl<T> MessageThreadPool<T> where T: Clone + Send + 'static {
271    pub fn new(cx: &mut Cx, num_threads: usize) -> Self {
272        let (sender, receiver) = channel::<Box<dyn FnOnce(Option<T>) + Send + 'static >> ();
273        let receiver = Arc::new(Mutex::new(receiver));
274        let mut msg_senders = Vec::new();
275        for _ in 0..num_threads {
276            let receiver = receiver.clone();
277            let (msg_send, msg_recv) = channel::<T>();
278            msg_senders.push(msg_send);
279            cx.spawn_thread(move || loop {
280                let task = if let Ok(receiver) = receiver.lock() {
281                    match receiver.recv() {
282                        Ok(task) => task,
283                        Err(_) => return
284                    }
285                }
286                else {
287                    panic!();
288                };
289                let mut msg_out = None;
290                while let Ok(msg) = msg_recv.try_recv() {
291                    msg_out = Some(msg);
292                }
293                task(msg_out);
294            })
295        }
296        Self {
297            sender,
298            msg_senders
299        }
300    }
301    
302    pub fn send_msg(&self, msg: T) {
303        for sender in &self.msg_senders {
304            sender.send(msg.clone()).unwrap();
305        }
306    }
307    
308    pub fn execute<F>(&self, task: F) where F: FnOnce(Option<T>) + Send + 'static {
309        self.sender.send(Box::new(task)).unwrap();
310    }
311}