1#![cfg_attr(not(any(test, feature = "std")), no_std)]
2#![warn(missing_debug_implementations, missing_docs, unused_import_braces)]
3
4extern crate alloc;
7
8mod async_task;
9mod atomic_state;
10mod multi_complete_future;
11mod polling_future;
12mod sleep_future;
13
14pub use async_task::*;
15pub use atomic_state::*;
16pub use multi_complete_future::*;
17pub use polling_future::*;
18pub use sleep_future::*;
19
20use alloc::boxed::Box;
21use alloc::sync::{Arc, Weak};
22use concurrency_traits::queue::{TimeoutQueue, TryQueue};
23use concurrency_traits::{ConcurrentSystem, ThreadSpawner, TryThreadSpawner};
24use core::fmt;
25use core::fmt::Debug;
26use core::future::Future;
27use core::marker::PhantomData;
28use core::ops::Deref;
29use core::sync::atomic::{AtomicBool, Ordering};
30use core::task::{RawWaker, RawWakerVTable, Waker};
31use core::time::Duration;
32use simple_futures::value_future::ValueFuture;
33
34trait EnsureSend: Send {}
35trait EnsureSync: Sync {}
36
37pub fn try_spawn_blocking<F, T, CS>(
41 function: F,
42) -> Result<(impl Future<Output = T> + 'static + Send, CS::ThreadHandle), CS::SpawnError>
43where
44 F: FnOnce() -> T + Send + 'static,
45 T: 'static + Send,
46 CS: TryThreadSpawner<()>,
47{
48 let future = ValueFuture::new();
49 let handle = future.get_handle();
50 let task_return = CS::try_spawn(move || {
51 if let Some(val) = handle.assign(function()) {
52 val.unwrap_or_else(|_| panic!("Could not assign from blocking!"))
53 }
54 })?;
55 Ok((future, task_return))
56}
57
58pub fn spawn_blocking<F, T, CS>(
62 function: F,
63) -> (impl Future<Output = T> + 'static + Send, CS::ThreadHandle)
64where
65 F: FnOnce() -> T + Send + 'static,
66 T: 'static + Send,
67 CS: ThreadSpawner<()> + 'static,
68{
69 try_spawn_blocking::<_, _, CS>(function).unwrap()
70}
71
72#[cfg(feature = "std")]
74pub type AsyncExecutorStd<Q> = AsyncExecutor<Q, concurrency_traits::StdThreadFunctions>;
75
76#[derive(Debug)]
143pub struct AsyncExecutor<Q, CS> {
144 task_queue: Arc<Q>,
145 phantom_cs: PhantomData<fn() -> CS>,
146 phantom_send_sync: PhantomData<*const ()>,
148}
149impl<Q, CS> AsyncExecutor<Q, CS>
150where
151 Q: 'static + TimeoutQueue<Item = AsyncTask> + Send + Sync,
152 CS: ConcurrentSystem<()>,
153{
154 pub fn new(task_queue: Q) -> Self {
156 Self {
157 task_queue: Arc::new(task_queue),
158 phantom_cs: Default::default(),
159 phantom_send_sync: Default::default(),
160 }
161 }
162
163 pub fn queue_from<T>(from: T) -> Self
166 where
167 Q: From<T>,
168 {
169 Self::new(Q::from(from))
170 }
171
172 pub fn handle(&self) -> ExecutorHandle<Q> {
174 ExecutorHandle {
175 queue: Arc::downgrade(&self.task_queue),
176 }
177 }
178
179 pub fn local_handle(&self) -> LocalExecutorHandle<Q> {
181 LocalExecutorHandle {
182 queue: Arc::downgrade(&self.task_queue),
183 phantom_send_sync: Default::default(),
184 }
185 }
186
187 pub fn submit(&self, future: impl Future<Output = ()> + 'static) {
192 self.task_queue
193 .try_push(AsyncTask::new(future))
194 .expect("Queue is full when spawning!");
195 }
196
197 pub fn submit_loop<SQ, Func, Fut>(
201 &self,
202 mut future_func: Func,
203 delay: Duration,
204 sleep_runner: impl Deref<Target = SleepFutureRunner<SQ, CS>> + 'static,
205 ) where
206 SQ: 'static + TimeoutQueue<Item = SleepMessage<CS>> + Send + Sync,
207 Func: FnMut() -> Fut + 'static,
208 Fut: Future<Output = ()>,
209 {
210 let future = async move {
211 loop {
212 let last = CS::current_time();
213 future_func().await;
214 sleep_runner.sleep_until(last + delay).await;
215 }
216 };
217 self.submit(future)
218 }
219
220 pub fn run(&self, stop: impl Deref<Target = AtomicBool>) {
222 let mut _run_iters: usize = 0;
223 while !stop.load(Ordering::Acquire) {
224 let task = self.task_queue.pop_timeout(Duration::from_millis(10));
225 if let Some(task) = task {
226 let waker_data = WakerData {
227 task_queue: self.task_queue.clone(),
228 task: task.clone(),
229 };
230 let waker = Waker::from(waker_data);
231 unsafe {
232 task.poll(&waker);
233 }
234 }
235 _run_iters += 1;
236 }
237 }
238}
239
240#[derive(Clone)]
241struct WakerData {
242 task_queue: Arc<dyn TryQueue<Item = AsyncTask> + Send + Sync>,
244 task: AsyncTask,
245}
246impl EnsureSend for WakerData {}
247impl From<WakerData> for Waker {
248 fn from(from: WakerData) -> Self {
249 unsafe { Waker::from_raw(RawWaker::from(from)) }
250 }
251}
252impl From<WakerData> for RawWaker {
253 fn from(from: WakerData) -> Self {
254 RawWaker::new(Box::into_raw(Box::new(from)) as *const (), &WAKER_VTABLE)
255 }
256}
257static WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
258 |ptr| {
259 let queue: &WakerData = unsafe { &*(ptr as *const WakerData) };
260 RawWaker::from(queue.clone())
261 },
262 |ptr| {
263 let data = unsafe { Box::from_raw(ptr as *const WakerData as *mut WakerData) };
264 data.task_queue.try_push(data.task).expect("Queue is full!");
265 },
266 |ptr| {
267 let data: &WakerData = unsafe { &*(ptr as *const WakerData) };
268 data.task_queue
269 .try_push(data.task.clone())
270 .expect("Queue is full!");
271 },
272 |ptr| {
273 let data = unsafe { Box::from_raw(ptr as *const WakerData as *mut WakerData) };
274 drop(data);
275 },
276);
277
278#[derive(Debug)]
280pub struct ExecutorHandle<Q> {
281 queue: Weak<Q>,
282}
283impl<Q> ExecutorHandle<Q>
284where
285 Q: 'static + TimeoutQueue<Item = AsyncTask> + Send + Sync,
286{
287 pub fn submit<F>(&self, future: F) -> Result<(), F>
289 where
290 F: Future<Output = ()> + 'static + Send,
291 {
292 match self.queue.upgrade() {
293 None => Err(future),
294 Some(queue) => {
295 queue
296 .try_push(AsyncTask::new(future))
297 .expect("Queue is full!");
298 Ok(())
299 }
300 }
301 }
302}
303impl<Q> Clone for ExecutorHandle<Q> {
304 fn clone(&self) -> Self {
305 Self {
306 queue: self.queue.clone(),
307 }
308 }
309}
310
311#[derive(Debug)]
313pub struct LocalExecutorHandle<Q> {
314 queue: Weak<Q>,
315 phantom_send_sync: PhantomData<*const ()>,
317}
318impl<Q> LocalExecutorHandle<Q>
319where
320 Q: 'static + TimeoutQueue<Item = AsyncTask> + Send + Sync,
321{
322 pub fn submit<F>(&self, future: F) -> Result<(), F>
324 where
325 F: Future<Output = ()> + 'static,
326 {
327 match self.queue.upgrade() {
328 None => Err(future),
329 Some(queue) => {
330 queue
331 .try_push(AsyncTask::new(future))
332 .expect("Queue is full!");
333 Ok(())
334 }
335 }
336 }
337}
338impl<Q> Clone for LocalExecutorHandle<Q> {
339 fn clone(&self) -> Self {
340 Self {
341 queue: self.queue.clone(),
342 phantom_send_sync: Default::default(),
343 }
344 }
345}
346
347#[cfg(feature = "std")]
348#[cfg(test)]
349mod test {
350 use crate::{AsyncExecutor, SleepFutureRunner};
351 use concurrency_traits::queue::ParkQueue;
352 use concurrency_traits::StdThreadFunctions;
353 use std::rc::Rc;
354 use std::sync::atomic::{AtomicBool, AtomicIsize, Ordering};
355 use std::sync::Arc;
356 use std::thread::{sleep, spawn};
357 use std::time::Duration;
358
359 #[test]
360 fn slam_test() {
361 let executor = AsyncExecutor::<_, StdThreadFunctions>::new(ParkQueue::<
362 _,
363 StdThreadFunctions,
364 >::default());
365 let sleep_runner = Rc::new(SleepFutureRunner::<
366 ParkQueue<_, StdThreadFunctions>,
367 StdThreadFunctions,
368 >::new(Default::default()));
369 let loop_function = |atom_count: Rc<AtomicIsize>| async move {
370 atom_count.fetch_add(1, Ordering::SeqCst);
371 };
372 let mut atom_counts = Vec::with_capacity(100);
373 for _ in 0..100 {
374 let atom_count = Rc::new(AtomicIsize::new(0));
375 atom_counts.push(atom_count.clone());
376 executor.submit_loop(
377 move || {
378 let atom_count = atom_count.clone();
379 loop_function(atom_count)
380 },
381 Duration::from_millis(100),
382 sleep_runner.clone(),
383 );
384 }
385 let stop = Arc::new(AtomicBool::new(false));
386 let stop_clone = stop.clone();
387 spawn(move || {
388 sleep(Duration::from_secs(1));
389 stop_clone.store(true, Ordering::Release);
390 });
391 executor.run(stop);
392 for count in &atom_counts {
393 assert!((count.load(Ordering::SeqCst) - 10).abs() < 5);
394 }
395 }
396}