conquer_once/park.rs
1use std::{
2 cell::Cell,
3 sync::atomic::{AtomicBool, Ordering},
4 thread::{self, Thread},
5};
6
7use conquer_util::BackOff;
8
9use crate::{
10 cell::{Block, Unblock},
11 state::{
12 AtomicOnceState, BlockedState,
13 OnceState::{Ready, Uninit, WouldBlock},
14 },
15 POISON_PANIC_MSG,
16};
17
18use self::internal::ParkThread;
19
20#[cfg(any(test, feature = "std"))]
21/// A type for lazy initialization of e.g. global static variables, which
22/// provides the same functionality as the `lazy_static!` macro.
23///
24/// This type uses the blocking synchronization mechanism provided by the
25/// underlying operating system.
26///
27/// For the API of this type alias, see the API of the generic
28/// [`Lazy`](crate::doc::Lazy) type.
29///
30/// # Examples
31///
32/// ```
33/// use std::sync::Mutex;
34///
35/// # #[cfg(feature = "std")]
36/// use conquer_once::Lazy;
37/// # #[cfg(not(feature = "std"))]
38/// # use conquer_once::spin::Lazy;
39///
40/// static MUTEX: Lazy<Mutex<Vec<i32>>> = Lazy::new(Mutex::default);
41///
42/// let mut lock = MUTEX.lock().unwrap();
43///
44/// lock.push(1);
45/// lock.push(2);
46/// lock.push(3);
47///
48/// assert_eq!(lock.as_slice(), &[1, 2, 3]);
49/// ```
50///
51/// The associated [`new`](crate::lazy::Lazy::new) function can be used with any
52/// function or closure that implements `Fn() -> T`.
53///
54/// ```
55/// use std::collections::HashMap;
56///
57/// # #[cfg(feature = "std")]
58/// use conquer_once::Lazy;
59/// # #[cfg(not(feature = "std"))]
60/// # use conquer_once::spin::Lazy;
61///
62/// static CAPITALS: Lazy<HashMap<&str, &str>> = Lazy::new(|| {
63/// let mut map = HashMap::new();
64/// map.insert("Norway", "Oslo");
65/// map.insert("Belgium", "Brussels");
66/// map.insert("Latvia", "Riga");
67/// map
68/// });
69///
70/// assert_eq!(CAPITALS.get(&"Norway"), Some(&"Oslo"));
71/// assert_eq!(CAPITALS.get(&"Belgium"), Some(&"Brussels"));
72/// assert_eq!(CAPITALS.get(&"Latvia"), Some(&"Riga"));
73/// ```
74pub type Lazy<T, F = fn() -> T> = crate::lazy::Lazy<T, ParkThread, F>;
75
76#[cfg(any(test, feature = "std"))]
77/// An interior mutability cell type which allows synchronized one-time
78/// initialization and read-only access exclusively after initialization.
79///
80/// This type uses the blocking synchronization mechanism provided by the
81/// underlying operating system.
82///
83/// For the API of this type alias, see the generic
84/// [`OnceCell`](crate::doc::OnceCell) type.
85///
86/// # Examples
87///
88/// ```
89/// # #[cfg(feature = "std")]
90/// use conquer_once::OnceCell;
91/// # #[cfg(not(feature = "std"))]
92/// # use conquer_once::spin::OnceCell;
93///
94/// #[derive(Copy, Clone)]
95/// struct Configuration {
96/// mode: i32,
97/// threshold: u64,
98/// msg: &'static str,
99/// }
100///
101/// static CONFIG: OnceCell<Configuration> = OnceCell::uninit();
102///
103/// // producer thread
104/// CONFIG.init_once(|| Configuration {
105/// mode: 2,
106/// threshold: 128,
107/// msg: "..."
108/// });
109///
110/// // consumer thread
111/// let res = CONFIG.get().copied();
112/// if let Some(config) = res {
113/// assert_eq!(config.mode, 2);
114/// assert_eq!(config.threshold, 128);
115/// }
116/// ```
117pub type OnceCell<T> = crate::cell::OnceCell<T, ParkThread>;
118
119#[cfg(any(test, feature = "std"))]
120/// A synchronization primitive which can be used to run a one-time global
121/// initialization.
122///
123/// This type uses the blocking synchronization mechanism provided by the
124/// underlying operating system.
125///
126/// For the API of this type alias, see the generic
127/// [`OnceCell`](crate::doc::OnceCell) type.
128/// This is a specialization with `T = ()`.
129///
130/// # Examples
131///
132/// ```
133/// # #[cfg(feature = "std")]
134/// use conquer_once::Once;
135/// # #[cfg(not(feature = "std"))]
136/// # use conquer_once::spin::Once;
137///
138/// static mut GLOBAL: usize = 0;
139/// static INIT: Once = Once::uninit();
140///
141/// fn get_global() -> usize {
142/// // SAFETY: this is safe because the `Once` ensures the `static mut` is
143/// // assigned by only one thread and without data races.
144/// unsafe {
145/// INIT.init_once(|| {
146/// GLOBAL = expensive_computation();
147/// });
148/// # assert_eq!(GLOBAL, 1);
149/// GLOBAL
150/// }
151/// }
152///
153/// fn expensive_computation() -> usize {
154/// // ...
155/// # 1
156/// }
157/// ```
158pub type Once = OnceCell<()>;
159
160mod internal {
161 /// Blocking strategy using low-level and OS-reliant parking and un-parking
162 /// mechanisms.
163 #[derive(Copy, Clone, Debug, Default, Eq, Ord, PartialEq, PartialOrd)]
164 pub struct ParkThread;
165}
166
167impl ParkThread {
168 #[inline]
169 pub(crate) fn try_block_spinning(
170 state: &AtomicOnceState,
171 back_off: &BackOff,
172 ) -> Result<(), BlockedState> {
173 loop {
174 // (wait:1) this acquire load syncs-with the release swaps (guard:2)
175 // and the acq-rel CAS (wait:2)
176 match state.load(Ordering::Acquire).expect(POISON_PANIC_MSG) {
177 Ready => return Ok(()),
178 WouldBlock(blocked) if back_off.advise_yield() => {
179 back_off.reset();
180 return Err(blocked);
181 }
182 _ => {}
183 }
184
185 back_off.spin();
186 }
187 }
188}
189
190impl Unblock for ParkThread {
191 /// Unblocks all blocked waiting threads.
192 #[inline]
193 unsafe fn on_unblock(state: BlockedState) {
194 let mut curr = state.as_ptr() as *const StackWaiter;
195 while !curr.is_null() {
196 let thread = {
197 // SAFETY: no mutable references to a stack waiter can exist
198 // and the waiter struct is ensured to live while its thread is
199 // parked, so the pointer can be safely dereferenced
200 #[allow(unused_unsafe)]
201 let waiter = unsafe { &*curr };
202 curr = waiter.next.get();
203 // there can be now data race when mutating the thread-cell as only the unblocking
204 // thread will access it, the stack waiter can dropped as soon as the following
205 // store becomes visible, so the thread MUST be taken out first
206 let thread = waiter.thread.take().unwrap();
207 // (ready:2) this release store syncs-with the acquire load (ready:1)
208 waiter.ready.store(true, Ordering::Release);
209 thread
210 };
211
212 thread.unpark();
213 }
214 }
215}
216
217unsafe impl Block for ParkThread {
218 /// Blocks (parks) the current thread until it is woken up by the thread
219 /// with permission to initialize the `OnceCell`.
220 #[inline]
221 fn block(state: &AtomicOnceState) {
222 // spin a little before parking the thread in case the state is
223 // quickly unlocked again
224 let back_off = BackOff::new();
225 let blocked = match Self::try_block_spinning(state, &back_off) {
226 Ok(_) => return,
227 Err(blocked) => blocked,
228 };
229
230 // create a linked list node on the current thread's stack, which is
231 // guaranteed to stay alive while the thread is parked.
232 let waiter = StackWaiter {
233 ready: AtomicBool::new(false),
234 thread: Cell::new(Some(thread::current())),
235 next: Cell::new(blocked.as_ptr() as *const StackWaiter),
236 };
237
238 let mut curr = blocked;
239 let head = BlockedState::from(&waiter as *const _);
240
241 // SAFETY: `head` is a valid pointer to a `StackWaiter` that will live
242 // for the duration of this function, which in turn will only return
243 // when no other thread can still observe any pointer to it
244 // (wait:2) this acq-rel CAS syncs-with itself and the acq load (wait:1)
245 while let Err(err) = unsafe { state.try_enqueue_waiter(curr, head, Ordering::AcqRel) } {
246 match err {
247 // another parked thread succeeded in placing itself at the queue's front
248 WouldBlock(queue) => {
249 // the waiter hasn't been shared yet, so it's still safe to
250 // mutate the next pointer
251 curr = queue;
252 waiter.next.set(queue.as_ptr() as *const StackWaiter);
253 back_off.spin();
254 }
255 // acquire-release is required here to enforce acquire ordering in the failure case,
256 // which guarantees that any (non-atomic) stores to the cell's inner state preceding
257 // (guard:2) have become visible, if the function returns;
258 // (alternatively an explicit acquire fence could be placed into this path)
259 Ready => return,
260 Uninit => unreachable!("cell state can not become `UNINIT again`"),
261 }
262 }
263
264 // park the thread until it is woken up by the thread that first set the state to blocked.
265 // the loop guards against spurious wake ups
266 // (ready:1) this acquire load syncs-with the release store (ready:2)
267 while !waiter.ready.load(Ordering::Acquire) {
268 thread::park();
269 }
270
271 // SAFETY: propagates poisoning as required by the trait
272 // (wait:3) this acquire load syncs-with the acq-rel swap (guard:2)
273 assert_eq!(state.load(Ordering::Acquire).expect(POISON_PANIC_MSG), Ready);
274 }
275}
276
277/// A linked list node that lives on the stack of a parked thread.
278#[repr(align(4))]
279pub(crate) struct StackWaiter {
280 /// The flag marking the waiter as either blocked or ready to proceed.
281 ///
282 /// This is read by the owning thread and is set by the thread that gets to
283 /// run the initialization closure and responsible for unparking all blocked
284 /// threads, which may be either the same or any other thread.
285 ready: AtomicBool,
286 /// The handle for the parked thread that is used to unpark it, once the
287 /// initialization is complete.
288 ///
289 /// This field is in fact mutated by a thread that is potentially not the
290 /// same as the owning thread, but exclusively in the case where the
291 /// mutating thread has exclusive access to this field.
292 thread: Cell<Option<Thread>>,
293 /// The pointer to the next blocked thread.
294 ///
295 /// This field is mutated exclusively by **either** the owning thread
296 /// **before** the waiter becomes visible to other threads or by the thread
297 /// responsible for unparking all waiting threads.
298 next: Cell<*const StackWaiter>,
299}
300
301#[cfg(test)]
302mod tests {
303 generate_tests_non_blocking!();
304 generate_tests!();
305}