nosy/simple_listeners.rs
1use alloc::sync::{Arc, Weak};
2use alloc::vec::Vec;
3use core::fmt;
4use core::sync::atomic::{AtomicBool, Ordering};
5
6use crate::{FromListener, Listen, Listener, StoreLock, StoreLockListener};
7
8// -------------------------------------------------------------------------------------------------
9
10/// A [`Listener`] which discards all messages.
11///
12/// Use this when a [`Listener`] is demanded, but there is nothing it should do.
13#[expect(clippy::exhaustive_structs)]
14#[derive(Clone, Copy, Debug, Eq, PartialEq)]
15pub struct NullListener;
16
17impl<M> Listener<M> for NullListener {
18 fn receive(&self, _messages: &[M]) -> bool {
19 false
20 }
21}
22
23impl<M> crate::FromListener<NullListener, M> for NullListener {
24 /// No-op conversion returning the listener unchanged.
25 fn from_listener(listener: NullListener) -> Self {
26 listener
27 }
28}
29
30// -------------------------------------------------------------------------------------------------
31
32/// Tuples of listeners may be used to distribute messages to multiple listeners with static
33/// dispatch.
34impl<M, L1, L2> Listener<M> for (L1, L2)
35where
36 L1: Listener<M>,
37 L2: Listener<M>,
38{
39 fn receive(&self, messages: &[M]) -> bool {
40 // note non-short-circuiting or
41 self.0.receive(messages) | self.1.receive(messages)
42 }
43}
44
45// -------------------------------------------------------------------------------------------------
46
47/// A [`Listener`] destination which stores all the messages it receives.
48///
49/// This is a slightly more convenient interface for a [`StoreLock<Vec<M>>`](StoreLock).
50///
51/// This is only intended for testing; real listeners should not unboundedly allocate
52/// duplicate messages.
53///
54/// # Generic parameters
55///
56/// * `M` is the type of the messages.
57pub struct Log<M>(StoreLock<Vec<M>>);
58
59/// [`Log::listener()`] implementation.
60///
61/// # Generic parameters
62///
63/// * `M` is the type of the messages.
64pub struct LogListener<M>(StoreLockListener<Vec<M>>);
65
66impl<M> Log<M> {
67 /// Constructs a new empty [`Log`].
68 #[must_use]
69 pub fn new() -> Self {
70 Self(StoreLock::default())
71 }
72
73 /// Returns a [`Listener`] which records the messages it receives in this `Log`.
74 #[must_use]
75 pub fn listener(&self) -> LogListener<M> {
76 LogListener(self.0.listener())
77 }
78
79 /// Remove and return all messages returned so far.
80 ///
81 /// ```
82 /// use nosy::{Listener, Log};
83 ///
84 /// let log = Log::new();
85 /// log.listener().receive(&[1]);
86 /// log.listener().receive(&[2]);
87 /// assert_eq!(log.drain(), vec![1, 2]);
88 /// log.listener().receive(&[3]);
89 /// assert_eq!(log.drain(), vec![3]);
90 /// ```
91 #[must_use]
92 pub fn drain(&self) -> Vec<M> {
93 self.0.lock().drain(..).collect()
94 }
95}
96
97impl<M: fmt::Debug> fmt::Debug for Log<M> {
98 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
99 let Log(store) = self;
100 f.debug_tuple("Log").field(&*store.lock()).finish()
101 }
102}
103
104impl<M> fmt::Debug for LogListener<M> {
105 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
106 f.debug_struct("LogListener")
107 .field("alive", &self.0.alive())
108 .finish()
109 }
110}
111
112impl<M> fmt::Pointer for Log<M> {
113 /// Produces an address which is the same for this [`Log`] and its associated [`LogListener`]s.
114 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
115 StoreLock::fmt(&self.0, f)
116 }
117}
118impl<M> fmt::Pointer for LogListener<M> {
119 /// Produces an address which is the same for this [`LogListener`] and its associated [`Log`].
120 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
121 StoreLockListener::fmt(&self.0, f)
122 }
123}
124
125impl<M: Clone + Send + Sync> Listener<M> for LogListener<M> {
126 fn receive(&self, messages: &[M]) -> bool {
127 self.0.receive(messages)
128 }
129}
130
131impl<M: Clone + Send + Sync> crate::FromListener<LogListener<M>, M> for LogListener<M> {
132 /// No-op conversion returning the listener unchanged.
133 fn from_listener(listener: LogListener<M>) -> Self {
134 listener
135 }
136}
137
138impl<M> Clone for LogListener<M> {
139 fn clone(&self) -> Self {
140 Self(self.0.clone())
141 }
142}
143
144impl<M> Default for Log<M> {
145 // This implementation cannot be derived because we do not want M: Default
146 fn default() -> Self {
147 Self::new()
148 }
149}
150
151// -------------------------------------------------------------------------------------------------
152
153#[cfg_attr(not(feature = "async"), allow(rustdoc::broken_intra_doc_links))]
154/// A [`Listener`] destination which records only whether any messages have been received,
155/// until cleared.
156///
157/// It is implemented as a shared [`AtomicBool`].
158/// It is [`Send`] and [`Sync`] regardless of whether the `"sync"` crate feature is enabled.
159///
160/// The atomic orderings used are [`Release`](Ordering::Release) for setting the flag, and
161/// [`Acquire`](Ordering::Acquire) for reading and clearing it.
162/// We do not recommend relying on this as your sole source of synchronization in unsafe code,
163/// but this does mean that if the notification is carried across threads then the recipient
164/// can rely on seeing effects that happened before the flag was set.
165///
166/// The name of this type comes from the concept of a “dirty flag”, marking that state is
167/// unsaved or out of sync, but it can also be understood as a metaphorical mailbox flag —
168/// it signals that something has arrived, but not what.
169///
170/// # See also
171///
172/// * [`future::WakeFlag`](crate::future::WakeFlag) is similar but wakes an async task
173/// instead of needing to be polled.
174pub struct Flag {
175 shared: Arc<AtomicBool>,
176}
177
178/// [`Flag::listener()`] implementation.
179#[derive(Clone)]
180pub struct FlagListener {
181 weak: Weak<AtomicBool>,
182}
183
184impl fmt::Debug for Flag {
185 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
186 // never multiline
187 write!(f, "Flag({:?})", self.shared.load(Ordering::Relaxed))
188 }
189}
190impl fmt::Debug for FlagListener {
191 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
192 let strong = self.weak.upgrade();
193
194 let mut ds = f.debug_struct("FlagListener");
195 ds.field("alive", &strong.is_some());
196 if let Some(strong) = strong {
197 ds.field("value", &(strong.load(Ordering::Relaxed)));
198 }
199 ds.finish()
200 }
201}
202
203impl fmt::Pointer for Flag {
204 /// Produces an address which is the same for this [`Flag`] and its associated
205 /// [`FlagListener`]s.
206 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
207 Arc::as_ptr(&self.shared).fmt(f)
208 }
209}
210impl fmt::Pointer for FlagListener {
211 /// Produces an address which is the same for this [`FlagListener`] and its associated [`Flag`].
212 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
213 self.weak.as_ptr().fmt(f)
214 }
215}
216
217impl Flag {
218 const SET_ORDERING: Ordering = Ordering::Release;
219 const GET_CLEAR_ORDERING: Ordering = Ordering::Acquire;
220
221 /// Constructs a new [`Flag`] with the given initial value.
222 ///
223 /// ```
224 /// # use nosy::Flag;
225 /// assert_eq!(Flag::new(false).get_and_clear(), false);
226 /// assert_eq!(Flag::new(true).get_and_clear(), true);
227 /// ```
228 #[must_use]
229 pub fn new(value: bool) -> Self {
230 Self {
231 shared: Arc::new(AtomicBool::new(value)),
232 }
233 }
234
235 /// Constructs a new [`Flag`] with the given initial value and call
236 /// [`Listen::listen()`] with its listener.
237 ///
238 /// This is a convenience for calling `new()` followed by `listener()`.
239 ///
240 /// ```
241 /// use nosy::{Flag, unsync::Notifier};
242 ///
243 /// let notifier = Notifier::<()>::new();
244 /// let flag = Flag::listening(false, ¬ifier);
245 ///
246 /// notifier.notify(&());
247 /// assert_eq!(flag.get_and_clear(), true);
248 /// ```
249 #[must_use]
250 pub fn listening<L>(value: bool, source: L) -> Self
251 where
252 L: Listen,
253 L::Listener: crate::FromListener<FlagListener, L::Msg>,
254 {
255 let new_self = Self::new(value);
256 source.listen(new_self.listener());
257 new_self
258 }
259
260 /// Returns a [`Listener`] which will set this flag to [`true`] when it receives any
261 /// message.
262 #[must_use]
263 pub fn listener(&self) -> FlagListener {
264 FlagListener {
265 weak: Arc::downgrade(&self.shared),
266 }
267 }
268
269 /// Returns the flag value, setting it to [`false`] at the same time.
270 #[allow(clippy::must_use_candidate)]
271 #[inline]
272 pub fn get_and_clear(&self) -> bool {
273 self.shared.swap(false, Self::GET_CLEAR_ORDERING)
274 }
275
276 /// Set the flag value to [`true`].
277 ///
278 /// This is equivalent to `self.listener().receive(())`, but more efficient.
279 /// It may be useful in situations where the caller of `get_and_clear()` realizes it cannot
280 /// actually complete its work, but wants to try again later.
281 ///
282 /// ```
283 /// # let flag = nosy::Flag::new(true);
284 /// # fn try_to_do_the_thing() -> bool { false }
285 /// #
286 /// if flag.get_and_clear() {
287 /// if !try_to_do_the_thing() {
288 /// flag.set();
289 /// }
290 /// # } else { unreachable!();
291 /// }
292 /// assert_eq!(flag.get_and_clear(), true);
293 /// ```
294 #[inline]
295 pub fn set(&self) {
296 self.shared.store(true, Self::SET_ORDERING);
297 }
298}
299
300impl<M> Listener<M> for FlagListener {
301 fn receive(&self, messages: &[M]) -> bool {
302 if let Some(cell) = self.weak.upgrade() {
303 if !messages.is_empty() {
304 cell.store(true, Flag::SET_ORDERING);
305 }
306 true
307 } else {
308 false
309 }
310 }
311}
312
313impl<M> FromListener<FlagListener, M> for FlagListener {
314 /// No-op conversion returning the listener unchanged.
315 fn from_listener(listener: FlagListener) -> Self {
316 listener
317 }
318}