std_semaphore/
lib.rs

1// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
2// file at the top-level directory of this distribution and at
3// http://rust-lang.org/COPYRIGHT.
4//
5// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8// option. This file may not be copied, modified, or distributed
9// except according to those terms.
10
11use std::ops::Drop;
12use std::sync::{Condvar, Mutex};
13
14/// A counting, blocking, semaphore.
15///
16/// Semaphores are a form of atomic counter where access is only granted if the
17/// counter is a positive value. Each acquisition will block the calling thread
18/// until the counter is positive, and each release will increment the counter
19/// and unblock any threads if necessary.
20///
21/// # Examples
22///
23/// ```
24/// use std_semaphore::Semaphore;
25///
26/// // Create a semaphore that represents 5 resources
27/// let sem = Semaphore::new(5);
28///
29/// // Acquire one of the resources
30/// sem.acquire();
31///
32/// // Acquire one of the resources for a limited period of time
33/// {
34///     let _guard = sem.access();
35///     // ...
36/// } // resources is released here
37///
38/// // Release our initially acquired resource
39/// sem.release();
40/// ```
41pub struct Semaphore {
42    lock: Mutex<isize>,
43    cvar: Condvar,
44}
45
46/// An RAII guard which will release a resource acquired from a semaphore when
47/// dropped.
48pub struct SemaphoreGuard<'a> {
49    sem: &'a Semaphore,
50}
51
52impl Semaphore {
53    /// Creates a new semaphore with the initial count specified.
54    ///
55    /// The count specified can be thought of as a number of resources, and a
56    /// call to `acquire` or `access` will block until at least one resource is
57    /// available. It is valid to initialize a semaphore with a negative count.
58    pub fn new(count: isize) -> Semaphore {
59        Semaphore {
60            lock: Mutex::new(count),
61            cvar: Condvar::new(),
62        }
63    }
64
65    /// Acquires a resource of this semaphore, blocking the current thread until
66    /// it can do so.
67    ///
68    /// This method will block until the internal count of the semaphore is at
69    /// least 1.
70    pub fn acquire(&self) {
71        let mut count = self.lock.lock().unwrap();
72        while *count <= 0 {
73            count = self.cvar.wait(count).unwrap();
74        }
75        *count -= 1;
76    }
77
78    /// Release a resource from this semaphore.
79    ///
80    /// This will increment the number of resources in this semaphore by 1 and
81    /// will notify any pending waiters in `acquire` or `access` if necessary.
82    pub fn release(&self) {
83        *self.lock.lock().unwrap() += 1;
84        self.cvar.notify_one();
85    }
86
87    /// Acquires a resource of this semaphore, returning an RAII guard to
88    /// release the semaphore when dropped.
89    ///
90    /// This function is semantically equivalent to an `acquire` followed by a
91    /// `release` when the guard returned is dropped.
92    pub fn access(&self) -> SemaphoreGuard {
93        self.acquire();
94        SemaphoreGuard { sem: self }
95    }
96}
97
98impl<'a> Drop for SemaphoreGuard<'a> {
99    fn drop(&mut self) {
100        self.sem.release();
101    }
102}
103
104#[cfg(test)]
105mod tests {
106    use std::prelude::v1::*;
107
108    use std::sync::Arc;
109    use super::Semaphore;
110    use std::sync::mpsc::channel;
111    use std::thread;
112
113    #[test]
114    fn test_sem_acquire_release() {
115        let s = Semaphore::new(1);
116        s.acquire();
117        s.release();
118        s.acquire();
119    }
120
121    #[test]
122    fn test_sem_basic() {
123        let s = Semaphore::new(1);
124        let _g = s.access();
125    }
126
127    #[test]
128    fn test_sem_as_mutex() {
129        let s = Arc::new(Semaphore::new(1));
130        let s2 = s.clone();
131        let _t = thread::spawn(move || {
132            let _g = s2.access();
133        });
134        let _g = s.access();
135    }
136
137    #[test]
138    fn test_sem_as_cvar() {
139        // Child waits and parent signals
140        let (tx, rx) = channel();
141        let s = Arc::new(Semaphore::new(0));
142        let s2 = s.clone();
143        let _t = thread::spawn(move || {
144            s2.acquire();
145            tx.send(()).unwrap();
146        });
147        s.release();
148        let _ = rx.recv();
149
150        // Parent waits and child signals
151        let (tx, rx) = channel();
152        let s = Arc::new(Semaphore::new(0));
153        let s2 = s.clone();
154        let _t = thread::spawn(move || {
155            s2.release();
156            let _ = rx.recv();
157        });
158        s.acquire();
159        tx.send(()).unwrap();
160    }
161
162    #[test]
163    fn test_sem_multi_resource() {
164        // Parent and child both get in the critical section at the same
165        // time, and shake hands.
166        let s = Arc::new(Semaphore::new(2));
167        let s2 = s.clone();
168        let (tx1, rx1) = channel();
169        let (tx2, rx2) = channel();
170        let _t = thread::spawn(move || {
171            let _g = s2.access();
172            let _ = rx2.recv();
173            tx1.send(()).unwrap();
174        });
175        let _g = s.access();
176        tx2.send(()).unwrap();
177        rx1.recv().unwrap();
178    }
179
180    #[test]
181    fn test_sem_runtime_friendly_blocking() {
182        let s = Arc::new(Semaphore::new(1));
183        let s2 = s.clone();
184        let (tx, rx) = channel();
185        {
186            let _g = s.access();
187            thread::spawn(move || {
188                tx.send(()).unwrap();
189                drop(s2.access());
190                tx.send(()).unwrap();
191            });
192            rx.recv().unwrap(); // wait for child to come alive
193        }
194        rx.recv().unwrap(); // wait for child to be done
195    }
196}