use std::{
sync::Arc,
time::{Duration, Instant},
};
#[cfg(not(feature = "parking_lot"))]
mod sync_std;
#[cfg(not(feature = "parking_lot"))]
use sync_std::{Condvar, Mutex};
#[cfg(feature = "parking_lot")]
mod sync_parking_lot;
#[cfg(feature = "parking_lot")]
use sync_parking_lot::{Condvar, Mutex};
pub struct WatchSender<T> {
shared: Arc<Shared<T>>,
}
pub struct WatchReceiver<T> {
shared: Arc<Shared<T>>,
last_seen_version: u64,
}
impl<T> Clone for WatchSender<T> {
fn clone(&self) -> WatchSender<T> {
WatchSender {
shared: self.shared.clone(),
}
}
}
impl<T> Clone for WatchReceiver<T> {
fn clone(&self) -> WatchReceiver<T> {
WatchReceiver {
shared: self.shared.clone(),
last_seen_version: self.last_seen_version,
}
}
}
struct Shared<T> {
lock: Mutex<SharedValue<T>>,
on_update: Condvar,
}
struct SharedValue<T> {
value: T,
version: u64,
}
pub fn channel<T: Clone>(value: T) -> (WatchSender<T>, WatchReceiver<T>) {
let shared = Arc::new(Shared {
lock: Mutex::new(SharedValue { value, version: 1 }),
on_update: Condvar::new(),
});
(
WatchSender {
shared: shared.clone(),
},
WatchReceiver {
shared,
last_seen_version: 0,
},
)
}
impl<T> WatchSender<T> {
pub fn send(&self, mut value: T) {
{
let mut lock = self.shared.lock.lock();
std::mem::swap(&mut lock.value, &mut value);
lock.version = lock.version.wrapping_add(1);
}
self.shared.on_update.notify_all();
drop(value);
}
pub fn update<F>(&self, f: F)
where
F: FnOnce(&mut T),
{
{
let mut lock = self.shared.lock.lock();
f(&mut lock.value);
lock.version = lock.version.wrapping_add(1);
}
self.shared.on_update.notify_all();
}
pub fn subscribe(&self) -> WatchReceiver<T> {
let version = {
let lock = self.shared.lock.lock();
lock.version
};
WatchReceiver {
shared: self.shared.clone(),
last_seen_version: version,
}
}
}
impl<T: Clone> WatchReceiver<T> {
pub fn get(&mut self) -> T {
let lock = self.shared.lock.lock();
self.last_seen_version = lock.version;
lock.value.clone()
}
pub fn get_if_new(&mut self) -> Option<T> {
let lock = self.shared.lock.lock();
if self.last_seen_version == lock.version {
return None;
}
self.last_seen_version = lock.version;
Some(lock.value.clone())
}
pub fn wait(&mut self) -> T {
let mut lock = self.shared.lock.lock();
while lock.version == self.last_seen_version {
lock = self.shared.on_update.wait(lock);
}
self.last_seen_version = lock.version;
lock.value.clone()
}
pub fn wait_timeout(&mut self, duration: Duration) -> Option<T> {
let mut lock = self.shared.lock.lock();
let deadline = Instant::now() + duration;
while lock.version == self.last_seen_version {
let timeout = deadline.saturating_duration_since(Instant::now());
lock = self.shared.on_update.wait_timeout(lock, timeout)?;
if timeout.is_zero() && lock.version == self.last_seen_version {
return None;
}
}
self.last_seen_version = lock.version;
Some(lock.value.clone())
}
}
impl<T> WatchReceiver<T> {
pub fn new_sender(&self) -> WatchSender<T> {
WatchSender {
shared: self.shared.clone(),
}
}
}