use std::ops::{Deref, DerefMut};
use void::{ResultVoidExt as _, Void};
pub trait PostageWatchSenderExt<T> {
fn try_maybe_send<F, E>(&mut self, update: F) -> Result<(), E>
where
T: PartialEq,
F: FnOnce(&T) -> Result<T, E>;
fn maybe_send<F>(&mut self, update: F)
where
T: PartialEq,
F: FnOnce(&T) -> T,
{
self.try_maybe_send(|t| Ok::<_, Void>(update(t)))
.void_unwrap();
}
}
impl<T> PostageWatchSenderExt<T> for postage::watch::Sender<T> {
fn try_maybe_send<F, E>(&mut self, update: F) -> Result<(), E>
where
T: PartialEq,
F: FnOnce(&T) -> Result<T, E>,
{
let lock = self.borrow();
let new = update(&*lock)?;
if new != *lock {
drop(lock);
*self.borrow_mut() = new;
}
Ok(())
}
}
#[derive(Debug)]
pub struct DropNotifyWatchSender<T: DropNotifyEofSignallable>(Option<postage::watch::Sender<T>>);
pub trait DropNotifyEofSignallable {
fn eof() -> Self;
#[deprecated]
fn is_eof(&self) -> bool {
false
}
}
impl<T> DropNotifyEofSignallable for Option<T> {
fn eof() -> Self {
None
}
fn is_eof(&self) -> bool {
self.is_none()
}
}
impl<T: DropNotifyEofSignallable> DropNotifyWatchSender<T> {
pub fn new(inner: postage::watch::Sender<T>) -> Self {
DropNotifyWatchSender(Some(inner))
}
pub fn into_inner(mut self) -> postage::watch::Sender<T> {
self.0.take().expect("inner was None")
}
}
impl<T: DropNotifyEofSignallable> Deref for DropNotifyWatchSender<T> {
type Target = postage::watch::Sender<T>;
fn deref(&self) -> &Self::Target {
self.0.as_ref().expect("inner was None")
}
}
impl<T: DropNotifyEofSignallable> DerefMut for DropNotifyWatchSender<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.0.as_mut().expect("inner was None")
}
}
impl<T: DropNotifyEofSignallable> Drop for DropNotifyWatchSender<T> {
fn drop(&mut self) {
if let Some(mut inner) = self.0.take() {
*inner.borrow_mut() = DropNotifyEofSignallable::eof();
}
}
}
#[cfg(test)]
mod test {
#![allow(clippy::bool_assert_comparison)]
#![allow(clippy::clone_on_copy)]
#![allow(clippy::dbg_macro)]
#![allow(clippy::mixed_attributes_style)]
#![allow(clippy::print_stderr)]
#![allow(clippy::print_stdout)]
#![allow(clippy::single_char_pattern)]
#![allow(clippy::unwrap_used)]
#![allow(clippy::unchecked_time_subtraction)]
#![allow(clippy::useless_vec)]
#![allow(clippy::needless_pass_by_value)]
use super::*;
use futures::select_biased;
use futures_await_test::async_test;
#[async_test]
async fn postage_sender_ext() {
use futures::FutureExt;
use futures::stream::StreamExt;
let (mut s, mut r) = postage::watch::channel_with(20);
select_biased! {
i = r.next().fuse() => assert_eq!(i, Some(20)),
_ = futures::future::ready(()) => { }, };
select_biased! {
_ = r.next().fuse() => panic!(),
_ = futures::future::ready(()) => { },
};
s.maybe_send(|i| *i);
select_biased! {
_ = r.next().fuse() => panic!(),
_ = futures::future::ready(()) => { },
};
s.maybe_send(|i| *i + 1);
select_biased! {
i = r.next().fuse() => assert_eq!(i, Some(21)),
_ = futures::future::ready(()) => panic!(),
};
let () = s.try_maybe_send(|_i| Err(())).unwrap_err();
select_biased! {
_ = r.next().fuse() => panic!(),
_ = futures::future::ready(()) => { },
};
}
#[test]
fn postage_drop() {
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
struct I(i32);
impl DropNotifyEofSignallable for I {
fn eof() -> I {
I(0)
}
fn is_eof(&self) -> bool {
self.0 == 0
}
}
let (s, r) = postage::watch::channel_with(I(20));
let s = DropNotifyWatchSender::new(s);
assert_eq!(*r.borrow(), I(20));
drop(s);
assert_eq!(*r.borrow(), I(0));
let (s, r) = postage::watch::channel_with(I(44));
let s = DropNotifyWatchSender::new(s);
assert_eq!(*r.borrow(), I(44));
drop(s.into_inner());
assert_eq!(*r.borrow(), I(44));
}
}