stakker/sync/
waker.rs

1//! # Inter-thread waking
2//!
3//! This converts the single `PollWaker` that we get from the I/O
4//! poller into many thousands, arranged in a hierarchical bitmap to
5//! minimise lookups.  A bitmap is used so that some code wishing to
6//! wake up its handler in the **Stakker** thread doesn't need to
7//! remember whether it already has a wake-up request outstanding.
8//! The bitmap allows the operation to be ignored quickly if it is
9//! already outstanding.
10//!
11//! ## Scaling
12//!
13//! Each bitmap has two layers, giving 4096 bits (64*64) on 64-bit
14//! platforms.  Above this is an additional summary bitmap of 64 bits.
15//! Each bit in the summary maps to a `Vec` of bitmaps.  For the first
16//! 262144 wakers (64*4096), there are just 0 or 1 bitmaps in each
17//! `Vec`.  If the caller goes beyond that number then 2 or more
18//! bitmaps may appear in each `Vec`.  So this scales quite gradually.
19//! Realistically since each waker will be associated with a channel
20//! or mutex connecting to some code in another thread, it seems
21//! unlikely we'd even reach 4096, but who knows.
22//!
23//! So costs of a single "wake" are minimum 1 atomic operation, and
24//! maximum 3 atomic operations and the poll-wake.  Costs of
25//! collecting that "wake" are 3 atomic operations up to 262144
26//! wakers, then 1 additional atomic operation for each 262144 beyond
27//! that, although most of the atomic operations will be shared with
28//! collecting any other wakes that also happened recently.
29//!
30//! Also, the more heavily loaded the main thread becomes, the less
31//! often it will collect the wakes, accumulating more each time,
32//! which means that as the load goes up, the waking mechanism becomes
33//! more efficient.
34//!
35//! (On 32-bit, the figures are 1024 per bitmap, and 32768 to fill the
36//! first element of each `Vec`.)
37//!
38//! ## Drop handling
39//!
40//! The drop of a `Waker` is handled by pushing a notification onto a
41//! mutex-protected list.  There is one waker slot in each bitmap
42//! reserved for waking up the drop handler in the main thread.  This
43//! overhead is assumed to be acceptable because drops should be very
44//! much less frequent than wakes.  For example a channel/waker pair
45//! would be created, handle many messages and then finally be
46//! dropped.
47
48// If inter-thread is disabled, we just switch in a dummy
49// implementation.  This results in a lot of dead code for the
50// compiler to optimise out, so ignore those warnings.
51#![cfg_attr(not(feature = "inter-thread"), allow(dead_code))]
52#![cfg_attr(not(feature = "inter-thread"), allow(unused_variables))]
53
54#[cfg(feature = "inter-thread")]
55use {slab::Slab, std::convert::TryFrom, std::mem};
56
57use crate::Stakker;
58use std::ops::{Index, IndexMut};
59use std::sync::atomic::{AtomicUsize, Ordering};
60use std::sync::{Arc, Mutex};
61
62type BoxFnMutCB = Box<dyn FnMut(&mut Stakker, bool) + 'static>;
63
64#[cfg(feature = "inter-thread")]
65pub(crate) struct WakeHandlers {
66    pollwaker: Arc<PollWaker>,
67    slab: Slab<Option<BoxFnMutCB>>,
68    bitmaps: Array<Vec<Arc<BitMap>>>,
69}
70
71#[cfg(feature = "inter-thread")]
72impl WakeHandlers {
73    pub fn new(waker: Box<dyn Fn() + Send + Sync>) -> Self {
74        Self {
75            pollwaker: Arc::new(PollWaker::new(waker)),
76            slab: Slab::new(),
77            bitmaps: Default::default(),
78        }
79    }
80
81    /// Get a list of all the wake handlers that need to run
82    pub fn wake_list(&mut self) -> Vec<u32> {
83        let mut rv = Vec::new();
84        self.pollwaker.summary.drain(|slot| {
85            // If there is nothing in the slab for `bit`, ignore it.
86            // Maybe a `wake` and a `del` occurred around the same
87            // time.  This also means that maybe we `del` and
88            // reallocate the slot and then get a `wake` for it.  But
89            // a spurious wake is acceptable.
90            for bm in &self.bitmaps[slot] {
91                bm.drain(|bit| rv.push(bit));
92            }
93        });
94        rv
95    }
96
97    // Get Waker `drop_list`
98    pub fn drop_list(&mut self) -> Vec<u32> {
99        mem::take(&mut *self.pollwaker.drop_list.lock().unwrap())
100    }
101
102    /// Borrows a wake handler from its slot in the slab, leaving a
103    /// `None` there.  Panics if a waker has been borrowed twice.
104    /// Returns `None` if the slot is now unoccupied, i.e. the handler
105    /// was deleted.
106    pub fn handler_borrow(&mut self, bit: u32) -> Option<BoxFnMutCB> {
107        match self.slab.get_mut(bit as usize) {
108            None => None,
109            Some(slot) => {
110                if let Some(cb) = slot.take() {
111                    Some(cb)
112                } else {
113                    panic!("Wake handler has been borrowed from its slot twice");
114                }
115            }
116        }
117    }
118
119    /// Restores a wake handler back into its slot in the slab.  If the
120    /// handler slot has been deleted, or is currently occupied (not
121    /// None), then panics as it means that something has gone badly
122    /// wrong.  It should not be possible for a wake handler to delete
123    /// itself.  A wake handler is only deleted when the [`Waker`] is
124    /// dropped.  A `drop_list` wake handler won't delete itself.
125    ///
126    /// [`Waker`]: ../sync/struct.Waker.html
127    pub fn handler_restore(&mut self, bit: u32, cb: BoxFnMutCB) {
128        if self
129            .slab
130            .get_mut(bit as usize)
131            .expect("WakeHandlers slot unexpectedly deleted during handler call")
132            .replace(cb)
133            .is_some()
134        {
135            panic!(
136                "WakeHandlers slot unexpected occupied by another handler during wake handler call"
137            );
138        }
139    }
140
141    /// Add a wake handler and return a Waker to pass to the
142    /// thread to use to trigger it.  A wake handler must be able to
143    /// handle spurious wakes, since there is a small chance of those
144    /// happening from time to time.
145    pub fn add(&mut self, cb: impl FnMut(&mut Stakker, bool) + 'static) -> Waker {
146        let mut bit = u32::try_from(self.slab.insert(Some(Box::new(cb))))
147            .expect("Exceeded 2^32 Waker instances");
148        let mut base = bit & !(BitMap::SIZE - 1);
149        while base == bit {
150            // Bit zero in any bitmap is used to signal a drop, so
151            // swap it and add another slab entry
152            let somecb = self
153                .slab
154                .get_mut(bit as usize)
155                .unwrap()
156                .replace(Box::new(|s, _| s.process_waker_drops()));
157            let bit2 =
158                u32::try_from(self.slab.insert(somecb)).expect("Exceeded 2^32 Waker instances");
159            bit = bit2;
160            base = bit & !(BitMap::SIZE - 1);
161        }
162        let vec_index = bit >> (USIZE_INDEX_BITS + BitMap::SIZE_BITS);
163        let waker_slot = (bit >> BitMap::SIZE_BITS) & (USIZE_BITS - 1);
164        let vec = &mut self.bitmaps[waker_slot];
165        while vec.len() <= vec_index as usize {
166            vec.push(Arc::new(BitMap::new(
167                base,
168                waker_slot,
169                self.pollwaker.clone(),
170            )));
171        }
172        Waker {
173            bit,
174            bitmap: vec[vec_index as usize].clone(),
175        }
176    }
177
178    /// Delete a handler, and return it if it was found.  The returned
179    /// handler should be called with a deleted argument of 'true'.
180    pub fn del(&mut self, bit: u32) -> Option<BoxFnMutCB> {
181        if 0 != (bit & (BitMap::SIZE - 1)) && self.slab.contains(bit as usize) {
182            return self.slab.remove(bit as usize);
183        }
184        None
185    }
186
187    /// Check the number of stored handlers (for testing)
188    #[cfg(test)]
189    pub(crate) fn handler_count(&self) -> usize {
190        self.slab.len()
191    }
192}
193
194// Dummy implementation used if feature "inter-thread" is disabled.
195// Allows `WakeHandlers` instance to be created, but panics if it is
196// actually used to create a Waker.
197#[cfg(not(feature = "inter-thread"))]
198pub(crate) struct WakeHandlers;
199
200#[cfg(not(feature = "inter-thread"))]
201impl WakeHandlers {
202    pub fn new(waker: Box<dyn Fn() + Send + Sync>) -> Self {
203        Self
204    }
205    pub fn wake_list(&mut self) -> Vec<u32> {
206        Vec::new()
207    }
208    pub fn drop_list(&mut self) -> Vec<u32> {
209        Vec::new()
210    }
211    pub fn handler_borrow(&mut self, bit: u32) -> Option<BoxFnMutCB> {
212        None
213    }
214    pub fn handler_restore(&mut self, bit: u32, cb: BoxFnMutCB) {}
215    pub fn add(&mut self, cb: impl FnMut(&mut Stakker, bool) + 'static) -> Waker {
216        panic!("Enable feature 'inter-thread' to create Waker instances");
217    }
218    pub fn del(&mut self, bit: u32) -> Option<BoxFnMutCB> {
219        None
220    }
221}
222
223/// Used to schedule a wake handler to be called in the main thread
224///
225/// Obtain an instance using [`Core::waker`], and pass it to the
226/// thread that needs to wake the main thread.  This primitive would
227/// normally be used in conjunction with a channel or some other
228/// shared list or shared state, to alert a wake handler in the main
229/// thread that there is a new message that needs attention, or that
230/// some other change has occurred.
231///
232/// When this is dropped, either due to being manually dropped or due
233/// to a panic, a final call to the wake handler in the main thread is
234/// scheduled with the `deleted` argument set to true, and then the
235/// wake handler is removed.
236///
237/// Note that there is no mechanism for back-pressure or cancellation
238/// here, i.e. no way to inform a [`Waker`] that whatever the wake
239/// handler notifies has gone away or needs to pause.  This should all
240/// be handled via whatever channel or shared state is used to
241/// communicate data from the other thread to the main thread.
242/// Usually cancellation would need to be flagged in a drop handler in
243/// the main thread, e.g. in an actor's drop handler.  That way the
244/// other thread can recognise the situation and terminate, ensuring
245/// that things clean up nicely in case of failure of the actor.
246///
247/// [`Core::waker`]: ../struct.Core.html#method.waker
248/// [`Waker`]: ../sync/struct.Waker.html
249pub struct Waker {
250    bit: u32,
251    bitmap: Arc<BitMap>,
252}
253
254impl Waker {
255    /// Schedule a call to the corresponding wake handler in the main
256    /// thread, if it is not already scheduled to be called.  If it is
257    /// already scheduled (or a nearby handler is already scheduled),
258    /// this requires just one `SeqCst` atomic operation.  In the
259    /// worst case it requires 3 atomic operations, and a wake-up call
260    /// to the I/O poller.
261    ///
262    /// This is handled using a hierarchical tree of `usize` bitmaps
263    /// containing wake bits, one leaf wake bit for each [`Waker`].
264    /// At the top of the tree is the poll-waker.  The wake-up only
265    /// needs to ascend until it reaches a level which has already
266    /// been woken.  In the main thread, in response to the poll-wake
267    /// the hierarchy is descended only on those branches where there
268    /// are wake bits set.  The wake bits are cleared and the
269    /// corresponding wake handlers are called.  The longer the
270    /// poll-wake process takes, the more wakes will be accumulated in
271    /// the bitmap, making the next wake more efficient, so this
272    /// scales well.
273    ///
274    /// Note that this operation has to be as cheap as possible
275    /// because with a channel for example, you'll need to call it
276    /// every time you add an item.  Trying to detect when you add the
277    /// first item to an empty queue is unreliable due to races (for
278    /// example calling `channel.is_empty()` first and then
279    /// `channel.send()` would race with the main thread removing
280    /// items).  So unless the channel has specific support for
281    /// detecting writing to an empty queue (some kind of
282    /// `channel.send_and_was_empty()` call), it's necessary to wake
283    /// on every send.
284    ///
285    /// [`Waker`]: ../sync/struct.Waker.html
286    pub fn wake(&self) {
287        self.bitmap.set(self.bit);
288    }
289}
290
291impl Drop for Waker {
292    fn drop(&mut self) {
293        // Ignore poisoning here, to not panic in panic handler
294        if let Ok(mut guard) = self.bitmap.pollwaker.drop_list.lock() {
295            guard.push(self.bit);
296            self.bitmap.set(self.bitmap.base_index);
297        }
298    }
299}
300
301const LOG2_TABLE: [u32; 9] = [0, 0, 1, 0, 2, 0, 0, 0, 3];
302const USIZE_BYTES: u32 = std::mem::size_of::<usize>() as u32; // 4 or 8
303const USIZE_BITS: u32 = 8 * USIZE_BYTES; // 32 or 64
304const USIZE_INDEX_BITS: u32 = 3 + LOG2_TABLE[USIZE_BYTES as usize]; // 5 or 6
305
306// Regarding Ordering::SeqCst, there are two BitMap operations: `set`
307// sets the bit at the bottom of the hierarchy and works up, and
308// `drain` clears the bits at the top of the hierarchy and works down.
309// If these operations occur at the same time, they must cross over in
310// an ordered way, hence SeqCst.  If they do occur at the same time we
311// get a spurious wake, but that is harmless.  If they crossed over in
312// an unordered way we could get a situation where a bit is set lower
313// down but not in the higher level summaries, which means that that
314// wakeup is stuck indefinitely.
315const ORDERING: Ordering = Ordering::SeqCst;
316
317// An array of 32/64 `I` items.  Unfortunately until const generics
318// appear in stable, it's necessary to use this workaround to get a
319// Default implementation for a 64-entry array.  It should hopefully
320// all compile down to the natural implementation.
321//
322// TODO: When const generics land, get rid of this workaround
323#[derive(Default)]
324struct Array<I>([[I; 8]; USIZE_BYTES as usize]);
325impl<I> Index<u32> for Array<I> {
326    type Output = I;
327    fn index(&self, ii: u32) -> &Self::Output {
328        &self.0[(ii >> 3) as usize][(ii & 7) as usize]
329    }
330}
331impl<I> IndexMut<u32> for Array<I> {
332    fn index_mut(&mut self, ii: u32) -> &mut Self::Output {
333        &mut self.0[(ii >> 3) as usize][(ii & 7) as usize]
334    }
335}
336
337// A leaf in the hierarchy, providing 32/64 bits.  Uses `usize` as
338// that is probably the memory word size.  Also `AtomicUsize` is
339// reported to have best platform support.
340#[derive(Default)]
341struct Leaf {
342    bitmap: AtomicUsize,
343}
344
345impl Leaf {
346    #[inline]
347    fn set(&self, bit: u32) -> bool {
348        0 == self.bitmap.fetch_or(1 << bit, ORDERING)
349    }
350    #[inline]
351    fn drain(&self, mut cb: impl FnMut(u32)) {
352        let mut bits = self.bitmap.swap(0, ORDERING);
353        while bits != 0 {
354            let bit = bits.trailing_zeros();
355            bits &= !(1 << bit);
356            cb(bit);
357        }
358    }
359}
360
361#[derive(Default)]
362struct Layer<S: Default> {
363    summary: Leaf, // A bit is set here if the corresponding child is non-zero
364    child: Array<S>,
365}
366
367// Code to produce a hierarchy of three levels is a trivial adaption
368// of this code (i.e. Layer<Layer<Leaf>>), but I don't think it's
369// needed.
370
371// This bitmap is a hierarchy of two levels.  This holds 2^10 or 2^12
372// bits (1024 or 4096), taking up 132 or 520 bytes of memory (plus
373// overheads).
374struct BitMap {
375    tree: Layer<Leaf>,
376    base_index: u32,
377    wake_index: u32,
378    pollwaker: Arc<PollWaker>,
379}
380
381impl BitMap {
382    const SIZE_BITS: u32 = USIZE_INDEX_BITS * 2;
383    const SIZE: u32 = 1 << Self::SIZE_BITS;
384
385    pub fn new(base_index: u32, wake_index: u32, pollwaker: Arc<PollWaker>) -> Self {
386        assert_eq!(1 << USIZE_INDEX_BITS, USIZE_BITS);
387        Self {
388            tree: Default::default(),
389            base_index,
390            wake_index,
391            pollwaker,
392        }
393    }
394
395    #[inline]
396    fn set(&self, bit: u32) {
397        let bit = bit - self.base_index;
398        let a = bit >> USIZE_INDEX_BITS;
399        let b = bit & (USIZE_BITS - 1);
400        if self.tree.child[a].set(b)
401            && self.tree.summary.set(a)
402            && self.pollwaker.summary.set(self.wake_index)
403        {
404            (self.pollwaker.waker)();
405        }
406    }
407
408    /// Read out all the set bits, clearing them in the process.
409    #[inline]
410    fn drain(&self, mut cb: impl FnMut(u32)) {
411        self.tree.summary.drain(|a| {
412            self.tree.child[a].drain(|b| {
413                cb((a << USIZE_INDEX_BITS) + b + self.base_index);
414            });
415        });
416    }
417}
418
419/// Interface to the I/O poller-provided waker of the main thread.
420/// Provides 32/64 slots for waking.  These slots might get used by
421/// multiple BitMap instances if we have very many of them.
422struct PollWaker {
423    // Bitmap showing which slots need processing
424    summary: Leaf,
425
426    // Waker provided by I/O poller
427    waker: Box<dyn Fn() + Send + Sync + 'static>,
428
429    // List of handlers that need to be dropped
430    drop_list: Mutex<Vec<u32>>,
431}
432
433impl PollWaker {
434    pub fn new(waker: Box<dyn Fn() + Send + Sync>) -> Self {
435        Self {
436            summary: Default::default(),
437            waker,
438            drop_list: Mutex::new(Vec::new()),
439        }
440    }
441}