use std::sync::atomic::AtomicU32;
#[cfg(target_os = "linux")]
use std::sync::atomic::Ordering;
use std::time::Duration;
#[cfg(target_os = "linux")]
pub fn futex_wait(
futex: &AtomicU32,
expected: u32,
timeout: Option<Duration>,
) -> std::io::Result<bool> {
use std::ptr;
let futex_ptr = futex as *const AtomicU32 as *const u32;
let timespec = timeout.map(|d| libc::timespec {
tv_sec: d.as_secs() as libc::time_t,
tv_nsec: d.subsec_nanos() as libc::c_long,
});
let timespec_ptr = match ×pec {
Some(ts) => ts as *const libc::timespec,
None => ptr::null(),
};
let result = unsafe {
libc::syscall(
libc::SYS_futex,
futex_ptr,
libc::FUTEX_WAIT,
expected,
timespec_ptr,
ptr::null::<u32>(),
0u32,
)
};
if result == 0 {
Ok(true)
} else {
let err = std::io::Error::last_os_error();
match err.raw_os_error() {
Some(libc::EAGAIN) => Ok(false), Some(libc::ETIMEDOUT) => Ok(false), Some(libc::EINTR) => Ok(false), _ => Err(err),
}
}
}
#[cfg(target_os = "linux")]
pub fn futex_wake(futex: &AtomicU32, count: u32) -> std::io::Result<u32> {
let futex_ptr = futex as *const AtomicU32 as *const u32;
let result = unsafe {
libc::syscall(
libc::SYS_futex,
futex_ptr,
libc::FUTEX_WAKE,
count,
std::ptr::null::<libc::timespec>(),
std::ptr::null::<u32>(),
0u32,
)
};
if result >= 0 {
Ok(result as u32)
} else {
Err(std::io::Error::last_os_error())
}
}
#[cfg(target_os = "linux")]
pub fn futex_signal(futex: &AtomicU32) {
futex.fetch_add(1, Ordering::Release);
let _ = futex_wake(futex, u32::MAX);
}
#[cfg(target_os = "linux")]
pub async fn futex_wait_async(
futex: &'static AtomicU32,
timeout: Option<Duration>,
) -> std::io::Result<bool> {
let current = futex.load(Ordering::Acquire);
tokio::task::spawn_blocking(move || futex_wait(futex, current, timeout)).await?
}
#[cfg(target_os = "linux")]
pub async fn futex_wait_async_ptr(
futex: &AtomicU32,
timeout: Option<Duration>,
) -> std::io::Result<bool> {
let current = futex.load(Ordering::Acquire);
let ptr_val = futex as *const AtomicU32 as usize;
tokio::task::spawn_blocking(move || {
let futex = unsafe { &*(ptr_val as *const AtomicU32) };
futex_wait(futex, current, timeout)
})
.await?
}
#[cfg(not(target_os = "linux"))]
pub async fn futex_wait_async_ptr(
_futex: &AtomicU32,
_timeout: Option<Duration>,
) -> std::io::Result<bool> {
tokio::task::yield_now().await;
Ok(false)
}
#[cfg(not(target_os = "linux"))]
pub fn futex_wait(
_futex: &AtomicU32,
_expected: u32,
_timeout: Option<Duration>,
) -> std::io::Result<bool> {
std::thread::yield_now();
Ok(false)
}
#[cfg(not(target_os = "linux"))]
pub fn futex_wake(_futex: &AtomicU32, _count: u32) -> std::io::Result<u32> {
Ok(0)
}
#[cfg(not(target_os = "linux"))]
pub fn futex_signal(_futex: &AtomicU32) {}
#[cfg(not(target_os = "linux"))]
pub async fn futex_wait_async(
_futex: &'static AtomicU32,
_timeout: Option<Duration>,
) -> std::io::Result<bool> {
tokio::task::yield_now().await;
Ok(false)
}
#[cfg(test)]
mod tests {
use super::*;
#[cfg(target_os = "linux")]
use std::sync::Arc;
#[cfg(target_os = "linux")]
use std::sync::atomic::Ordering;
#[cfg(target_os = "linux")]
use std::thread;
#[test]
fn test_futex_wake_without_waiters() {
let futex = AtomicU32::new(0);
let woken = futex_wake(&futex, 1).unwrap();
assert_eq!(woken, 0);
}
#[test]
fn test_futex_wait_value_mismatch() {
let futex = AtomicU32::new(42);
let result = futex_wait(&futex, 0, Some(Duration::from_millis(10))).unwrap();
assert!(!result); }
#[test]
#[cfg(target_os = "linux")]
fn test_futex_signal_and_wait() {
let futex = Arc::new(AtomicU32::new(0));
let futex_clone = futex.clone();
let handle = thread::spawn(move || {
thread::sleep(Duration::from_millis(50));
futex_signal(&futex_clone);
});
let initial = futex.load(Ordering::Acquire);
let result = futex_wait(&futex, initial, Some(Duration::from_secs(1))).unwrap();
assert!(result || futex.load(Ordering::Acquire) != initial);
handle.join().unwrap();
}
}