use std::any::Any;
use std::fmt::{Debug, Formatter};
use std::ops::Deref;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{self, Receiver, SyncSender, TrySendError};
use std::sync::Arc;
pub type Converter<T> = dyn Fn(&dyn Any) -> Option<T> + Send + Sync;
pub trait AnySubscriber
where
Self: Debug + Send + Sync,
{
#[must_use]
fn is_alive(&self) -> bool;
#[must_use]
fn send(&self, event: &dyn Any) -> bool;
}
impl<T> AnySubscriber for Subscriber<T>
where
T: Send + 'static,
{
fn is_alive(&self) -> bool {
Subscriber::is_alive(self)
}
fn send(&self, event: &dyn Any) -> bool {
Subscriber::send(self, event)
}
}
pub struct Subscriber<T> {
sender: SyncSender<T>,
alive: Arc<AtomicBool>,
convert: Arc<Converter<T>>,
}
impl<T> Subscriber<T> {
pub fn send(&self, event: &dyn Any) -> bool {
let Some(event) = (self.convert)(event) else {
return false;
};
let result = self.sender.try_send(event);
if matches!(result, Err(TrySendError::Disconnected(_))) {
self.alive.store(false, Ordering::SeqCst);
}
result.is_ok()
}
#[must_use]
#[inline]
pub fn is_alive(&self) -> bool {
self.alive.load(Ordering::SeqCst)
}
}
impl<T> Debug for Subscriber<T> {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
f.debug_struct("Subscriber").field("sender", &self.sender).field("alive", &self.alive).finish()
}
}
impl<T> Clone for Subscriber<T> {
fn clone(&self) -> Self {
Self { sender: self.sender.clone(), alive: self.alive.clone(), convert: self.convert.clone() }
}
}
pub struct Subscription<T> {
receiver: Receiver<T>,
alive: Arc<AtomicBool>,
}
impl<T> Deref for Subscription<T> {
type Target = Receiver<T>;
fn deref(&self) -> &Self::Target {
&self.receiver
}
}
impl<T> Debug for Subscription<T> {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
f.debug_struct("Subscription").field("receiver", &self.receiver).field("alive", &self.alive).finish()
}
}
impl<T> Drop for Subscription<T> {
fn drop(&mut self) {
self.alive.store(false, Ordering::SeqCst);
}
}
pub fn pair<T>(backlog: usize, convert: Arc<Converter<T>>) -> (Subscriber<T>, Subscription<T>)
where
T: Clone + 'static,
{
let (sender, receiver) = mpsc::sync_channel(backlog);
let alive = Arc::new(AtomicBool::new(true));
let subscriber = Subscriber { sender, alive: alive.clone(), convert };
let subscription = Subscription { receiver, alive };
(subscriber, subscription)
}