1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
use std::time::{Duration, Instant};
use crate::sync::{const_mutex, Condvar, Mutex};
/// Move single values between threads
///
/// A condition variable with a single slot that allows to pass
/// values from producer to consumer threads. Producers and consumers
/// may arrive at any point in time.
///
/// A typical scenario involves only a single producer and a single
/// consumer thread implementing a handover protocol for passing
/// the latest (= most recent) value between each other.
///
/// The value is buffered until the consumer is ready to take it.
/// Each value can be consumed at most once. Producers can replace
/// the current value if it has not been consumed yet.
#[derive(Debug)]
pub struct Relay<T> {
mutex: Mutex<Option<T>>,
condvar: Condvar,
}
impl<T> Relay<T> {
#[must_use]
pub const fn new() -> Self {
Self {
mutex: const_mutex(None),
condvar: Condvar::new(),
}
}
pub const fn with_value(value: T) -> Self {
Self {
mutex: const_mutex(Some(value)),
condvar: Condvar::new(),
}
}
}
impl<T> Default for Relay<T> {
fn default() -> Self {
Self::new()
}
}
impl<T> Relay<T> {
/// Replace the current value and notify a single waiting consumer
///
/// Returns the previous value or `None`. If `None` is returned
/// then a notification has been triggered.
pub fn replace_notify_one(&self, value: T) -> Option<T> {
let mut guard = self.mutex.lock();
let replaced = guard.replace(value);
// Dropping the guard before notifying consumers might
// cause spurious wakeups. These are handled appropriately.
drop(guard);
// Only notify consumers on an edge trigger (None -> Some)
// and not again after subsequent placements (Some -> Some)!
if replaced.is_none() {
self.condvar.notify_one();
}
replaced
}
/// Replace the current value and notify all waiting consumers
///
/// Returns the previous value or `None`. If `None` is returned
/// then a notification has been triggered.
pub fn replace_notify_all(&self, value: T) -> Option<T> {
let mut guard = self.mutex.lock();
let replaced = guard.replace(value);
// Dropping the guard before notifying consumers might
// cause spurious wakeups. These are handled appropriately.
drop(guard);
// Only notify consumers on an edge trigger (None -> Some)
// and not again after subsequent placements (Some -> Some)!
if replaced.is_none() {
self.condvar.notify_all();
}
replaced
}
/// Take the current value immediately
///
/// Resets the internal state on return.
///
/// Returns the previous value or `None`.
pub fn take(&self) -> Option<T> {
let mut guard = self.mutex.lock();
guard.take()
}
/// Wait for a value and then take it
///
/// Resets the internal state on return.
///
/// Returns the previous value.
pub fn wait(&self) -> T {
let mut guard = self.mutex.lock();
// The loop is required to handle spurious wakeups
loop {
if let Some(value) = guard.take() {
return value;
}
self.condvar.wait(&mut guard);
}
}
/// Wait for a value with a timeout and then take it
///
/// Resets the internal state on return, i.e. either takes the value
/// or on timeout the internal value already was `None` and doesn't
/// need to be reset.
///
/// Returns the value if available or `None` if the timeout expired.
pub fn wait_for(&self, timeout: Duration) -> Option<T> {
// Handle edge case separately
if timeout.is_zero() {
return self.take();
}
// Handling spurious timeouts in a loop would require to adjust the
// timeout on each turn by calculating the remaining timeout from
// the elapsed timeout! This is tedious, error prone, and could cause
// jitter when done wrong. Better delegate this task to the
// deadline-constrained wait function.
if let Some(deadline) = Instant::now().checked_add(timeout) {
self.wait_until(deadline)
} else {
// Wait without a deadline if the result cannot be represented
// by an Instant
Some(self.wait())
}
}
/// Wait for a value until a deadline and then take it
///
/// Resets the internal state on return, i.e. either takes the value
/// or on timeout the internal value already was `None` and doesn't
/// need to be reset.
///
/// Returns the value if available or `None` if the deadline expired.
pub fn wait_until(&self, deadline: Instant) -> Option<T> {
let mut guard = self.mutex.lock();
// The loop is required to handle spurious wakeups
while guard.is_none() && !self.condvar.wait_until(&mut guard, deadline).timed_out() {
continue; // Continue on spurious wakeup
}
guard.take()
}
}
#[cfg(test)]
mod tests;