use std::{
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use futures::Stream;
use nodecraft::{CheapClone, Id};
use crate::proto::NodeState;
#[derive(Debug, PartialEq, Eq, Hash)]
#[repr(u8)]
#[non_exhaustive]
enum EventInner<I, A> {
Join(Arc<NodeState<I, A>>),
Leave(Arc<NodeState<I, A>>),
Update(Arc<NodeState<I, A>>),
}
impl<I, A> Clone for EventInner<I, A> {
fn clone(&self) -> Self {
match self {
EventInner::Join(node) => EventInner::Join(node.clone()),
EventInner::Leave(node) => EventInner::Leave(node.clone()),
EventInner::Update(node) => EventInner::Update(node.clone()),
}
}
}
#[derive(Debug, PartialEq, Eq, Hash, Copy, Clone)]
#[repr(u8)]
#[non_exhaustive]
pub enum EventKind {
Join,
Leave,
Update,
}
#[derive(Debug)]
pub struct Event<I, A>(EventInner<I, A>);
impl<I, A> Clone for Event<I, A> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<I, A> Event<I, A> {
#[inline]
pub fn node_state(&self) -> &NodeState<I, A> {
match &self.0 {
EventInner::Join(node) => node,
EventInner::Leave(node) => node,
EventInner::Update(node) => node,
}
}
#[inline]
pub const fn kind(&self) -> EventKind {
match &self.0 {
EventInner::Join(_) => EventKind::Join,
EventInner::Leave(_) => EventKind::Leave,
EventInner::Update(_) => EventKind::Update,
}
}
pub(crate) fn join(node: Arc<NodeState<I, A>>) -> Self {
Event(EventInner::Join(node))
}
pub(crate) fn leave(node: Arc<NodeState<I, A>>) -> Self {
Event(EventInner::Leave(node))
}
pub(crate) fn update(node: Arc<NodeState<I, A>>) -> Self {
Event(EventInner::Update(node))
}
}
#[auto_impl::auto_impl(Box, Arc)]
pub trait EventDelegate: Send + Sync + 'static {
type Id: Id;
type Address: CheapClone + Send + Sync + 'static;
fn notify_join(
&self,
node: Arc<NodeState<Self::Id, Self::Address>>,
) -> impl Future<Output = ()> + Send;
fn notify_leave(
&self,
node: Arc<NodeState<Self::Id, Self::Address>>,
) -> impl Future<Output = ()> + Send;
fn notify_update(
&self,
node: Arc<NodeState<Self::Id, Self::Address>>,
) -> impl Future<Output = ()> + Send;
}
pub struct SubscribleEventDelegate<I, A>(async_channel::Sender<Event<I, A>>);
impl<I, A> SubscribleEventDelegate<I, A> {
pub fn unbounded() -> (Self, EventSubscriber<I, A>) {
let (tx, rx) = async_channel::unbounded();
(Self(tx), EventSubscriber(rx))
}
pub fn bounded(capacity: usize) -> (Self, EventSubscriber<I, A>) {
let (tx, rx) = async_channel::bounded(capacity);
(Self(tx), EventSubscriber(rx))
}
}
impl<I, A> EventDelegate for SubscribleEventDelegate<I, A>
where
I: Id + Send + Sync + 'static,
A: CheapClone + Send + Sync + 'static,
{
type Id = I;
type Address = A;
async fn notify_join(&self, node: Arc<NodeState<Self::Id, Self::Address>>) {
let _ = self.0.send(Event::join(node)).await;
}
async fn notify_leave(&self, node: Arc<NodeState<Self::Id, Self::Address>>) {
let _ = self.0.send(Event::leave(node)).await;
}
async fn notify_update(&self, node: Arc<NodeState<Self::Id, Self::Address>>) {
let _ = self.0.send(Event::update(node)).await;
}
}
#[pin_project::pin_project]
pub struct EventSubscriber<I, A>(#[pin] async_channel::Receiver<Event<I, A>>);
impl<I, A> EventSubscriber<I, A> {
pub async fn recv(&self) -> Result<Event<I, A>, async_channel::RecvError> {
self.0.recv().await
}
pub fn try_recv(&self) -> Result<Event<I, A>, async_channel::TryRecvError> {
self.0.try_recv()
}
pub fn capacity(&self) -> Option<usize> {
self.0.capacity()
}
pub fn len(&self) -> usize {
self.0.len()
}
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
pub fn is_full(&self) -> bool {
self.0.is_full()
}
}
impl<I, A> Stream for EventSubscriber<I, A> {
type Item = Event<I, A>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
<async_channel::Receiver<Event<I, A>> as Stream>::poll_next(self.project().0, cx)
}
}