1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
//! Stream-readiness latch used by backends with dedicated worker threads.
//!
//! Prevents worker threads from invoking user callbacks before the `Stream` handle has been
//! returned to the caller.
use std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc, Weak,
},
thread::Thread,
};
/// Signals worker threads that the stream handle has been given to the caller.
#[derive(Debug)]
pub(crate) struct Latch {
/// `Option` so `Drop` can move it out before unparking, closing the window where a thread
/// could wake, see the Arc still alive (flag=false), re-park, then be orphaned.
flag: Option<Arc<AtomicBool>>,
threads: Vec<Thread>,
}
/// Held by a worker thread. Parks until the matching [`Latch`] is released.
#[derive(Clone, Debug)]
pub(crate) struct LatchWaiter(Weak<AtomicBool>);
impl Latch {
/// Creates a new stream-readiness latch.
pub(crate) fn new() -> Self {
Self {
flag: Some(Arc::new(AtomicBool::new(false))),
threads: Vec::new(),
}
}
/// Returns a waiter that unblocks when this latch is released.
pub(crate) fn waiter(&self) -> LatchWaiter {
LatchWaiter(Arc::downgrade(
self.flag
.as_ref()
.expect("waiter called on a dropped Latch"),
))
}
/// Registers a thread to be unparked when [`release`](Self::release) is called.
#[cfg(any(
target_os = "macos",
target_os = "windows",
target_os = "linux",
target_os = "dragonfly",
target_os = "freebsd",
target_os = "netbsd",
))]
pub(crate) fn add_thread(&mut self, thread: Thread) {
self.threads.push(thread);
}
/// Releases the latch and unparks all registered threads.
pub(crate) fn release(&self) {
if let Some(flag) = &self.flag {
flag.store(true, Ordering::Release);
}
for t in &self.threads {
t.unpark();
}
}
}
impl Drop for Latch {
fn drop(&mut self) {
// Invalidate the Arc *before* unparking so waiters see upgrade() == None and exit cleanly
// on the error path (latch dropped without being released).
drop(self.flag.take());
for t in &self.threads {
t.unpark();
}
}
}
impl LatchWaiter {
/// Parks the calling thread until the latch is released or dropped without releasing.
///
/// Returns `true` if the stream is ready, `false` if the [`Latch`] was dropped before release.
#[cfg(any(
target_os = "macos",
target_os = "windows",
target_os = "linux",
target_os = "dragonfly",
target_os = "freebsd",
target_os = "netbsd",
))]
pub(crate) fn wait(&self) -> bool {
loop {
match self.0.upgrade() {
None => return false,
Some(flag) if flag.load(Ordering::Acquire) => return true,
Some(flag) => {
drop(flag); // release strong ref before parking
std::thread::park();
}
}
}
}
/// Returns `true` if the latch has already been released.
#[cfg(all(target_vendor = "apple", not(target_os = "macos")))]
pub(crate) fn is_released(&self) -> bool {
self.0.upgrade().is_some_and(|f| f.load(Ordering::Acquire))
}
}