use crate::{
internal::{
epoch::QuiesceEpoch,
gc::GlobalThreadList,
thread::{Thread, ThreadKeyRaw},
},
read::ReadTx,
rw::RWTx,
tx::Error,
};
use std::{
alloc::AllocErr,
fmt::{self, Debug, Formatter},
ptr::NonNull,
sync::atomic::Ordering::Release,
thread::AccessError,
};
pub struct ThreadKey {
thread: ThreadKeyRaw,
}
impl ThreadKey {
#[inline(never)]
#[cold]
fn new() -> Result<Self, AllocErr> {
let thread = Box::new(Thread::new()?);
GlobalThreadList::instance()
.write()
.register((&thread.synch).into());
let thread = unsafe { ThreadKeyRaw::new(NonNull::new_unchecked(Box::into_raw(thread))) };
Ok(ThreadKey { thread })
}
#[inline(never)]
#[cold]
unsafe fn unregister(&self) {
let synch = self.thread.synch();
synch
.as_ref()
.current_epoch
.set(QuiesceEpoch::end_of_time(), Release);
self.thread
.tx_state()
.as_mut()
.garbage
.synch_and_collect_all(self.thread.synch().as_ref());
synch
.as_ref()
.current_epoch
.set(QuiesceEpoch::inactive(), Release);
tls::clear_tls();
GlobalThreadList::instance_unchecked()
.write()
.unregister(synch);
drop(Box::from_raw(self.thread.thread().as_ptr()))
}
#[inline]
pub(crate) fn as_raw(&self) -> ThreadKeyRaw {
ThreadKeyRaw::new(self.thread.thread())
}
#[inline]
pub fn read<'tcell, F, O>(&'tcell self, f: F) -> O
where
F: FnMut(&ReadTx<'tcell>) -> Result<O, Error>,
{
self.try_read(f)
.expect("nested transactions are not yet supported")
}
#[inline]
pub fn rw<'tcell, F, O>(&'tcell self, f: F) -> O
where
F: FnMut(&mut RWTx<'tcell>) -> Result<O, Error>,
{
self.try_rw(f)
.expect("nested transactions are not yet supported")
}
#[inline]
pub fn try_read<'tcell, F, O>(&'tcell self, f: F) -> Result<O, TryReadErr>
where
F: FnMut(&ReadTx<'tcell>) -> Result<O, Error>,
{
let raw = self.as_raw();
unsafe {
if likely!(!raw.synch().as_mut().current_epoch.is_active_unsync()) {
Ok(raw.read_slow(f))
} else {
Err(TryReadErr::new())
}
}
}
#[inline]
pub fn try_rw<'tcell, F, O>(&'tcell self, f: F) -> Result<O, TryRWErr>
where
F: FnMut(&mut RWTx<'tcell>) -> Result<O, Error>,
{
let raw = self.as_raw();
unsafe {
if likely!(!raw.synch().as_mut().current_epoch.is_active_unsync()) {
Ok(raw.rw_slow(f))
} else {
Err(TryRWErr::new())
}
}
}
#[inline]
pub unsafe fn read_unchecked<'tcell, F, O>(&'tcell self, f: F) -> O
where
F: FnMut(&ReadTx<'tcell>) -> Result<O, Error>,
{
let raw = self.as_raw();
debug_assert!(
!raw.synch().as_mut().current_epoch.is_active_unsync(),
"`rw_unchecked` called during a transaction",
);
raw.read_slow(f)
}
#[inline]
pub unsafe fn rw_unchecked<'tcell, F, O>(&'tcell self, f: F) -> O
where
F: FnMut(&mut RWTx<'tcell>) -> Result<O, Error>,
{
let raw = self.as_raw();
debug_assert!(
!raw.synch().as_mut().current_epoch.is_active_unsync(),
"`rw_unchecked` called during a transaction",
);
raw.rw_slow(f)
}
}
impl Clone for ThreadKey {
#[inline]
fn clone(&self) -> Self {
unsafe {
let ref_count = self.thread.ref_count();
let ref_count = ref_count.as_ref();
let count = ref_count.get();
debug_assert!(count > 0, "attempt to clone a deallocated `ThreadKey`");
ref_count.set(count + 1);
ThreadKey {
thread: self.as_raw(),
}
}
}
}
impl Drop for ThreadKey {
#[inline]
fn drop(&mut self) {
unsafe {
let ref_count = self.thread.ref_count();
let ref_count = ref_count.as_ref();
let count = ref_count.get();
debug_assert!(count > 0, "double free on ThreadKey attempted");
if count == 1 {
self.unregister()
} else {
ref_count.set(count - 1);
}
}
}
}
#[inline(never)]
#[cold]
fn new_thread_key() -> ThreadKey {
ThreadKey::new().expect("Failed to allocate `Thread`")
}
#[inline(never)]
#[cold]
fn err_into_thread_key(_: AccessError) -> ThreadKey {
new_thread_key()
}
thread_local! {
static THREAD_KEY: ThreadKey = new_thread_key();
}
#[cfg(not(target_thread_local))]
mod tls {
use super::*;
#[inline(never)]
pub fn thread_key() -> ThreadKey {
THREAD_KEY
.try_with(ThreadKey::clone)
.unwrap_or_else(err_into_thread_key)
}
#[inline]
pub fn clear_tls() {}
}
#[cfg(target_thread_local)]
mod tls {
use super::{err_into_thread_key, NonNull, ThreadKey, ThreadKeyRaw, THREAD_KEY};
use crate::internal::thread::Thread;
use std::{mem, ptr};
#[thread_local]
static mut TLS: *mut Thread = ptr::null_mut();
#[inline]
pub fn clear_tls() {
unsafe {
TLS = ptr::null_mut();
}
}
#[inline(never)]
#[cold]
fn thread_key_impl() -> ThreadKey {
THREAD_KEY
.try_with(|thread_key| unsafe {
TLS = thread_key.as_raw().thread().as_ptr();
thread_key.clone()
})
.unwrap_or_else(err_into_thread_key)
}
#[inline]
pub fn thread_key() -> ThreadKey {
unsafe {
let tls = TLS;
if likely!(!tls.is_null()) {
let thread_key = ThreadKey {
thread: ThreadKeyRaw::new(NonNull::new_unchecked(tls)),
};
mem::forget(thread_key.clone()); thread_key
} else {
thread_key_impl()
}
}
}
}
#[inline]
pub fn get() -> ThreadKey {
tls::thread_key()
}
pub struct TryReadErr {
_private: (),
}
impl Debug for TryReadErr {
#[cold]
fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result {
formatter.pad("TryReadError { .. }")
}
}
impl TryReadErr {
#[inline]
pub(crate) fn new() -> Self {
TryReadErr { _private: () }
}
}
pub struct TryRWErr {
_private: (),
}
impl Debug for TryRWErr {
#[cold]
fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result {
formatter.pad("TryRWErr { .. }")
}
}
impl TryRWErr {
#[inline]
pub(crate) fn new() -> Self {
TryRWErr { _private: () }
}
}