conquer_once/
park.rs

1use std::{
2    cell::Cell,
3    sync::atomic::{AtomicBool, Ordering},
4    thread::{self, Thread},
5};
6
7use conquer_util::BackOff;
8
9use crate::{
10    cell::{Block, Unblock},
11    state::{
12        AtomicOnceState, BlockedState,
13        OnceState::{Ready, Uninit, WouldBlock},
14    },
15    POISON_PANIC_MSG,
16};
17
18use self::internal::ParkThread;
19
20#[cfg(any(test, feature = "std"))]
21/// A type for lazy initialization of e.g. global static variables, which
22/// provides the same functionality as the `lazy_static!` macro.
23///
24/// This type uses the blocking synchronization mechanism provided by the
25/// underlying operating system.
26///
27/// For the API of this type alias, see the API of the generic
28/// [`Lazy`](crate::doc::Lazy) type.
29///
30/// # Examples
31///
32/// ```
33/// use std::sync::Mutex;
34///
35/// # #[cfg(feature = "std")]
36/// use conquer_once::Lazy;
37/// # #[cfg(not(feature = "std"))]
38/// # use conquer_once::spin::Lazy;
39///
40/// static MUTEX: Lazy<Mutex<Vec<i32>>> = Lazy::new(Mutex::default);
41///
42/// let mut lock = MUTEX.lock().unwrap();
43///
44/// lock.push(1);
45/// lock.push(2);
46/// lock.push(3);
47///
48/// assert_eq!(lock.as_slice(), &[1, 2, 3]);
49/// ```
50///
51/// The associated [`new`](crate::lazy::Lazy::new) function can be used with any
52/// function or closure that implements `Fn() -> T`.
53///
54/// ```
55/// use std::collections::HashMap;
56///
57/// # #[cfg(feature = "std")]
58/// use conquer_once::Lazy;
59/// # #[cfg(not(feature = "std"))]
60/// # use conquer_once::spin::Lazy;
61///
62/// static CAPITALS: Lazy<HashMap<&str, &str>> = Lazy::new(|| {
63///     let mut map = HashMap::new();
64///     map.insert("Norway", "Oslo");
65///     map.insert("Belgium", "Brussels");
66///     map.insert("Latvia", "Riga");
67///     map
68/// });
69///
70/// assert_eq!(CAPITALS.get(&"Norway"), Some(&"Oslo"));
71/// assert_eq!(CAPITALS.get(&"Belgium"), Some(&"Brussels"));
72/// assert_eq!(CAPITALS.get(&"Latvia"), Some(&"Riga"));
73/// ```
74pub type Lazy<T, F = fn() -> T> = crate::lazy::Lazy<T, ParkThread, F>;
75
76#[cfg(any(test, feature = "std"))]
77/// An interior mutability cell type which allows synchronized one-time
78/// initialization and read-only access exclusively after initialization.
79///
80/// This type uses the blocking synchronization mechanism provided by the
81/// underlying operating system.
82///
83/// For the API of this type alias, see the generic
84/// [`OnceCell`](crate::doc::OnceCell) type.
85///
86/// # Examples
87///
88/// ```
89/// # #[cfg(feature = "std")]
90/// use conquer_once::OnceCell;
91/// # #[cfg(not(feature = "std"))]
92/// # use conquer_once::spin::OnceCell;
93///
94/// #[derive(Copy, Clone)]
95/// struct Configuration {
96///     mode: i32,
97///     threshold: u64,
98///     msg: &'static str,
99/// }
100///
101/// static CONFIG: OnceCell<Configuration> = OnceCell::uninit();
102///
103/// // producer thread
104/// CONFIG.init_once(|| Configuration {
105///     mode: 2,
106///     threshold: 128,
107///     msg: "..."
108/// });
109///
110/// // consumer thread
111/// let res = CONFIG.get().copied();
112/// if let Some(config) = res {
113///     assert_eq!(config.mode, 2);
114///     assert_eq!(config.threshold, 128);
115/// }
116/// ```
117pub type OnceCell<T> = crate::cell::OnceCell<T, ParkThread>;
118
119#[cfg(any(test, feature = "std"))]
120/// A synchronization primitive which can be used to run a one-time global
121/// initialization.
122///
123/// This type uses the blocking synchronization mechanism provided by the
124/// underlying operating system.
125///
126/// For the API of this type alias, see the generic
127/// [`OnceCell`](crate::doc::OnceCell) type.
128/// This is a specialization with `T = ()`.
129///
130/// # Examples
131///
132/// ```
133/// # #[cfg(feature = "std")]
134/// use conquer_once::Once;
135/// # #[cfg(not(feature = "std"))]
136/// # use conquer_once::spin::Once;
137///
138/// static mut GLOBAL: usize = 0;
139/// static INIT: Once = Once::uninit();
140///
141/// fn get_global() -> usize {
142///     // SAFETY: this is safe because the `Once` ensures the `static mut` is
143///     // assigned by only one thread and without data races.
144///     unsafe {
145///         INIT.init_once(|| {
146///             GLOBAL = expensive_computation();
147///         });
148///         # assert_eq!(GLOBAL, 1);
149///         GLOBAL
150///     }
151/// }
152///
153/// fn expensive_computation() -> usize {
154///     // ...
155///     # 1
156/// }
157/// ```
158pub type Once = OnceCell<()>;
159
160mod internal {
161    /// Blocking strategy using low-level and OS-reliant parking and un-parking
162    /// mechanisms.
163    #[derive(Copy, Clone, Debug, Default, Eq, Ord, PartialEq, PartialOrd)]
164    pub struct ParkThread;
165}
166
167impl ParkThread {
168    #[inline]
169    pub(crate) fn try_block_spinning(
170        state: &AtomicOnceState,
171        back_off: &BackOff,
172    ) -> Result<(), BlockedState> {
173        loop {
174            // (wait:1) this acquire load syncs-with the release swaps (guard:2)
175            // and the acq-rel CAS (wait:2)
176            match state.load(Ordering::Acquire).expect(POISON_PANIC_MSG) {
177                Ready => return Ok(()),
178                WouldBlock(blocked) if back_off.advise_yield() => {
179                    back_off.reset();
180                    return Err(blocked);
181                }
182                _ => {}
183            }
184
185            back_off.spin();
186        }
187    }
188}
189
190impl Unblock for ParkThread {
191    /// Unblocks all blocked waiting threads.
192    #[inline]
193    unsafe fn on_unblock(state: BlockedState) {
194        let mut curr = state.as_ptr() as *const StackWaiter;
195        while !curr.is_null() {
196            let thread = {
197                // SAFETY: no mutable references to a stack waiter can exist
198                // and the waiter struct is ensured to live while its thread is
199                // parked, so the pointer can be safely dereferenced
200                #[allow(unused_unsafe)]
201                let waiter = unsafe { &*curr };
202                curr = waiter.next.get();
203                // there can be now data race when mutating the thread-cell as only the unblocking
204                // thread will access it, the stack waiter can dropped as soon as the following
205                // store becomes visible, so the thread MUST be taken out first
206                let thread = waiter.thread.take().unwrap();
207                // (ready:2) this release store syncs-with the acquire load (ready:1)
208                waiter.ready.store(true, Ordering::Release);
209                thread
210            };
211
212            thread.unpark();
213        }
214    }
215}
216
217unsafe impl Block for ParkThread {
218    /// Blocks (parks) the current thread until it is woken up by the thread
219    /// with permission to initialize the `OnceCell`.
220    #[inline]
221    fn block(state: &AtomicOnceState) {
222        // spin a little before parking the thread in case the state is
223        // quickly unlocked again
224        let back_off = BackOff::new();
225        let blocked = match Self::try_block_spinning(state, &back_off) {
226            Ok(_) => return,
227            Err(blocked) => blocked,
228        };
229
230        // create a linked list node on the current thread's stack, which is
231        // guaranteed to stay alive while the thread is parked.
232        let waiter = StackWaiter {
233            ready: AtomicBool::new(false),
234            thread: Cell::new(Some(thread::current())),
235            next: Cell::new(blocked.as_ptr() as *const StackWaiter),
236        };
237
238        let mut curr = blocked;
239        let head = BlockedState::from(&waiter as *const _);
240
241        // SAFETY: `head` is a valid pointer to a `StackWaiter` that will live
242        // for the duration of this function, which in turn will only return
243        // when no other thread can still observe any pointer to it
244        // (wait:2) this acq-rel CAS syncs-with itself and the acq load (wait:1)
245        while let Err(err) = unsafe { state.try_enqueue_waiter(curr, head, Ordering::AcqRel) } {
246            match err {
247                // another parked thread succeeded in placing itself at the queue's front
248                WouldBlock(queue) => {
249                    // the waiter hasn't been shared yet, so it's still safe to
250                    // mutate the next pointer
251                    curr = queue;
252                    waiter.next.set(queue.as_ptr() as *const StackWaiter);
253                    back_off.spin();
254                }
255                // acquire-release is required here to enforce acquire ordering in the failure case,
256                // which guarantees that any (non-atomic) stores to the cell's inner state preceding
257                // (guard:2) have become visible, if the function returns;
258                // (alternatively an explicit acquire fence could be placed into this path)
259                Ready => return,
260                Uninit => unreachable!("cell state can not become `UNINIT again`"),
261            }
262        }
263
264        // park the thread until it is woken up by the thread that first set the state to blocked.
265        // the loop guards against spurious wake ups
266        // (ready:1) this acquire load syncs-with the release store (ready:2)
267        while !waiter.ready.load(Ordering::Acquire) {
268            thread::park();
269        }
270
271        // SAFETY: propagates poisoning as required by the trait
272        // (wait:3) this acquire load syncs-with the acq-rel swap (guard:2)
273        assert_eq!(state.load(Ordering::Acquire).expect(POISON_PANIC_MSG), Ready);
274    }
275}
276
277/// A linked list node that lives on the stack of a parked thread.
278#[repr(align(4))]
279pub(crate) struct StackWaiter {
280    /// The flag marking the waiter as either blocked or ready to proceed.
281    ///
282    /// This is read by the owning thread and is set by the thread that gets to
283    /// run the initialization closure and responsible for unparking all blocked
284    /// threads, which may be either the same or any other thread.
285    ready: AtomicBool,
286    /// The handle for the parked thread that is used to unpark it, once the
287    /// initialization is complete.
288    ///
289    /// This field is in fact mutated by a thread that is potentially not the
290    /// same as the owning thread, but exclusively in the case where the
291    /// mutating thread has exclusive access to this field.
292    thread: Cell<Option<Thread>>,
293    /// The pointer to the next blocked thread.
294    ///
295    /// This field is mutated exclusively by **either** the owning thread
296    /// **before** the waiter becomes visible to other threads or by the thread
297    /// responsible for unparking all waiting threads.
298    next: Cell<*const StackWaiter>,
299}
300
301#[cfg(test)]
302mod tests {
303    generate_tests_non_blocking!();
304    generate_tests!();
305}