nb_executor/lib.rs
1//! Single-future, `#![no_std]` executor based on event bitmasks.
2//!
3//! See `README.md` for a brief overview. [`Executor`] describes the most important elements
4//! to get started.
5
6#![cfg_attr(not(test), no_std)]
7#![cfg_attr(docsrs, feature(doc_cfg))]
8#![forbid(unsafe_code)]
9#![warn(missing_docs)]
10
11use core::{
12 cell::Cell,
13 convert::Infallible,
14 future::Future,
15 marker::PhantomData,
16 pin::Pin,
17 sync::atomic::{AtomicU32, Ordering::Relaxed},
18 task::{Context, Poll, Waker},
19};
20
21pub use futures::{self, pin_mut};
22pub use nb;
23
24#[cfg(test)]
25mod tests;
26
27#[cfg(feature = "heapless")]
28mod collections;
29
30#[cfg(feature = "heapless")]
31pub use collections::Mpmc;
32
33/// Definition of event bits.
34///
35/// Implementors should be simple wrappers around `u32` that allow application code to clearly
36/// differentiate the relevant event sources. Events and signals correspond to the bits of the
37/// value returned by `as_bits()`.
38///
39/// # Examples
40///
41/// ```
42/// # use nb_executor::EventMask;
43/// use bitflags::bitflags;
44///
45/// bitflags! {
46/// struct Ev: u32 {
47/// const USB_RX = 1 << 0;
48/// const PRIO = 1 << 1;
49/// const TICK = 1 << 2;
50/// }
51/// }
52///
53/// impl EventMask for Ev {
54/// fn as_bits(self) -> u32 {
55/// self.bits()
56/// }
57/// }
58/// ```
59pub trait EventMask: Copy + 'static {
60 /// Get the `u32` bitmask.
61 fn as_bits(self) -> u32;
62}
63
64/// Shared event mask.
65///
66/// The event mask is an atomic bitmask shared between the executor and the event sources. The type
67/// parameter `S` is an [`EventMask`].
68///
69/// # Events or signals?
70/// They are the same. The name "event" refers to their presence in the shared event mask,
71/// while the name "signal" refers to poll-local signal masks. Events become signals when they
72/// are handled or waited for.
73pub struct Events<S> {
74 events: AtomicU32,
75 _phantom: PhantomData<S>,
76}
77
78/// Signal state manager and event listener.
79///
80/// `Signals` maintains the signal state of an [`Executor`]. This consists of the raised signal set
81/// and the wakeup signal set. Upon polling the future, the raised signal set is frozen to the
82/// then-current value of the event mask, all bits in the wakeup signal set are removed from the
83/// event mask (atomically), and the wakeup signal set is cleared. See also [`Step::poll()`]
84/// for behavior when the wakeup signal set is zero. Current poll functions which are driven
85/// by any of the raised signals will be attempted. If a poll function is not attempted or does
86/// not yet resolve to an output then its signal mask will be OR-ed into the wakeup signal set.
87/// The future won't be polled again until a raised signal matches the wakeup signal set.
88///
89/// The type parameter `S` is an [`EventMask`].
90///
91/// # Delayed signals
92/// The executor will examine the event mask after polling the future to check for any
93/// immediate updates. Since the raised signal set remains frozen during polling, any signal
94/// raised by the future itself through [`Events`] won't become visible until this happens.
95/// This also means that external events are not tested for until the future yields. As an
96/// optimization for the first case, [`Signals::raise()`] updates the raised signal set
97/// immediately. Correct programs are not able to observe this behavior.
98pub struct Signals<'a, S> {
99 pending: &'a Events<S>,
100 run: Cell<Run>,
101}
102
103/// A single-threaded, single-future async executor.
104///
105/// # Typical setup
106/// - First, an event mask is created with [`Events::default()`] (or [`Events::new()`] if
107/// that needs to be `const`). Event masks are `Send + Sync` and shared references to them
108/// are enough for all operations, so they work well as `static` items or other types of
109/// shared state. Distribute event mask references to the external event sources and keep
110/// one for the executor.
111///
112/// - The event mask is then watched with [`Events::watch()`]. The resulting [`Signals`] is
113/// `Send + !Sync`. This means that operations become limited to one thread of execution
114/// from this point on, so this is usually done in some type of initialization or main
115/// function instead of in a shared or global context.
116///
117/// - A new executor is bound with [`Signals::bind()`]. Executors are `!Send + !Sync`:
118/// neither it nor the associated `Signals` may escape the current thread. This makes them
119/// appropriate for construction at the use site.
120///
121/// - A future is created. It needs a reference to the `Signals` object in order to drive
122/// poll functions, making it `!Sync` too.
123///
124/// - Finally, [`Executor::block_on()`] blocks and resolves the future while external event
125/// sources direct it through the event mask, possibly with help from the park function.
126///
127/// # Examples
128///
129/// This is a complete usage example. It uses `std::sync` primitives and a park function based on
130/// `std::thread::park()` to multiply the integers from 1 to 10 read from a blocking queue.
131///
132/// ```
133/// # use nb_executor::*;
134/// # use bitflags::bitflags;
135/// use std::{thread, sync::{mpsc::*, Arc}};
136///
137/// bitflags! {
138/// struct Ev: u32 {
139/// const QUEUE = 1 << 0;
140/// }
141/// }
142///
143/// impl EventMask for Ev {
144/// fn as_bits(self) -> u32 {
145/// self.bits()
146/// }
147/// }
148///
149/// async fn recv(signals: &Signals<'_, Ev>, rx: &Receiver<u32>) -> Option<u32> {
150/// signals.drive_infallible(Ev::QUEUE, || match rx.try_recv() {
151/// Ok(n) => Ok(Some(n)),
152/// Err(TryRecvError::Disconnected) => Ok(None),
153/// Err(TryRecvError::Empty) => Err(nb::Error::WouldBlock),
154/// }).await
155/// }
156///
157/// let events = Arc::new(Events::default());
158/// let signals = events.watch();
159///
160/// let (tx, rx) = sync_channel(1);
161/// let future = async {
162/// let mut product = 1;
163/// while let Some(n) = recv(&signals, &rx).await {
164/// product *= n;
165/// }
166///
167/// product
168/// };
169///
170/// let events_prod = Arc::clone(&events);
171/// let runner = thread::current();
172///
173/// thread::spawn(move || {
174/// for n in 1..=10 {
175/// tx.send(n).unwrap();
176/// events_prod.raise(Ev::QUEUE);
177/// runner.unpark();
178/// }
179///
180/// // Notify shutdown
181/// drop(tx);
182/// events_prod.raise(Ev::QUEUE);
183/// runner.unpark();
184/// });
185///
186/// let result = signals.bind().block_on(future, |park| {
187/// // thread::park() is event-safe, no lock is required
188/// let parked = park.race_free();
189/// if parked.is_idle() {
190/// thread::park();
191/// }
192///
193/// parked
194/// });
195///
196/// assert_eq!(result, (1..=10).product()); // 3628800
197/// ```
198pub struct Executor<'a> {
199 pending: &'a AtomicU32,
200 run: &'a Cell<Run>,
201 waker: Waker,
202}
203
204/// A non-blocking executor-future-park state machine.
205///
206/// All executor operations are ultimately implemented with `Step`. It allows fine-grained
207/// control over execution and control flow. `Step` can only perform one poll step at a time
208/// and requires a pinned future. A `Step` object is created by calling [`Executor::step()`].
209///
210/// See [`Executor::block_on()`] for a blocking runner.
211pub struct Step<'exec, 'fut, F, P> {
212 executor: Executor<'exec>,
213 wakeup: u32,
214 last_pending: u32,
215 future: Pin<&'fut mut F>,
216 park: P,
217}
218
219/// A request to park the executor.
220///
221/// Parking is the mechanism by which the executor *tries* to wait for external event sources when
222/// no signal in the wakeup set is currently raised (see [`Signals`]). The executor might
223/// nonetheless resume immediately if a signal is raised before the atomic part of the *park
224/// protocol* takes place. Park functions implement the park protocol and must follow it strictly,
225/// **you risk deadlocks otherwise**.
226///
227/// # The park protocol
228///
229/// - First, the executor determines that further progress is unlikely at this moment. The
230/// specifics of this process are implementation details that should not be relied upon.
231///
232/// - The park function is called with a `Park` parameter.
233///
234/// - The park function enters a context wherein no external events may influence a correct
235/// decision to sleep or not. For example, a park function that does not sleep at all does
236/// not need to do anything here, since no external event can incorrectly change that
237/// behavior. On the other hand, a park function that halts until a hardware interrupt occurs
238/// would need to enter an interrupt-free context to avoid deadlocks.
239///
240/// - The park function calls [`Park::race_free()`] while still in the event-safe context.
241/// This produces a [`Parked`] value that serves as proof of the call to `race_free()`.
242///
243/// - If the park function intends to block or sleep, then it must first call
244/// [`Parked::is_idle()`]. It may be allowed to sleep only if that function returns `true`.
245///
246/// - If the park function is willing to sleep and is allowed to do so, it must
247/// atomically exit the event-safe context whilst entering the sleep state. A deadlock is
248/// again possible if both operations are not done atomically with respect to each other
249/// (**only for blocking runners, see below**).
250///
251/// - If the park function sleeps, this state should be automatically exited when an external
252/// event occurs.
253///
254/// - The park function returns its [`Parked`] token.
255///
256/// - The executor resumes.
257///
258/// # Delegating out of the park function
259///
260/// Blocking runners, such as [`Executor::block_on()`], require that the park function's
261/// event-safe context be exited in an atomic manner with respect to the start of whatever
262/// blocking operation. However, this requirement does not hold for [`Step`] **as long as
263/// the park function never exits the event-safe context and it itself never blocks**. Since
264/// [`Step::poll()`] will return after parking, the caller can perform potentially-blocking
265/// operations from the event-safe context. It must still release it atomically if it will
266/// sleep, though. With this technique it is even possible for the `poll()` caller to be an
267/// external event source.
268pub struct Park<'a> {
269 pending: &'a AtomicU32,
270 wakeup: u32,
271}
272
273/// Proof of parking.
274///
275/// Park functions return `Parked` objects as a proof of having called [`Park::race_free()`]. This
276/// is necessary because [`Park::race_free()`] updates executor state and must always be run.
277/// `Parked` can be used by park functions to determine whether blocking or sleeping is
278/// permissible. See [`Park`] documentation for the correct parking protocol.
279pub struct Parked<'a> {
280 last_pending: u32,
281 wakeup: u32,
282 _phantom: PhantomData<&'a ()>,
283}
284
285impl<S> Default for Events<S> {
286 fn default() -> Self {
287 Events {
288 events: Default::default(),
289 _phantom: PhantomData,
290 }
291 }
292}
293
294impl<S> Events<S> {
295 /// Construct a new event mask.
296 ///
297 /// This is the same as [`Events::default()`] but is also `const`.
298 pub const fn new() -> Self {
299 Events {
300 events: AtomicU32::new(0),
301 _phantom: PhantomData,
302 }
303 }
304
305 /// Associate a new `Signals` to this event mask.
306 ///
307 /// This indirectly links the executor to the event mask, since creating an [`Executor`]
308 /// requires a `Signals`.
309 #[must_use]
310 pub fn watch(&self) -> Signals<S> {
311 Signals {
312 pending: self,
313 run: Default::default(),
314 }
315 }
316}
317
318impl<S: EventMask> Events<S> {
319 /// Raise all events in a mask.
320 ///
321 /// This operation is atomic: multiple events can be safely raised at the same time.
322 /// Already raised signals are left as is.
323 pub fn raise(&self, signals: S) {
324 self.events.fetch_or(signals.as_bits(), Relaxed);
325 }
326}
327
328impl<'a, S: EventMask> Signals<'a, S> {
329 /// Bind a new executor to this signal source.
330 ///
331 /// The executor will be constructed with a waker that does nothing. It can be replaced
332 /// by calling [`Executor::with_waker()`].
333 #[must_use]
334 pub fn bind(&self) -> Executor {
335 Executor {
336 pending: &self.pending.events,
337 run: &self.run,
338 waker: futures::task::noop_waker(),
339 }
340 }
341
342 /// Raise a signal set without delays.
343 ///
344 /// This will perform the equivalent of [`pending().raise(signals)`], but it will
345 /// also update the raised signal set. This prevents the "delayed signals" effect
346 /// that is shown above. Prefer `raise()` if another branch of this future (such as
347 /// in [`futures::join!()`] might benefit from this, otherwise use `pending().raise()`.
348 /// It is impossible to visibly influence a well-behaved poll function by switching
349 /// between `raise()` or `pending().raise()` at any call site; this is a best-effort
350 /// optimization only.
351 pub fn raise(&self, signals: S) {
352 self.pending.raise(signals);
353
354 let run = self.run.get();
355 let raised = run.raised | signals.as_bits();
356 self.run.set(Run { raised, ..run });
357 }
358
359 /// Retrieve the underlying event mask.
360 pub fn pending(&self) -> &Events<S> {
361 self.pending
362 }
363
364 /// Asynchronously drive a fallible poll function to completion
365 ///
366 /// The future produced by this method checks on each poll whether any of the signals
367 /// in `signals` is present in the raised signal set. If that is so, it invokes the poll
368 /// function. On first poll, this check is omitted, thus guaranteeing at least one call
369 /// to the poll function. If the poll function succeeds or fails with
370 /// [`nb::Error::Other`], the future completes immediately with that value. If the poll
371 /// function returns [`nb::Error::WouldBlock`], or if none of the signals in `signals`
372 /// is present in the raised signal sent, then `signals` is added to the wakeup signal
373 /// set and the future pends.
374 ///
375 /// `poll()` must handle spurious calls gracefully. There is no guarantee that any of
376 /// the intended effects of any signal in `signals` has actually taken place. `poll()`
377 /// may not block.
378 pub async fn drive<T, E, F>(&self, signals: S, mut poll: F) -> Result<T, E>
379 where
380 F: FnMut() -> nb::Result<T, E>,
381 {
382 let mask = signals.as_bits();
383 let mut first_poll = true;
384
385 futures::future::poll_fn(move |_| {
386 let run = self.run.get();
387
388 if first_poll || run.raised & mask != 0 {
389 first_poll = false;
390
391 match poll() {
392 Ok(ok) => return Poll::Ready(Ok(ok)),
393 Err(nb::Error::Other(err)) => return Poll::Ready(Err(err)),
394 Err(nb::Error::WouldBlock) => (),
395 }
396 }
397
398 let wakeup = run.wakeup | mask;
399 self.run.set(Run { wakeup, ..run });
400
401 Poll::Pending
402 })
403 .await
404 }
405
406 /// Asynchronously drive an infallible poll function to completion.
407 ///
408 /// This is a variant of [`Signals::drive()`] intended for cases where there is no
409 /// proper error type. Although `drive(sig, poll).await.unwrap()` works the same, it
410 /// often requires explicit type annotations if `poll` is a closure. This method should
411 /// be preferred in such cases, such as when `poll` is just a wrapper around
412 /// `some_option.ok_or(WouldBlock)`.
413 pub async fn drive_infallible<T, F>(&self, signals: S, poll: F) -> T
414 where
415 F: FnMut() -> nb::Result<T, Infallible>,
416 {
417 self.drive(signals, poll).await.unwrap()
418 }
419}
420
421impl<'exec> Executor<'exec> {
422 /// Replace the executor's waker with a custom one.
423 ///
424 /// No restrictions are imposed on the waker: `nb-executor` does not use wakers at all.
425 /// Application code may define some communication between it and the park function.
426 #[must_use]
427 pub fn with_waker(mut self, waker: Waker) -> Self {
428 self.waker = waker;
429 self
430 }
431
432 /// Begin stepped execution.
433 ///
434 /// This allows the caller to remain in control of program flow in between polls, unlike
435 /// [`Executor::block_on()`]. The future is run as specified in [`Step`] and [`Signals`].
436 /// `park` is a park function and must follow the [`Park`] protocol. The caller and the
437 /// park function may cooperate to block or sleep outside of the park function within the
438 /// protocol's requirements.
439 pub fn step<'fut, F, P>(self, future: Pin<&'fut mut F>, park: P) -> Step<F, P>
440 where
441 F: Future,
442 P: FnMut(Park) -> Parked,
443 'exec: 'fut,
444 {
445 Step {
446 executor: self,
447 wakeup: 0,
448 last_pending: u32::MAX,
449 future,
450 park,
451 }
452 }
453
454 /// Execute a future on this executor, parking when no progress is possible.
455 ///
456 /// This method will block until the future resolves. There are two possible states of
457 /// operation while the future is executed:
458 ///
459 /// - Polling: The future's `poll()` method is called in order to attempt to resolve
460 /// it. The signal state is prepared as documented in [`Signals`] when switching
461 /// to the polling state. The next state after polling is unspecified, but will
462 /// eventually lead to parking if the future pends consistently.
463 ///
464 /// - Parking: This state is entered when useful work is unlikely at the current
465 /// time. For details, see the parking protocol in [`Park`]. `park` must adhere to
466 /// this protocol.
467 ///
468 /// See also [`Step`] and [`Executor::step()`].
469 pub fn block_on<F, P>(self, future: F, park: P) -> F::Output
470 where
471 F: Future,
472 P: FnMut(Park) -> Parked,
473 {
474 pin_mut!(future);
475 let mut step = self.step(future, park);
476
477 loop {
478 match step.poll() {
479 Poll::Pending => continue,
480 Poll::Ready(ready) => break ready,
481 }
482 }
483 }
484
485 /// Execute a future in a busy-waiting loop.
486 ///
487 /// This is equivalent to calling [`Executor::block_on()`] with a park function that
488 /// never sleeps. This is most likely the wrong way to do whatever you intend, prefer to
489 /// define a proper wake function.
490 pub fn block_busy_on<F: Future>(self, future: F) -> F::Output {
491 self.block_on(future, |park| park.race_free())
492 }
493}
494
495impl<F, P> Step<'_, '_, F, P>
496where
497 F: Future,
498 P: FnMut(Park) -> Parked,
499{
500 /// Perform at most one non-blocking polling attempt.
501 ///
502 /// This method never blocks unless the park function does (or the future, but that's
503 /// totally wrong anyway). The future will be polled given any of these:
504 ///
505 /// - The wakeup signal set is zero. This is intended to provide a reset or "full scan"
506 /// mechanism. In this case, the raised signal set will be hard-coded to all-ones,
507 /// regardless of the current value of the event mask. **Note:** A side effect of this
508 /// special case is that [`core::future::pending()`] and similar constructs might
509 /// busy-loop depending on the particular park function.
510 ///
511 /// - Any bit in the wakeup signal set matches the event mask. The event mask is
512 /// atomically frozen at the same time that this condition is check, creating the
513 /// raised signal set.
514 ///
515 /// The wakeup signal set is cleared just before polling the future (edge-triggering).
516 /// Regardless of the poll result or if the future is polled at all, a park
517 /// operation may be triggered under implementation-defined conditions. If parking
518 /// occurs, it will be the last observable operation before `poll()` returns. The
519 /// return value will be [`Poll::Pending`] if the future is not polled. Do not call
520 /// call `poll()` again after it returns [`Poll::Ready`].
521 ///
522 /// On first call, the wakeup signal set is zero. This ensures that all driven poll
523 /// functions are attempted at least once.
524 pub fn poll(&mut self) -> Poll<F::Output> {
525 let woken = if self.wakeup == 0 {
526 self.last_pending = u32::MAX;
527 true
528 } else {
529 loop {
530 let cleared = self.last_pending & !self.wakeup;
531 let result = self.executor.pending.compare_exchange_weak(
532 self.last_pending,
533 cleared,
534 Relaxed,
535 Relaxed,
536 );
537
538 match result {
539 Ok(current) => break cleared != current,
540 Err(current) => self.last_pending = current,
541 }
542 }
543 };
544
545 if !woken {
546 return Poll::Pending;
547 }
548
549 self.executor.run.set(Run {
550 raised: self.last_pending,
551 wakeup: 0,
552 });
553
554 let mut cx = Context::from_waker(&self.executor.waker);
555 let poll = self.future.as_mut().poll(&mut cx);
556
557 if poll.is_pending() {
558 self.wakeup = self.executor.run.get().wakeup;
559 let park = Park {
560 pending: self.executor.pending,
561 wakeup: self.wakeup,
562 };
563
564 self.last_pending = (self.park)(park).last_pending;
565 }
566
567 poll
568 }
569}
570
571impl<'a> Park<'a> {
572 /// Promise that new events won't race with sleeps, then get a proof of parking.
573 ///
574 /// Park functions must call this method to obtain the [`Parked`] object that they
575 /// return, which also allows them to determine sleep permissibility. The caller
576 /// promises that external events which may occur from the start of this call until
577 /// optionally starting to sleep won't result in race conditions.
578 pub fn race_free(self) -> Parked<'a> {
579 Parked {
580 last_pending: self.pending.load(Relaxed),
581 wakeup: self.wakeup,
582 _phantom: PhantomData,
583 }
584 }
585}
586
587impl Parked<'_> {
588 /// Check whether useful work is certainly not possible until an event is raised.
589 ///
590 /// Unlike the calling of the park function, this is not an optimistic operation.
591 /// Its result will be exact as long as the [`Park`] protocol is correctly followed.
592 /// A return value of `false` prohibits the park function from sleeping at all:
593 /// it should yield control to the executor immediately. A return value of `true`
594 /// is a strong hint to sleep, block, or otherwise take over control flow until
595 /// some unspecified condition, ideally until an event is raised.
596 ///
597 /// See also the [`Park`] protocol.
598 pub fn is_idle(&self) -> bool {
599 self.last_pending & self.wakeup == 0
600 }
601}
602
603#[derive(Copy, Clone, Default)]
604struct Run {
605 raised: u32,
606 wakeup: u32,
607}