1use std::ops::{Deref, DerefMut};
3use void::{ResultVoidExt as _, Void};
4
5pub trait PostageWatchSenderExt<T> {
13 fn try_maybe_send<F, E>(&mut self, update: F) -> Result<(), E>
18 where
19 T: PartialEq,
20 F: FnOnce(&T) -> Result<T, E>;
21
22 fn maybe_send<F>(&mut self, update: F)
27 where
28 T: PartialEq,
29 F: FnOnce(&T) -> T,
30 {
31 self.try_maybe_send(|t| Ok::<_, Void>(update(t)))
32 .void_unwrap();
33 }
34}
35
36impl<T> PostageWatchSenderExt<T> for postage::watch::Sender<T> {
37 fn try_maybe_send<F, E>(&mut self, update: F) -> Result<(), E>
38 where
39 T: PartialEq,
40 F: FnOnce(&T) -> Result<T, E>,
41 {
42 let lock = self.borrow();
43 let new = update(&*lock)?;
44 if new != *lock {
45 drop(lock);
50 *self.borrow_mut() = new;
51 }
52 Ok(())
53 }
54}
55
56#[derive(Debug)]
57pub struct DropNotifyWatchSender<T: DropNotifyEofSignallable>(Option<postage::watch::Sender<T>>);
64
65pub trait DropNotifyEofSignallable {
69 fn eof() -> Self;
71
72 #[deprecated]
81 fn is_eof(&self) -> bool {
82 false
83 }
84}
85
86impl<T> DropNotifyEofSignallable for Option<T> {
87 fn eof() -> Self {
88 None
89 }
90
91 fn is_eof(&self) -> bool {
92 self.is_none()
93 }
94}
95
96impl<T: DropNotifyEofSignallable> DropNotifyWatchSender<T> {
97 pub fn new(inner: postage::watch::Sender<T>) -> Self {
99 DropNotifyWatchSender(Some(inner))
100 }
101
102 pub fn into_inner(mut self) -> postage::watch::Sender<T> {
104 self.0.take().expect("inner was None")
105 }
106}
107
108impl<T: DropNotifyEofSignallable> Deref for DropNotifyWatchSender<T> {
109 type Target = postage::watch::Sender<T>;
110 fn deref(&self) -> &Self::Target {
111 self.0.as_ref().expect("inner was None")
112 }
113}
114
115impl<T: DropNotifyEofSignallable> DerefMut for DropNotifyWatchSender<T> {
116 fn deref_mut(&mut self) -> &mut Self::Target {
117 self.0.as_mut().expect("inner was None")
118 }
119}
120
121impl<T: DropNotifyEofSignallable> Drop for DropNotifyWatchSender<T> {
122 fn drop(&mut self) {
123 if let Some(mut inner) = self.0.take() {
124 *inner.borrow_mut() = DropNotifyEofSignallable::eof();
126 }
127 }
128}
129
130#[cfg(test)]
131mod test {
132 #![allow(clippy::bool_assert_comparison)]
134 #![allow(clippy::clone_on_copy)]
135 #![allow(clippy::dbg_macro)]
136 #![allow(clippy::mixed_attributes_style)]
137 #![allow(clippy::print_stderr)]
138 #![allow(clippy::print_stdout)]
139 #![allow(clippy::single_char_pattern)]
140 #![allow(clippy::unwrap_used)]
141 #![allow(clippy::unchecked_duration_subtraction)]
142 #![allow(clippy::useless_vec)]
143 #![allow(clippy::needless_pass_by_value)]
144 use super::*;
147 use futures::select_biased;
148 use futures_await_test::async_test;
149
150 #[async_test]
151 async fn postage_sender_ext() {
152 use futures::stream::StreamExt;
153 use futures::FutureExt;
154
155 let (mut s, mut r) = postage::watch::channel_with(20);
156 select_biased! {
158 i = r.next().fuse() => assert_eq!(i, Some(20)),
159 _ = futures::future::ready(()) => { }, };
161 select_biased! {
163 _ = r.next().fuse() => panic!(),
164 _ = futures::future::ready(()) => { },
165 };
166
167 s.maybe_send(|i| *i);
168 select_biased! {
170 _ = r.next().fuse() => panic!(),
171 _ = futures::future::ready(()) => { },
172 };
173
174 s.maybe_send(|i| *i + 1);
175 select_biased! {
177 i = r.next().fuse() => assert_eq!(i, Some(21)),
178 _ = futures::future::ready(()) => panic!(),
179 };
180
181 let () = s.try_maybe_send(|_i| Err(())).unwrap_err();
182 select_biased! {
184 _ = r.next().fuse() => panic!(),
185 _ = futures::future::ready(()) => { },
186 };
187 }
188
189 #[async_test]
190 async fn postage_drop() {
191 #[derive(Clone, Copy, Debug, Eq, PartialEq)]
192 struct I(i32);
193
194 impl DropNotifyEofSignallable for I {
195 fn eof() -> I {
196 I(0)
197 }
198 fn is_eof(&self) -> bool {
199 self.0 == 0
200 }
201 }
202
203 let (s, r) = postage::watch::channel_with(I(20));
204 let s = DropNotifyWatchSender::new(s);
205
206 assert_eq!(*r.borrow(), I(20));
207 drop(s);
208 assert_eq!(*r.borrow(), I(0));
209
210 let (s, r) = postage::watch::channel_with(I(44));
211 let s = DropNotifyWatchSender::new(s);
212
213 assert_eq!(*r.borrow(), I(44));
214 drop(s.into_inner());
215 assert_eq!(*r.borrow(), I(44));
216 }
217}