1use libc;
2use time::Duration;
3
4pub use self::os::{
5 Semaphore,
6 SemaphoreGuard,
7};
8
9fn to_timespec(dur: Duration) -> libc::timespec {
11 let sec = dur.num_seconds();
12 let nsec = (dur - Duration::seconds(sec)).num_nanoseconds().unwrap();
14 libc::timespec {
15 tv_sec: sec as libc::time_t,
16 tv_nsec: nsec as libc::c_long,
17 }
18}
19
20#[cfg(target_os = "linux")]
23mod os {
24 use std::mem;
25 use std::ptr;
26 use std::sync::atomic::{
27 Ordering,
28 AtomicUsize,
29 };
30 use std::io::{
31 Error,
32 ErrorKind
33 };
34
35 use libc;
36 use time::Duration;
37
38 use super::to_timespec;
39
40 #[cfg(target_pointer_width = "64")]
42 const NWAITERS_SHIFT: usize = 32;
43 #[cfg(target_pointer_width = "32")]
44 const NWAITERS_SHIFT: usize = 16;
45
46 const VALUE_MASK: usize = (!0) >> NWAITERS_SHIFT;
48
49 const ONE_WAITER: usize = (1 << NWAITERS_SHIFT);
51 const NEG_ONE_WAITER: usize = (!0 << NWAITERS_SHIFT);
53
54
55 #[cfg(target_arch = "x86_64")]
57 const SYS_FUTEX: libc::c_long = 202;
58 #[cfg(target_arch = "x86")]
59 const SYS_FUTEX: libc::c_long = 240;
60
61 const FUTEX_WAIT: i32 = 0;
63 const FUTEX_WAKE: i32 = 1;
64
65
66 extern {
67 fn syscall(number: libc::c_long, ...) -> libc::c_long;
70 }
71
72 fn futex_wake(uaddr: *mut u32, val: u32) -> Result<i32, Error> {
74 let res = unsafe {
75 syscall(SYS_FUTEX, uaddr, FUTEX_WAKE, val)
76 };
77 if res == -1 {
78 Err(Error::last_os_error())
79 } else {
80 Ok(res as i32)
81 }
82 }
83
84 fn futex_wait(uaddr: *mut u32, val: u32, timeout: *const libc::timespec) -> Result<i32, Error> {
88 let res = unsafe {
89 syscall(SYS_FUTEX, uaddr, FUTEX_WAIT, val, timeout)
90 };
91 if res == -1 {
92 Err(Error::last_os_error())
93 } else {
94 Ok(res as i32)
95 }
96 }
97
98 unsafe trait AsPointer<T> {
99 unsafe fn as_ptr(&self) -> *mut T;
100 }
101
102 unsafe impl AsPointer<usize> for AtomicUsize {
108 unsafe fn as_ptr(&self) -> *mut usize {
109 mem::transmute(self)
110 }
111 }
112
113 pub struct Semaphore {
114 data: AtomicUsize,
115 }
116
117 pub struct SemaphoreGuard<'a> {
118 sem: &'a Semaphore,
119 }
120
121 impl Semaphore {
122 pub fn new(value: usize) -> Semaphore {
123 Semaphore {
124 data: AtomicUsize::new(value),
125 }
126 }
127
128 pub fn post(&self) {
129 let d = self.data.load(Ordering::Relaxed);
130 self.data.fetch_add(1, Ordering::Release);
132
133 if (d >> NWAITERS_SHIFT) > 0 {
135 futex_wake(self.value_ptr(), 1).unwrap();
136 }
137 }
138
139 pub fn wait(&self) -> Result<(), Error> {
140 self.wait_fast(false).or_else(|_| {
141 self.wait_slow(ptr::null())
142 })
143 }
144
145 pub fn try_wait(&self) -> Result<(), Error> {
146 self.wait_fast(true)
147 }
148
149 pub fn wait_timeout(&self, timeout: Duration) -> Result<(), Error> {
150 self.wait_fast(false).or_else(|_| {
151 let ts = to_timespec(timeout);
152 self.wait_slow(&ts)
153 })
154 }
155
156 pub fn take(&self) -> Result<SemaphoreGuard, Error> {
157 try!(self.wait());
158 Ok(SemaphoreGuard {
159 sem: self,
160 })
161 }
162
163 fn value_ptr(&self) -> *mut u32 {
166 #[cfg(any(target_endian = "little",
167 target_pointer_width = "32"))]
168 const VALUE_OFFSET: isize = 0;
169 #[cfg(all(target_endian = "big",
170 target_pointer_width = "64"))]
171 const VALUE_OFFSET: isize = 1;
172
173 unsafe {
174 (self.data.as_ptr() as *mut u32).offset(VALUE_OFFSET)
175 }
176 }
177
178 fn wait_fast(&self, definitive_result: bool) -> Result<(), Error> {
180 let mut d = self.data.load(Ordering::Relaxed);
181 loop {
182 if (d & VALUE_MASK) == 0 {
184 return Err(Error::new(ErrorKind::WouldBlock, "wait would block"));
186 }
187 let prev = self.data.compare_and_swap(d, d - 1, Ordering::Acquire);
189 if prev == d {
190 return Ok(())
192 } else {
193 d = prev;
195 }
196 if definitive_result {
197 continue;
198 } else {
199 return Err(Error::new(ErrorKind::WouldBlock, "wait would block"));
200 }
201 }
202 }
203
204 fn wait_slow(&self, timeout: *const libc::timespec) -> Result<(), Error> {
205 let mut d = self.data.fetch_add(ONE_WAITER, Ordering::Relaxed);
206
207 loop {
209 if (d & VALUE_MASK) == 0 {
211 let res = futex_wait(self.value_ptr(), 0, timeout);
212
213 if let Err(e) = res {
216 if e.kind() == ErrorKind::Interrupted || e.kind() == ErrorKind::TimedOut {
217 self.data.fetch_add(NEG_ONE_WAITER, Ordering::Relaxed);
218 return Err(e);
219 }
220 }
221
222 d = self.data.load(Ordering::Relaxed);
223 } else {
224 let prev = self.data.compare_and_swap(d, (d - 1) - ONE_WAITER, Ordering::Acquire);
227 if prev == d {
228 return Ok(())
230 } else {
231 d = prev;
233 }
234 }
235 }
236 }
237 }
238
239 unsafe impl Send for Semaphore {}
240 unsafe impl Sync for Semaphore {}
241
242 impl<'a> Drop for SemaphoreGuard<'a> {
243 fn drop(&mut self) {
244 self.sem.post();
245 }
246 }
247}
248
249#[cfg(not(any(target_os = "macos",
255 target_os = "linux")))]
256mod os {
257 use std::cell::UnsafeCell;
258 use std::mem;
259 use std::io::Error;
260
261 use time::Duration;
262 use libc::{
263 self,
264 c_int,
265 c_uint,
266 };
267
268 use super::to_timespec;
269
270 #[cfg(target_pointer_width = "64")]
271 const SIZEOF_SEM_T: usize = 32;
272 #[cfg(not(target_pointer_width = "64"))]
273 const SIZEOF_SEM_T: usize = 16;
274
275 extern {
276 fn sem_init(sem: *mut sem_t, pshared: c_int, value: c_uint) -> c_int;
277 fn sem_post(sem: *mut sem_t) -> c_int;
278 fn sem_wait(sem: *mut sem_t) -> c_int;
279 fn sem_trywait(sem: *mut sem_t) -> c_int;
280 fn sem_timedwait(sem: *mut sem_t, timeout: *const libc::timespec) -> c_int;
281 fn sem_destroy(sem: *mut sem_t) -> c_int;
282 }
283 #[repr(C)]
284 #[derive(Debug)]
285 struct sem_t {
286 __opaque: [u8; SIZEOF_SEM_T],
287 }
288
289 pub struct Semaphore {
290 inner: UnsafeCell<sem_t>,
291 }
292
293 pub struct SemaphoreGuard<'a> {
294 sem: &'a Semaphore,
295 }
296
297 impl Semaphore {
298 pub fn new(value: u32) -> Semaphore {
299 let mut sem: sem_t = unsafe {
300 mem::uninitialized()
301 };
302 let res = unsafe {
303 sem_init(&mut sem, 0, value as c_uint)
304 };
305 debug_assert_eq!(res, 0);
306
307 Semaphore {
308 inner: UnsafeCell::new(sem),
309 }
310 }
311
312 pub fn wait(&self) -> Result<(), Error> {
313 let res = unsafe {
314 sem_wait(self.inner.get())
315 };
316 if res == -1 {
317 Err(Error::last_os_error())
318 } else {
319 Ok(())
320 }
321 }
322
323 pub fn try_wait(&self) -> Result<(), Error> {
324 let res = unsafe {
325 sem_trywait(self.inner.get())
326 };
327 if res == -1 {
328 Err(Error::last_os_error())
329 } else {
330 Ok(())
331 }
332 }
333
334 pub fn wait_timeout(&self, timeout: Duration) -> Result<(), Error> {
335 let res = unsafe {
336 let ts = to_timespec(timeout);
337 sem_timedwait(self.inner.get(), &ts)
338 };
339 if res == -1 {
340 Err(Error::last_os_error())
341 } else {
342 Ok(())
343 }
344 }
345
346 pub fn post(&self) {
347 let res = unsafe {
348 sem_post(self.inner.get())
349 };
350 debug_assert_eq!(res, 0);
351 }
352
353 pub fn take(&self) -> Result<SemaphoreGuard, Error> {
354 try!(self.wait());
355 Ok(SemaphoreGuard {
356 sem: self,
357 })
358 }
359 }
360
361 unsafe impl Send for Semaphore {}
362 unsafe impl Sync for Semaphore {}
363
364 impl Drop for Semaphore {
365 fn drop(&mut self) {
366 let res = unsafe {
367 sem_destroy(self.inner.get())
368 };
369 debug_assert_eq!(res, 0);
370 }
371 }
372
373 impl<'a> Drop for SemaphoreGuard<'a> {
374 fn drop(&mut self) {
375 self.sem.post();
376 }
377 }
378}
379
380#[cfg(target_os = "macos")]
386mod os {
387 use std::ffi::CString;
388 use std::cell::UnsafeCell;
389 use std::io::Error;
390
391 use rand::{
392 thread_rng,
393 Rng,
394 };
395 use libc::{
396 self,
397 c_int,
398 c_uint,
399 c_char,
400 mode_t,
401 O_CREAT,
402 O_EXCL,
403 S_IRWXU,
404 };
405 use time::Duration;
406
407 use super::to_timespec;
408
409 const SEM_NAME_MAX: usize = 28; const SEM_FAILED: *mut sem_t = 0 as *mut sem_t;
411
412 extern {
413 fn sem_open(name: *const c_char, oflag: c_int, mode: mode_t, value: c_uint) -> *mut sem_t;
414 fn sem_post(sem: *mut sem_t) -> c_int;
415 fn sem_wait(sem: *mut sem_t) -> c_int;
416 fn sem_trywait(sem: *mut sem_t) -> c_int;
417 fn sem_timedwait(sem: *mut sem_t, timeout: *const libc::timespec) -> c_int;
418 fn sem_close(sem: *mut sem_t) -> c_int;
419 fn sem_unlink(sem: *const c_char) -> c_int;
420 }
421
422 #[repr(C)]
423 #[derive(Debug)]
424 pub struct sem_t {
425 __opaque: c_int,
426 }
427
428 pub struct Semaphore {
429 inner: UnsafeCell<*mut sem_t>,
430 name: CString,
431 }
432
433 pub struct SemaphoreGuard<'a> {
434 sem: &'a Semaphore,
435 }
436
437 impl Semaphore {
438 pub fn new(value: u32) -> Semaphore {
439 let name: String = thread_rng().gen_ascii_chars().take(SEM_NAME_MAX).collect();
440 let c_name = CString::new(name).unwrap(); let sem: *mut sem_t = unsafe {
443 sem_open(c_name.as_ptr(), O_CREAT | O_EXCL, S_IRWXU, value as c_uint)
444 };
445 debug_assert!(sem != SEM_FAILED);
446
447 Semaphore {
448 inner: UnsafeCell::new(sem),
449 name: c_name,
450 }
451 }
452
453 pub fn wait(&self) -> Result<(), Error> {
454 let res = unsafe {
455 sem_wait(*self.inner.get())
456 };
457 if res == -1 {
458 Err(Error::last_os_error())
459 } else {
460 Ok(())
461 }
462 }
463
464 pub fn try_wait(&self) -> Result<(), Error> {
465 let res = unsafe {
466 sem_trywait(*self.inner.get())
467 };
468 if res == -1 {
469 Err(Error::last_os_error())
470 } else {
471 Ok(())
472 }
473 }
474
475 pub fn wait_timeout(&self, timeout: Duration) -> Result<(), Error> {
476 let res = unsafe {
477 let ts = to_timespec(timeout);
478 sem_timedwait(*self.inner.get(), &ts)
479 };
480 if res == -1 {
481 Err(Error::last_os_error())
482 } else {
483 Ok(())
484 }
485 }
486
487 pub fn post(&self) {
488 let res = unsafe {
489 sem_post(*self.inner.get())
490 };
491 debug_assert_eq!(res, 0);
492 }
493
494 pub fn take(&self) -> Result<SemaphoreGuard, Error> {
495 try!(self.wait());
496 Ok(SemaphoreGuard {
497 sem: self,
498 })
499 }
500 }
501
502 unsafe impl Send for Semaphore {}
503 unsafe impl Sync for Semaphore {}
504
505 impl Drop for Semaphore {
506 fn drop(&mut self) {
507 let res = unsafe {
508 sem_close(*self.inner.get())
509 };
510 debug_assert_eq!(res, 0);
511 let res = unsafe {
512 sem_unlink(self.name.as_ptr())
513 };
514 debug_assert_eq!(res, 0);
515 }
516 }
517
518 impl<'a> Drop for SemaphoreGuard<'a> {
519 fn drop(&mut self) {
520 self.sem.post();
521 }
522 }
523}