use libc::c_void;
use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
use crate::{LOCKED_NO_WAITERS, LOCKED_WAITERS, UNLOCKED};
pub struct SharedFutex {
pub futex: *mut c_void,
atom: *mut AtomicU32,
}
impl SharedFutex {
pub fn new(futex: *mut c_void) -> Self {
let atom: *mut AtomicU32 = futex as *mut AtomicU32;
Self { futex, atom }
}
fn cmpxchg(atom: *mut AtomicU32, expected: u32, desired: u32) -> u32 {
unsafe {
match (*atom).compare_exchange(expected, desired, SeqCst, SeqCst) {
Err(err) => err,
Ok(val) => val,
}
}
}
pub unsafe fn syscall_futex(&mut self, futex_op: i32, value: u32, val3: u32) -> i64 {
libc::syscall(libc::SYS_futex, self.futex, futex_op, value, 0, 0, val3)
}
pub unsafe fn syscall_futex3(
&mut self,
futex_op: i32,
value: u32,
val2: u32,
val3: u32,
) -> i64 {
libc::syscall(libc::SYS_futex, self.futex, futex_op, value, val2, 0, val3)
}
pub unsafe fn syscall_futex3_wait(
&mut self,
futex_op: i32,
value: u32,
timeout: *const libc::timespec,
val3: u32,
) -> i64 {
libc::syscall(
libc::SYS_futex,
self.futex,
futex_op,
value,
timeout,
0,
val3,
)
}
pub fn post(&mut self, number_of_waiters: u32) -> i64 {
unsafe {
let s = self.syscall_futex(libc::FUTEX_WAKE, number_of_waiters, 0);
s
}
}
pub fn post_with_value(&mut self, value: u32, number_of_waiters: u32) -> i64 {
unsafe {
(*self.atom).store(value, SeqCst);
let s = self.syscall_futex(libc::FUTEX_WAKE, number_of_waiters, 0);
s
}
}
pub fn set_futex_value(&mut self, value: u32) {
unsafe {
(*self.atom).store(value, SeqCst);
}
}
pub fn get_futex_value(&mut self) -> u32 {
let ret: u32;
unsafe {
ret = (*self.atom).load(SeqCst);
}
ret
}
pub fn wait(&mut self, wait_value: u32) -> i64 {
unsafe {
let ret = self.syscall_futex(libc::FUTEX_WAIT, wait_value, 0);
ret
}
}
pub fn wait_with_timeout(&mut self, wait_value: u32, timeout: libc::timespec) -> i64 {
let timeout = timeout;
unsafe {
let ret = self.syscall_futex3_wait(libc::FUTEX_WAIT, wait_value, &timeout, 0);
ret
}
}
pub fn lock(&mut self) {
let mut ret = Self::cmpxchg(self.atom, UNLOCKED, LOCKED_NO_WAITERS);
if ret != 0 {
loop {
if (ret == LOCKED_WAITERS)
|| (Self::cmpxchg(self.atom, LOCKED_NO_WAITERS, LOCKED_WAITERS) != UNLOCKED)
{
self.wait(LOCKED_WAITERS);
}
ret = Self::cmpxchg(self.atom, UNLOCKED, LOCKED_WAITERS);
if ret == 0 {
break;
}
}
}
}
pub fn unlock(&mut self, how_may_waiters: u32) {
let ret: u32;
unsafe {
ret = (*self.atom).fetch_sub(1, SeqCst);
}
if ret != LOCKED_NO_WAITERS {
unsafe {
(*self.atom).store(UNLOCKED, SeqCst);
self.post(how_may_waiters);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use rushm::posixaccessor::POSIXShm;
use std::mem;
use std::sync::atomic;
use std::sync::atomic::AtomicU32;
use std::sync::mpsc;
use std::{thread, time};
#[test]
fn test_atomic_in_shared_memory() {
let mut shm = POSIXShm::<i32>::new("futex".to_string(), mem::size_of::<u32>());
unsafe {
let ret = shm.open();
assert!(ret.is_ok());
ret.unwrap();
}
let ptr = shm.get_cptr_mut();
let a1: *mut AtomicU32 = ptr as *mut AtomicU32;
unsafe {
(*a1).store(7, atomic::Ordering::SeqCst);
let ret = (*a1).load(atomic::Ordering::SeqCst);
assert_eq!(ret, 7);
}
unsafe {
let ret = shm.close(true);
assert!(ret.is_ok());
ret.unwrap();
}
}
#[test]
fn test_cmpxchg() {
let mut atomic_val: AtomicU32 = AtomicU32::new(UNLOCKED);
let before = atomic_val.load(atomic::Ordering::SeqCst);
let ret = SharedFutex::cmpxchg(&mut atomic_val, UNLOCKED, LOCKED_NO_WAITERS);
assert_eq!(before, UNLOCKED);
assert_eq!(ret, before);
}
#[test]
fn test_cmpxchg_shm() {
unsafe {
let mut shm =
POSIXShm::<i32>::new("test_cmpxchg_shm".to_string(), std::mem::size_of::<u32>());
let ret = shm.open();
assert!(ret.is_ok());
let ptr = shm.get_cptr_mut();
let atom_val: *mut AtomicU32 = ptr as *mut AtomicU32;
(*atom_val).store(0xFF, atomic::Ordering::SeqCst);
let before = (*atom_val).load(atomic::Ordering::SeqCst);
let ret = SharedFutex::cmpxchg(atom_val, UNLOCKED, LOCKED_NO_WAITERS);
assert_eq!(before, 0xFF);
assert_eq!(ret, before);
let ret = shm.close(true);
assert!(ret.is_ok());
}
}
#[test]
fn test_futex_lock_in_shared_memory() {
let (tx, rx) = mpsc::channel();
let mut shm = POSIXShm::<i32>::new(
"test_futex_lock_in_shared_memory".to_string(),
std::mem::size_of::<u32>(),
);
unsafe {
let ret = shm.open();
assert!(ret.is_ok());
}
let ptr_shm = shm.get_cptr_mut();
let shared_atom_val: *mut AtomicU32 = ptr_shm as *mut AtomicU32;
unsafe {
(*shared_atom_val).store(LOCKED_NO_WAITERS, atomic::Ordering::SeqCst);
let val = (*shared_atom_val).load(atomic::Ordering::SeqCst);
assert_eq!(val, LOCKED_NO_WAITERS);
}
let mut shared_futex = SharedFutex::new(ptr_shm);
let handle = thread::spawn(move || {
let mut shm = POSIXShm::<i32>::new(
"test_futex_lock_in_shared_memory".to_string(),
std::mem::size_of::<u32>(),
);
unsafe {
let ret = shm.open();
assert!(ret.is_ok());
}
let ptr_shm = shm.get_cptr_mut();
let mut shared_futex = SharedFutex::new(ptr_shm);
tx.send(true).unwrap();
shared_futex.lock();
});
let _ = rx.recv().unwrap();
thread::sleep(time::Duration::from_millis(500));
shared_futex.unlock(1);
handle.join().unwrap();
unsafe {
let ret = shm.close(true);
assert!(ret.is_ok());
}
}
#[test]
fn test_shared_lock_unlock() {
let mut shm = POSIXShm::<i32>::new("test_shared_lock_unlock".to_string(), 8);
unsafe {
let ret = shm.open();
assert!(ret.is_ok());
}
let ptr_shm = shm.get_cptr_mut();
let mut shared_futex = SharedFutex::new(ptr_shm);
shared_futex.lock();
shared_futex.unlock(1);
shared_futex.lock();
shared_futex.unlock(1);
unsafe {
let ret = shm.close(true);
assert!(ret.is_ok());
}
}
#[test]
fn test_shared_lock_timeout() {
let mut shm = POSIXShm::<i32>::new("test_shared_lock_timeout".to_string(), 8);
unsafe {
let ret = shm.open();
assert!(ret.is_ok());
}
let ptr_shm = shm.get_cptr_mut();
let mut shared_futex = SharedFutex::new(ptr_shm);
let wait_time = libc::timespec {
tv_sec: 0,
tv_nsec: 500 * 1000 * 1000,
};
shared_futex.set_futex_value(1);
shared_futex.wait_with_timeout(1, wait_time);
unsafe {
let ret = shm.close(true);
assert!(ret.is_ok());
}
}
}