1use std::any::Any;
4use std::fmt::{Debug, Formatter};
5use std::ops::Deref;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::mpsc::{self, Receiver, SyncSender, TrySendError};
8use std::sync::Arc;
9
10pub type Converter<T> = dyn Fn(&dyn Any) -> Option<T> + Send + Sync;
12
13pub trait AnySubscriber
15where
16 Self: Debug + Send + Sync,
17{
18 #[must_use]
21 fn is_alive(&self) -> bool;
22
23 #[must_use]
25 fn send(&self, event: &dyn Any) -> bool;
26}
27impl<T> AnySubscriber for Subscriber<T>
28where
29 T: Send + 'static,
30{
31 fn is_alive(&self) -> bool {
32 Subscriber::is_alive(self)
33 }
34
35 fn send(&self, event: &dyn Any) -> bool {
36 Subscriber::send(self, event)
37 }
38}
39
40pub struct Subscriber<T> {
42 sender: SyncSender<T>,
44 alive: Arc<AtomicBool>,
46 convert: Arc<Converter<T>>,
48}
49impl<T> Subscriber<T> {
50 pub fn send(&self, event: &dyn Any) -> bool {
56 let Some(event) = (self.convert)(event) else {
58 return false;
60 };
61
62 let result = self.sender.try_send(event);
64 if matches!(result, Err(TrySendError::Disconnected(_))) {
65 self.alive.store(false, Ordering::SeqCst);
67 }
68
69 result.is_ok()
71 }
72
73 #[must_use]
75 #[inline]
76 pub fn is_alive(&self) -> bool {
77 self.alive.load(Ordering::SeqCst)
78 }
79}
80impl<T> Debug for Subscriber<T> {
81 fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
82 f.debug_struct("Subscriber").field("sender", &self.sender).field("alive", &self.alive).finish()
83 }
84}
85impl<T> Clone for Subscriber<T> {
86 fn clone(&self) -> Self {
87 Self { sender: self.sender.clone(), alive: self.alive.clone(), convert: self.convert.clone() }
88 }
89}
90
91pub struct Subscription<T> {
93 receiver: Receiver<T>,
95 alive: Arc<AtomicBool>,
97}
98impl<T> Deref for Subscription<T> {
99 type Target = Receiver<T>;
100
101 fn deref(&self) -> &Self::Target {
102 &self.receiver
103 }
104}
105impl<T> Debug for Subscription<T> {
106 fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
107 f.debug_struct("Subscription").field("receiver", &self.receiver).field("alive", &self.alive).finish()
108 }
109}
110impl<T> Drop for Subscription<T> {
111 fn drop(&mut self) {
112 self.alive.store(false, Ordering::SeqCst);
114 }
115}
116
117pub fn pair<T>(backlog: usize, convert: Arc<Converter<T>>) -> (Subscriber<T>, Subscription<T>)
119where
120 T: Clone + 'static,
121{
122 let (sender, receiver) = mpsc::sync_channel(backlog);
124 let alive = Arc::new(AtomicBool::new(true));
125
126 let subscriber = Subscriber { sender, alive: alive.clone(), convert };
128 let subscription = Subscription { receiver, alive };
129 (subscriber, subscription)
130}