sema/
sys.rs

1use libc;
2use time::Duration;
3
4pub use self::os::{
5    Semaphore,
6    SemaphoreGuard,
7};
8
9// Converts a `Duration` to a `timespec`.
10fn to_timespec(dur: Duration) -> libc::timespec {
11    let sec = dur.num_seconds();
12    // Safe to unwrap since there can't be more than one second left.
13    let nsec = (dur - Duration::seconds(sec)).num_nanoseconds().unwrap();
14    libc::timespec {
15        tv_sec: sec as libc::time_t,
16        tv_nsec: nsec as libc::c_long,
17    }
18}
19
20// Linux-specific semaphore, implemented with futexes.
21// Heavily based on glibc `sem_t` implementation.
22#[cfg(target_os = "linux")]
23mod os {
24    use std::mem;
25    use std::ptr;
26    use std::sync::atomic::{
27        Ordering,
28        AtomicUsize,
29    };
30    use std::io::{
31        Error,
32        ErrorKind
33    };
34
35    use libc;
36    use time::Duration;
37
38    use super::to_timespec;
39
40    // The number of waiters is stored in the upper half most significant bits.
41    #[cfg(target_pointer_width = "64")]
42    const NWAITERS_SHIFT: usize = 32;
43    #[cfg(target_pointer_width = "32")]
44    const NWAITERS_SHIFT: usize = 16;
45
46    // Masks out nwaiters to obtain the Semaphore's count.
47    const VALUE_MASK: usize = (!0) >> NWAITERS_SHIFT;
48
49    // Value to add to semaphoroe to add one waiter.
50    const ONE_WAITER: usize = (1 << NWAITERS_SHIFT);
51    // Value to add to semaphore to subtract one waiter.
52    const NEG_ONE_WAITER: usize = (!0 << NWAITERS_SHIFT);
53
54
55    // Futex syscall number.
56    #[cfg(target_arch = "x86_64")]
57    const SYS_FUTEX: libc::c_long = 202;
58    #[cfg(target_arch = "x86")]
59    const SYS_FUTEX: libc::c_long = 240;
60
61    // Syscall op numbers.
62    const FUTEX_WAIT: i32 = 0;
63    const FUTEX_WAKE: i32 = 1;
64
65
66    extern {
67        // Glibc doesn't provide a futex wrapper function.
68        // We use this to wrap the futex syscall.
69        fn syscall(number: libc::c_long, ...) -> libc::c_long;
70    }
71
72    // Wake at most `val` threads currently waiting on the futex.
73    fn futex_wake(uaddr: *mut u32, val: u32) -> Result<i32, Error> {
74        let res = unsafe {
75            syscall(SYS_FUTEX, uaddr, FUTEX_WAKE, val)
76        };
77        if res == -1 {
78            Err(Error::last_os_error())
79        } else {
80            Ok(res as i32)
81        }
82    }
83
84    // Puts the current thread to sleep on the futex.
85    // If the timeout is non-NULL, the thread wake after the timeout specified with
86    // `ErrorKind::TimedOut`.
87    fn futex_wait(uaddr: *mut u32, val: u32, timeout: *const libc::timespec) -> Result<i32, Error> {
88        let res = unsafe {
89            syscall(SYS_FUTEX, uaddr, FUTEX_WAIT, val, timeout)
90        };
91        if res == -1 {
92            Err(Error::last_os_error())
93        } else {
94            Ok(res as i32)
95        }
96    }
97
98    unsafe trait AsPointer<T> {
99        unsafe fn as_ptr(&self) -> *mut T;
100    }
101
102    // This is almost definitely undefined behaviour, however this is the only method to get the
103    // address of the underlying integer contained in the atomic wrapper since the field is private.
104    //
105    // This is ONLY to pass to the kernel for futex syscalls and should never, ever, ever be done under
106    // normal circumstances.
107    unsafe impl AsPointer<usize> for AtomicUsize {
108        unsafe fn as_ptr(&self) -> *mut usize {
109            mem::transmute(self)
110        }
111    }
112
113    pub struct Semaphore {
114        data: AtomicUsize,
115    }
116
117    pub struct SemaphoreGuard<'a> {
118        sem: &'a Semaphore,
119    }
120
121    impl Semaphore {
122        pub fn new(value: usize) -> Semaphore {
123            Semaphore {
124                data: AtomicUsize::new(value),
125            }
126        }
127
128        pub fn post(&self) {
129            let d = self.data.load(Ordering::Relaxed);
130            // Release, pending the acquire which will establish happens-before relation.
131            self.data.fetch_add(1, Ordering::Release);
132
133            // If there are any waiters, wake one.
134            if (d >> NWAITERS_SHIFT) > 0 {
135                futex_wake(self.value_ptr(), 1).unwrap();
136            }
137        }
138
139        pub fn wait(&self) -> Result<(), Error> {
140            self.wait_fast(false).or_else(|_| {
141                self.wait_slow(ptr::null())
142            })
143        }
144
145        pub fn try_wait(&self) -> Result<(), Error> {
146            self.wait_fast(true)
147        }
148
149        pub fn wait_timeout(&self, timeout: Duration) -> Result<(), Error> {
150            self.wait_fast(false).or_else(|_| {
151                let ts = to_timespec(timeout);
152                self.wait_slow(&ts)
153            })
154        }
155
156        pub fn take(&self) -> Result<SemaphoreGuard, Error> {
157            try!(self.wait());
158            Ok(SemaphoreGuard {
159                sem: self,
160            })
161        }
162
163        // Returns a pointer to the value of the atomic counter.
164        // This is used to abstract over platform pointer width and endianness differences.
165        fn value_ptr(&self) -> *mut u32 {
166            #[cfg(any(target_endian = "little",
167                      target_pointer_width = "32"))]
168            const VALUE_OFFSET: isize = 0;
169            #[cfg(all(target_endian = "big",
170                      target_pointer_width = "64"))]
171            const VALUE_OFFSET: isize = 1;
172
173            unsafe {
174                (self.data.as_ptr() as *mut u32).offset(VALUE_OFFSET)
175            }
176        }
177
178        // Will grab a token if one is available. Otherwise, returns `ErrorKind::WouldBlock`.
179        fn wait_fast(&self, definitive_result: bool) -> Result<(), Error> {
180            let mut d = self.data.load(Ordering::Relaxed);
181            loop {
182                // Check if there is a token available.
183                if (d & VALUE_MASK) == 0 {
184                    // No token available. Need to call `wait_slow()` and block.
185                    return Err(Error::new(ErrorKind::WouldBlock, "wait would block"));
186                }
187                // Grab the token and establish synchronizes-with between threads.
188                let prev = self.data.compare_and_swap(d, d - 1, Ordering::Acquire);
189                if prev == d {
190                    // Swap was successful and we have taken a token.
191                    return Ok(())
192                } else {
193                    // Swap was unsuccessful. Update variable and possibly loop.
194                    d = prev;
195                }
196                if definitive_result {
197                    continue;
198                } else {
199                    return Err(Error::new(ErrorKind::WouldBlock, "wait would block"));
200                }
201            }
202        }
203
204        fn wait_slow(&self, timeout: *const libc::timespec) -> Result<(), Error> {
205            let mut d = self.data.fetch_add(ONE_WAITER, Ordering::Relaxed);
206
207            // Wait for a token to become available.
208            loop {
209                // If there is no token avalable, sleep until there is.
210                if (d & VALUE_MASK) == 0 {
211                    let res = futex_wait(self.value_ptr(), 0, timeout);
212
213                    // If `futex_wait` timed out, or was interrupted by a signal, return this error to
214                    // the caller. Otherwise we retry.
215                    if let Err(e) = res {
216                        if e.kind() == ErrorKind::Interrupted || e.kind() == ErrorKind::TimedOut {
217                            self.data.fetch_add(NEG_ONE_WAITER, Ordering::Relaxed);
218                            return Err(e);
219                        }
220                    }
221
222                    d = self.data.load(Ordering::Relaxed);
223                } else {
224                    // There is a token available, try to take the token and decrement the number of
225                    // waiters. Return if we are successful, loop if not.
226                    let prev = self.data.compare_and_swap(d, (d - 1) - ONE_WAITER, Ordering::Acquire);
227                    if prev == d {
228                        // Swap was successful and we have synchronizes-with relationship.
229                        return Ok(())
230                    } else {
231                        // Swap was unsuccessful. Update variable and retry.
232                        d = prev;
233                    }
234                }
235            }
236        }
237    }
238
239    unsafe impl Send for Semaphore {}
240    unsafe impl Sync for Semaphore {}
241
242    impl<'a> Drop for SemaphoreGuard<'a> {
243        fn drop(&mut self) {
244            self.sem.post();
245        }
246    }
247}
248
249// POSIX semaphores.
250//
251// This is the basic, non-shared semaphore that is present on most unix-likes. OS X is excluded as
252// it does not implement process local semaphores, and Linux is omitted because we have our own
253// implementation instead.
254#[cfg(not(any(target_os = "macos",
255              target_os = "linux")))]
256mod os {
257    use std::cell::UnsafeCell;
258    use std::mem;
259    use std::io::Error;
260
261    use time::Duration;
262    use libc::{
263        self,
264        c_int,
265        c_uint,
266    };
267
268    use super::to_timespec;
269
270    #[cfg(target_pointer_width = "64")]
271    const SIZEOF_SEM_T: usize = 32;
272    #[cfg(not(target_pointer_width = "64"))]
273    const SIZEOF_SEM_T: usize = 16;
274
275    extern {
276        fn sem_init(sem: *mut sem_t, pshared: c_int, value: c_uint) -> c_int;
277        fn sem_post(sem: *mut sem_t) -> c_int;
278        fn sem_wait(sem: *mut sem_t) -> c_int;
279        fn sem_trywait(sem: *mut sem_t) -> c_int;
280        fn sem_timedwait(sem: *mut sem_t, timeout: *const libc::timespec) -> c_int;
281        fn sem_destroy(sem: *mut sem_t) -> c_int;
282    }
283    #[repr(C)]
284    #[derive(Debug)]
285    struct sem_t {
286        __opaque: [u8; SIZEOF_SEM_T],
287    }
288
289    pub struct Semaphore {
290        inner: UnsafeCell<sem_t>,
291    }
292
293    pub struct SemaphoreGuard<'a> {
294        sem: &'a Semaphore,
295    }
296
297    impl Semaphore {
298        pub fn new(value: u32) -> Semaphore {
299            let mut sem: sem_t = unsafe {
300                mem::uninitialized()
301            };
302            let res = unsafe {
303                sem_init(&mut sem, 0, value as c_uint)
304            };
305            debug_assert_eq!(res, 0);
306
307            Semaphore {
308                inner: UnsafeCell::new(sem),
309            }
310        }
311
312        pub fn wait(&self) -> Result<(), Error> {
313            let res = unsafe {
314                sem_wait(self.inner.get())
315            };
316            if res == -1 {
317                Err(Error::last_os_error())
318            } else {
319                Ok(())
320            }
321        }
322
323        pub fn try_wait(&self) -> Result<(), Error> {
324            let res = unsafe {
325                sem_trywait(self.inner.get())
326            };
327            if res == -1 {
328                Err(Error::last_os_error())
329            } else {
330                Ok(())
331            }
332        }
333
334        pub fn wait_timeout(&self, timeout: Duration) -> Result<(), Error> {
335            let res = unsafe {
336                let ts = to_timespec(timeout);
337                sem_timedwait(self.inner.get(), &ts)
338            };
339            if res == -1 {
340                Err(Error::last_os_error())
341            } else {
342                Ok(())
343            }
344        }
345
346        pub fn post(&self) {
347            let res = unsafe {
348                sem_post(self.inner.get())
349            };
350            debug_assert_eq!(res, 0);
351        }
352
353        pub fn take(&self) -> Result<SemaphoreGuard, Error> {
354            try!(self.wait());
355            Ok(SemaphoreGuard { 
356                sem: self,
357            })
358        }
359    }
360
361    unsafe impl Send for Semaphore {}
362    unsafe impl Sync for Semaphore {}
363
364    impl Drop for Semaphore {
365        fn drop(&mut self) {
366            let res = unsafe {
367                sem_destroy(self.inner.get())
368            };
369            debug_assert_eq!(res, 0);
370        }
371    }
372
373    impl<'a> Drop for SemaphoreGuard<'a> {
374        fn drop(&mut self) {
375            self.sem.post();
376        }
377    }
378}
379
380// OS X specific semaphores.
381//
382// OS X does not implement `sem_init()` and process-local semaphores, however it does implement
383// process-shared semaphores. We use the latter, with randomly generated names to implement
384// pseudo-local semaphores. Semantically they should operate identically.
385#[cfg(target_os = "macos")]
386mod os {
387    use std::ffi::CString;
388    use std::cell::UnsafeCell;
389    use std::io::Error;
390
391    use rand::{
392        thread_rng,
393        Rng,
394    };
395    use libc::{
396        self,
397        c_int,
398        c_uint,
399        c_char,
400        mode_t,
401        O_CREAT,
402        O_EXCL,
403        S_IRWXU,
404    };
405    use time::Duration;
406
407    use super::to_timespec;
408
409    const SEM_NAME_MAX: usize = 28; // No definitive value for this on OS X. Erring on the side of caution.
410    const SEM_FAILED: *mut sem_t = 0 as *mut sem_t;
411
412    extern {
413        fn sem_open(name: *const c_char, oflag: c_int, mode: mode_t, value: c_uint) -> *mut sem_t;
414        fn sem_post(sem: *mut sem_t) -> c_int;
415        fn sem_wait(sem: *mut sem_t) -> c_int;
416        fn sem_trywait(sem: *mut sem_t) -> c_int;
417        fn sem_timedwait(sem: *mut sem_t, timeout: *const libc::timespec) -> c_int;
418        fn sem_close(sem: *mut sem_t) -> c_int;
419        fn sem_unlink(sem: *const c_char) -> c_int;
420    }
421
422    #[repr(C)]
423    #[derive(Debug)]
424    pub struct sem_t {
425        __opaque: c_int,
426    }
427
428    pub struct Semaphore {
429        inner: UnsafeCell<*mut sem_t>,
430        name: CString,
431    }
432
433    pub struct SemaphoreGuard<'a> {
434        sem: &'a Semaphore,
435    }
436
437    impl Semaphore {
438        pub fn new(value: u32) -> Semaphore {
439            let name: String = thread_rng().gen_ascii_chars().take(SEM_NAME_MAX).collect();
440            let c_name = CString::new(name).unwrap(); // Rng does not emit 0 bytes.
441
442            let sem: *mut sem_t = unsafe {
443                sem_open(c_name.as_ptr(), O_CREAT | O_EXCL, S_IRWXU, value as c_uint)
444            };
445            debug_assert!(sem != SEM_FAILED);
446
447            Semaphore {
448                inner: UnsafeCell::new(sem),
449                name: c_name,
450            }
451        }
452
453        pub fn wait(&self) -> Result<(), Error> {
454            let res = unsafe {
455                sem_wait(*self.inner.get())
456            };
457            if res == -1 {
458                Err(Error::last_os_error())
459            } else {
460                Ok(())
461            }
462        }
463
464        pub fn try_wait(&self) -> Result<(), Error> {
465            let res = unsafe {
466                sem_trywait(*self.inner.get())
467            };
468            if res == -1 {
469                Err(Error::last_os_error())
470            } else {
471                Ok(())
472            }
473        }
474
475        pub fn wait_timeout(&self, timeout: Duration) -> Result<(), Error> {
476            let res = unsafe {
477                let ts = to_timespec(timeout);
478                sem_timedwait(*self.inner.get(), &ts)
479            };
480            if res == -1 {
481                Err(Error::last_os_error())
482            } else {
483                Ok(())
484            }
485        }
486
487        pub fn post(&self) {
488            let res = unsafe {
489                sem_post(*self.inner.get())
490            };
491            debug_assert_eq!(res, 0);
492        }
493
494        pub fn take(&self) -> Result<SemaphoreGuard, Error> {
495            try!(self.wait());
496            Ok(SemaphoreGuard { 
497                sem: self,
498            })
499        }
500    }
501
502    unsafe impl Send for Semaphore {}
503    unsafe impl Sync for Semaphore {}
504
505    impl Drop for Semaphore {
506        fn drop(&mut self) {
507            let res = unsafe {
508                sem_close(*self.inner.get())
509            };
510            debug_assert_eq!(res, 0);
511            let res = unsafe {
512                sem_unlink(self.name.as_ptr())
513            };
514            debug_assert_eq!(res, 0);
515        }
516    }
517
518    impl<'a> Drop for SemaphoreGuard<'a> {
519        fn drop(&mut self) {
520            self.sem.post();
521        }
522    }
523}