embedded_executor/alloc_executor/
inner.rs

1use core::{
2    future::Future,
3    marker::PhantomData,
4    mem,
5    pin::Pin,
6    task::{
7        Context,
8        Poll,
9        Waker,
10    },
11};
12
13use alloc::{
14    collections::VecDeque,
15    sync::Arc,
16};
17
18use futures::{
19    future::{
20        FutureObj,
21        LocalFutureObj,
22        UnsafeFutureObj,
23    },
24    task::{
25        LocalSpawn,
26        Spawn,
27        SpawnError,
28    },
29};
30
31use lock_api::{
32    Mutex,
33    RawMutex,
34};
35
36use generational_arena::{
37    Arena,
38    Index,
39};
40
41use crate::{
42    future_box,
43    sleep::*,
44    wake::{
45        Wake,
46        WakeExt,
47    },
48};
49
50// default initial registry capacity
51const REG_CAP: usize = 16;
52
53// default initial queue capacity
54const QUEUE_CAP: usize = REG_CAP / 2;
55
56// TODO: Investigate lock-free queues rather than using Mutexes. Might not
57// really get us much for embedded devices where disabling interrupts is just an
58// instruction away, but could be beneficial if threads get involved.
59
60/// Alloc-only `Future` executor
61///
62/// Assuming the `RawMutex` implementation provided is sound, this *should* be
63/// safe to use in both embedded and non-embedded scenarios. On embedded devices,
64/// it will probably be a type that disables/re-enables interrupts. On real OS's,
65/// it can be an actual mutex implementation.
66///
67/// The `Sleep` implementation can be used to put the event loop into a low-power
68/// state using something like `cortex_m::wfi/e`.
69pub struct AllocExecutor<'a, R, S>
70where
71    R: RawMutex,
72{
73    registry: Arena<Task<'a>>,
74    queue: QueueHandle<'a, R>,
75    sleep_waker: S,
76}
77
78/// See [`AllocExecutor::spawn_local`]
79enum SpawnLoc {
80    Front,
81    Back,
82}
83
84impl<'a, R, S> Default for AllocExecutor<'a, R, S>
85where
86    R: RawMutex,
87    S: Sleep + Wake + Clone + Default,
88{
89    fn default() -> Self {
90        Self::new()
91    }
92}
93
94impl<'a, R, S> AllocExecutor<'a, R, S>
95where
96    R: RawMutex,
97    S: Sleep + Wake + Clone + Default,
98{
99    /// Initialize a new `AllocExecutor`
100    ///
101    /// Does nothing unless it's `run()`
102    pub fn new() -> Self {
103        Self::with_capacity(REG_CAP, QUEUE_CAP)
104    }
105
106    /// Initialize a new `AllocExecutor` with the given capacities.
107    ///
108    /// Does nothing unless it's `run()`
109    pub fn with_capacity(registry: usize, queue: usize) -> Self {
110        AllocExecutor {
111            registry: Arena::with_capacity(registry),
112            queue: new_queue(queue),
113            sleep_waker: S::default(),
114        }
115    }
116
117    /// Get a handle to a `Spawner` that can be passed to `Future` constructors
118    /// to spawn even *more* `Future`s
119    pub fn spawner(&self) -> Spawner<'a, R> {
120        Spawner::new(self.queue.clone())
121    }
122
123    /// Get a handle to a `LocalSpawner` that can be passed to local `Future` constructors
124    /// to spawn even *more* local `Future`s
125    pub fn local_spawner(&self) -> LocalSpawner<'a, R> {
126        LocalSpawner::new(Spawner::new(self.queue.clone()))
127    }
128
129    /// "Real" spawn method
130    ///
131    /// Differentiates between spawning at the back of the queue and spawning at
132    /// the front of the queue. When `spawn` is called directly on the executor,
133    /// one would expect the futures to be polled in the order they were spawned,
134    /// so they should go to the back of the queue. When tasks are spawned via
135    /// the spawn/poll queue, they've already waited in line and get an express
136    /// ticket to the front.
137    fn spawn_local(&mut self, future: LocalFutureObj<'a, ()>, loc: SpawnLoc) {
138        let id = self.registry.insert(Task::new(future));
139
140        let queue_waker = Arc::new(QueueWaker::new(
141            self.queue.clone(),
142            id,
143            self.sleep_waker.clone(),
144        ));
145
146        let waker = queue_waker.into_waker();
147        self.registry.get_mut(id).unwrap().set_waker(waker);
148
149        let item = QueueItem::Poll(id);
150        let mut lock = self.queue.lock();
151
152        match loc {
153            SpawnLoc::Front => lock.push_front(item),
154            SpawnLoc::Back => lock.push_back(item),
155        }
156    }
157
158    /// Spawn a local `UnsafeFutureObj` into the executor.
159    pub fn spawn_raw<F>(&mut self, future: F)
160    where
161        F: UnsafeFutureObj<'a, ()>,
162    {
163        self.spawn_local(LocalFutureObj::new(future), SpawnLoc::Back)
164    }
165
166    /// Spawn a `Future` into the executor.
167    ///
168    /// This will implicitly box the future in order to objectify it.
169    pub fn spawn<F>(&mut self, future: F)
170    where
171        F: Future<Output = ()> + 'a,
172    {
173        self.spawn_raw(future_box::make_local(future));
174    }
175
176    /// Polls a task with the given id
177    ///
178    /// If no such task exists, it's a no-op.
179    /// If the task returns `Poll::Ready`, it will be removed from the registry.
180    fn poll_task(&mut self, id: Index) {
181        // It's possible that the waker is still hanging out somewhere and
182        // getting called even though its task is gone. If so, we can just
183        // skip it.
184        if let Some(Task { future, waker }) = self.registry.get_mut(id) {
185            let future = Pin::new(future);
186
187            let waker = waker
188                .as_ref()
189                .expect("waker not set, task spawned incorrectly");
190
191            match future.poll(&mut Context::from_waker(waker)) {
192                Poll::Ready(_) => {
193                    self.registry.remove(id);
194                }
195                Poll::Pending => {}
196            }
197        }
198    }
199
200    /// Get one task id or new future to be spawned from the queue.
201    fn dequeue(&self) -> Option<QueueItem<'a>> {
202        self.queue.lock().pop_front()
203    }
204
205    /// Run the executor
206    ///
207    /// Each loop will poll at most one task from the queue and then check for
208    /// newly spawned tasks. If there are no new tasks spawned and nothing left
209    /// in the queue, the executor will attempt to sleep.
210    ///
211    /// Once there's nothing to spawn and nothing left in the registry, the
212    /// executor will return.
213    pub fn run(&mut self) {
214        'outer: loop {
215            while let Some(item) = self.dequeue() {
216                match item {
217                    QueueItem::Poll(id) => {
218                        self.poll_task(id);
219                    }
220                    QueueItem::Spawn(task) => {
221                        self.spawn_local(task.into(), SpawnLoc::Front);
222                    }
223                }
224                if self.registry.is_empty() {
225                    break 'outer;
226                }
227                self.sleep_waker.sleep();
228            }
229        }
230    }
231}
232
233struct Task<'a> {
234    future: LocalFutureObj<'a, ()>,
235    // Invariant: waker should always be Some after the task has been spawned.
236    waker: Option<Waker>,
237}
238
239impl<'a> Task<'a> {
240    fn new(future: LocalFutureObj<'a, ()>) -> Task<'a> {
241        Task {
242            future,
243            waker: None,
244        }
245    }
246    fn set_waker(&mut self, waker: Waker) {
247        self.waker = Some(waker);
248    }
249}
250
251type Queue<'a> = VecDeque<QueueItem<'a>>;
252
253type QueueHandle<'a, R> = Arc<Mutex<R, Queue<'a>>>;
254
255fn new_queue<'a, R>(capacity: usize) -> QueueHandle<'a, R>
256where
257    R: RawMutex,
258{
259    Arc::new(Mutex::new(Queue::with_capacity(capacity)))
260}
261
262enum QueueItem<'a> {
263    Poll(Index),
264    Spawn(FutureObj<'a, ()>),
265}
266
267// Super simple Wake implementation
268// Sticks the Index into the queue and calls W::wake
269struct QueueWaker<R, W>
270where
271    R: RawMutex,
272{
273    queue: QueueHandle<'static, R>,
274    id: Index,
275    waker: W,
276}
277
278impl<R, W> QueueWaker<R, W>
279where
280    R: RawMutex,
281    W: Wake,
282{
283    fn new(queue: QueueHandle<'_, R>, id: Index, waker: W) -> Self {
284        QueueWaker {
285            // Safety: The QueueWaker only deals in 'static lifetimed things, i.e.
286            // task `Index`es, only writes to the queue, and will never give anyone
287            // else this transmuted version.
288            queue: unsafe { mem::transmute(queue) },
289            id,
290            waker,
291        }
292    }
293}
294
295impl<R, W> Wake for QueueWaker<R, W>
296where
297    R: RawMutex,
298    W: Wake,
299{
300    fn wake(&self) {
301        self.queue.lock().push_back(QueueItem::Poll(self.id));
302        self.waker.wake();
303    }
304}
305
306/// Local spawner for an `AllocExecutor`
307///
308/// This can be used to spawn futures from the same thread as the executor.
309///
310/// Use a `Spawner` to spawn futures from *other* threads.
311#[derive(Clone)]
312pub struct LocalSpawner<'a, R>(Spawner<'a, R>, PhantomData<LocalFutureObj<'a, ()>>)
313where
314    R: RawMutex;
315
316impl<'a, R> LocalSpawner<'a, R>
317where
318    R: RawMutex,
319{
320    fn new(spawner: Spawner<'a, R>) -> Self {
321        LocalSpawner(spawner, PhantomData)
322    }
323}
324
325impl<'a, R> LocalSpawner<'a, R>
326where
327    R: RawMutex,
328{
329    fn spawn_local(&self, future: LocalFutureObj<'a, ()>) -> Result<(), SpawnError> {
330        // Safety: LocalSpawner is !Send and !Sync, so the future spawned will
331        // always remain local to the executor.
332        Ok(self
333            .0
334            .spawn_obj(unsafe { mem::transmute(future.into_future_obj()) }))
335    }
336
337    /// Spawn a `FutureObj` into the corresponding `AllocExecutor`
338    pub fn spawn_raw<F>(&mut self, future: F) -> Result<(), SpawnError>
339    where
340        F: UnsafeFutureObj<'a, ()>,
341    {
342        self.spawn_local(LocalFutureObj::new(future))
343    }
344
345    /// Spawn a `Future` into the corresponding `AllocExecutor`
346    ///
347    /// While the lifetime on the Future is `'a`, unless you're calling this on a
348    /// non-`'static` `Future` before the executor has started, you're most
349    /// likely going to be stuck with the `Spawn` trait's `'static` bound.
350    pub fn spawn<F>(&mut self, future: F) -> Result<(), SpawnError>
351    where
352        F: Future<Output = ()> + 'a,
353    {
354        self.spawn_raw(future_box::make_local(future))
355    }
356}
357
358impl<'a, R> LocalSpawn for LocalSpawner<'a, R>
359where
360    R: RawMutex,
361{
362    fn spawn_local_obj(&self, future: LocalFutureObj<'a, ()>) -> Result<(), SpawnError> {
363        self.spawn_local(future)
364    }
365}
366
367/// Spawner for an `AllocExecutor`
368///
369/// This can be cloned and passed to an async function to allow it to spawn more
370/// tasks.
371pub struct Spawner<'a, R>(QueueHandle<'a, R>)
372where
373    R: RawMutex;
374
375impl<'a, R> Spawner<'a, R>
376where
377    R: RawMutex,
378{
379    fn new(handle: QueueHandle<'a, R>) -> Self {
380        Spawner(handle)
381    }
382
383    fn spawn_obj(&self, future: FutureObj<'a, ()>) {
384        self.0.lock().push_back(QueueItem::Spawn(future));
385    }
386
387    /// Spawn a `FutureObj` into the corresponding `AllocExecutor`
388    pub fn spawn_raw<F>(&self, future: F)
389    where
390        F: UnsafeFutureObj<'a, ()> + Send + 'a,
391    {
392        Spawner::spawn_obj(self, FutureObj::new(future));
393    }
394
395    /// Spawn a `Future` into the corresponding `AllocExecutor`
396    ///
397    /// While the lifetime on the Future is `'a`, unless you're calling this on a
398    /// non-`'static` `Future` before the executor has started, you're most
399    /// likely going to be stuck with the `Spawn` trait's `'static` bound.
400    pub fn spawn<F>(&mut self, future: F)
401    where
402        F: Future<Output = ()> + Send + 'a,
403    {
404        self.spawn_raw(future_box::make_obj(future));
405    }
406}
407
408impl<'a, R> Clone for Spawner<'a, R>
409where
410    R: RawMutex,
411{
412    fn clone(&self) -> Self {
413        Spawner(self.0.clone())
414    }
415}
416
417impl<'a, R> Spawn for Spawner<'a, R>
418where
419    R: RawMutex,
420{
421    fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
422        Ok(Spawner::spawn_obj(self, future))
423    }
424}
425
426impl<'a, R> From<LocalSpawner<'a, R>> for Spawner<'a, R>
427where
428    R: RawMutex,
429{
430    fn from(other: LocalSpawner<'a, R>) -> Self {
431        other.0
432    }
433}
434
435#[cfg(test)]
436mod test {
437    use super::*;
438    use crate::sleep::Sleep;
439    use core::sync::atomic::{
440        AtomicBool,
441        Ordering,
442    };
443    use futures::{
444        future::{
445            self,
446            FutureObj,
447        },
448        task::Spawn,
449    };
450    use lock_api::GuardSend;
451
452    // shamelessly borrowed from the lock_api docs
453    // 1. Define our raw lock type
454    pub struct RawSpinlock(AtomicBool);
455
456    // 2. Implement RawMutex for this type
457    unsafe impl RawMutex for RawSpinlock {
458        const INIT: RawSpinlock = RawSpinlock(AtomicBool::new(false));
459
460        // A spinlock guard can be sent to another thread and unlocked there
461        type GuardMarker = GuardSend;
462
463        fn lock(&self) {
464            // Note: This isn't the best way of implementing a spinlock, but it
465            // suffices for the sake of this example.
466            while !self.try_lock() {}
467        }
468
469        fn try_lock(&self) -> bool {
470            self.0.swap(true, Ordering::Acquire)
471        }
472
473        unsafe fn unlock(&self) {
474            self.0.store(false, Ordering::Release);
475        }
476    }
477    #[derive(Copy, Clone, Default)]
478    struct NopSleep;
479
480    impl Sleep for NopSleep {
481        fn sleep(&self) {}
482    }
483
484    impl Wake for NopSleep {
485        fn wake(&self) {}
486    }
487
488    async fn foo() -> i32 {
489        5
490    }
491
492    async fn bar() -> i32 {
493        let a = foo().await;
494        println!("{}", a);
495        let b = a + 1;
496        b
497    }
498
499    async fn baz<S: Spawn>(spawner: S) {
500        let c = bar().await;
501        for i in c..25 {
502            let spam = async move {
503                println!("{}", i);
504            };
505            println!("spawning!");
506            spawner
507                .spawn_obj(FutureObj::new(future_box::make_obj(spam)))
508                .unwrap();
509        }
510    }
511    #[test]
512    fn executor() {
513        let mut executor = AllocExecutor::<RawSpinlock, NopSleep>::new();
514        let spawner = executor.spawner();
515        let entry = future::lazy(move |_| {
516            for i in 0..10 {
517                spawner.spawn_raw(future_box::make_obj(future::lazy(move |_| {
518                    println!("{}", i);
519                })));
520            }
521        });
522        executor.spawn(entry);
523        executor.spawn(baz(executor.spawner()));
524        executor.run();
525    }
526}