wakerpool/
lib.rs

1//! Shared Future implementations, like channels, often store lists of
2//! [core::task::Waker]. This crate provides an efficient [WakerList]
3//! that avoids memory allocation under conditions where wakers are
4//! frequently stored and woken.
5//!
6//! Nodes are stored in a thread-local object pool and backed by a
7//! global, lock-free pool.
8//!
9//! NOTE: For efficiency and simplicity, this crate never deallocates
10//! nodes. If you expect to potentially store unbounded sets of
11//! Wakers, use a [std::vec::Vec].
12
13use core::cell::Cell;
14use core::mem::MaybeUninit;
15use core::ptr;
16use core::sync::atomic::AtomicPtr;
17use core::sync::atomic::Ordering;
18use core::task::Waker;
19
20type WakerNodePtr = AtomicPtr<WakerNode>;
21
22struct WakerNode {
23    next: *mut WakerNode,
24    waker: MaybeUninit<Waker>,
25}
26
27fn allocate_node() -> *mut WakerNode {
28    Box::into_raw(Box::new(WakerNode {
29        next: ptr::null_mut(),
30        waker: MaybeUninit::uninit(),
31    }))
32}
33
34static GLOBAL_POOL: WakerNodePtr = WakerNodePtr::new(ptr::null_mut());
35
36struct LocalPool {
37    head: Cell<*mut WakerNode>,
38}
39
40impl LocalPool {
41    const fn new() -> LocalPool {
42        LocalPool {
43            head: Cell::new(ptr::null_mut()),
44        }
45    }
46
47    fn acquire_node(&self) -> *mut WakerNode {
48        let node = self.head.get();
49        if !node.is_null() {
50            self.head.set(unsafe { (*node).next });
51            // We could clear the next pointer, but the caller is
52            // responsible.
53            return node;
54        }
55
56        let mut node = GLOBAL_POOL.load(Ordering::Acquire);
57        loop {
58            if node.is_null() {
59                break;
60            }
61            // No ABA on global pool because we never deallocate.
62            let new_head = unsafe { (*node).next };
63            node = match GLOBAL_POOL.compare_exchange_weak(
64                node,
65                new_head,
66                Ordering::AcqRel,
67                Ordering::Acquire,
68            ) {
69                Ok(popped) => {
70                    return popped;
71                }
72                Err(node) => node,
73            };
74        }
75
76        allocate_node()
77    }
78
79    unsafe fn release_node(&self, node: *mut WakerNode) {
80        unsafe {
81            (*node).next = self.head.get();
82            self.head.set(node);
83        }
84    }
85
86    unsafe fn release_list(&self, head: *mut WakerNode) {
87        let mut p = self.head.get();
88        if p.is_null() {
89            self.head.set(head);
90            return;
91        }
92        loop {
93            let next = unsafe { (*p).next };
94            if next.is_null() {
95                break;
96            }
97            p = next;
98        }
99        unsafe { (*p).next = head }
100    }
101}
102
103impl Drop for LocalPool {
104    fn drop(&mut self) {
105        let mut p = self.head.get();
106        if p.is_null() {
107            return;
108        }
109        // Find the tail.
110        loop {
111            let next = unsafe { (*p).next };
112            if next.is_null() {
113                break;
114            }
115            p = next;
116        }
117
118        let mut global_head = GLOBAL_POOL.load(Ordering::Acquire);
119        loop {
120            unsafe {
121                (*p).next = global_head;
122            }
123            global_head = match GLOBAL_POOL.compare_exchange_weak(
124                global_head,
125                self.head.get(),
126                Ordering::AcqRel,
127                Ordering::Acquire,
128            ) {
129                Ok(_) => return,
130                Err(node) => node,
131            };
132        }
133    }
134}
135
136thread_local! {
137    static LOCAL_POOL: LocalPool = const { LocalPool::new() }
138}
139
140fn acquire_node() -> *mut WakerNode {
141    LOCAL_POOL.with(LocalPool::acquire_node)
142}
143
144unsafe fn release_node(node: *mut WakerNode) {
145    LOCAL_POOL.with(|lp| unsafe { LocalPool::release_node(lp, node) })
146}
147
148unsafe fn release_list(head: *mut WakerNode) {
149    LOCAL_POOL.with(|lp| unsafe { LocalPool::release_list(lp, head) })
150}
151
152/// Stores a linked list of [core::task::Waker].
153#[derive(Debug)]
154pub struct WakerList {
155    head: *mut WakerNode,
156}
157
158// It's okay to release the nodes onto some other thread.
159unsafe impl Send for WakerList {}
160
161impl Drop for WakerList {
162    fn drop(&mut self) {
163        unsafe {
164            // Deallocate the individual wakers. It's unfortunate to
165            // make two passes through the list, though dropping a
166            // non-empty list is rare.
167            let mut p = self.head;
168            while !p.is_null() {
169                (*p).waker.assume_init_drop();
170                p = (*p).next;
171            }
172            release_list(self.head)
173        }
174    }
175}
176
177impl Default for WakerList {
178    fn default() -> Self {
179        WakerList::new()
180    }
181}
182
183impl WakerList {
184    /// Returns a new empty list.
185    pub const fn new() -> WakerList {
186        Self {
187            head: ptr::null_mut(),
188        }
189    }
190
191    /// Returns true if no wakers are stored.
192    pub fn is_empty(&self) -> bool {
193        self.head.is_null()
194    }
195
196    /// Adds a [Waker] to the list.
197    pub fn push(&mut self, waker: Waker) {
198        let node = acquire_node();
199        unsafe {
200            (*node).waker.write(waker);
201            (*node).next = self.head;
202        }
203        self.head = node;
204    }
205
206    /// Pops a [Waker] from the back of the list. Returns [None] if
207    /// empty.
208    pub fn pop(&mut self) -> Option<Waker> {
209        if self.head.is_null() {
210            None
211        } else {
212            Some(unsafe {
213                let node = self.head;
214
215                self.head = (*node).next;
216                let waker = (*node).waker.assume_init_read();
217                release_node(node);
218                waker
219            })
220        }
221    }
222}
223
224/// To avoid WakerList needing to track the list's tail, iteration is
225/// in reverse order. This should be okay, as wake ordering shouldn't
226/// matter.
227impl Iterator for WakerList {
228    type Item = Waker;
229
230    fn next(&mut self) -> Option<Self::Item> {
231        self.pop()
232    }
233}