1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
#![doc = include_str!("../README.md")]
#![no_std]
#![allow(clippy::result_unit_err)]
#[cfg(not(unix))]
core::compile_error!("Only supported on POSIX.");
/// Unnamed semaphores.
pub mod unnamed {
use core::{
cell::UnsafeCell,
fmt::{self, Debug, Display, Formatter},
marker::PhantomPinned,
mem::MaybeUninit,
pin::Pin,
ptr,
sync::atomic::{
AtomicU8,
Ordering::{Acquire, Relaxed, Release},
},
};
/// An "unnamed" [`sem_t`](
/// https://pubs.opengroup.org/onlinepubs/9699919799/basedefs/semaphore.h.html) that can only
/// be used safely.
///
/// This must remain pinned for and after [`Self::init_with()`], because it's not clear if
/// moving a `sem_t` value is permitted after it's been initialized with `sem_init()`. Using
/// this as a `static` item (not as `mut`able) is a common way to achieve that (via
/// [`Pin::static_ref`]). Or, [`pin!`](core::pin::pin) can also work.
#[derive(Debug)]
pub struct Semaphore {
inner: MaybeUninit<UnsafeCell<libc::sem_t>>,
state: AtomicU8,
_pin: PhantomPinned,
}
/// SAFETY: The POSIX Semaphores API intends for `sem_t` to be shared between threads and its
/// operations are thread-safe (similar to atomic types). Therefore we can expose this in
/// Rust as having "interior mutability".
unsafe impl Sync for Semaphore {}
// Note: `Send` isn't impl'ed, because it's not clear if moving a `sem_t` value is permitted
// after it's been initialized with `sem_init`.
impl Semaphore {
// These values are decided only internally.
const UNINITIALIZED: u8 = 0;
const PREPARING: u8 = 1;
const READY: u8 = 2;
// These values are decided by the `sem_init` documentation.
const SINGLE_PROCESS_PRIVATE: libc::c_int = 0;
const MULTI_PROCESS_SHARED: libc::c_int = 1;
/// Create an uninitialized `sem_t`.
///
/// The only operations that can be done with a new instance are to
/// [initialize](Self::init) it (which first requires pinning it) or drop it.
#[must_use]
#[inline]
pub const fn new() -> Self {
Self {
inner: MaybeUninit::uninit(),
state: AtomicU8::new(Self::UNINITIALIZED),
_pin: PhantomPinned,
}
}
/// Like [`Self::init_with`] but uses `is_shared = false` and `sem_count = 0`.
///
/// This is a common use case to have a `sem_t` that is private to a single process
/// (i.e. not shareable between multiple) and that starts with a "resource count" of zero
/// so that initial waiting on it blocks waiter threads until a post indicates to wake.
///
/// # Errors
/// Same as [`Self::init_with`].
#[inline]
pub fn init(self: Pin<&Self>) -> Result<SemaphoreRef<'_>, bool> {
self.init_with(false, 0)
}
/// Do [`sem_init()`](
/// https://pubs.opengroup.org/onlinepubs/9699919799/functions/sem_init.html)
/// on an underlying `sem_t`, and return a [`SemaphoreRef`] to it.
///
/// Usually this should only be called once. But this guards against multiple calls on
/// the same instance (perhaps by multiple threads), to ensure the initialization is only
/// done once.
///
/// # Errors
/// Returns `Err(true)` if the initialization was already successfully done, or is being
/// done, by another call (perhaps by another thread). Returns `Err(false)` if the call
/// tried to do the initialization but there was an error with that.
#[allow(
clippy::missing_inline_in_public_items,
clippy::unwrap_in_result,
clippy::missing_panics_doc
)]
pub fn init_with(
self: Pin<&Self>,
is_shared: bool,
sem_count: libc::c_uint,
) -> Result<SemaphoreRef<'_>, bool> {
// Since our crate is `no_std`, `Once` or `OnceLock` are not available in only the
// `core` lib, so we do our own once-ness with an atomic.
match self.state.compare_exchange(
Self::UNINITIALIZED,
Self::PREPARING,
Relaxed,
Relaxed,
) {
Ok(_) => {
// This call is the first, so it does the initialization.
let sem: *mut libc::sem_t =
UnsafeCell::raw_get(MaybeUninit::as_ptr(&self.inner));
// SAFETY: The arguments are valid.
let r = unsafe {
libc::sem_init(
sem,
if is_shared {
Self::MULTI_PROCESS_SHARED
} else {
Self::SINGLE_PROCESS_PRIVATE
},
sem_count,
)
};
if r == 0 {
// Do `Release` to ensure that the memory writes that `sem_init()` did
// will be properly visible to other threads that do `Self::sem_ref`.
self.state.store(Self::READY, Release);
#[allow(clippy::expect_used)]
Ok(self.sem_ref().expect("the `Semaphore` should be ready"))
} else {
Err(false)
}
}
Err(_) => Err(true),
}
}
/// Get a [`SemaphoreRef`] to `self`, so that semaphore operations can be done on `self`.
///
/// # Errors
/// If `self` was not previously initialized.
#[allow(clippy::missing_inline_in_public_items)]
pub fn sem_ref(self: Pin<&Self>) -> Result<SemaphoreRef<'_>, ()> {
// Do `Acquire` to ensure that the memory writes that `sem_init()` did (in
// `Self::init`) from another thread will be properly visible in our thread.
if Self::READY == self.state.load(Acquire) {
fn project_inner(it: &Semaphore) -> &UnsafeCell<libc::sem_t> {
let sem = &it.inner;
// SAFETY: `sem` is ready, so it was initialized correctly and successfully.
unsafe { MaybeUninit::assume_init_ref(sem) }
}
// SAFETY: The `.inner` field is pinned when `self` is.
let sem = unsafe { Pin::map_unchecked(self, project_inner) };
Ok(SemaphoreRef(sem))
} else {
Err(())
}
}
/// Return a value that displays `self`.
///
/// Shows the current count value only if the semaphore has been initialized.
#[must_use]
#[inline]
pub fn display(self: Pin<&Self>) -> impl Display + '_ {
struct Wrap<'l>(Pin<&'l Semaphore>);
impl Display for Wrap<'_> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self.0.sem_ref() {
Ok(sem) => Display::fmt(&sem, f),
Err(()) => write!(f, "<Semaphore>"),
}
}
}
Wrap(self)
}
}
impl Default for Semaphore {
#[inline]
fn default() -> Self {
Self::new()
}
}
impl Drop for Semaphore {
#[inline]
fn drop(&mut self) {
fn pinned_drop(this: Pin<&mut Semaphore>) {
if let Ok(sem) = this.into_ref().sem_ref() {
// `self` was `sem_init`ed, so it should be `sem_destroy`ed.
sem.destroy();
}
}
// SAFETY: Okay because we know this value is never used again after being dropped.
pinned_drop(unsafe { Pin::new_unchecked(self) });
}
}
/// Like a `sem_t *` to a `sem_t` that is known to be initialized and so valid to do
/// operations on.
#[derive(Copy, Clone)]
pub struct SemaphoreRef<'l>(Pin<&'l UnsafeCell<libc::sem_t>>);
/// SAFETY: The POSIX Semaphores API intends for `sem_t *` to be shared between threads and its
/// operations are thread-safe (similar to atomic types). Therefore we can expose this in
/// Rust as having "interior mutability".
unsafe impl Sync for SemaphoreRef<'_> {}
/// SAFETY: Ditto.
unsafe impl Send for SemaphoreRef<'_> {}
impl SemaphoreRef<'_> {
/// Like [`sem_post`](
/// https://pubs.opengroup.org/onlinepubs/9699919799/functions/sem_post.html), and
/// async-signal-safe like that.
///
/// It is safe for this to be called from a signal handler. That is a primary use-case
/// for POSIX Semaphores versus other better synchronization APIs (which shouldn't be used
/// in signal handlers).
///
/// # Errors
/// If `sem_post()` does. `errno` is set to indicate the error. Its `EINVAL` case should
/// be impossible.
#[inline]
pub fn post(&self) -> Result<(), ()> {
// SAFETY: The argument is valid, because the `Semaphore` was initialized.
let r = unsafe { libc::sem_post(self.0.get()) };
if r == 0 {
Ok(())
} else {
Err(()) // Most likely: EOVERFLOW (max value for a `sem_t` would be exceeded).
}
}
/// Like [`sem_wait`](
/// https://pubs.opengroup.org/onlinepubs/9699919799/functions/sem_wait.html).
///
/// # Errors
/// If `sem_wait()` does. `errno` is set to indicate the error. Its `EINVAL` case should
/// be impossible.
#[inline]
pub fn wait(&self) -> Result<(), ()> {
// SAFETY: The argument is valid, because the `Semaphore` was initialized.
let r = unsafe { libc::sem_wait(self.0.get()) };
if r == 0 {
Ok(())
} else {
Err(()) // Most likely: EINTR (a signal interrupted this function).
}
}
/// Like [`sem_trywait`](
/// https://pubs.opengroup.org/onlinepubs/9699919799/functions/sem_trywait.html).
///
/// # Errors
/// If `sem_trywait()` does. `errno` is set to indicate the error. Its `EINVAL` case
/// should be impossible.
#[inline]
pub fn try_wait(&self) -> Result<(), ()> {
// SAFETY: The argument is valid, because the `Semaphore` was initialized.
let r = unsafe { libc::sem_trywait(self.0.get()) };
if r == 0 {
Ok(())
} else {
Err(()) // Most likely: EAGAIN (would block), or EINTR
}
}
// TODO: `Self::timedwait` that uses `sem_timedwait`.
/// Like [`sem_getvalue`](
/// https://pubs.opengroup.org/onlinepubs/9699919799/functions/sem_getvalue.html).
#[must_use]
#[inline]
pub fn get_value(&self) -> libc::c_int {
let mut sval = libc::c_int::MIN;
// SAFETY: The arguments are valid, because the `Semaphore` was initialized.
let r = unsafe { libc::sem_getvalue(self.0.get(), &mut sval) };
debug_assert_eq!(r, 0, "the `sem_t` should be valid");
sval
}
/// Like [`sem_destroy`](
/// https://pubs.opengroup.org/onlinepubs/9699919799/functions/sem_destroy.html).
/// Not public. Only used when dropping `Semaphore`.
fn destroy(&self) {
// SAFETY: The argument is valid, because the `Semaphore` was initialized.
let r = unsafe { libc::sem_destroy(self.0.get()) };
debug_assert_eq!(r, 0, "the `sem_t` should be valid with no waiters");
}
}
/// Compare by `sem_t *` pointer equality.
impl PartialEq for SemaphoreRef<'_> {
#[inline]
fn eq(&self, other: &Self) -> bool {
ptr::eq(self.0.get(), other.0.get())
}
}
impl Eq for SemaphoreRef<'_> {}
/// Shows the `sem_t *` pointer.
impl Debug for SemaphoreRef<'_> {
#[inline]
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_tuple("SemaphoreRef").field(&self.0.get()).finish()
}
}
/// Human-readable representation that shows the semaphore's current count value.
impl Display for SemaphoreRef<'_> {
#[inline]
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "<Semaphore value:{}>", self.get_value())
}
}
}