makepad_futures/
executor.rs

1use std::{
2    error, fmt,
3    future::Future,
4    pin::Pin,
5    sync::{
6        mpsc::{Receiver, Sender},
7        Arc, Mutex,
8    },
9    task::Wake,
10};
11
12#[derive(Debug)]
13pub struct Executor {
14    task_receiver: Receiver<Arc<Task>>,
15}
16
17impl Executor {
18    pub fn run(&self) {
19        while let Ok(task) = self.task_receiver.recv() {
20            task.run();
21        }
22    }
23
24    pub fn run_until_stalled(&self) {
25        while let Ok(task) = self.task_receiver.try_recv() {
26            task.run();
27        }
28    }
29}
30
31#[derive(Clone, Debug)]
32pub struct Spawner {
33    task_sender: Sender<Arc<Task>>,
34}
35
36impl Spawner {
37    pub fn spawn(&self, future: impl Future<Output = ()> + 'static) -> Result<(), SpawnError> {
38        if self.task_sender.send(Arc::new(Task {
39            inner: Mutex::new(TaskInner {
40                future: Some(Box::pin(future)),
41                task_sender: self.task_sender.clone(),
42            }),
43        })).is_err() {
44            return Err(SpawnError::shutdown());
45        }
46        Ok(())
47    }
48}
49
50#[derive(Clone, Debug, Default, Eq, PartialEq)]
51pub struct SpawnError {
52    _priv: (),
53}
54
55impl SpawnError {
56    pub fn shutdown() -> Self {
57        Self { _priv: () }
58    }
59}
60
61impl error::Error for SpawnError {}
62
63impl fmt::Display for SpawnError {
64    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
65        write!(f, "executor is shutdown")
66    }
67}
68
69struct Task {
70    inner: Mutex<TaskInner>,
71}
72
73impl Task {
74    fn run(self: Arc<Task>) {
75        use {std::task::Context, crate::task};
76
77        let future = self.inner.lock().unwrap().future.take();
78        if let Some(mut future) = future {
79            let waker = task::waker(self.clone());
80            let mut cx = Context::from_waker(&waker);
81            if future.as_mut().poll(&mut cx).is_pending() {
82                self.inner.lock().unwrap().future = Some(future);
83            }
84        }
85    }
86}
87
88impl Wake for Task {
89    fn wake(self: Arc<Task>) {
90        self.inner
91            .lock()
92            .unwrap()
93            .task_sender
94            .send(self.clone())
95            .unwrap();
96    }
97}
98
99struct TaskInner {
100    future: Option<Pin<Box<dyn Future<Output = ()> + 'static>>>,
101    task_sender: Sender<Arc<Task>>,
102}
103
104pub fn new_executor_and_spawner() -> (Executor, Spawner) {
105    use std::sync::mpsc;
106
107    let (task_sender, task_receiver) = mpsc::channel();
108    (Executor { task_receiver }, Spawner { task_sender })
109}