msr_core/sync/relay/
mod.rs

1use std::time::{Duration, Instant};
2
3use crate::sync::{Condvar, Mutex};
4
5/// Move single values between threads
6///
7/// A condition variable with a single slot that allows to pass
8/// values from producer to consumer threads. Producers and consumers
9/// may arrive at any point in time.
10///
11/// A typical scenario involves only a single producer and a single
12/// consumer thread implementing a handover protocol for passing
13/// the latest (= most recent) value between each other.
14///
15/// The value is buffered until the consumer is ready to take it.
16/// Each value can be consumed at most once. Producers can replace
17/// the current value if it has not been consumed yet.
18#[derive(Debug)]
19pub struct Relay<T> {
20    mutex: Mutex<Option<T>>,
21    condvar: Condvar,
22}
23
24impl<T> Relay<T> {
25    #[must_use]
26    #[cfg(not(loom))]
27    pub const fn new() -> Self {
28        Self {
29            mutex: Mutex::new(None),
30            condvar: Condvar::new(),
31        }
32    }
33
34    #[must_use]
35    #[cfg(loom)]
36    pub fn new() -> Self {
37        Self {
38            mutex: Mutex::new(None),
39            condvar: Condvar::new(),
40        }
41    }
42
43    #[must_use]
44    #[cfg(not(loom))]
45    pub const fn with_value(value: T) -> Self {
46        Self {
47            mutex: Mutex::new(Some(value)),
48            condvar: Condvar::new(),
49        }
50    }
51
52    #[must_use]
53    #[cfg(loom)]
54    pub fn with_value(value: T) -> Self {
55        Self {
56            mutex: Mutex::new(Some(value)),
57            condvar: Condvar::new(),
58        }
59    }
60}
61
62impl<T> Default for Relay<T> {
63    fn default() -> Self {
64        Self::new()
65    }
66}
67
68impl<T> Relay<T> {
69    /// Replace the current value and notify a single waiting consumer
70    ///
71    /// Returns the previous value or `None`. If `None` is returned
72    /// then a notification has been triggered.
73    pub fn replace_notify_one(&self, value: T) -> Option<T> {
74        let mut guard = self.mutex.lock().expect("not poisoned");
75        let replaced = guard.replace(value);
76        // Dropping the guard before notifying consumers might
77        // cause spurious wakeups. These are handled appropriately.
78        drop(guard);
79        // Only notify consumers on an edge trigger (None -> Some)
80        // and not again after subsequent placements (Some -> Some)!
81        if replaced.is_none() {
82            self.condvar.notify_one();
83        }
84        replaced
85    }
86
87    /// Replace the current value and notify all waiting consumers
88    ///
89    /// Returns the previous value or `None`. If `None` is returned
90    /// then a notification has been triggered.
91    pub fn replace_notify_all(&self, value: T) -> Option<T> {
92        let mut guard = self.mutex.lock().expect("not poisoned");
93        let replaced = guard.replace(value);
94        // Dropping the guard before notifying consumers might
95        // cause spurious wakeups. These are handled appropriately.
96        drop(guard);
97        // Only notify consumers on an edge trigger (None -> Some)
98        // and not again after subsequent placements (Some -> Some)!
99        if replaced.is_none() {
100            self.condvar.notify_all();
101        }
102        replaced
103    }
104
105    /// Take the current value immediately
106    ///
107    /// Resets the internal state on return.
108    ///
109    /// Returns the previous value or `None`.
110    pub fn take(&self) -> Option<T> {
111        let mut guard = self.mutex.lock().expect("not poisoned");
112        guard.take()
113    }
114
115    /// Wait for a value and then take it
116    ///
117    /// Resets the internal state on return.
118    ///
119    /// Returns the previous value.
120    pub fn wait(&self) -> T {
121        let mut guard = self.mutex.lock().expect("not poisoned");
122        // The loop is required to handle spurious wakeups
123        loop {
124            if let Some(value) = guard.take() {
125                return value;
126            }
127            guard = self.condvar.wait(guard).expect("not poisoned");
128        }
129    }
130
131    /// Wait for a value with a timeout and then take it
132    ///
133    /// Resets the internal state on return, i.e. either takes the value
134    /// or on timeout the internal value already was `None` and doesn't
135    /// need to be reset.
136    ///
137    /// Returns the value if available or `None` if the timeout expired.
138    pub fn wait_for(&self, timeout: Duration) -> Option<T> {
139        // Handle edge case separately
140        if timeout.is_zero() {
141            return self.take();
142        }
143        // Handling spurious timeouts in a loop would require to adjust the
144        // timeout on each turn by calculating the remaining timeout from
145        // the elapsed timeout! This is tedious, error prone, and could cause
146        // jitter when done wrong. Better delegate this task to the
147        // deadline-constrained wait function.
148        if let Some(deadline) = Instant::now().checked_add(timeout) {
149            self.wait_until(deadline)
150        } else {
151            // Wait without a deadline if the result cannot be represented
152            // by an Instant
153            Some(self.wait())
154        }
155    }
156
157    /// Wait for a value until a deadline and then take it
158    ///
159    /// Resets the internal state on return, i.e. either takes the value
160    /// or on timeout the internal value already was `None` and doesn't
161    /// need to be reset.
162    ///
163    /// Returns the value if available or `None` if the deadline expired.
164    pub fn wait_until(&self, deadline: Instant) -> Option<T> {
165        let mut guard = self.mutex.lock().expect("not poisoned");
166        // The loop is required to handle spurious wakeups
167        while guard.is_none() {
168            let now = Instant::now();
169            if now >= deadline {
170                break;
171            }
172            let timeout = deadline.duration_since(now);
173            let (replaced_guard, wait_result) = self
174                .condvar
175                .wait_timeout(guard, timeout)
176                .expect("not poisoned");
177            guard = replaced_guard;
178            if wait_result.timed_out() {
179                break;
180            }
181            // Continue on spurious wakeup
182        }
183        guard.take()
184    }
185}
186
187#[cfg(test)]
188mod tests;