nosy/
simple_listeners.rs

1use alloc::sync::{Arc, Weak};
2use alloc::vec::Vec;
3use core::fmt;
4use core::sync::atomic::{AtomicBool, Ordering};
5
6use crate::{FromListener, Listen, Listener, StoreLock, StoreLockListener};
7
8// -------------------------------------------------------------------------------------------------
9
10/// A [`Listener`] which discards all messages.
11///
12/// Use this when a [`Listener`] is demanded, but there is nothing it should do.
13#[expect(clippy::exhaustive_structs)]
14#[derive(Clone, Copy, Debug, Eq, PartialEq)]
15pub struct NullListener;
16
17impl<M> Listener<M> for NullListener {
18    fn receive(&self, _messages: &[M]) -> bool {
19        false
20    }
21}
22
23impl<M> crate::FromListener<NullListener, M> for NullListener {
24    /// No-op conversion returning the listener unchanged.
25    fn from_listener(listener: NullListener) -> Self {
26        listener
27    }
28}
29
30// -------------------------------------------------------------------------------------------------
31
32/// Tuples of listeners may be used to distribute messages to multiple listeners with static
33/// dispatch.
34impl<M, L1, L2> Listener<M> for (L1, L2)
35where
36    L1: Listener<M>,
37    L2: Listener<M>,
38{
39    fn receive(&self, messages: &[M]) -> bool {
40        // note non-short-circuiting or
41        self.0.receive(messages) | self.1.receive(messages)
42    }
43}
44
45// -------------------------------------------------------------------------------------------------
46
47/// A [`Listener`] destination which stores all the messages it receives.
48///
49/// This is a slightly more convenient interface for a [`StoreLock<Vec<M>>`](StoreLock).
50///
51/// This is only intended for testing; real listeners should not unboundedly allocate
52/// duplicate messages.
53///
54/// # Generic parameters
55///
56/// * `M` is the type of the messages.
57pub struct Log<M>(StoreLock<Vec<M>>);
58
59/// [`Log::listener()`] implementation.
60///
61/// # Generic parameters
62///
63/// * `M` is the type of the messages.
64pub struct LogListener<M>(StoreLockListener<Vec<M>>);
65
66impl<M> Log<M> {
67    /// Constructs a new empty [`Log`].
68    #[must_use]
69    pub fn new() -> Self {
70        Self(StoreLock::default())
71    }
72
73    /// Returns a [`Listener`] which records the messages it receives in this `Log`.
74    #[must_use]
75    pub fn listener(&self) -> LogListener<M> {
76        LogListener(self.0.listener())
77    }
78
79    /// Remove and return all messages returned so far.
80    ///
81    /// ```
82    /// use nosy::{Listener, Log};
83    ///
84    /// let log = Log::new();
85    /// log.listener().receive(&[1]);
86    /// log.listener().receive(&[2]);
87    /// assert_eq!(log.drain(), vec![1, 2]);
88    /// log.listener().receive(&[3]);
89    /// assert_eq!(log.drain(), vec![3]);
90    /// ```
91    #[must_use]
92    pub fn drain(&self) -> Vec<M> {
93        self.0.lock().drain(..).collect()
94    }
95}
96
97impl<M: fmt::Debug> fmt::Debug for Log<M> {
98    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
99        let Log(store) = self;
100        f.debug_tuple("Log").field(&*store.lock()).finish()
101    }
102}
103
104impl<M> fmt::Debug for LogListener<M> {
105    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
106        f.debug_struct("LogListener")
107            .field("alive", &self.0.alive())
108            .finish()
109    }
110}
111
112impl<M> fmt::Pointer for Log<M> {
113    /// Produces an address which is the same for this [`Log`] and its associated [`LogListener`]s.
114    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
115        StoreLock::fmt(&self.0, f)
116    }
117}
118impl<M> fmt::Pointer for LogListener<M> {
119    /// Produces an address which is the same for this [`LogListener`] and its associated [`Log`].
120    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
121        StoreLockListener::fmt(&self.0, f)
122    }
123}
124
125impl<M: Clone + Send + Sync> Listener<M> for LogListener<M> {
126    fn receive(&self, messages: &[M]) -> bool {
127        self.0.receive(messages)
128    }
129}
130
131impl<M: Clone + Send + Sync> crate::FromListener<LogListener<M>, M> for LogListener<M> {
132    /// No-op conversion returning the listener unchanged.
133    fn from_listener(listener: LogListener<M>) -> Self {
134        listener
135    }
136}
137
138impl<M> Clone for LogListener<M> {
139    fn clone(&self) -> Self {
140        Self(self.0.clone())
141    }
142}
143
144impl<M> Default for Log<M> {
145    // This implementation cannot be derived because we do not want M: Default
146    fn default() -> Self {
147        Self::new()
148    }
149}
150
151// -------------------------------------------------------------------------------------------------
152
153#[cfg_attr(not(feature = "async"), allow(rustdoc::broken_intra_doc_links))]
154/// A [`Listener`] destination which records only whether any messages have been received,
155/// until cleared.
156///
157/// It is implemented as a shared [`AtomicBool`].
158/// It is [`Send`] and [`Sync`] regardless of whether the `"sync"` crate feature is enabled.
159///
160/// The atomic orderings used are [`Release`](Ordering::Release) for setting the flag, and
161/// [`Acquire`](Ordering::Acquire) for reading and clearing it.
162/// We do not recommend relying on this as your sole source of synchronization in unsafe code,
163/// but this does mean that if the notification is carried across threads then the recipient
164/// can rely on seeing effects that happened before the flag was set.
165///
166/// The name of this type comes from the concept of a “dirty flag”, marking that state is
167/// unsaved or out of sync, but it can also be understood as a metaphorical mailbox flag —
168/// it signals that something has arrived, but not what.
169///
170/// # See also
171///
172/// * [`future::WakeFlag`](crate::future::WakeFlag) is similar but wakes an async task
173///   instead of needing to be polled.
174pub struct Flag {
175    shared: Arc<AtomicBool>,
176}
177
178/// [`Flag::listener()`] implementation.
179#[derive(Clone)]
180pub struct FlagListener {
181    weak: Weak<AtomicBool>,
182}
183
184impl fmt::Debug for Flag {
185    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
186        // never multiline
187        write!(f, "Flag({:?})", self.shared.load(Ordering::Relaxed))
188    }
189}
190impl fmt::Debug for FlagListener {
191    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
192        let strong = self.weak.upgrade();
193
194        let mut ds = f.debug_struct("FlagListener");
195        ds.field("alive", &strong.is_some());
196        if let Some(strong) = strong {
197            ds.field("value", &(strong.load(Ordering::Relaxed)));
198        }
199        ds.finish()
200    }
201}
202
203impl fmt::Pointer for Flag {
204    /// Produces an address which is the same for this [`Flag`] and its associated
205    /// [`FlagListener`]s.
206    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
207        Arc::as_ptr(&self.shared).fmt(f)
208    }
209}
210impl fmt::Pointer for FlagListener {
211    /// Produces an address which is the same for this [`FlagListener`] and its associated [`Flag`].
212    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
213        self.weak.as_ptr().fmt(f)
214    }
215}
216
217impl Flag {
218    const SET_ORDERING: Ordering = Ordering::Release;
219    const GET_CLEAR_ORDERING: Ordering = Ordering::Acquire;
220
221    /// Constructs a new [`Flag`] with the given initial value.
222    ///
223    /// ```
224    /// # use nosy::Flag;
225    /// assert_eq!(Flag::new(false).get_and_clear(), false);
226    /// assert_eq!(Flag::new(true).get_and_clear(), true);
227    /// ```
228    #[must_use]
229    pub fn new(value: bool) -> Self {
230        Self {
231            shared: Arc::new(AtomicBool::new(value)),
232        }
233    }
234
235    /// Constructs a new [`Flag`] with the given initial value and call
236    /// [`Listen::listen()`] with its listener.
237    ///
238    /// This is a convenience for calling `new()` followed by `listener()`.
239    ///
240    /// ```
241    /// use nosy::{Flag, unsync::Notifier};
242    ///
243    /// let notifier = Notifier::<()>::new();
244    /// let flag = Flag::listening(false, &notifier);
245    ///
246    /// notifier.notify(&());
247    /// assert_eq!(flag.get_and_clear(), true);
248    /// ```
249    #[must_use]
250    pub fn listening<L>(value: bool, source: L) -> Self
251    where
252        L: Listen,
253        L::Listener: crate::FromListener<FlagListener, L::Msg>,
254    {
255        let new_self = Self::new(value);
256        source.listen(new_self.listener());
257        new_self
258    }
259
260    /// Returns a [`Listener`] which will set this flag to [`true`] when it receives any
261    /// message.
262    #[must_use]
263    pub fn listener(&self) -> FlagListener {
264        FlagListener {
265            weak: Arc::downgrade(&self.shared),
266        }
267    }
268
269    /// Returns the flag value, setting it to [`false`] at the same time.
270    #[allow(clippy::must_use_candidate)]
271    #[inline]
272    pub fn get_and_clear(&self) -> bool {
273        self.shared.swap(false, Self::GET_CLEAR_ORDERING)
274    }
275
276    /// Set the flag value to [`true`].
277    ///
278    /// This is equivalent to `self.listener().receive(())`, but more efficient.
279    /// It may be useful in situations where the caller of `get_and_clear()` realizes it cannot
280    /// actually complete its work, but wants to try again later.
281    ///
282    /// ```
283    /// # let flag = nosy::Flag::new(true);
284    /// # fn try_to_do_the_thing() -> bool { false }
285    /// #
286    /// if flag.get_and_clear() {
287    ///     if !try_to_do_the_thing() {
288    ///         flag.set();
289    ///     }
290    /// # } else { unreachable!();
291    /// }
292    /// assert_eq!(flag.get_and_clear(), true);
293    /// ```
294    #[inline]
295    pub fn set(&self) {
296        self.shared.store(true, Self::SET_ORDERING);
297    }
298}
299
300impl<M> Listener<M> for FlagListener {
301    fn receive(&self, messages: &[M]) -> bool {
302        if let Some(cell) = self.weak.upgrade() {
303            if !messages.is_empty() {
304                cell.store(true, Flag::SET_ORDERING);
305            }
306            true
307        } else {
308            false
309        }
310    }
311}
312
313impl<M> FromListener<FlagListener, M> for FlagListener {
314    /// No-op conversion returning the listener unchanged.
315    fn from_listener(listener: FlagListener) -> Self {
316        listener
317    }
318}