//! Thread local state, [`thread_key::ThreadKey`], used to run transactions.
//!
//! A handle to the thread local state can be acquired by calling [`thread_key::get`].
use crate::{
internal::{
epoch::QuiesceEpoch,
gc::GlobalThreadList,
thread::{Thread, ThreadKeyRaw},
},
read::ReadTx,
rw::RWTx,
tx::Error,
};
use std::{
alloc::AllocErr,
fmt::{self, Debug, Formatter},
ptr::NonNull,
sync::atomic::Ordering::Release,
thread::AccessError,
};
/// A handle to `swym`'s thread local state.
///
/// `ThreadKey` can be acquired by calling [`get`].
///
/// `ThreadKey`'s encapsulate the state required to perform transactions, and provides the necessary
/// methods for running transactions.
pub struct ThreadKey {
thread: ThreadKeyRaw,
}
impl ThreadKey {
#[inline(never)]
#[cold]
fn new() -> Result<Self, AllocErr> {
let thread = Box::new(Thread::new()?);
GlobalThreadList::instance()
.write()
.register((&thread.synch).into());
let thread = unsafe { ThreadKeyRaw::new(NonNull::new_unchecked(Box::into_raw(thread))) };
Ok(ThreadKey { thread })
}
#[inline(never)]
#[cold]
unsafe fn unregister(&self) {
let synch = self.thread.synch();
synch
.as_ref()
.current_epoch
.set(QuiesceEpoch::end_of_time(), Release);
self.thread
.tx_state()
.as_mut()
.garbage
.synch_and_collect_all(self.thread.synch().as_ref());
synch
.as_ref()
.current_epoch
.set(QuiesceEpoch::inactive(), Release);
tls::clear_tls();
GlobalThreadList::instance_unchecked()
.write()
.unregister(synch);
drop(Box::from_raw(self.thread.thread().as_ptr()))
}
#[inline]
pub(crate) fn as_raw(&self) -> ThreadKeyRaw {
ThreadKeyRaw::new(self.thread.thread())
}
/// Performs a transaction capabable of only reading.
///
/// # Examples
///
/// ```
/// use swym::{tcell::TCell, thread_key};
///
/// let x = TCell::new(String::from("not gonna be overwritten"));
///
/// let thread_key = thread_key::get();
///
/// let x_clone = thread_key.read(|tx| Ok(x.borrow(tx, Default::default())?.to_owned()));
/// assert_eq!(x_clone, "not gonna be overwritten");
/// ```
#[inline]
pub fn read<'tcell, F, O>(&'tcell self, f: F) -> O
where
F: FnMut(&ReadTx<'tcell>) -> Result<O, Error>,
{
self.try_read(f)
.expect("nested transactions are not yet supported")
}
/// Performs a transaction capabable of reading and writing.
///
/// # Panics
///
/// Panics if there is already a running transaction.
///
/// # Examples
///
/// ```
/// use swym::{tcell::TCell, thread_key};
///
/// let x = TCell::new(String::from("gonna be overwritten"));
///
/// let thread_key = thread_key::get();
///
/// let prev_x = thread_key.rw(|tx| {
/// let r = x.borrow(tx, Default::default())?.to_owned();
/// x.set(tx, "overwritten".to_owned())?;
/// Ok(r)
/// });
/// assert_eq!(prev_x, "gonna be overwritten");
/// ```
#[inline]
pub fn rw<'tcell, F, O>(&'tcell self, f: F) -> O
where
F: FnMut(&mut RWTx<'tcell>) -> Result<O, Error>,
{
self.try_rw(f)
.expect("nested transactions are not yet supported")
}
/// Performs a transaction capabable of only reading.
///
/// # Errors
///
/// Returns a [`TryReadErr`] if a transaction is already running on the current thread.
///
/// # Examples
///
/// ```
/// use swym::{tcell::TCell, thread_key};
///
/// let x = TCell::new(String::from("not gonna be overwritten"));
///
/// let thread_key = thread_key::get();
///
/// let x_clone = thread_key
/// .try_read(|tx| Ok(x.borrow(tx, Default::default())?.to_owned()))
/// .unwrap();
/// assert_eq!(x_clone, "not gonna be overwritten");
/// ```
#[inline]
pub fn try_read<'tcell, F, O>(&'tcell self, f: F) -> Result<O, TryReadErr>
where
F: FnMut(&ReadTx<'tcell>) -> Result<O, Error>,
{
let raw = self.as_raw();
unsafe {
if likely!(!raw.synch().as_mut().current_epoch.is_active_unsync()) {
Ok(raw.read_slow(f))
} else {
Err(TryReadErr::new())
}
}
}
/// Performs a transaction capabable of reading and writing.
///
/// # Errors
///
/// Returns a [`TryRWErr`] if a transaction is already running on the current thread.
///
/// # Examples
///
/// ```
/// use swym::{tcell::TCell, thread_key};
///
/// let x = TCell::new(String::from("gonna be overwritten"));
///
/// let thread_key = thread_key::get();
///
/// let prev_x = thread_key
/// .try_rw(|tx| {
/// let prev_x = x.borrow(tx, Default::default())?.to_owned();
/// x.set(tx, "overwritten".to_owned())?;
/// Ok(prev_x)
/// })
/// .unwrap();
/// assert_eq!(prev_x, "gonna be overwritten");
/// ```
#[inline]
pub fn try_rw<'tcell, F, O>(&'tcell self, f: F) -> Result<O, TryRWErr>
where
F: FnMut(&mut RWTx<'tcell>) -> Result<O, Error>,
{
let raw = self.as_raw();
unsafe {
if likely!(!raw.synch().as_mut().current_epoch.is_active_unsync()) {
Ok(raw.rw_slow(f))
} else {
Err(TryRWErr::new())
}
}
}
/// Performs a transaction capabable of only reading.
///
/// # Safety
///
/// If the thread is currently in a transaction, this results in undefined behavior.
///
/// # Examples
///
/// ```
/// use swym::{tcell::TCell, thread_key};
///
/// let x = TCell::new(String::from("not gonna be overwritten"));
///
/// let thread_key = thread_key::get();
///
/// unsafe {
/// let x_clone =
/// thread_key.read_unchecked(|tx| Ok(x.borrow(tx, Default::default())?.to_owned()));
/// assert_eq!(x_clone, "not gonna be overwritten");
/// }
/// ```
#[inline]
pub unsafe fn read_unchecked<'tcell, F, O>(&'tcell self, f: F) -> O
where
F: FnMut(&ReadTx<'tcell>) -> Result<O, Error>,
{
let raw = self.as_raw();
debug_assert!(
!raw.synch().as_mut().current_epoch.is_active_unsync(),
"`rw_unchecked` called during a transaction",
);
raw.read_slow(f)
}
/// Performs a transaction capabable of reading and writing.
///
/// # Safety
///
/// If the thread is currently in a transaction, this results in undefined behavior.
///
/// # Examples
///
/// ```
/// use swym::{tcell::TCell, thread_key};
///
/// let x = TCell::new(String::from("gonna be overwritten"));
///
/// let thread_key = thread_key::get();
///
/// unsafe {
/// let prev_x = thread_key.rw_unchecked(|tx| {
/// let prev_x = x.borrow(tx, Default::default())?.to_owned();
/// x.set(tx, "overwritten".to_owned())?;
/// Ok(prev_x)
/// });
/// assert_eq!(prev_x, "gonna be overwritten");
/// }
/// ```
#[inline]
pub unsafe fn rw_unchecked<'tcell, F, O>(&'tcell self, f: F) -> O
where
F: FnMut(&mut RWTx<'tcell>) -> Result<O, Error>,
{
let raw = self.as_raw();
debug_assert!(
!raw.synch().as_mut().current_epoch.is_active_unsync(),
"`rw_unchecked` called during a transaction",
);
raw.rw_slow(f)
}
}
impl Clone for ThreadKey {
#[inline]
fn clone(&self) -> Self {
unsafe {
let ref_count = self.thread.ref_count();
let ref_count = ref_count.as_ref();
let count = ref_count.get();
debug_assert!(count > 0, "attempt to clone a deallocated `ThreadKey`");
ref_count.set(count + 1);
ThreadKey {
thread: self.as_raw(),
}
}
}
}
impl Drop for ThreadKey {
#[inline]
fn drop(&mut self) {
unsafe {
let ref_count = self.thread.ref_count();
let ref_count = ref_count.as_ref();
let count = ref_count.get();
debug_assert!(count > 0, "double free on ThreadKey attempted");
if count == 1 {
self.unregister()
} else {
ref_count.set(count - 1);
}
}
}
}
#[inline(never)]
#[cold]
fn new_thread_key() -> ThreadKey {
ThreadKey::new().expect("Failed to allocate `Thread`")
}
#[inline(never)]
#[cold]
fn err_into_thread_key(_: AccessError) -> ThreadKey {
new_thread_key()
}
thread_local! {
static THREAD_KEY: ThreadKey = new_thread_key();
}
#[cfg(not(target_thread_local))]
mod tls {
use super::*;
#[inline(never)]
pub fn thread_key() -> ThreadKey {
THREAD_KEY
.try_with(ThreadKey::clone)
.unwrap_or_else(err_into_thread_key)
}
#[inline]
pub fn clear_tls() {}
}
#[cfg(target_thread_local)]
mod tls {
use super::{err_into_thread_key, NonNull, ThreadKey, ThreadKeyRaw, THREAD_KEY};
use crate::internal::thread::Thread;
use std::{mem, ptr};
#[thread_local]
static mut TLS: *mut Thread = ptr::null_mut();
#[inline]
pub fn clear_tls() {
unsafe {
TLS = ptr::null_mut();
}
}
#[inline(never)]
#[cold]
fn thread_key_impl() -> ThreadKey {
THREAD_KEY
.try_with(|thread_key| unsafe {
TLS = thread_key.as_raw().thread().as_ptr();
thread_key.clone()
})
.unwrap_or_else(err_into_thread_key)
}
#[inline]
pub fn thread_key() -> ThreadKey {
unsafe {
let tls = TLS;
if likely!(!tls.is_null()) {
let thread_key = ThreadKey {
thread: ThreadKeyRaw::new(NonNull::new_unchecked(tls)),
};
mem::forget(thread_key.clone()); // bump ref_count since we created ThreadKey through other means
thread_key
} else {
thread_key_impl()
}
}
}
}
/// Returns a handle to `swym`'s thread local state.
///
/// # Note
///
/// Reusing the same [`ThreadKey`] between transactions is slightly more efficient due to the costs
/// of access thread local memory, and (non-atomic) reference counting.
///
/// # Examples
///
/// ```rust
/// use swym::{tcell::TCell, thread_key, tx::Ordering};
/// let thread_key = thread_key::get();
///
/// let x = TCell::new(0);
///
/// thread_key.rw(|tx| Ok(x.set(tx, 1)?));
///
/// let one = thread_key.read(|tx| Ok(x.get(tx, Ordering::default())?));
///
/// assert_eq!(one, 1);
/// ```
#[inline]
pub fn get() -> ThreadKey {
tls::thread_key()
}
/// Error type indicating that the read transaction failed to even start due to nesting.
pub struct TryReadErr {
_private: (),
}
impl Debug for TryReadErr {
#[cold]
fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result {
formatter.pad("TryReadError { .. }")
}
}
impl TryReadErr {
#[inline]
pub(crate) fn new() -> Self {
TryReadErr { _private: () }
}
}
/// Error type indicating that the read-write transaction failed to even start due to nesting.
pub struct TryRWErr {
_private: (),
}
impl Debug for TryRWErr {
#[cold]
fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result {
formatter.pad("TryRWErr { .. }")
}
}
impl TryRWErr {
#[inline]
pub(crate) fn new() -> Self {
TryRWErr { _private: () }
}
}