exocore_apps_sdk/
executor.rs1use std::{
7 future::Future,
8 sync::{
9 mpsc::{sync_channel, Receiver, SyncSender},
10 Arc, Mutex,
11 },
12 task::Context,
13};
14
15use futures::{
16 future::{BoxFuture, FutureExt},
17 task::{waker_ref, ArcWake},
18};
19
20const MAX_QUEUED_TASKS: usize = 100_000;
21
22pub fn spawn(future: impl Future<Output = ()> + 'static + Send) {
24 EXECUTOR.spawner.spawn(future);
25}
26
27pub(crate) fn poll_executor() {
28 let executor = EXECUTOR.executor.lock().unwrap();
29 executor.poll();
30}
31
32lazy_static! {
33 static ref EXECUTOR: ExecutorPair = init();
34}
35
36struct ExecutorPair {
37 executor: Mutex<Executor>,
38 spawner: Spawner,
39}
40
41fn init() -> ExecutorPair {
42 let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);
43 let executor = Executor { ready_queue };
44 let spawner = Spawner { task_sender };
45
46 ExecutorPair {
47 executor: Mutex::new(executor),
48 spawner,
49 }
50}
51
52struct Executor {
54 ready_queue: Receiver<Arc<Task>>,
55}
56
57impl Executor {
58 fn poll(&self) {
59 while let Ok(task) = self.ready_queue.try_recv() {
60 let mut future_slot = task.future.lock().unwrap();
63 if let Some(mut future) = future_slot.take() {
64 let waker = waker_ref(&task);
66 let context = &mut Context::from_waker(&waker);
67 if future.as_mut().poll(context).is_pending() {
72 *future_slot = Some(future);
75 }
76 }
77 }
78 }
79}
80
81#[derive(Clone)]
83struct Spawner {
84 task_sender: SyncSender<Arc<Task>>,
85}
86
87impl Spawner {
88 fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
89 let future = future.boxed();
90 let task = Arc::new(Task {
91 future: Mutex::new(Some(future)),
92 task_sender: self.task_sender.clone(),
93 });
94 self.task_sender.send(task).expect("too many tasks queued");
95 }
96}
97
98struct Task {
100 future: Mutex<Option<BoxFuture<'static, ()>>>,
108
109 task_sender: SyncSender<Arc<Task>>,
111}
112
113impl ArcWake for Task {
114 fn wake_by_ref(arc_self: &Arc<Self>) {
115 let cloned = arc_self.clone();
118 arc_self
119 .task_sender
120 .send(cloned)
121 .expect("too many tasks queued");
122 }
123}
124
125#[cfg(test)]
126mod tests {
127 use futures::channel::oneshot;
128
129 use super::*;
130
131 #[test]
132 fn simple_two_tasks_channels() {
133 let (sender1, mut receiver1) = oneshot::channel();
134 spawn(async move {
135 sender1.send("hello").unwrap();
136 });
137
138 assert!(receiver1.try_recv().unwrap().is_none());
140
141 let (sender2, mut receiver2) = oneshot::channel();
144 spawn(async move {
145 sender2.send(receiver1.await.unwrap()).unwrap();
146 });
147
148 assert!(receiver2.try_recv().unwrap().is_none());
150
151 poll_executor();
153
154 assert_eq!(receiver2.try_recv().unwrap(), Some("hello"));
156 }
157}