use crate::Gutex;
use std::cell::UnsafeCell;
use std::io::Error;
use std::marker::PhantomData;
use std::rc::Rc;
use std::sync::atomic::Ordering;
use std::sync::Arc;
#[derive(Debug)]
pub struct GutexGroup {
owning: ThreadId,
active: UnsafeCell<usize>,
}
impl GutexGroup {
pub fn new() -> Arc<Self> {
Arc::new(Self {
owning: ThreadId::new(0),
active: UnsafeCell::new(0),
})
}
pub fn spawn<T>(self: &Arc<Self>, value: T) -> Gutex<T> {
Gutex {
group: self.clone(),
active: UnsafeCell::new(0),
value: UnsafeCell::new(value),
}
}
#[inline(never)]
pub(crate) fn lock(&self) -> GroupGuard {
let current = Self::current_thread();
if current == self.owning.load(Ordering::Relaxed) {
return unsafe { GroupGuard::new(self) };
}
while let Err(owning) =
self.owning
.compare_exchange(0, current, Ordering::Acquire, Ordering::Relaxed)
{
unsafe { Self::wait_unlock(self.owning.as_ptr(), owning) };
}
unsafe { GroupGuard::new(self) }
}
#[cfg(target_os = "linux")]
fn current_thread() -> i32 {
unsafe { libc::gettid() }
}
#[cfg(target_os = "macos")]
fn current_thread() -> u64 {
let mut id = 0;
assert_eq!(unsafe { libc::pthread_threadid_np(0, &mut id) }, 0);
id
}
#[cfg(target_os = "windows")]
fn current_thread() -> u32 {
unsafe { windows_sys::Win32::System::Threading::GetCurrentThreadId() }
}
#[cfg(target_os = "linux")]
unsafe fn wait_unlock(addr: *mut i32, owning: i32) {
use libc::{syscall, SYS_futex, EAGAIN, FUTEX_PRIVATE_FLAG, FUTEX_WAIT};
if unsafe { syscall(SYS_futex, addr, FUTEX_WAIT | FUTEX_PRIVATE_FLAG, owning, 0) } < 0 {
let e = Error::last_os_error();
if e.raw_os_error().unwrap() != EAGAIN {
panic!("FUTEX_WAIT failed: {e}");
}
}
}
#[cfg(target_os = "macos")]
unsafe fn wait_unlock(addr: *mut u64, owning: u64) {
use ulock_sys::__ulock_wait;
use ulock_sys::darwin19::UL_COMPARE_AND_WAIT64;
if __ulock_wait(UL_COMPARE_AND_WAIT64, addr.cast(), owning, 0) != 0 {
panic!("__ulock_wait() failed: {}", Error::last_os_error());
}
}
#[cfg(target_os = "windows")]
unsafe fn wait_unlock(addr: *mut u32, owning: u32) {
use windows_sys::Win32::System::Threading::{WaitOnAddress, INFINITE};
if unsafe { WaitOnAddress(addr.cast(), &owning as *const u32 as _, 4, INFINITE) } == 0 {
panic!("WaitOnAddress() failed: {}", Error::last_os_error());
}
}
#[cfg(target_os = "linux")]
unsafe fn wake_one(addr: *mut i32) {
use libc::{syscall, SYS_futex, FUTEX_PRIVATE_FLAG, FUTEX_WAKE};
if unsafe { syscall(SYS_futex, addr, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, 1) } < 0 {
panic!("FUTEX_WAKE failed: {}", Error::last_os_error());
}
}
#[cfg(target_os = "macos")]
unsafe fn wake_one(addr: *mut u64) {
use libc::ENOENT;
use ulock_sys::__ulock_wake;
use ulock_sys::darwin19::UL_COMPARE_AND_WAIT64;
if __ulock_wake(UL_COMPARE_AND_WAIT64, addr.cast(), 0) != 0 {
let e = Error::last_os_error();
if e.raw_os_error().unwrap() != ENOENT {
panic!("__ulock_wake() failed: {e}");
}
}
}
#[cfg(target_os = "windows")]
unsafe fn wake_one(addr: *mut u32) {
use windows_sys::Win32::System::Threading::WakeByAddressSingle;
unsafe { WakeByAddressSingle(addr.cast()) };
}
}
unsafe impl Send for GutexGroup {}
unsafe impl Sync for GutexGroup {}
#[derive(Debug)]
pub(crate) struct GroupGuard<'a> {
group: &'a GutexGroup,
phantom: PhantomData<Rc<()>>, }
impl<'a> GroupGuard<'a> {
unsafe fn new(group: &'a GutexGroup) -> Self {
*group.active.get() += 1;
Self {
group,
phantom: PhantomData,
}
}
}
impl<'a> Drop for GroupGuard<'a> {
#[inline(never)]
fn drop(&mut self) {
unsafe {
let active = self.group.active.get();
*active -= 1;
if *active != 0 {
return;
}
}
self.group.owning.store(0, Ordering::Release);
unsafe { GutexGroup::wake_one(self.group.owning.as_ptr()) };
}
}
#[cfg(target_os = "linux")]
type ThreadId = std::sync::atomic::AtomicI32;
#[cfg(target_os = "macos")]
type ThreadId = std::sync::atomic::AtomicU64;
#[cfg(target_os = "windows")]
type ThreadId = std::sync::atomic::AtomicU32;