stakker/sync/waker.rs
1//! # Inter-thread waking
2//!
3//! This converts the single `PollWaker` that we get from the I/O
4//! poller into many thousands, arranged in a hierarchical bitmap to
5//! minimise lookups. A bitmap is used so that some code wishing to
6//! wake up its handler in the **Stakker** thread doesn't need to
7//! remember whether it already has a wake-up request outstanding.
8//! The bitmap allows the operation to be ignored quickly if it is
9//! already outstanding.
10//!
11//! ## Scaling
12//!
13//! Each bitmap has two layers, giving 4096 bits (64*64) on 64-bit
14//! platforms. Above this is an additional summary bitmap of 64 bits.
15//! Each bit in the summary maps to a `Vec` of bitmaps. For the first
16//! 262144 wakers (64*4096), there are just 0 or 1 bitmaps in each
17//! `Vec`. If the caller goes beyond that number then 2 or more
18//! bitmaps may appear in each `Vec`. So this scales quite gradually.
19//! Realistically since each waker will be associated with a channel
20//! or mutex connecting to some code in another thread, it seems
21//! unlikely we'd even reach 4096, but who knows.
22//!
23//! So costs of a single "wake" are minimum 1 atomic operation, and
24//! maximum 3 atomic operations and the poll-wake. Costs of
25//! collecting that "wake" are 3 atomic operations up to 262144
26//! wakers, then 1 additional atomic operation for each 262144 beyond
27//! that, although most of the atomic operations will be shared with
28//! collecting any other wakes that also happened recently.
29//!
30//! Also, the more heavily loaded the main thread becomes, the less
31//! often it will collect the wakes, accumulating more each time,
32//! which means that as the load goes up, the waking mechanism becomes
33//! more efficient.
34//!
35//! (On 32-bit, the figures are 1024 per bitmap, and 32768 to fill the
36//! first element of each `Vec`.)
37//!
38//! ## Drop handling
39//!
40//! The drop of a `Waker` is handled by pushing a notification onto a
41//! mutex-protected list. There is one waker slot in each bitmap
42//! reserved for waking up the drop handler in the main thread. This
43//! overhead is assumed to be acceptable because drops should be very
44//! much less frequent than wakes. For example a channel/waker pair
45//! would be created, handle many messages and then finally be
46//! dropped.
47
48// If inter-thread is disabled, we just switch in a dummy
49// implementation. This results in a lot of dead code for the
50// compiler to optimise out, so ignore those warnings.
51#![cfg_attr(not(feature = "inter-thread"), allow(dead_code))]
52#![cfg_attr(not(feature = "inter-thread"), allow(unused_variables))]
53
54#[cfg(feature = "inter-thread")]
55use {slab::Slab, std::convert::TryFrom, std::mem};
56
57use crate::Stakker;
58use std::ops::{Index, IndexMut};
59use std::sync::atomic::{AtomicUsize, Ordering};
60use std::sync::{Arc, Mutex};
61
62type BoxFnMutCB = Box<dyn FnMut(&mut Stakker, bool) + 'static>;
63
64#[cfg(feature = "inter-thread")]
65pub(crate) struct WakeHandlers {
66 pollwaker: Arc<PollWaker>,
67 slab: Slab<Option<BoxFnMutCB>>,
68 bitmaps: Array<Vec<Arc<BitMap>>>,
69}
70
71#[cfg(feature = "inter-thread")]
72impl WakeHandlers {
73 pub fn new(waker: Box<dyn Fn() + Send + Sync>) -> Self {
74 Self {
75 pollwaker: Arc::new(PollWaker::new(waker)),
76 slab: Slab::new(),
77 bitmaps: Default::default(),
78 }
79 }
80
81 /// Get a list of all the wake handlers that need to run
82 pub fn wake_list(&mut self) -> Vec<u32> {
83 let mut rv = Vec::new();
84 self.pollwaker.summary.drain(|slot| {
85 // If there is nothing in the slab for `bit`, ignore it.
86 // Maybe a `wake` and a `del` occurred around the same
87 // time. This also means that maybe we `del` and
88 // reallocate the slot and then get a `wake` for it. But
89 // a spurious wake is acceptable.
90 for bm in &self.bitmaps[slot] {
91 bm.drain(|bit| rv.push(bit));
92 }
93 });
94 rv
95 }
96
97 // Get Waker `drop_list`
98 pub fn drop_list(&mut self) -> Vec<u32> {
99 mem::take(&mut *self.pollwaker.drop_list.lock().unwrap())
100 }
101
102 /// Borrows a wake handler from its slot in the slab, leaving a
103 /// `None` there. Panics if a waker has been borrowed twice.
104 /// Returns `None` if the slot is now unoccupied, i.e. the handler
105 /// was deleted.
106 pub fn handler_borrow(&mut self, bit: u32) -> Option<BoxFnMutCB> {
107 match self.slab.get_mut(bit as usize) {
108 None => None,
109 Some(slot) => {
110 if let Some(cb) = slot.take() {
111 Some(cb)
112 } else {
113 panic!("Wake handler has been borrowed from its slot twice");
114 }
115 }
116 }
117 }
118
119 /// Restores a wake handler back into its slot in the slab. If the
120 /// handler slot has been deleted, or is currently occupied (not
121 /// None), then panics as it means that something has gone badly
122 /// wrong. It should not be possible for a wake handler to delete
123 /// itself. A wake handler is only deleted when the [`Waker`] is
124 /// dropped. A `drop_list` wake handler won't delete itself.
125 ///
126 /// [`Waker`]: ../sync/struct.Waker.html
127 pub fn handler_restore(&mut self, bit: u32, cb: BoxFnMutCB) {
128 if self
129 .slab
130 .get_mut(bit as usize)
131 .expect("WakeHandlers slot unexpectedly deleted during handler call")
132 .replace(cb)
133 .is_some()
134 {
135 panic!(
136 "WakeHandlers slot unexpected occupied by another handler during wake handler call"
137 );
138 }
139 }
140
141 /// Add a wake handler and return a Waker to pass to the
142 /// thread to use to trigger it. A wake handler must be able to
143 /// handle spurious wakes, since there is a small chance of those
144 /// happening from time to time.
145 pub fn add(&mut self, cb: impl FnMut(&mut Stakker, bool) + 'static) -> Waker {
146 let mut bit = u32::try_from(self.slab.insert(Some(Box::new(cb))))
147 .expect("Exceeded 2^32 Waker instances");
148 let mut base = bit & !(BitMap::SIZE - 1);
149 while base == bit {
150 // Bit zero in any bitmap is used to signal a drop, so
151 // swap it and add another slab entry
152 let somecb = self
153 .slab
154 .get_mut(bit as usize)
155 .unwrap()
156 .replace(Box::new(|s, _| s.process_waker_drops()));
157 let bit2 =
158 u32::try_from(self.slab.insert(somecb)).expect("Exceeded 2^32 Waker instances");
159 bit = bit2;
160 base = bit & !(BitMap::SIZE - 1);
161 }
162 let vec_index = bit >> (USIZE_INDEX_BITS + BitMap::SIZE_BITS);
163 let waker_slot = (bit >> BitMap::SIZE_BITS) & (USIZE_BITS - 1);
164 let vec = &mut self.bitmaps[waker_slot];
165 while vec.len() <= vec_index as usize {
166 vec.push(Arc::new(BitMap::new(
167 base,
168 waker_slot,
169 self.pollwaker.clone(),
170 )));
171 }
172 Waker {
173 bit,
174 bitmap: vec[vec_index as usize].clone(),
175 }
176 }
177
178 /// Delete a handler, and return it if it was found. The returned
179 /// handler should be called with a deleted argument of 'true'.
180 pub fn del(&mut self, bit: u32) -> Option<BoxFnMutCB> {
181 if 0 != (bit & (BitMap::SIZE - 1)) && self.slab.contains(bit as usize) {
182 return self.slab.remove(bit as usize);
183 }
184 None
185 }
186
187 /// Check the number of stored handlers (for testing)
188 #[cfg(test)]
189 pub(crate) fn handler_count(&self) -> usize {
190 self.slab.len()
191 }
192}
193
194// Dummy implementation used if feature "inter-thread" is disabled.
195// Allows `WakeHandlers` instance to be created, but panics if it is
196// actually used to create a Waker.
197#[cfg(not(feature = "inter-thread"))]
198pub(crate) struct WakeHandlers;
199
200#[cfg(not(feature = "inter-thread"))]
201impl WakeHandlers {
202 pub fn new(waker: Box<dyn Fn() + Send + Sync>) -> Self {
203 Self
204 }
205 pub fn wake_list(&mut self) -> Vec<u32> {
206 Vec::new()
207 }
208 pub fn drop_list(&mut self) -> Vec<u32> {
209 Vec::new()
210 }
211 pub fn handler_borrow(&mut self, bit: u32) -> Option<BoxFnMutCB> {
212 None
213 }
214 pub fn handler_restore(&mut self, bit: u32, cb: BoxFnMutCB) {}
215 pub fn add(&mut self, cb: impl FnMut(&mut Stakker, bool) + 'static) -> Waker {
216 panic!("Enable feature 'inter-thread' to create Waker instances");
217 }
218 pub fn del(&mut self, bit: u32) -> Option<BoxFnMutCB> {
219 None
220 }
221}
222
223/// Used to schedule a wake handler to be called in the main thread
224///
225/// Obtain an instance using [`Core::waker`], and pass it to the
226/// thread that needs to wake the main thread. This primitive would
227/// normally be used in conjunction with a channel or some other
228/// shared list or shared state, to alert a wake handler in the main
229/// thread that there is a new message that needs attention, or that
230/// some other change has occurred.
231///
232/// When this is dropped, either due to being manually dropped or due
233/// to a panic, a final call to the wake handler in the main thread is
234/// scheduled with the `deleted` argument set to true, and then the
235/// wake handler is removed.
236///
237/// Note that there is no mechanism for back-pressure or cancellation
238/// here, i.e. no way to inform a [`Waker`] that whatever the wake
239/// handler notifies has gone away or needs to pause. This should all
240/// be handled via whatever channel or shared state is used to
241/// communicate data from the other thread to the main thread.
242/// Usually cancellation would need to be flagged in a drop handler in
243/// the main thread, e.g. in an actor's drop handler. That way the
244/// other thread can recognise the situation and terminate, ensuring
245/// that things clean up nicely in case of failure of the actor.
246///
247/// [`Core::waker`]: ../struct.Core.html#method.waker
248/// [`Waker`]: ../sync/struct.Waker.html
249pub struct Waker {
250 bit: u32,
251 bitmap: Arc<BitMap>,
252}
253
254impl Waker {
255 /// Schedule a call to the corresponding wake handler in the main
256 /// thread, if it is not already scheduled to be called. If it is
257 /// already scheduled (or a nearby handler is already scheduled),
258 /// this requires just one `SeqCst` atomic operation. In the
259 /// worst case it requires 3 atomic operations, and a wake-up call
260 /// to the I/O poller.
261 ///
262 /// This is handled using a hierarchical tree of `usize` bitmaps
263 /// containing wake bits, one leaf wake bit for each [`Waker`].
264 /// At the top of the tree is the poll-waker. The wake-up only
265 /// needs to ascend until it reaches a level which has already
266 /// been woken. In the main thread, in response to the poll-wake
267 /// the hierarchy is descended only on those branches where there
268 /// are wake bits set. The wake bits are cleared and the
269 /// corresponding wake handlers are called. The longer the
270 /// poll-wake process takes, the more wakes will be accumulated in
271 /// the bitmap, making the next wake more efficient, so this
272 /// scales well.
273 ///
274 /// Note that this operation has to be as cheap as possible
275 /// because with a channel for example, you'll need to call it
276 /// every time you add an item. Trying to detect when you add the
277 /// first item to an empty queue is unreliable due to races (for
278 /// example calling `channel.is_empty()` first and then
279 /// `channel.send()` would race with the main thread removing
280 /// items). So unless the channel has specific support for
281 /// detecting writing to an empty queue (some kind of
282 /// `channel.send_and_was_empty()` call), it's necessary to wake
283 /// on every send.
284 ///
285 /// [`Waker`]: ../sync/struct.Waker.html
286 pub fn wake(&self) {
287 self.bitmap.set(self.bit);
288 }
289}
290
291impl Drop for Waker {
292 fn drop(&mut self) {
293 // Ignore poisoning here, to not panic in panic handler
294 if let Ok(mut guard) = self.bitmap.pollwaker.drop_list.lock() {
295 guard.push(self.bit);
296 self.bitmap.set(self.bitmap.base_index);
297 }
298 }
299}
300
301const LOG2_TABLE: [u32; 9] = [0, 0, 1, 0, 2, 0, 0, 0, 3];
302const USIZE_BYTES: u32 = std::mem::size_of::<usize>() as u32; // 4 or 8
303const USIZE_BITS: u32 = 8 * USIZE_BYTES; // 32 or 64
304const USIZE_INDEX_BITS: u32 = 3 + LOG2_TABLE[USIZE_BYTES as usize]; // 5 or 6
305
306// Regarding Ordering::SeqCst, there are two BitMap operations: `set`
307// sets the bit at the bottom of the hierarchy and works up, and
308// `drain` clears the bits at the top of the hierarchy and works down.
309// If these operations occur at the same time, they must cross over in
310// an ordered way, hence SeqCst. If they do occur at the same time we
311// get a spurious wake, but that is harmless. If they crossed over in
312// an unordered way we could get a situation where a bit is set lower
313// down but not in the higher level summaries, which means that that
314// wakeup is stuck indefinitely.
315const ORDERING: Ordering = Ordering::SeqCst;
316
317// An array of 32/64 `I` items. Unfortunately until const generics
318// appear in stable, it's necessary to use this workaround to get a
319// Default implementation for a 64-entry array. It should hopefully
320// all compile down to the natural implementation.
321//
322// TODO: When const generics land, get rid of this workaround
323#[derive(Default)]
324struct Array<I>([[I; 8]; USIZE_BYTES as usize]);
325impl<I> Index<u32> for Array<I> {
326 type Output = I;
327 fn index(&self, ii: u32) -> &Self::Output {
328 &self.0[(ii >> 3) as usize][(ii & 7) as usize]
329 }
330}
331impl<I> IndexMut<u32> for Array<I> {
332 fn index_mut(&mut self, ii: u32) -> &mut Self::Output {
333 &mut self.0[(ii >> 3) as usize][(ii & 7) as usize]
334 }
335}
336
337// A leaf in the hierarchy, providing 32/64 bits. Uses `usize` as
338// that is probably the memory word size. Also `AtomicUsize` is
339// reported to have best platform support.
340#[derive(Default)]
341struct Leaf {
342 bitmap: AtomicUsize,
343}
344
345impl Leaf {
346 #[inline]
347 fn set(&self, bit: u32) -> bool {
348 0 == self.bitmap.fetch_or(1 << bit, ORDERING)
349 }
350 #[inline]
351 fn drain(&self, mut cb: impl FnMut(u32)) {
352 let mut bits = self.bitmap.swap(0, ORDERING);
353 while bits != 0 {
354 let bit = bits.trailing_zeros();
355 bits &= !(1 << bit);
356 cb(bit);
357 }
358 }
359}
360
361#[derive(Default)]
362struct Layer<S: Default> {
363 summary: Leaf, // A bit is set here if the corresponding child is non-zero
364 child: Array<S>,
365}
366
367// Code to produce a hierarchy of three levels is a trivial adaption
368// of this code (i.e. Layer<Layer<Leaf>>), but I don't think it's
369// needed.
370
371// This bitmap is a hierarchy of two levels. This holds 2^10 or 2^12
372// bits (1024 or 4096), taking up 132 or 520 bytes of memory (plus
373// overheads).
374struct BitMap {
375 tree: Layer<Leaf>,
376 base_index: u32,
377 wake_index: u32,
378 pollwaker: Arc<PollWaker>,
379}
380
381impl BitMap {
382 const SIZE_BITS: u32 = USIZE_INDEX_BITS * 2;
383 const SIZE: u32 = 1 << Self::SIZE_BITS;
384
385 pub fn new(base_index: u32, wake_index: u32, pollwaker: Arc<PollWaker>) -> Self {
386 assert_eq!(1 << USIZE_INDEX_BITS, USIZE_BITS);
387 Self {
388 tree: Default::default(),
389 base_index,
390 wake_index,
391 pollwaker,
392 }
393 }
394
395 #[inline]
396 fn set(&self, bit: u32) {
397 let bit = bit - self.base_index;
398 let a = bit >> USIZE_INDEX_BITS;
399 let b = bit & (USIZE_BITS - 1);
400 if self.tree.child[a].set(b)
401 && self.tree.summary.set(a)
402 && self.pollwaker.summary.set(self.wake_index)
403 {
404 (self.pollwaker.waker)();
405 }
406 }
407
408 /// Read out all the set bits, clearing them in the process.
409 #[inline]
410 fn drain(&self, mut cb: impl FnMut(u32)) {
411 self.tree.summary.drain(|a| {
412 self.tree.child[a].drain(|b| {
413 cb((a << USIZE_INDEX_BITS) + b + self.base_index);
414 });
415 });
416 }
417}
418
419/// Interface to the I/O poller-provided waker of the main thread.
420/// Provides 32/64 slots for waking. These slots might get used by
421/// multiple BitMap instances if we have very many of them.
422struct PollWaker {
423 // Bitmap showing which slots need processing
424 summary: Leaf,
425
426 // Waker provided by I/O poller
427 waker: Box<dyn Fn() + Send + Sync + 'static>,
428
429 // List of handlers that need to be dropped
430 drop_list: Mutex<Vec<u32>>,
431}
432
433impl PollWaker {
434 pub fn new(waker: Box<dyn Fn() + Send + Sync>) -> Self {
435 Self {
436 summary: Default::default(),
437 waker,
438 drop_list: Mutex::new(Vec::new()),
439 }
440 }
441}