nosy 0.3.0

Change notification / observation / broadcast channels, with filtering and coalescing. no_std compatible.
Documentation
use alloc::rc::Rc;
use alloc::sync::Arc;
use core::fmt;

use crate::{Filter, Gate, IntoListener};

#[cfg(doc)]
use crate::{Flag, FlagListener, Notifier, Store, StoreLock, StoreRef};

// -------------------------------------------------------------------------------------------------

#[cfg_attr(not(feature = "std-sync"), allow(rustdoc::broken_intra_doc_links))]
/// A receiver of messages (typically from something implementing [`Listen`]) which can
/// indicate when it is no longer interested in them (typically because the associated
/// recipient has been dropped).
///
/// Listeners are typically used in trait object form, which may be created via the
/// [`IntoListener`] trait in addition to the usual coercions;
/// this is done automatically by [`Listen`], but calling `.into_listener()` earlier may be useful
/// to minimize the number of separately allocated clones of the listener when the same listener is
/// to be registered with multiple message sources.
///
/// If you are implementing [`Listener`],  note the requirements set out in [`Listener::receive()`].
/// Instead of writing new implementations, consider the following alternatives when you need a
/// listener:
///
/// * Use an existing implementation such as [`FlagListener`] from [`Flag`].
/// * Implement [`Store`] instead, and use [`StoreLock`] instead.
///   [`StoreLock`] provides the weak reference and mutex that are needed in the most common
///   kind of use of [`Listener`].
/// * Implement [`StoreRef`] for an interior mutable data structure, then wrap it in
///   [`Weak`][alloc::sync::Weak] to make it a listener.
///   (This works via via the built-in implementation
///   `impl<T: StoreRef<M>, M> Listener<M> for Weak<T>`).
///
/// # Generic parameters
///
/// * `M` is the type of message that can be received.
pub trait Listener<M>: fmt::Debug {
    /// Process and store the given series of messages.
    ///
    /// Returns `true` if the listener is still interested in further messages (“alive”),
    /// and `false` if it should be dropped because these and all future messages would have
    /// no observable effect.
    /// A call of the form `.receive(&[])` may be performed to query aliveness without
    /// delivering any messages.
    ///
    /// # Requirements on implementors
    ///
    // [Docs maintenance note: keep wording consistent with the below, `Store`, and `StoreRef`.]
    ///
    /// * Messages are provided in a batch for efficiency of dispatch.
    ///   Each message in the provided slice should be processed exactly the same as if
    ///   it were the only message provided.
    ///   If the slice is empty, there should be no observable effect.
    ///
    /// * Do not panic under any possible incoming message stream,
    ///   in order to ensure the sender's other work is not interfered with.
    ///   For example, if the listener accesses a poisoned mutex, it should do nothing or clear
    ///   the poison, rather than panicking.
    ///
    /// * Do not acquire any locks except ones which are used only for the state of the
    ///   listener itself.
    ///  
    /// * Do not perform any blocking operation except for such locks.
    ///
    /// * Do not access thread-local state, since this may be called from whichever thread(s)
    ///   the sender is using.
    ///
    /// # Advice for implementors
    ///
    /// Note that, since this method takes `&Self`, a `Listener` must use interior
    /// mutability of some variety to store the message. As a `Listener` may be called
    /// from various contexts, and in particular while the sender is still performing
    /// its work, that mutability should in general be limited to setting dirty flags
    /// or inserting into message queues — not attempting to directly perform further
    /// game state changes, and particularly not taking any locks that are not solely
    /// used by the `Listener` and its destination, as that could result in deadlock.
    ///
    /// The typical pattern is for a listener to contain a `Weak<Mutex<...>>` or similar
    /// multiply-owned mutable structure to aggregate incoming messages, which will
    /// then be read and cleared by a later task; see [`StoreLock`] for assistance in
    /// implementing this pattern.
    ///
    /// Note that a [`Notifier`] might call `.receive(&[])` at any time, particularly when
    /// listeners are added. Be careful not to cause a deadlock in this case; it may be
    /// necessary to avoid locking in the case where there are no messages to be delivered.
    fn receive(&self, messages: &[M]) -> bool;

    /// Convert this listener into trait object form, allowing it to be stored in
    /// collections or passed non-generically.
    /// The produced trait object does not implement [`Sync`].
    ///
    /// The purpose of this method over simply calling [`Rc::new()`] is that it will
    /// avoid double-wrapping of a listener that's already in [`Rc`].
    ///
    /// **You should not need to override or call this method;** use [`IntoListener`] instead.
    #[doc(hidden)]
    fn into_dyn_listener_unsync(self) -> crate::unsync::DynListener<M>
    where
        Self: Sized + 'static,
    {
        Rc::new(self)
    }

    /// Convert this listener into trait object form, allowing it to be stored in
    /// collections or passed non-generically.
    /// The produced trait object implements [`Sync`].
    ///
    /// The purpose of this method over simply calling [`Arc::new()`] is that it will
    /// avoid double-wrapping of a listener that's already in [`Arc`].
    ///
    /// **You should not need to override or call this method;** use [`IntoListener`] instead.
    #[doc(hidden)]
    fn into_dyn_listener_sync(self) -> crate::sync::DynListener<M>
    where
        Self: Sized + Send + Sync + 'static,
    {
        Arc::new(self)
    }

    /// Wraps `self` so to apply a map/filter function (similar to [`Iterator::filter_map()`])
    /// to incoming messages, to discard uninteresting messages and transform interesting ones.
    ///
    /// Note: By default, this filter breaks up all message batching into batches of 1.
    /// In order to avoid this and have more efficient message delivery, use
    /// [`Filter::with_stack_buffer()`].
    /// This is unnecessary if `size_of::<M>() == 0`; the buffer is automatically unbounded in
    /// that case.
    ///
    /// # Example
    ///
    /// ```
    /// use nosy::{unsync::Notifier, Flag, Listen as _, Listener as _};
    ///
    /// let notifier = Notifier::new();
    /// let flag = Flag::new(false);
    /// notifier.listen(flag.listener().filter(|&msg: &i32| {
    ///     if msg >= 0 {
    ///         Some(())
    ///     } else {
    ///         None
    ///     }
    /// }));
    ///
    /// // This message is filtered out.
    /// notifier.notify(&-1);
    /// assert_eq!(flag.get_and_clear(), false);
    ///
    /// // This message is passed through:
    /// notifier.notify(&2);
    /// assert_eq!(flag.get_and_clear(), true);
    /// ```
    fn filter<MI, F>(self, function: F) -> Filter<F, Self, 1>
    where
        Self: Sized,
        F: for<'a> Fn(&'a MI) -> Option<M>,
    {
        Filter {
            function,
            target: self,
        }
    }

    /// Wraps `self` to pass messages only until the returned [`Gate`], and any clones
    /// of it, are dropped.
    ///    
    /// This may be used to stop forwarding messages when a dependency no longer exists.
    ///
    /// ```
    /// use nosy::{Listen as _, Listener as _};
    ///
    /// let log = nosy::Log::new();
    /// let (gate, gated) = log.listener().gate();
    /// gated.receive(&["kept1"]);
    /// assert_eq!(log.drain(), vec!["kept1"]);
    /// gated.receive(&["kept2"]);
    /// drop(gate);
    /// gated.receive(&["discarded"]);
    /// assert_eq!(log.drain(), vec!["kept2"]);
    /// ```
    fn gate(self) -> (Gate, crate::GateListener<Self>)
    where
        Self: Sized,
    {
        Gate::new(self)
    }
}

// -------------------------------------------------------------------------------------------------

/// Ability to subscribe to a source of messages, causing a [`Listener`] to receive them
/// as long as it wishes to.
///
/// # Examples
///
/// It is common to implement [`Listen`] so as to delegate to a [`Notifier`].
/// Such an implementation is written as follows:
///
/// ```
/// use nosy::{Listen, unsync::Notifier};
///
/// struct MyType {
///     notifier: Notifier<MyMessage>,
/// }
/// struct MyMessage;
///
/// impl Listen for MyType {
///     // This message type must equal the Notifier’s message type.
///     type Msg = MyMessage;
///
///     // This part is boilerplate.
///     type Listener = <Notifier<Self::Msg> as Listen>::Listener;
///     fn listen_raw(&self, listener: Self::Listener) {
///         self.notifier.listen(listener)
///     }
/// }
/// ```
pub trait Listen {
    /// The type of message which may be obtained from this source.
    ///
    /// Most message types should satisfy `Copy + Send + Sync + 'static`, but this is not required.
    type Msg;

    /// The type which all added listeners must be convertible to.
    type Listener: Listener<Self::Msg>;

    /// Subscribe the given [`Listener`] to this source of messages.
    ///
    /// Note that listeners are removed only via their returning [`false`] from
    /// [`Listener::receive()`]; there is no operation to remove a listener,
    /// and redundant subscriptions will result in redundant messages.
    ///
    /// By default, this method is equivalent to
    ///
    /// ```ignore
    /// self.listen_raw(listener.into_listener())
    /// ```
    fn listen<L>(&self, listener: L)
    where
        L: IntoListener<Self::Listener, Self::Msg>,
        Self: Sized,
    {
        self.listen_raw(listener.into_listener())
    }

    /// Subscribe the given [`Listener`] to this source of messages.
    ///
    /// Compared to `listen()`, `listen_raw()` requires that the given listener be of exactly the
    /// type that it will be stored as, rather than automatically wrapping it via the
    /// [`IntoListener`] trait. In exchange, it can be used when [`IntoListener`] is not
    /// implemented, or with `dyn Listen`.
    /// Also, it is the method which implementors of `Listen` must implement.
    ///
    /// Note that listeners are removed only via their returning [`false`] from
    /// [`Listener::receive()`]; there is no operation to remove a listener,
    /// and redundant subscriptions will result in redundant messages.
    fn listen_raw(&self, listener: Self::Listener);
}

impl<T: ?Sized + Listen> Listen for &T {
    type Msg = T::Msg;
    type Listener = T::Listener;

    fn listen_raw(&self, listener: Self::Listener) {
        (**self).listen_raw(listener)
    }
}
impl<T: ?Sized + Listen> Listen for &mut T {
    type Msg = T::Msg;
    type Listener = T::Listener;

    fn listen_raw(&self, listener: Self::Listener) {
        (**self).listen_raw(listener)
    }
}
impl<T: ?Sized + Listen> Listen for alloc::boxed::Box<T> {
    type Msg = T::Msg;
    type Listener = T::Listener;

    fn listen_raw(&self, listener: Self::Listener) {
        (**self).listen_raw(listener)
    }
}
impl<T: ?Sized + Listen> Listen for alloc::rc::Rc<T> {
    type Msg = T::Msg;
    type Listener = T::Listener;

    fn listen_raw(&self, listener: Self::Listener) {
        (**self).listen_raw(listener)
    }
}
impl<T: ?Sized + Listen> Listen for alloc::sync::Arc<T> {
    type Msg = T::Msg;
    type Listener = T::Listener;

    fn listen_raw(&self, listener: Self::Listener) {
        (**self).listen_raw(listener)
    }
}