Skip to main content

memlink_shm/
futex.rs

1//! Cross-platform futex (fast userspace mutex) for efficient wait/wake signaling.
2//! Uses native syscalls: Linux futex, macOS ulock, Windows WaitOnAddress.
3
4use std::sync::atomic::{AtomicU32, Ordering};
5use std::time::Duration;
6
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8pub enum FutexError {
9    Timeout,
10    Interrupted,
11    InvalidArgument,
12    Unsupported,
13    Other(i32),
14}
15
16impl std::fmt::Display for FutexError {
17    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
18        match self {
19            FutexError::Timeout => write!(f, "Futex operation timed out"),
20            FutexError::Interrupted => write!(f, "Futex operation interrupted"),
21            FutexError::InvalidArgument => write!(f, "Invalid futex argument"),
22            FutexError::Unsupported => write!(f, "Futex not supported on this platform"),
23            FutexError::Other(code) => write!(f, "Futex error: {}", code),
24        }
25    }
26}
27
28impl std::error::Error for FutexError {}
29
30pub type FutexResult = Result<(), FutexError>;
31
32pub struct Futex {
33    value: AtomicU32,
34    #[cfg(any(target_os = "linux", target_os = "android"))]
35    _platform: LinuxFutex,
36    #[cfg(target_os = "macos")]
37    _platform: MacOSFutex,
38    #[cfg(target_os = "windows")]
39    _platform: WindowsFutex,
40}
41
42#[cfg(any(target_os = "linux", target_os = "android"))]
43struct LinuxFutex;
44
45#[cfg(target_os = "macos")]
46struct MacOSFutex;
47
48#[cfg(target_os = "windows")]
49struct WindowsFutex;
50
51impl Futex {
52    pub fn new(value: u32) -> Self {
53        Self {
54            value: AtomicU32::new(value),
55            #[cfg(any(target_os = "linux", target_os = "android"))]
56            _platform: LinuxFutex,
57            #[cfg(target_os = "macos")]
58            _platform: MacOSFutex,
59            #[cfg(target_os = "windows")]
60            _platform: WindowsFutex,
61        }
62    }
63
64    pub fn load(&self) -> u32 {
65        self.value.load(Ordering::SeqCst)
66    }
67
68    pub fn store(&self, value: u32) {
69        self.value.store(value, Ordering::SeqCst);
70    }
71
72    pub fn wait(&self, expected: u32, timeout: Option<Duration>) -> FutexResult {
73        #[cfg(any(target_os = "linux", target_os = "android"))]
74        {
75            linux_futex_wait(&self.value, expected, timeout)
76        }
77        #[cfg(target_os = "macos")]
78        {
79            macos_futex_wait(&self.value, expected, timeout)
80        }
81        #[cfg(target_os = "windows")]
82        {
83            windows_futex_wait(&self.value, expected, timeout)
84        }
85        #[cfg(not(any(target_os = "linux", target_os = "android", target_os = "macos", target_os = "windows")))]
86        {
87            let _ = (expected, timeout);
88            Err(FutexError::Unsupported)
89        }
90    }
91
92    pub fn wake_one(&self) -> usize {
93        #[cfg(any(target_os = "linux", target_os = "android"))]
94        {
95            linux_futex_wake(&self.value, 1)
96        }
97        #[cfg(target_os = "macos")]
98        {
99            macos_futex_wake(&self.value, 1)
100        }
101        #[cfg(target_os = "windows")]
102        {
103            windows_futex_wake(&self.value, 1)
104        }
105        #[cfg(not(any(target_os = "linux", target_os = "android", target_os = "macos", target_os = "windows")))]
106        {
107            0
108        }
109    }
110
111    pub fn wake_all(&self) -> usize {
112        #[cfg(any(target_os = "linux", target_os = "android"))]
113        {
114            linux_futex_wake(&self.value, usize::MAX)
115        }
116        #[cfg(target_os = "macos")]
117        {
118            macos_futex_wake(&self.value, usize::MAX)
119        }
120        #[cfg(target_os = "windows")]
121        {
122            windows_futex_wake(&self.value, usize::MAX)
123        }
124        #[cfg(not(any(target_os = "linux", target_os = "android", target_os = "macos", target_os = "windows")))]
125        {
126            0
127        }
128    }
129
130    pub fn as_atomic(&self) -> &AtomicU32 {
131        &self.value
132    }
133}
134
135#[cfg(any(target_os = "linux", target_os = "android"))]
136fn linux_futex_wait(
137    value: &AtomicU32,
138    expected: u32,
139    timeout: Option<Duration>,
140) -> FutexResult {
141    use std::ptr;
142
143    const FUTEX_WAIT: i32 = 0;
144
145    if value.load(Ordering::SeqCst) != expected {
146        return Err(FutexError::InvalidArgument);
147    }
148
149    let timespec = timeout.map(duration_to_timespec);
150    let timespec_ptr = timespec
151        .as_ref()
152        .map(|ts| ts as *const libc::timespec)
153        .unwrap_or(ptr::null());
154
155    loop {
156        if value.load(Ordering::SeqCst) != expected {
157            return Err(FutexError::InvalidArgument);
158        }
159
160        let ret = unsafe {
161            libc::syscall(
162                libc::SYS_futex,
163                value as *const AtomicU32 as *const u32,
164                FUTEX_WAIT,
165                expected as i32,
166                timespec_ptr,
167            )
168        };
169
170        if ret == 0 {
171            return Ok(());
172        }
173
174        let err = std::io::Error::last_os_error().raw_os_error().unwrap_or(0);
175        match err {
176            libc::EINTR => continue,
177            libc::ETIMEDOUT => return Err(FutexError::Timeout),
178            libc::EAGAIN => continue,
179            libc::EINVAL | libc::EFAULT => return Err(FutexError::InvalidArgument),
180            _ => return Err(FutexError::Other(err)),
181        }
182    }
183}
184
185#[cfg(any(target_os = "linux", target_os = "android"))]
186fn linux_futex_wake(value: &AtomicU32, count: usize) -> usize {
187    use std::ptr;
188
189    const FUTEX_WAKE: i32 = 1;
190
191    let ret = unsafe {
192        libc::syscall(
193            libc::SYS_futex,
194            value as *const AtomicU32 as *const u32,
195            FUTEX_WAKE,
196            count as i32,
197            ptr::null::<libc::timespec>(),
198        )
199    };
200
201    if ret < 0 {
202        0
203    } else {
204        ret as usize
205    }
206}
207
208#[cfg(any(target_os = "linux", target_os = "android"))]
209fn duration_to_timespec(dur: Duration) -> libc::timespec {
210    libc::timespec {
211        tv_sec: dur.as_secs() as libc::time_t,
212        tv_nsec: dur.subsec_nanos() as libc::c_long,
213    }
214}
215
216#[cfg(target_os = "macos")]
217fn macos_futex_wait(
218    value: &AtomicU32,
219    expected: u32,
220    timeout: Option<Duration>,
221) -> FutexResult {
222    if value.load(Ordering::SeqCst) != expected {
223        return Err(FutexError::InvalidArgument);
224    }
225
226    const UL_WAIT: i32 = 0;
227    const UL_NO_ERR: i32 = 1;
228
229    let timeout_ns = timeout
230        .map(|d| d.as_nanos() as i64)
231        .unwrap_or(i64::MAX);
232
233    let ret = unsafe {
234        libc::ulock_wait(
235            UL_WAIT as u32,
236            value as *const AtomicU32 as *mut libc::c_void,
237            expected as u32,
238            timeout_ns,
239        )
240    };
241
242    match ret {
243        0 => Ok(()),
244        libc::ETIMEDOUT => Err(FutexError::Timeout),
245        libc::EINTR => Err(FutexError::Interrupted),
246        libc::EINVAL => Err(FutexError::InvalidArgument),
247        _ if ret < 0 => Err(FutexError::Other(ret)),
248        _ => Ok(()),
249    }
250}
251
252#[cfg(target_os = "macos")]
253fn macos_futex_wake(value: &AtomicU32, count: usize) -> usize {
254    const UL_WAKE: i32 = 1;
255    const UL_WAKE_ALL: i32 = 3;
256
257    let op = if count == usize::MAX {
258        UL_WAKE_ALL
259    } else {
260        UL_WAKE
261    };
262
263    let ret = unsafe {
264        libc::ulock_wake(
265            op as u32,
266            value as *const AtomicU32 as *mut libc::c_void,
267            0,
268        )
269    };
270
271    if ret < 0 {
272        0
273    } else if count == usize::MAX {
274        count
275    } else {
276        ret as usize
277    }
278}
279
280#[cfg(target_os = "windows")]
281fn windows_futex_wait(
282    value: &AtomicU32,
283    expected: u32,
284    timeout: Option<Duration>,
285) -> FutexResult {
286    use windows::Win32::System::Threading::WaitOnAddress;
287
288    if value.load(Ordering::SeqCst) != expected {
289        return Err(FutexError::InvalidArgument);
290    }
291
292    let timeout_ms = timeout
293        .map(|d| d.as_millis().clamp(1, u32::MAX as u128) as u32)
294        .unwrap_or(0xFFFFFFFF);
295
296    let result = unsafe {
297        WaitOnAddress(
298            value as *const AtomicU32 as *const u32 as *mut _,
299            &expected as *const u32 as *mut _,
300            std::mem::size_of::<u32>(),
301            timeout_ms,
302        )
303    };
304
305    match result {
306        Ok(_) => Ok(()),
307        Err(e) => {
308            const ERROR_TIMEOUT_HRESULT: u32 = 0x800705B4;
309            if e.code().0 == ERROR_TIMEOUT_HRESULT as i32 {
310                Err(FutexError::Timeout)
311            } else {
312                Err(FutexError::Other(e.code().0))
313            }
314        }
315    }
316}
317
318#[cfg(target_os = "windows")]
319fn windows_futex_wake(value: &AtomicU32, count: usize) -> usize {
320    use windows::Win32::System::Threading::{
321        WakeByAddressAll, WakeByAddressSingle,
322    };
323
324    if count == usize::MAX {
325        unsafe {
326            WakeByAddressAll(value as *const AtomicU32 as *mut _);
327        }
328        count
329    } else {
330        for _ in 0..count {
331            unsafe {
332                WakeByAddressSingle(value as *const AtomicU32 as *mut _);
333            }
334        }
335        count
336    }
337}