msr_core/sync/relay/mod.rs
1use std::time::{Duration, Instant};
2
3use crate::sync::{Condvar, Mutex};
4
5/// Move single values between threads
6///
7/// A condition variable with a single slot that allows to pass
8/// values from producer to consumer threads. Producers and consumers
9/// may arrive at any point in time.
10///
11/// A typical scenario involves only a single producer and a single
12/// consumer thread implementing a handover protocol for passing
13/// the latest (= most recent) value between each other.
14///
15/// The value is buffered until the consumer is ready to take it.
16/// Each value can be consumed at most once. Producers can replace
17/// the current value if it has not been consumed yet.
18#[derive(Debug)]
19pub struct Relay<T> {
20 mutex: Mutex<Option<T>>,
21 condvar: Condvar,
22}
23
24impl<T> Relay<T> {
25 #[must_use]
26 #[cfg(not(loom))]
27 pub const fn new() -> Self {
28 Self {
29 mutex: Mutex::new(None),
30 condvar: Condvar::new(),
31 }
32 }
33
34 #[must_use]
35 #[cfg(loom)]
36 pub fn new() -> Self {
37 Self {
38 mutex: Mutex::new(None),
39 condvar: Condvar::new(),
40 }
41 }
42
43 #[must_use]
44 #[cfg(not(loom))]
45 pub const fn with_value(value: T) -> Self {
46 Self {
47 mutex: Mutex::new(Some(value)),
48 condvar: Condvar::new(),
49 }
50 }
51
52 #[must_use]
53 #[cfg(loom)]
54 pub fn with_value(value: T) -> Self {
55 Self {
56 mutex: Mutex::new(Some(value)),
57 condvar: Condvar::new(),
58 }
59 }
60}
61
62impl<T> Default for Relay<T> {
63 fn default() -> Self {
64 Self::new()
65 }
66}
67
68impl<T> Relay<T> {
69 /// Replace the current value and notify a single waiting consumer
70 ///
71 /// Returns the previous value or `None`. If `None` is returned
72 /// then a notification has been triggered.
73 pub fn replace_notify_one(&self, value: T) -> Option<T> {
74 let mut guard = self.mutex.lock().expect("not poisoned");
75 let replaced = guard.replace(value);
76 // Dropping the guard before notifying consumers might
77 // cause spurious wakeups. These are handled appropriately.
78 drop(guard);
79 // Only notify consumers on an edge trigger (None -> Some)
80 // and not again after subsequent placements (Some -> Some)!
81 if replaced.is_none() {
82 self.condvar.notify_one();
83 }
84 replaced
85 }
86
87 /// Replace the current value and notify all waiting consumers
88 ///
89 /// Returns the previous value or `None`. If `None` is returned
90 /// then a notification has been triggered.
91 pub fn replace_notify_all(&self, value: T) -> Option<T> {
92 let mut guard = self.mutex.lock().expect("not poisoned");
93 let replaced = guard.replace(value);
94 // Dropping the guard before notifying consumers might
95 // cause spurious wakeups. These are handled appropriately.
96 drop(guard);
97 // Only notify consumers on an edge trigger (None -> Some)
98 // and not again after subsequent placements (Some -> Some)!
99 if replaced.is_none() {
100 self.condvar.notify_all();
101 }
102 replaced
103 }
104
105 /// Take the current value immediately
106 ///
107 /// Resets the internal state on return.
108 ///
109 /// Returns the previous value or `None`.
110 pub fn take(&self) -> Option<T> {
111 let mut guard = self.mutex.lock().expect("not poisoned");
112 guard.take()
113 }
114
115 /// Wait for a value and then take it
116 ///
117 /// Resets the internal state on return.
118 ///
119 /// Returns the previous value.
120 pub fn wait(&self) -> T {
121 let mut guard = self.mutex.lock().expect("not poisoned");
122 // The loop is required to handle spurious wakeups
123 loop {
124 if let Some(value) = guard.take() {
125 return value;
126 }
127 guard = self.condvar.wait(guard).expect("not poisoned");
128 }
129 }
130
131 /// Wait for a value with a timeout and then take it
132 ///
133 /// Resets the internal state on return, i.e. either takes the value
134 /// or on timeout the internal value already was `None` and doesn't
135 /// need to be reset.
136 ///
137 /// Returns the value if available or `None` if the timeout expired.
138 pub fn wait_for(&self, timeout: Duration) -> Option<T> {
139 // Handle edge case separately
140 if timeout.is_zero() {
141 return self.take();
142 }
143 // Handling spurious timeouts in a loop would require to adjust the
144 // timeout on each turn by calculating the remaining timeout from
145 // the elapsed timeout! This is tedious, error prone, and could cause
146 // jitter when done wrong. Better delegate this task to the
147 // deadline-constrained wait function.
148 if let Some(deadline) = Instant::now().checked_add(timeout) {
149 self.wait_until(deadline)
150 } else {
151 // Wait without a deadline if the result cannot be represented
152 // by an Instant
153 Some(self.wait())
154 }
155 }
156
157 /// Wait for a value until a deadline and then take it
158 ///
159 /// Resets the internal state on return, i.e. either takes the value
160 /// or on timeout the internal value already was `None` and doesn't
161 /// need to be reset.
162 ///
163 /// Returns the value if available or `None` if the deadline expired.
164 pub fn wait_until(&self, deadline: Instant) -> Option<T> {
165 let mut guard = self.mutex.lock().expect("not poisoned");
166 // The loop is required to handle spurious wakeups
167 while guard.is_none() {
168 let now = Instant::now();
169 if now >= deadline {
170 break;
171 }
172 let timeout = deadline.duration_since(now);
173 let (replaced_guard, wait_result) = self
174 .condvar
175 .wait_timeout(guard, timeout)
176 .expect("not poisoned");
177 guard = replaced_guard;
178 if wait_result.timed_out() {
179 break;
180 }
181 // Continue on spurious wakeup
182 }
183 guard.take()
184 }
185}
186
187#[cfg(test)]
188mod tests;