single_executor/
lib.rs

1#![cfg_attr(not(any(test, feature = "std")), no_std)]
2#![warn(missing_debug_implementations, missing_docs, unused_import_braces)]
3
4//! A single-threaded async executor.
5
6extern crate alloc;
7
8mod async_task;
9mod atomic_state;
10mod multi_complete_future;
11mod polling_future;
12mod sleep_future;
13
14pub use async_task::*;
15pub use atomic_state::*;
16pub use multi_complete_future::*;
17pub use polling_future::*;
18pub use sleep_future::*;
19
20use alloc::boxed::Box;
21use alloc::sync::{Arc, Weak};
22use concurrency_traits::queue::{TimeoutQueue, TryQueue};
23use concurrency_traits::{ConcurrentSystem, ThreadSpawner, TryThreadSpawner};
24use core::fmt;
25use core::fmt::Debug;
26use core::future::Future;
27use core::marker::PhantomData;
28use core::ops::Deref;
29use core::sync::atomic::{AtomicBool, Ordering};
30use core::task::{RawWaker, RawWakerVTable, Waker};
31use core::time::Duration;
32use simple_futures::value_future::ValueFuture;
33
34trait EnsureSend: Send {}
35trait EnsureSync: Sync {}
36
37/// Returns a future that will contain the result of `function`.
38/// Function will be called in another thread as to not block the main
39/// thread. Fallible version of [`spawn_blocking`].
40pub fn try_spawn_blocking<F, T, CS>(
41    function: F,
42) -> Result<(impl Future<Output = T> + 'static + Send, CS::ThreadHandle), CS::SpawnError>
43where
44    F: FnOnce() -> T + Send + 'static,
45    T: 'static + Send,
46    CS: TryThreadSpawner<()>,
47{
48    let future = ValueFuture::new();
49    let handle = future.get_handle();
50    let task_return = CS::try_spawn(move || {
51        if let Some(val) = handle.assign(function()) {
52            val.unwrap_or_else(|_| panic!("Could not assign from blocking!"))
53        }
54    })?;
55    Ok((future, task_return))
56}
57
58/// Returns a future that will contain the result of `function`.
59/// Function will be called in another thread as to not block the main
60/// thread. Infallible version of [`try_spawn_blocking`].
61pub fn spawn_blocking<F, T, CS>(
62    function: F,
63) -> (impl Future<Output = T> + 'static + Send, CS::ThreadHandle)
64where
65    F: FnOnce() -> T + Send + 'static,
66    T: 'static + Send,
67    CS: ThreadSpawner<()> + 'static,
68{
69    try_spawn_blocking::<_, _, CS>(function).unwrap()
70}
71
72/// An async executor that uses std functions.
73#[cfg(feature = "std")]
74pub type AsyncExecutorStd<Q> = AsyncExecutor<Q, concurrency_traits::StdThreadFunctions>;
75
76/// An asynchronous executor that can be used to run multiple async tasks.
77/// All user code runs in a single thread becasue the v5 is single threaded.
78/// Blocked tasks will stop running and wait to be unblocked while also not
79/// blocking the main thread.
80///
81/// # Panics
82/// This will panic if [`Q::try_push`](concurrency_traits::queue::TryQueue::try_push) ever fails.
83///
84/// # Example
85/// ```
86/// # #[cfg(feature = "std")]
87/// # {
88/// use concurrency_traits::StdThreadFunctions;
89/// use concurrency_traits::queue::ParkQueueStd;
90/// use single_executor::{SleepFutureRunner, spawn_blocking, AsyncExecutorStd};
91/// use std::rc::Rc;
92/// use std::sync::atomic::{AtomicBool, Ordering};
93/// use std::sync::Arc;
94/// use std::thread;
95/// use std::thread::sleep;
96/// use std::time::Duration;
97///
98/// let executor = AsyncExecutorStd::new(ParkQueueStd::default());
99/// let sleep_runner = Rc::new(SleepFutureRunner::new(ParkQueueStd::default()));
100///
101/// let sleep_runner_clone = sleep_runner.clone();
102/// let loop_function = move ||{
103///     let sleep_runner_clone = sleep_runner_clone.clone();
104///     async move {
105///         // dummy code but shows how you can await
106///         sleep_runner_clone.sleep_for(Duration::from_millis(100)).await;
107///         // Do stuff
108///     }
109/// };
110/// executor.submit_loop(
111///     loop_function,
112///     Duration::from_millis(10),
113///     sleep_runner
114/// );
115///
116/// /// Dummy function
117/// async fn get_something_from_io(){}
118/// executor.submit(get_something_from_io());
119///
120/// /// Dummy blocking function
121/// fn block_for_a_while() -> usize{
122///     std::thread::sleep(Duration::from_millis(100));
123///     100
124/// }
125/// executor.submit(async {
126///     assert_eq!(spawn_blocking::<_, _, StdThreadFunctions>(block_for_a_while).0.await, 100);
127/// });
128///
129/// // Nothing runs until run is called on the executor
130/// let stop = Arc::new(AtomicBool::new(false));
131/// let stop_clone = stop.clone();
132/// thread::spawn(move || {
133///     sleep(Duration::from_secs(1));
134///     stop_clone.store(true, Ordering::Relaxed);
135/// });
136/// executor.run(stop); // Keeps running until stop is set to true
137/// # }
138/// ```
139///
140/// MAKE SURE NONE OF YOUR SUBMISSIONS BLOCK OR YOUR WHOLE PROGRAM WILL COME
141/// CRASHING DOWN!
142#[derive(Debug)]
143pub struct AsyncExecutor<Q, CS> {
144    task_queue: Arc<Q>,
145    phantom_cs: PhantomData<fn() -> CS>,
146    /// Block send and sync
147    phantom_send_sync: PhantomData<*const ()>,
148}
149impl<Q, CS> AsyncExecutor<Q, CS>
150where
151    Q: 'static + TimeoutQueue<Item = AsyncTask> + Send + Sync,
152    CS: ConcurrentSystem<()>,
153{
154    /// Creates a new executor from a given queue
155    pub fn new(task_queue: Q) -> Self {
156        Self {
157            task_queue: Arc::new(task_queue),
158            phantom_cs: Default::default(),
159            phantom_send_sync: Default::default(),
160        }
161    }
162
163    /// Creates a new executor from `Q`'s [`From<T>`](std::convert::From)
164    /// implementation. Usually used for converting from an initial size.
165    pub fn queue_from<T>(from: T) -> Self
166    where
167        Q: From<T>,
168    {
169        Self::new(Q::from(from))
170    }
171
172    /// Gets a handle to the executor through which tasks can be submitted.
173    pub fn handle(&self) -> ExecutorHandle<Q> {
174        ExecutorHandle {
175            queue: Arc::downgrade(&self.task_queue),
176        }
177    }
178
179    /// Gets a handle to the executor through which tasks can be submitted. This handle may not be sent across threads but may submit [`!Send`](Send) futures.
180    pub fn local_handle(&self) -> LocalExecutorHandle<Q> {
181        LocalExecutorHandle {
182            queue: Arc::downgrade(&self.task_queue),
183            phantom_send_sync: Default::default(),
184        }
185    }
186
187    /// Adds a new future to the executor.
188    /// This can be called from within a future.
189    /// If this is a long running future (like a loop) then make use of sleep or
190    /// use `spawn_loop` instead.
191    pub fn submit(&self, future: impl Future<Output = ()> + 'static) {
192        self.task_queue
193            .try_push(AsyncTask::new(future))
194            .expect("Queue is full when spawning!");
195    }
196
197    /// Adds a new future that will be called at a set rate.
198    /// Do not do a min loop inside the future, this function handles that for
199    /// you.
200    pub fn submit_loop<SQ, Func, Fut>(
201        &self,
202        mut future_func: Func,
203        delay: Duration,
204        sleep_runner: impl Deref<Target = SleepFutureRunner<SQ, CS>> + 'static,
205    ) where
206        SQ: 'static + TimeoutQueue<Item = SleepMessage<CS>> + Send + Sync,
207        Func: FnMut() -> Fut + 'static,
208        Fut: Future<Output = ()>,
209    {
210        let future = async move {
211            loop {
212                let last = CS::current_time();
213                future_func().await;
214                sleep_runner.sleep_until(last + delay).await;
215            }
216        };
217        self.submit(future)
218    }
219
220    /// Runs the executor, must be called or no futures will run.
221    pub fn run(&self, stop: impl Deref<Target = AtomicBool>) {
222        let mut _run_iters: usize = 0;
223        while !stop.load(Ordering::Acquire) {
224            let task = self.task_queue.pop_timeout(Duration::from_millis(10));
225            if let Some(task) = task {
226                let waker_data = WakerData {
227                    task_queue: self.task_queue.clone(),
228                    task: task.clone(),
229                };
230                let waker = Waker::from(waker_data);
231                unsafe {
232                    task.poll(&waker);
233                }
234            }
235            _run_iters += 1;
236        }
237    }
238}
239
240#[derive(Clone)]
241struct WakerData {
242    /// Could be weak but the overhead isn't worth it to ensure dropping sooner
243    task_queue: Arc<dyn TryQueue<Item = AsyncTask> + Send + Sync>,
244    task: AsyncTask,
245}
246impl EnsureSend for WakerData {}
247impl From<WakerData> for Waker {
248    fn from(from: WakerData) -> Self {
249        unsafe { Waker::from_raw(RawWaker::from(from)) }
250    }
251}
252impl From<WakerData> for RawWaker {
253    fn from(from: WakerData) -> Self {
254        RawWaker::new(Box::into_raw(Box::new(from)) as *const (), &WAKER_VTABLE)
255    }
256}
257static WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
258    |ptr| {
259        let queue: &WakerData = unsafe { &*(ptr as *const WakerData) };
260        RawWaker::from(queue.clone())
261    },
262    |ptr| {
263        let data = unsafe { Box::from_raw(ptr as *const WakerData as *mut WakerData) };
264        data.task_queue.try_push(data.task).expect("Queue is full!");
265    },
266    |ptr| {
267        let data: &WakerData = unsafe { &*(ptr as *const WakerData) };
268        data.task_queue
269            .try_push(data.task.clone())
270            .expect("Queue is full!");
271    },
272    |ptr| {
273        let data = unsafe { Box::from_raw(ptr as *const WakerData as *mut WakerData) };
274        drop(data);
275    },
276);
277
278/// A handle to an executor allowing submission of tasks.
279#[derive(Debug)]
280pub struct ExecutorHandle<Q> {
281    queue: Weak<Q>,
282}
283impl<Q> ExecutorHandle<Q>
284where
285    Q: 'static + TimeoutQueue<Item = AsyncTask> + Send + Sync,
286{
287    /// Submits a task to the executor this handle came from. Will return [`Err`] if the executor was dropped.
288    pub fn submit<F>(&self, future: F) -> Result<(), F>
289    where
290        F: Future<Output = ()> + 'static + Send,
291    {
292        match self.queue.upgrade() {
293            None => Err(future),
294            Some(queue) => {
295                queue
296                    .try_push(AsyncTask::new(future))
297                    .expect("Queue is full!");
298                Ok(())
299            }
300        }
301    }
302}
303impl<Q> Clone for ExecutorHandle<Q> {
304    fn clone(&self) -> Self {
305        Self {
306            queue: self.queue.clone(),
307        }
308    }
309}
310
311/// A handle to an executor allowing submission of tasks. This handle may not be sent across threads but can submit [`!Send`](Send) futures.
312#[derive(Debug)]
313pub struct LocalExecutorHandle<Q> {
314    queue: Weak<Q>,
315    /// Block send and sync
316    phantom_send_sync: PhantomData<*const ()>,
317}
318impl<Q> LocalExecutorHandle<Q>
319where
320    Q: 'static + TimeoutQueue<Item = AsyncTask> + Send + Sync,
321{
322    /// Submits a task to the executor this handle came from. Will return [`Err`] if the executor was dropped.
323    pub fn submit<F>(&self, future: F) -> Result<(), F>
324    where
325        F: Future<Output = ()> + 'static,
326    {
327        match self.queue.upgrade() {
328            None => Err(future),
329            Some(queue) => {
330                queue
331                    .try_push(AsyncTask::new(future))
332                    .expect("Queue is full!");
333                Ok(())
334            }
335        }
336    }
337}
338impl<Q> Clone for LocalExecutorHandle<Q> {
339    fn clone(&self) -> Self {
340        Self {
341            queue: self.queue.clone(),
342            phantom_send_sync: Default::default(),
343        }
344    }
345}
346
347#[cfg(feature = "std")]
348#[cfg(test)]
349mod test {
350    use crate::{AsyncExecutor, SleepFutureRunner};
351    use concurrency_traits::queue::ParkQueue;
352    use concurrency_traits::StdThreadFunctions;
353    use std::rc::Rc;
354    use std::sync::atomic::{AtomicBool, AtomicIsize, Ordering};
355    use std::sync::Arc;
356    use std::thread::{sleep, spawn};
357    use std::time::Duration;
358
359    #[test]
360    fn slam_test() {
361        let executor = AsyncExecutor::<_, StdThreadFunctions>::new(ParkQueue::<
362            _,
363            StdThreadFunctions,
364        >::default());
365        let sleep_runner = Rc::new(SleepFutureRunner::<
366            ParkQueue<_, StdThreadFunctions>,
367            StdThreadFunctions,
368        >::new(Default::default()));
369        let loop_function = |atom_count: Rc<AtomicIsize>| async move {
370            atom_count.fetch_add(1, Ordering::SeqCst);
371        };
372        let mut atom_counts = Vec::with_capacity(100);
373        for _ in 0..100 {
374            let atom_count = Rc::new(AtomicIsize::new(0));
375            atom_counts.push(atom_count.clone());
376            executor.submit_loop(
377                move || {
378                    let atom_count = atom_count.clone();
379                    loop_function(atom_count)
380                },
381                Duration::from_millis(100),
382                sleep_runner.clone(),
383            );
384        }
385        let stop = Arc::new(AtomicBool::new(false));
386        let stop_clone = stop.clone();
387        spawn(move || {
388            sleep(Duration::from_secs(1));
389            stop_clone.store(true, Ordering::Release);
390        });
391        executor.run(stop);
392        for count in &atom_counts {
393            assert!((count.load(Ordering::SeqCst) - 10).abs() < 5);
394        }
395    }
396}