use crate::RuntimeError;
use std::fmt;
use std::ops::Deref;
use tokio::sync::watch as tokio_watch;
pub fn watch<T>(initial: T) -> (WatchSender<T>, WatchReceiver<T>) {
let (tx, rx) = tokio_watch::channel(initial);
(WatchSender { inner: tx }, WatchReceiver { inner: rx })
}
pub struct WatchSender<T> {
inner: tokio_watch::Sender<T>,
}
impl<T> WatchSender<T> {
pub fn send(&self, value: T) -> Result<(), RuntimeError> {
self.inner
.send(value)
.map_err(|_| RuntimeError::ChannelClosed)
}
pub fn send_modify<F: FnOnce(&mut T)>(&self, modify: F) {
self.inner.send_modify(modify);
}
pub fn borrow(&self) -> WatchRef<'_, T> {
WatchRef {
inner: self.inner.borrow(),
}
}
}
impl<T> Clone for WatchSender<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<T: fmt::Debug> fmt::Debug for WatchSender<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("WatchSender").finish()
}
}
pub struct WatchReceiver<T> {
inner: tokio_watch::Receiver<T>,
}
impl<T> WatchReceiver<T> {
pub fn borrow(&self) -> WatchRef<'_, T> {
WatchRef {
inner: self.inner.borrow(),
}
}
pub fn borrow_and_update(&mut self) -> WatchRef<'_, T> {
WatchRef {
inner: self.inner.borrow_and_update(),
}
}
pub async fn changed(&mut self) -> Result<(), RuntimeError> {
self.inner
.changed()
.await
.map_err(|_| RuntimeError::ChannelClosed)
}
pub fn has_changed(&self) -> bool {
self.inner.has_changed().unwrap_or(false)
}
}
impl<T> Clone for WatchReceiver<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<T: fmt::Debug> fmt::Debug for WatchReceiver<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("WatchReceiver").finish()
}
}
pub struct WatchRef<'a, T> {
inner: tokio_watch::Ref<'a, T>,
}
impl<T> Deref for WatchRef<'_, T> {
type Target = T;
fn deref(&self) -> &T {
&self.inner
}
}
impl<T: fmt::Debug> fmt::Debug for WatchRef<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.inner.fmt(f)
}
}