1use std::sync::atomic::{AtomicU32, Ordering};
5use std::time::Duration;
6
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8pub enum FutexError {
9 Timeout,
10 Interrupted,
11 InvalidArgument,
12 Unsupported,
13 Other(i32),
14}
15
16impl std::fmt::Display for FutexError {
17 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
18 match self {
19 FutexError::Timeout => write!(f, "Futex operation timed out"),
20 FutexError::Interrupted => write!(f, "Futex operation interrupted"),
21 FutexError::InvalidArgument => write!(f, "Invalid futex argument"),
22 FutexError::Unsupported => write!(f, "Futex not supported on this platform"),
23 FutexError::Other(code) => write!(f, "Futex error: {}", code),
24 }
25 }
26}
27
28impl std::error::Error for FutexError {}
29
30pub type FutexResult = Result<(), FutexError>;
31
32pub struct Futex {
33 value: AtomicU32,
34 #[cfg(any(target_os = "linux", target_os = "android"))]
35 _platform: LinuxFutex,
36 #[cfg(target_os = "macos")]
37 _platform: MacOSFutex,
38 #[cfg(target_os = "windows")]
39 _platform: WindowsFutex,
40}
41
42#[cfg(any(target_os = "linux", target_os = "android"))]
43struct LinuxFutex;
44
45#[cfg(target_os = "macos")]
46struct MacOSFutex;
47
48#[cfg(target_os = "windows")]
49struct WindowsFutex;
50
51impl Futex {
52 pub fn new(value: u32) -> Self {
53 Self {
54 value: AtomicU32::new(value),
55 #[cfg(any(target_os = "linux", target_os = "android"))]
56 _platform: LinuxFutex,
57 #[cfg(target_os = "macos")]
58 _platform: MacOSFutex,
59 #[cfg(target_os = "windows")]
60 _platform: WindowsFutex,
61 }
62 }
63
64 pub fn load(&self) -> u32 {
65 self.value.load(Ordering::SeqCst)
66 }
67
68 pub fn store(&self, value: u32) {
69 self.value.store(value, Ordering::SeqCst);
70 }
71
72 pub fn wait(&self, expected: u32, timeout: Option<Duration>) -> FutexResult {
73 #[cfg(any(target_os = "linux", target_os = "android"))]
74 {
75 linux_futex_wait(&self.value, expected, timeout)
76 }
77 #[cfg(target_os = "macos")]
78 {
79 macos_futex_wait(&self.value, expected, timeout)
80 }
81 #[cfg(target_os = "windows")]
82 {
83 windows_futex_wait(&self.value, expected, timeout)
84 }
85 #[cfg(not(any(target_os = "linux", target_os = "android", target_os = "macos", target_os = "windows")))]
86 {
87 let _ = (expected, timeout);
88 Err(FutexError::Unsupported)
89 }
90 }
91
92 pub fn wake_one(&self) -> usize {
93 #[cfg(any(target_os = "linux", target_os = "android"))]
94 {
95 linux_futex_wake(&self.value, 1)
96 }
97 #[cfg(target_os = "macos")]
98 {
99 macos_futex_wake(&self.value, 1)
100 }
101 #[cfg(target_os = "windows")]
102 {
103 windows_futex_wake(&self.value, 1)
104 }
105 #[cfg(not(any(target_os = "linux", target_os = "android", target_os = "macos", target_os = "windows")))]
106 {
107 0
108 }
109 }
110
111 pub fn wake_all(&self) -> usize {
112 #[cfg(any(target_os = "linux", target_os = "android"))]
113 {
114 linux_futex_wake(&self.value, usize::MAX)
115 }
116 #[cfg(target_os = "macos")]
117 {
118 macos_futex_wake(&self.value, usize::MAX)
119 }
120 #[cfg(target_os = "windows")]
121 {
122 windows_futex_wake(&self.value, usize::MAX)
123 }
124 #[cfg(not(any(target_os = "linux", target_os = "android", target_os = "macos", target_os = "windows")))]
125 {
126 0
127 }
128 }
129
130 pub fn as_atomic(&self) -> &AtomicU32 {
131 &self.value
132 }
133}
134
135#[cfg(any(target_os = "linux", target_os = "android"))]
136fn linux_futex_wait(
137 value: &AtomicU32,
138 expected: u32,
139 timeout: Option<Duration>,
140) -> FutexResult {
141 use std::ptr;
142
143 const FUTEX_WAIT: i32 = 0;
144
145 if value.load(Ordering::SeqCst) != expected {
146 return Err(FutexError::InvalidArgument);
147 }
148
149 let timespec = timeout.map(duration_to_timespec);
150 let timespec_ptr = timespec
151 .as_ref()
152 .map(|ts| ts as *const libc::timespec)
153 .unwrap_or(ptr::null());
154
155 loop {
156 if value.load(Ordering::SeqCst) != expected {
157 return Err(FutexError::InvalidArgument);
158 }
159
160 let ret = unsafe {
161 libc::syscall(
162 libc::SYS_futex,
163 value as *const AtomicU32 as *const u32,
164 FUTEX_WAIT,
165 expected as i32,
166 timespec_ptr,
167 )
168 };
169
170 if ret == 0 {
171 return Ok(());
172 }
173
174 let err = std::io::Error::last_os_error().raw_os_error().unwrap_or(0);
175 match err {
176 libc::EINTR => continue,
177 libc::ETIMEDOUT => return Err(FutexError::Timeout),
178 libc::EAGAIN => continue,
179 libc::EINVAL | libc::EFAULT => return Err(FutexError::InvalidArgument),
180 _ => return Err(FutexError::Other(err)),
181 }
182 }
183}
184
185#[cfg(any(target_os = "linux", target_os = "android"))]
186fn linux_futex_wake(value: &AtomicU32, count: usize) -> usize {
187 use std::ptr;
188
189 const FUTEX_WAKE: i32 = 1;
190
191 let ret = unsafe {
192 libc::syscall(
193 libc::SYS_futex,
194 value as *const AtomicU32 as *const u32,
195 FUTEX_WAKE,
196 count as i32,
197 ptr::null::<libc::timespec>(),
198 )
199 };
200
201 if ret < 0 {
202 0
203 } else {
204 ret as usize
205 }
206}
207
208#[cfg(any(target_os = "linux", target_os = "android"))]
209fn duration_to_timespec(dur: Duration) -> libc::timespec {
210 libc::timespec {
211 tv_sec: dur.as_secs() as libc::time_t,
212 tv_nsec: dur.subsec_nanos() as libc::c_long,
213 }
214}
215
216#[cfg(target_os = "macos")]
217fn macos_futex_wait(
218 value: &AtomicU32,
219 expected: u32,
220 timeout: Option<Duration>,
221) -> FutexResult {
222 if value.load(Ordering::SeqCst) != expected {
223 return Err(FutexError::InvalidArgument);
224 }
225
226 const UL_WAIT: i32 = 0;
227 const UL_NO_ERR: i32 = 1;
228
229 let timeout_ns = timeout
230 .map(|d| d.as_nanos() as i64)
231 .unwrap_or(i64::MAX);
232
233 let ret = unsafe {
234 libc::ulock_wait(
235 UL_WAIT as u32,
236 value as *const AtomicU32 as *mut libc::c_void,
237 expected as u32,
238 timeout_ns,
239 )
240 };
241
242 match ret {
243 0 => Ok(()),
244 libc::ETIMEDOUT => Err(FutexError::Timeout),
245 libc::EINTR => Err(FutexError::Interrupted),
246 libc::EINVAL => Err(FutexError::InvalidArgument),
247 _ if ret < 0 => Err(FutexError::Other(ret)),
248 _ => Ok(()),
249 }
250}
251
252#[cfg(target_os = "macos")]
253fn macos_futex_wake(value: &AtomicU32, count: usize) -> usize {
254 const UL_WAKE: i32 = 1;
255 const UL_WAKE_ALL: i32 = 3;
256
257 let op = if count == usize::MAX {
258 UL_WAKE_ALL
259 } else {
260 UL_WAKE
261 };
262
263 let ret = unsafe {
264 libc::ulock_wake(
265 op as u32,
266 value as *const AtomicU32 as *mut libc::c_void,
267 0,
268 )
269 };
270
271 if ret < 0 {
272 0
273 } else if count == usize::MAX {
274 count
275 } else {
276 ret as usize
277 }
278}
279
280#[cfg(target_os = "windows")]
281fn windows_futex_wait(
282 value: &AtomicU32,
283 expected: u32,
284 timeout: Option<Duration>,
285) -> FutexResult {
286 use windows::Win32::System::Threading::WaitOnAddress;
287
288 if value.load(Ordering::SeqCst) != expected {
289 return Err(FutexError::InvalidArgument);
290 }
291
292 let timeout_ms = timeout
293 .map(|d| d.as_millis().clamp(1, u32::MAX as u128) as u32)
294 .unwrap_or(0xFFFFFFFF);
295
296 let result = unsafe {
297 WaitOnAddress(
298 value as *const AtomicU32 as *const u32 as *mut _,
299 &expected as *const u32 as *mut _,
300 std::mem::size_of::<u32>(),
301 timeout_ms,
302 )
303 };
304
305 match result {
306 Ok(_) => Ok(()),
307 Err(e) => {
308 const ERROR_TIMEOUT_HRESULT: u32 = 0x800705B4;
309 if e.code().0 == ERROR_TIMEOUT_HRESULT as i32 {
310 Err(FutexError::Timeout)
311 } else {
312 Err(FutexError::Other(e.code().0))
313 }
314 }
315 }
316}
317
318#[cfg(target_os = "windows")]
319fn windows_futex_wake(value: &AtomicU32, count: usize) -> usize {
320 use windows::Win32::System::Threading::{
321 WakeByAddressAll, WakeByAddressSingle,
322 };
323
324 if count == usize::MAX {
325 unsafe {
326 WakeByAddressAll(value as *const AtomicU32 as *mut _);
327 }
328 count
329 } else {
330 for _ in 0..count {
331 unsafe {
332 WakeByAddressSingle(value as *const AtomicU32 as *mut _);
333 }
334 }
335 count
336 }
337}