1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219
//! Extension trait for more efficient use of [`postage::watch`].
use std::ops::{Deref, DerefMut};
use void::{ResultVoidExt as _, Void};
/// Extension trait for some `postage::watch::Sender` to provide `maybe_send`
///
/// Ideally these, or something like them, would be upstream:
/// See <https://github.com/austinjones/postage-rs/issues/56>.
///
/// We provide this as an extension trait became the implementation is a bit fiddly.
/// This lets us concentrate on the actual logic, when we use it.
pub trait PostageWatchSenderExt<T> {
/// Update, by calling a fallible function, sending only if necessary
///
/// Calls `update` on the current value in the watch, to obtain a new value.
/// If the new value doesn't compare equal, updates the watch, notifying receivers.
fn try_maybe_send<F, E>(&mut self, update: F) -> Result<(), E>
where
T: PartialEq,
F: FnOnce(&T) -> Result<T, E>;
/// Update, by calling a function, sending only if necessary
///
/// Calls `update` on the current value in the watch, to obtain a new value.
/// If the new value doesn't compare equal, updates the watch, notifying receivers.
fn maybe_send<F>(&mut self, update: F)
where
T: PartialEq,
F: FnOnce(&T) -> T,
{
self.try_maybe_send(|t| Ok::<_, Void>(update(t)))
.void_unwrap();
}
}
impl<T> PostageWatchSenderExt<T> for postage::watch::Sender<T> {
fn try_maybe_send<F, E>(&mut self, update: F) -> Result<(), E>
where
T: PartialEq,
F: FnOnce(&T) -> Result<T, E>,
{
let lock = self.borrow();
let new = update(&*lock)?;
if new != *lock {
// We must drop the lock guard, because otherwise borrow_mut will deadlock.
// There is no race, because we hold &mut self, so no-one else can get a look in.
// (postage::watch::Sender is not one of those facilities which is mereely a
// handle, and Clone.)
drop(lock);
*self.borrow_mut() = new;
}
Ok(())
}
}
#[derive(Debug)]
/// Wrapper for `postage::watch::Sender` that sends `DropNotifyEof::eof()` when dropped
///
/// Derefs to the inner `Sender`.
///
/// Ideally this would be behaviour promised by upstream, or something
/// See <https://github.com/austinjones/postage-rs/issues/57>.
pub struct DropNotifyWatchSender<T: DropNotifyEofSignallable>(Option<postage::watch::Sender<T>>);
/// Values that can signal EOF
///
/// Implemented for `Option`, which is usually what you want to use.
pub trait DropNotifyEofSignallable {
/// Generate the EOF value
fn eof() -> Self;
/// Does this value indicate EOF?
///
/// ### Deprecated
///
/// This method is deprecated.
/// It should not be called, or defined, in new programs.
/// It is not required by [`DropNotifyWatchSender`].
/// The provided implementation always returns `false`.
#[deprecated]
fn is_eof(&self) -> bool {
false
}
}
impl<T> DropNotifyEofSignallable for Option<T> {
fn eof() -> Self {
None
}
fn is_eof(&self) -> bool {
self.is_none()
}
}
impl<T: DropNotifyEofSignallable> DropNotifyWatchSender<T> {
/// Arrange to send `T::Default` when `inner` is dropped
pub fn new(inner: postage::watch::Sender<T>) -> Self {
DropNotifyWatchSender(Some(inner))
}
/// Unwrap the inner sender, defusing the drop notification
pub fn into_inner(mut self) -> postage::watch::Sender<T> {
self.0.take().expect("inner was None")
}
}
impl<T: DropNotifyEofSignallable> Deref for DropNotifyWatchSender<T> {
type Target = postage::watch::Sender<T>;
fn deref(&self) -> &Self::Target {
self.0.as_ref().expect("inner was None")
}
}
impl<T: DropNotifyEofSignallable> DerefMut for DropNotifyWatchSender<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.0.as_mut().expect("inner was None")
}
}
impl<T: DropNotifyEofSignallable> Drop for DropNotifyWatchSender<T> {
fn drop(&mut self) {
if let Some(mut inner) = self.0.take() {
// None means into_inner() was called
*inner.borrow_mut() = DropNotifyEofSignallable::eof();
}
}
}
#[cfg(test)]
mod test {
// @@ begin test lint list maintained by maint/add_warning @@
#![allow(clippy::bool_assert_comparison)]
#![allow(clippy::clone_on_copy)]
#![allow(clippy::dbg_macro)]
#![allow(clippy::print_stderr)]
#![allow(clippy::print_stdout)]
#![allow(clippy::single_char_pattern)]
#![allow(clippy::unwrap_used)]
#![allow(clippy::unchecked_duration_subtraction)]
#![allow(clippy::useless_vec)]
#![allow(clippy::needless_pass_by_value)]
//! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
use super::*;
use futures::select_biased;
use futures_await_test::async_test;
#[derive(Debug, Eq, PartialEq)]
struct TestError(char);
#[async_test]
async fn postage_sender_ext() {
use futures::stream::StreamExt;
use futures::FutureExt;
let (mut s, mut r) = postage::watch::channel_with(20);
// Receiver of a fresh watch wakes once, but let's not rely on this
select_biased! {
i = r.next().fuse() => assert_eq!(i, Some(20)),
_ = futures::future::ready(()) => { }, // tolerate nothing
};
// Now, not ready
select_biased! {
_ = r.next().fuse() => panic!(),
_ = futures::future::ready(()) => { },
};
s.maybe_send(|i| *i);
// Still not ready
select_biased! {
_ = r.next().fuse() => panic!(),
_ = futures::future::ready(()) => { },
};
s.maybe_send(|i| *i + 1);
// Ready, with 21
select_biased! {
i = r.next().fuse() => assert_eq!(i, Some(21)),
_ = futures::future::ready(()) => panic!(),
};
let () = s.try_maybe_send(|_i| Err(())).unwrap_err();
// Not ready
select_biased! {
_ = r.next().fuse() => panic!(),
_ = futures::future::ready(()) => { },
};
}
#[async_test]
async fn postage_drop() {
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
struct I(i32);
impl DropNotifyEofSignallable for I {
fn eof() -> I {
I(0)
}
fn is_eof(&self) -> bool {
self.0 == 0
}
}
let (s, r) = postage::watch::channel_with(I(20));
let s = DropNotifyWatchSender::new(s);
assert_eq!(*r.borrow(), I(20));
drop(s);
assert_eq!(*r.borrow(), I(0));
let (s, r) = postage::watch::channel_with(I(44));
let s = DropNotifyWatchSender::new(s);
assert_eq!(*r.borrow(), I(44));
drop(s.into_inner());
assert_eq!(*r.borrow(), I(44));
}
}