use crate::util::UncheckedOptionExt;
use core::{
fmt, mem,
sync::atomic::{fence, AtomicU8, Ordering},
};
use parking_lot_core::{self, SpinWait, DEFAULT_PARK_TOKEN, DEFAULT_UNPARK_TOKEN};
const DONE_BIT: u8 = 1;
const POISON_BIT: u8 = 2;
const LOCKED_BIT: u8 = 4;
const PARKED_BIT: u8 = 8;
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub enum OnceState {
New,
Poisoned,
InProgress,
Done,
}
impl OnceState {
#[inline]
pub fn poisoned(self) -> bool {
matches!(self, OnceState::Poisoned)
}
#[inline]
pub fn done(self) -> bool {
matches!(self, OnceState::Done)
}
}
pub struct Once(AtomicU8);
impl Once {
#[inline]
pub const fn new() -> Once {
Once(AtomicU8::new(0))
}
#[inline]
pub fn state(&self) -> OnceState {
let state = self.0.load(Ordering::Acquire);
if state & DONE_BIT != 0 {
OnceState::Done
} else if state & LOCKED_BIT != 0 {
OnceState::InProgress
} else if state & POISON_BIT != 0 {
OnceState::Poisoned
} else {
OnceState::New
}
}
#[inline]
pub fn call_once<F>(&self, f: F)
where
F: FnOnce(),
{
if self.0.load(Ordering::Acquire) == DONE_BIT {
return;
}
let mut f = Some(f);
self.call_once_slow(false, &mut |_| unsafe { f.take().unchecked_unwrap()() });
}
#[inline]
pub fn call_once_force<F>(&self, f: F)
where
F: FnOnce(OnceState),
{
if self.0.load(Ordering::Acquire) == DONE_BIT {
return;
}
let mut f = Some(f);
self.call_once_slow(true, &mut |state| unsafe {
f.take().unchecked_unwrap()(state)
});
}
#[cold]
fn call_once_slow(&self, ignore_poison: bool, f: &mut dyn FnMut(OnceState)) {
let mut spinwait = SpinWait::new();
let mut state = self.0.load(Ordering::Relaxed);
loop {
if state & DONE_BIT != 0 {
fence(Ordering::Acquire);
return;
}
if state & POISON_BIT != 0 && !ignore_poison {
fence(Ordering::Acquire);
panic!("Once instance has previously been poisoned");
}
if state & LOCKED_BIT == 0 {
match self.0.compare_exchange_weak(
state,
(state | LOCKED_BIT) & !POISON_BIT,
Ordering::Acquire,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(x) => state = x,
}
continue;
}
if state & PARKED_BIT == 0 && spinwait.spin() {
state = self.0.load(Ordering::Relaxed);
continue;
}
if state & PARKED_BIT == 0 {
if let Err(x) = self.0.compare_exchange_weak(
state,
state | PARKED_BIT,
Ordering::Relaxed,
Ordering::Relaxed,
) {
state = x;
continue;
}
}
let addr = self as *const _ as usize;
let validate = || self.0.load(Ordering::Relaxed) == LOCKED_BIT | PARKED_BIT;
let before_sleep = || {};
let timed_out = |_, _| unreachable!();
unsafe {
parking_lot_core::park(
addr,
validate,
before_sleep,
timed_out,
DEFAULT_PARK_TOKEN,
None,
);
}
spinwait.reset();
state = self.0.load(Ordering::Relaxed);
}
struct PanicGuard<'a>(&'a Once);
impl<'a> Drop for PanicGuard<'a> {
fn drop(&mut self) {
let once = self.0;
let state = once.0.swap(POISON_BIT, Ordering::Release);
if state & PARKED_BIT != 0 {
let addr = once as *const _ as usize;
unsafe {
parking_lot_core::unpark_all(addr, DEFAULT_UNPARK_TOKEN);
}
}
}
}
let guard = PanicGuard(self);
let once_state = if state & POISON_BIT != 0 {
OnceState::Poisoned
} else {
OnceState::New
};
f(once_state);
mem::forget(guard);
let state = self.0.swap(DONE_BIT, Ordering::Release);
if state & PARKED_BIT != 0 {
let addr = self as *const _ as usize;
unsafe {
parking_lot_core::unpark_all(addr, DEFAULT_UNPARK_TOKEN);
}
}
}
}
impl Default for Once {
#[inline]
fn default() -> Once {
Once::new()
}
}
impl fmt::Debug for Once {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Once")
.field("state", &self.state())
.finish()
}
}
#[cfg(test)]
mod tests {
use crate::Once;
use std::panic;
use std::sync::mpsc::channel;
use std::thread;
#[test]
fn smoke_once() {
static O: Once = Once::new();
let mut a = 0;
O.call_once(|| a += 1);
assert_eq!(a, 1);
O.call_once(|| a += 1);
assert_eq!(a, 1);
}
#[test]
fn stampede_once() {
static O: Once = Once::new();
static mut RUN: bool = false;
let (tx, rx) = channel();
for _ in 0..10 {
let tx = tx.clone();
thread::spawn(move || {
for _ in 0..4 {
thread::yield_now()
}
unsafe {
O.call_once(|| {
assert!(!RUN);
RUN = true;
});
assert!(RUN);
}
tx.send(()).unwrap();
});
}
unsafe {
O.call_once(|| {
assert!(!RUN);
RUN = true;
});
assert!(RUN);
}
for _ in 0..10 {
rx.recv().unwrap();
}
}
#[test]
fn poison_bad() {
static O: Once = Once::new();
let t = panic::catch_unwind(|| {
O.call_once(|| panic!());
});
assert!(t.is_err());
let t = panic::catch_unwind(|| {
O.call_once(|| {});
});
assert!(t.is_err());
let mut called = false;
O.call_once_force(|p| {
called = true;
assert!(p.poisoned())
});
assert!(called);
O.call_once(|| {});
}
#[test]
fn wait_for_force_to_finish() {
static O: Once = Once::new();
let t = panic::catch_unwind(|| {
O.call_once(|| panic!());
});
assert!(t.is_err());
let (tx1, rx1) = channel();
let (tx2, rx2) = channel();
let t1 = thread::spawn(move || {
O.call_once_force(|p| {
assert!(p.poisoned());
tx1.send(()).unwrap();
rx2.recv().unwrap();
});
});
rx1.recv().unwrap();
let t2 = thread::spawn(|| {
let mut called = false;
O.call_once(|| {
called = true;
});
assert!(!called);
});
tx2.send(()).unwrap();
assert!(t1.join().is_ok());
assert!(t2.join().is_ok());
}
#[test]
fn test_once_debug() {
static O: Once = Once::new();
assert_eq!(format!("{:?}", O), "Once { state: New }");
}
}