nb_executor/
lib.rs

1//! Single-future, `#![no_std]` executor based on event bitmasks.
2//!
3//! See `README.md` for a brief overview. [`Executor`] describes the most important elements
4//! to get started.
5
6#![cfg_attr(not(test), no_std)]
7#![cfg_attr(docsrs, feature(doc_cfg))]
8#![forbid(unsafe_code)]
9#![warn(missing_docs)]
10
11use core::{
12    cell::Cell,
13    convert::Infallible,
14    future::Future,
15    marker::PhantomData,
16    pin::Pin,
17    sync::atomic::{AtomicU32, Ordering::Relaxed},
18    task::{Context, Poll, Waker},
19};
20
21pub use futures::{self, pin_mut};
22pub use nb;
23
24#[cfg(test)]
25mod tests;
26
27#[cfg(feature = "heapless")]
28mod collections;
29
30#[cfg(feature = "heapless")]
31pub use collections::Mpmc;
32
33/// Definition of event bits.
34///
35/// Implementors should be simple wrappers around `u32` that allow application code to clearly
36/// differentiate the relevant event sources. Events and signals correspond to the bits of the
37/// value returned by `as_bits()`.
38///
39/// # Examples
40///
41/// ```
42/// # use nb_executor::EventMask;
43/// use bitflags::bitflags;
44///
45/// bitflags! {
46///     struct Ev: u32 {
47///         const USB_RX = 1 << 0;
48///         const PRIO = 1 << 1;
49///         const TICK = 1 << 2;
50///     }
51/// }
52///
53/// impl EventMask for Ev {
54///     fn as_bits(self) -> u32 {
55///         self.bits()
56///     }
57/// }
58/// ```
59pub trait EventMask: Copy + 'static {
60    /// Get the `u32` bitmask.
61    fn as_bits(self) -> u32;
62}
63
64/// Shared event mask.
65///
66/// The event mask is an atomic bitmask shared between the executor and the event sources. The type
67/// parameter `S` is an [`EventMask`].
68///
69/// # Events or signals?
70/// They are the same. The name "event" refers to their presence in the shared event mask,
71/// while the name "signal" refers to poll-local signal masks. Events become signals when they
72/// are handled or waited for.
73pub struct Events<S> {
74    events: AtomicU32,
75    _phantom: PhantomData<S>,
76}
77
78/// Signal state manager and event listener.
79///
80/// `Signals` maintains the signal state of an [`Executor`]. This consists of the raised signal set
81/// and the wakeup signal set. Upon polling the future, the raised signal set is frozen to the
82/// then-current value of the event mask, all bits in the wakeup signal set are removed from the
83/// event mask (atomically), and the wakeup signal set is cleared. See also [`Step::poll()`]
84/// for behavior when the wakeup signal set is zero. Current poll functions which are driven
85/// by any of the raised signals will be attempted. If a poll function is not attempted or does
86/// not yet resolve to an output then its signal mask will be OR-ed into the wakeup signal set.
87/// The future won't be polled again until a raised signal matches the wakeup signal set.
88///
89/// The type parameter `S` is an [`EventMask`].
90///
91/// # Delayed signals
92/// The executor will examine the event mask after polling the future to check for any
93/// immediate updates. Since the raised signal set remains frozen during polling, any signal
94/// raised by the future itself through [`Events`] won't become visible until this happens.
95/// This also means that external events are not tested for until the future yields. As an
96/// optimization for the first case, [`Signals::raise()`] updates the raised signal set
97/// immediately. Correct programs are not able to observe this behavior.
98pub struct Signals<'a, S> {
99    pending: &'a Events<S>,
100    run: Cell<Run>,
101}
102
103/// A single-threaded, single-future async executor.
104///
105/// # Typical setup
106/// - First, an event mask is created with [`Events::default()`] (or [`Events::new()`] if
107///   that needs to be `const`). Event masks are `Send + Sync` and shared references to them
108///   are enough for all operations, so they work well as `static` items or other types of
109///   shared state. Distribute event mask references to the external event sources and keep
110///   one for the executor.
111///
112/// - The event mask is then watched with [`Events::watch()`]. The resulting [`Signals`] is
113///   `Send + !Sync`. This means that operations become limited to one thread of execution
114///   from this point on, so this is usually done in some type of initialization or main
115///   function instead of in a shared or global context.
116///
117/// - A new executor is bound with [`Signals::bind()`]. Executors are `!Send + !Sync`:
118///   neither it nor the associated `Signals` may escape the current thread. This makes them
119///   appropriate for construction at the use site.
120///
121/// - A future is created. It needs a reference to the `Signals` object in order to drive
122///   poll functions, making it `!Sync` too.
123///
124/// - Finally, [`Executor::block_on()`] blocks and resolves the future while external event
125///   sources direct it through the event mask, possibly with help from the park function.
126///
127/// # Examples
128///
129/// This is a complete usage example. It uses `std::sync` primitives and a park function based on
130/// `std::thread::park()` to multiply the integers from 1 to 10 read from a blocking queue.
131///
132/// ```
133/// # use nb_executor::*;
134/// # use bitflags::bitflags;
135/// use std::{thread, sync::{mpsc::*, Arc}};
136///
137/// bitflags! {
138///     struct Ev: u32 {
139///         const QUEUE = 1 << 0;
140///     }
141/// }
142///
143/// impl EventMask for Ev {
144///     fn as_bits(self) -> u32 {
145///         self.bits()
146///     }
147/// }
148///
149/// async fn recv(signals: &Signals<'_, Ev>, rx: &Receiver<u32>) -> Option<u32> {
150///     signals.drive_infallible(Ev::QUEUE, || match rx.try_recv() {
151///         Ok(n) => Ok(Some(n)),
152///         Err(TryRecvError::Disconnected) => Ok(None),
153///         Err(TryRecvError::Empty) => Err(nb::Error::WouldBlock),
154///     }).await
155/// }
156///
157/// let events = Arc::new(Events::default());
158/// let signals = events.watch();
159///
160/// let (tx, rx) = sync_channel(1);
161/// let future = async {
162///     let mut product = 1;
163///     while let Some(n) = recv(&signals, &rx).await {
164///         product *= n;
165///     }
166///
167///     product
168/// };
169///
170/// let events_prod = Arc::clone(&events);
171/// let runner = thread::current();
172///
173/// thread::spawn(move || {
174///     for n in 1..=10 {
175///         tx.send(n).unwrap();
176///         events_prod.raise(Ev::QUEUE);
177///         runner.unpark();
178///     }
179///
180///     // Notify shutdown
181///     drop(tx);
182///     events_prod.raise(Ev::QUEUE);
183///     runner.unpark();
184/// });
185///
186/// let result = signals.bind().block_on(future, |park| {
187///     // thread::park() is event-safe, no lock is required
188///     let parked = park.race_free();
189///     if parked.is_idle() {
190///         thread::park();
191///     }
192///
193///     parked
194/// });
195///
196/// assert_eq!(result, (1..=10).product()); // 3628800
197/// ```
198pub struct Executor<'a> {
199    pending: &'a AtomicU32,
200    run: &'a Cell<Run>,
201    waker: Waker,
202}
203
204/// A non-blocking executor-future-park state machine.
205///
206/// All executor operations are ultimately implemented with `Step`. It allows fine-grained
207/// control over execution and control flow. `Step` can only perform one poll step at a time
208/// and requires a pinned future. A `Step` object is created by calling [`Executor::step()`].
209///
210/// See [`Executor::block_on()`] for a blocking runner.
211pub struct Step<'exec, 'fut, F, P> {
212    executor: Executor<'exec>,
213    wakeup: u32,
214    last_pending: u32,
215    future: Pin<&'fut mut F>,
216    park: P,
217}
218
219/// A request to park the executor.
220///
221/// Parking is the mechanism by which the executor *tries* to wait for external event sources when
222/// no signal in the wakeup set is currently raised (see [`Signals`]). The executor might
223/// nonetheless resume immediately if a signal is raised before the atomic part of the *park
224/// protocol* takes place. Park functions implement the park protocol and must follow it strictly,
225/// **you risk deadlocks otherwise**.
226///
227/// # The park protocol
228///
229/// - First, the executor determines that further progress is unlikely at this moment. The
230///   specifics of this process are implementation details that should not be relied upon.
231///
232/// - The park function is called with a `Park` parameter.
233///
234/// - The park function enters a context wherein no external events may influence a correct
235///   decision to sleep or not. For example, a park function that does not sleep at all does
236///   not need to do anything here, since no external event can incorrectly change that
237///   behavior. On the other hand, a park function that halts until a hardware interrupt occurs
238///   would need to enter an interrupt-free context to avoid deadlocks.
239///
240/// - The park function calls [`Park::race_free()`] while still in the event-safe context.
241///   This produces a [`Parked`] value that serves as proof of the call to `race_free()`.
242///
243/// - If the park function intends to block or sleep, then it must first call
244///   [`Parked::is_idle()`]. It may be allowed to sleep only if that function returns `true`.
245///
246/// - If the park function is willing to sleep and is allowed to do so, it must
247///   atomically exit the event-safe context whilst entering the sleep state. A deadlock is
248///   again possible if both operations are not done atomically with respect to each other
249///   (**only for blocking runners, see below**).
250///
251/// - If the park function sleeps, this state should be automatically exited when an external
252///   event occurs.
253///
254/// - The park function returns its [`Parked`] token.
255///
256/// - The executor resumes.
257///
258/// # Delegating out of the park function
259///
260/// Blocking runners, such as [`Executor::block_on()`], require that the park function's
261/// event-safe context be exited in an atomic manner with respect to the start of whatever
262/// blocking operation. However, this requirement does not hold for [`Step`] **as long as
263/// the park function never exits the event-safe context and it itself never blocks**. Since
264/// [`Step::poll()`] will return after parking, the caller can perform potentially-blocking
265/// operations from the event-safe context. It must still release it atomically if it will
266/// sleep, though. With this technique it is even possible for the `poll()` caller to be an
267/// external event source.
268pub struct Park<'a> {
269    pending: &'a AtomicU32,
270    wakeup: u32,
271}
272
273/// Proof of parking.
274///
275/// Park functions return `Parked` objects as a proof of having called [`Park::race_free()`]. This
276/// is necessary because [`Park::race_free()`] updates executor state and must always be run.
277/// `Parked` can be used by park functions to determine whether blocking or sleeping is
278/// permissible. See [`Park`] documentation for the correct parking protocol.
279pub struct Parked<'a> {
280    last_pending: u32,
281    wakeup: u32,
282    _phantom: PhantomData<&'a ()>,
283}
284
285impl<S> Default for Events<S> {
286    fn default() -> Self {
287        Events {
288            events: Default::default(),
289            _phantom: PhantomData,
290        }
291    }
292}
293
294impl<S> Events<S> {
295    /// Construct a new event mask.
296    ///
297    /// This is the same as [`Events::default()`] but is also `const`.
298    pub const fn new() -> Self {
299        Events {
300            events: AtomicU32::new(0),
301            _phantom: PhantomData,
302        }
303    }
304
305    /// Associate a new `Signals` to this event mask.
306    ///
307    /// This indirectly links the executor to the event mask, since creating an [`Executor`]
308    /// requires a `Signals`.
309    #[must_use]
310    pub fn watch(&self) -> Signals<S> {
311        Signals {
312            pending: self,
313            run: Default::default(),
314        }
315    }
316}
317
318impl<S: EventMask> Events<S> {
319    /// Raise all events in a mask.
320    ///
321    /// This operation is atomic: multiple events can be safely raised at the same time.
322    /// Already raised signals are left as is.
323    pub fn raise(&self, signals: S) {
324        self.events.fetch_or(signals.as_bits(), Relaxed);
325    }
326}
327
328impl<'a, S: EventMask> Signals<'a, S> {
329    /// Bind a new executor to this signal source.
330    ///
331    /// The executor will be constructed with a waker that does nothing. It can be replaced
332    /// by calling [`Executor::with_waker()`].
333    #[must_use]
334    pub fn bind(&self) -> Executor {
335        Executor {
336            pending: &self.pending.events,
337            run: &self.run,
338            waker: futures::task::noop_waker(),
339        }
340    }
341
342    /// Raise a signal set without delays.
343    ///
344    /// This will perform the equivalent of [`pending().raise(signals)`], but it will
345    /// also update the raised signal set. This prevents the "delayed signals" effect
346    /// that is shown above. Prefer `raise()` if another branch of this future (such as
347    /// in [`futures::join!()`] might benefit from this, otherwise use `pending().raise()`.
348    /// It is impossible to visibly influence a well-behaved poll function by switching
349    /// between `raise()` or `pending().raise()` at any call site; this is a best-effort
350    /// optimization only.
351    pub fn raise(&self, signals: S) {
352        self.pending.raise(signals);
353
354        let run = self.run.get();
355        let raised = run.raised | signals.as_bits();
356        self.run.set(Run { raised, ..run });
357    }
358
359    /// Retrieve the underlying event mask.
360    pub fn pending(&self) -> &Events<S> {
361        self.pending
362    }
363
364    /// Asynchronously drive a fallible poll function to completion
365    ///
366    /// The future produced by this method checks on each poll whether any of the signals
367    /// in `signals` is present in the raised signal set. If that is so, it invokes the poll
368    /// function. On first poll, this check is omitted, thus guaranteeing at least one call
369    /// to the poll function. If the poll function succeeds or fails with
370    /// [`nb::Error::Other`], the future completes immediately with that value. If the poll
371    /// function returns [`nb::Error::WouldBlock`], or if none of the signals in `signals`
372    /// is present in the raised signal sent, then `signals` is added to the wakeup signal
373    /// set and the future pends.
374    ///
375    /// `poll()` must handle spurious calls gracefully. There is no guarantee that any of
376    /// the intended effects of any signal in `signals` has actually taken place. `poll()`
377    /// may not block.
378    pub async fn drive<T, E, F>(&self, signals: S, mut poll: F) -> Result<T, E>
379    where
380        F: FnMut() -> nb::Result<T, E>,
381    {
382        let mask = signals.as_bits();
383        let mut first_poll = true;
384
385        futures::future::poll_fn(move |_| {
386            let run = self.run.get();
387
388            if first_poll || run.raised & mask != 0 {
389                first_poll = false;
390
391                match poll() {
392                    Ok(ok) => return Poll::Ready(Ok(ok)),
393                    Err(nb::Error::Other(err)) => return Poll::Ready(Err(err)),
394                    Err(nb::Error::WouldBlock) => (),
395                }
396            }
397
398            let wakeup = run.wakeup | mask;
399            self.run.set(Run { wakeup, ..run });
400
401            Poll::Pending
402        })
403        .await
404    }
405
406    /// Asynchronously drive an infallible poll function to completion.
407    ///
408    /// This is a variant of [`Signals::drive()`] intended for cases where there is no
409    /// proper error type. Although `drive(sig, poll).await.unwrap()` works the same, it
410    /// often requires explicit type annotations if `poll` is a closure. This method should
411    /// be preferred in such cases, such as when `poll` is just a wrapper around
412    /// `some_option.ok_or(WouldBlock)`.
413    pub async fn drive_infallible<T, F>(&self, signals: S, poll: F) -> T
414    where
415        F: FnMut() -> nb::Result<T, Infallible>,
416    {
417        self.drive(signals, poll).await.unwrap()
418    }
419}
420
421impl<'exec> Executor<'exec> {
422    /// Replace the executor's waker with a custom one.
423    ///
424    /// No restrictions are imposed on the waker: `nb-executor` does not use wakers at all.
425    /// Application code may define some communication between it and the park function.
426    #[must_use]
427    pub fn with_waker(mut self, waker: Waker) -> Self {
428        self.waker = waker;
429        self
430    }
431
432    /// Begin stepped execution.
433    ///
434    /// This allows the caller to remain in control of program flow in between polls, unlike
435    /// [`Executor::block_on()`]. The future is run as specified in [`Step`] and [`Signals`].
436    /// `park` is a park function and must follow the [`Park`] protocol. The caller and the
437    /// park function may cooperate to block or sleep outside of the park function within the
438    /// protocol's requirements.
439    pub fn step<'fut, F, P>(self, future: Pin<&'fut mut F>, park: P) -> Step<F, P>
440    where
441        F: Future,
442        P: FnMut(Park) -> Parked,
443        'exec: 'fut,
444    {
445        Step {
446            executor: self,
447            wakeup: 0,
448            last_pending: u32::MAX,
449            future,
450            park,
451        }
452    }
453
454    /// Execute a future on this executor, parking when no progress is possible.
455    ///
456    /// This method will block until the future resolves. There are two possible states of
457    /// operation while the future is executed:
458    ///
459    /// - Polling: The future's `poll()` method is called in order to attempt to resolve
460    ///   it. The signal state is prepared as documented in [`Signals`] when switching
461    ///   to the polling state. The next state after polling is unspecified, but will
462    ///   eventually lead to parking if the future pends consistently.
463    ///
464    /// - Parking: This state is entered when useful work is unlikely at the current
465    ///   time. For details, see the parking protocol in [`Park`]. `park` must adhere to
466    ///   this protocol.
467    ///
468    /// See also [`Step`] and [`Executor::step()`].
469    pub fn block_on<F, P>(self, future: F, park: P) -> F::Output
470    where
471        F: Future,
472        P: FnMut(Park) -> Parked,
473    {
474        pin_mut!(future);
475        let mut step = self.step(future, park);
476
477        loop {
478            match step.poll() {
479                Poll::Pending => continue,
480                Poll::Ready(ready) => break ready,
481            }
482        }
483    }
484
485    /// Execute a future in a busy-waiting loop.
486    ///
487    /// This is equivalent to calling [`Executor::block_on()`] with a park function that
488    /// never sleeps. This is most likely the wrong way to do whatever you intend, prefer to
489    /// define a proper wake function.
490    pub fn block_busy_on<F: Future>(self, future: F) -> F::Output {
491        self.block_on(future, |park| park.race_free())
492    }
493}
494
495impl<F, P> Step<'_, '_, F, P>
496where
497    F: Future,
498    P: FnMut(Park) -> Parked,
499{
500    /// Perform at most one non-blocking polling attempt.
501    ///
502    /// This method never blocks unless the park function does (or the future, but that's
503    /// totally wrong anyway). The future will be polled given any of these:
504    ///
505    /// - The wakeup signal set is zero. This is intended to provide a reset or "full scan"
506    ///   mechanism. In this case, the raised signal set will be hard-coded to all-ones,
507    ///   regardless of the current value of the event mask. **Note:** A side effect of this
508    ///   special case is that [`core::future::pending()`] and similar constructs might
509    ///   busy-loop depending on the particular park function.
510    ///
511    /// - Any bit in the wakeup signal set matches the event mask. The event mask is
512    ///   atomically frozen at the same time that this condition is check, creating the
513    ///   raised signal set.
514    ///
515    /// The wakeup signal set is cleared just before polling the future (edge-triggering).
516    /// Regardless of the poll result or if the future is polled at all, a park
517    /// operation may be triggered under implementation-defined conditions. If parking
518    /// occurs, it will be the last observable operation before `poll()` returns. The
519    /// return value will be [`Poll::Pending`] if the future is not polled. Do not call
520    /// call `poll()` again after it returns [`Poll::Ready`].
521    ///
522    /// On first call, the wakeup signal set is zero. This ensures that all driven poll
523    /// functions are attempted at least once.
524    pub fn poll(&mut self) -> Poll<F::Output> {
525        let woken = if self.wakeup == 0 {
526            self.last_pending = u32::MAX;
527            true
528        } else {
529            loop {
530                let cleared = self.last_pending & !self.wakeup;
531                let result = self.executor.pending.compare_exchange_weak(
532                    self.last_pending,
533                    cleared,
534                    Relaxed,
535                    Relaxed,
536                );
537
538                match result {
539                    Ok(current) => break cleared != current,
540                    Err(current) => self.last_pending = current,
541                }
542            }
543        };
544
545        if !woken {
546            return Poll::Pending;
547        }
548
549        self.executor.run.set(Run {
550            raised: self.last_pending,
551            wakeup: 0,
552        });
553
554        let mut cx = Context::from_waker(&self.executor.waker);
555        let poll = self.future.as_mut().poll(&mut cx);
556
557        if poll.is_pending() {
558            self.wakeup = self.executor.run.get().wakeup;
559            let park = Park {
560                pending: self.executor.pending,
561                wakeup: self.wakeup,
562            };
563
564            self.last_pending = (self.park)(park).last_pending;
565        }
566
567        poll
568    }
569}
570
571impl<'a> Park<'a> {
572    /// Promise that new events won't race with sleeps, then get a proof of parking.
573    ///
574    /// Park functions must call this method to obtain the [`Parked`] object that they
575    /// return, which also allows them to determine sleep permissibility. The caller
576    /// promises that external events which may occur from the start of this call until
577    /// optionally starting to sleep won't result in race conditions.
578    pub fn race_free(self) -> Parked<'a> {
579        Parked {
580            last_pending: self.pending.load(Relaxed),
581            wakeup: self.wakeup,
582            _phantom: PhantomData,
583        }
584    }
585}
586
587impl Parked<'_> {
588    /// Check whether useful work is certainly not possible until an event is raised.
589    ///
590    /// Unlike the calling of the park function, this is not an optimistic operation.
591    /// Its result will be exact as long as the [`Park`] protocol is correctly followed.
592    /// A return value of `false` prohibits the park function from sleeping at all:
593    /// it should yield control to the executor immediately. A return value of `true`
594    /// is a strong hint to sleep, block, or otherwise take over control flow until
595    /// some unspecified condition, ideally until an event is raised.
596    ///
597    /// See also the [`Park`] protocol.
598    pub fn is_idle(&self) -> bool {
599        self.last_pending & self.wakeup == 0
600    }
601}
602
603#[derive(Copy, Clone, Default)]
604struct Run {
605    raised: u32,
606    wakeup: u32,
607}