eventbus_tiny/
sub.rs

1//! Event subscribtion types
2
3use std::any::Any;
4use std::fmt::{Debug, Formatter};
5use std::ops::Deref;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::mpsc::{self, Receiver, SyncSender, TrySendError};
8use std::sync::Arc;
9
10/// A fallible function that can cast `&dyn Any` to `T`
11pub type Converter<T> = dyn Fn(&dyn Any) -> Option<T> + Send + Sync;
12
13/// Type-erasure for event [`Subscriber`]s
14pub trait AnySubscriber
15where
16    Self: Debug + Send + Sync,
17{
18    /// Whether the subscriber is still alive and could receive messages or not (see also [`Subscriber::send`] plus
19    /// notes)
20    #[must_use]
21    fn is_alive(&self) -> bool;
22
23    /// Sends an event to the subscriber
24    #[must_use]
25    fn send(&self, event: &dyn Any) -> bool;
26}
27impl<T> AnySubscriber for Subscriber<T>
28where
29    T: Send + 'static,
30{
31    fn is_alive(&self) -> bool {
32        Subscriber::is_alive(self)
33    }
34
35    fn send(&self, event: &dyn Any) -> bool {
36        Subscriber::send(self, event)
37    }
38}
39
40/// An event subscriber handle for an event type `T`
41pub struct Subscriber<T> {
42    /// The sender queue
43    sender: SyncSender<T>,
44    /// An is-alive reference counter
45    alive: Arc<AtomicBool>,
46    /// A conversion to convert
47    convert: Arc<Converter<T>>,
48}
49impl<T> Subscriber<T> {
50    /// Tries to send a non-blocking event to the subscriber and returns if the event was sent successfully
51    ///
52    /// # Important
53    /// Please note that if an event has been sent, this only means that the event is now in a state that it can be
54    /// received by the subscriber, but it has not been received or processed yet.
55    pub fn send(&self, event: &dyn Any) -> bool {
56        // Try to convert the event
57        let Some(event) = (self.convert)(event) else {
58            // Event is incompatible
59            return false;
60        };
61
62        // Try to send the event
63        let result = self.sender.try_send(event);
64        if matches!(result, Err(TrySendError::Disconnected(_))) {
65            // Mark subscriber as dead
66            self.alive.store(false, Ordering::SeqCst);
67        }
68
69        // Return send-state
70        result.is_ok()
71    }
72
73    /// Whether the subscriber is still alive and could receive messages or not (see also [`Self::send`] plus notes)
74    #[must_use]
75    #[inline]
76    pub fn is_alive(&self) -> bool {
77        self.alive.load(Ordering::SeqCst)
78    }
79}
80impl<T> Debug for Subscriber<T> {
81    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
82        f.debug_struct("Subscriber").field("sender", &self.sender).field("alive", &self.alive).finish()
83    }
84}
85impl<T> Clone for Subscriber<T> {
86    fn clone(&self) -> Self {
87        Self { sender: self.sender.clone(), alive: self.alive.clone(), convert: self.convert.clone() }
88    }
89}
90
91/// An event subscription channel
92pub struct Subscription<T> {
93    /// The receive queue
94    receiver: Receiver<T>,
95    /// An is-alive flag
96    alive: Arc<AtomicBool>,
97}
98impl<T> Deref for Subscription<T> {
99    type Target = Receiver<T>;
100
101    fn deref(&self) -> &Self::Target {
102        &self.receiver
103    }
104}
105impl<T> Debug for Subscription<T> {
106    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
107        f.debug_struct("Subscription").field("receiver", &self.receiver).field("alive", &self.alive).finish()
108    }
109}
110impl<T> Drop for Subscription<T> {
111    fn drop(&mut self) {
112        // Mark as dead
113        self.alive.store(false, Ordering::SeqCst);
114    }
115}
116
117/// Creates a new `(subscriber, subscribption)`-pair with the given backlog as capacity limit
118pub fn pair<T>(backlog: usize, convert: Arc<Converter<T>>) -> (Subscriber<T>, Subscription<T>)
119where
120    T: Clone + 'static,
121{
122    // Create underlying communication types
123    let (sender, receiver) = mpsc::sync_channel(backlog);
124    let alive = Arc::new(AtomicBool::new(true));
125
126    // Create connected subscriber/subscription pair
127    let subscriber = Subscriber { sender, alive: alive.clone(), convert };
128    let subscription = Subscription { receiver, alive };
129    (subscriber, subscription)
130}