xtor 0.9.10

Async Actor framework for Rust which is blazing fast and rock solid.
Documentation
use std::{marker::PhantomData, sync::atomic::AtomicU64};

use dashmap::DashMap;
use futures::{Stream, StreamExt};
use tokio::task::JoinHandle;

use crate::{
    broker::{Broker, Publish, Subscribe, SubscriptionID, Unsubscribe},
    Actor, Addr, Context, Handler, Message,
};

pub struct DefaultBroker<T: Message + Sync + Clone> {
    counter: AtomicU64,
    subscriptions: DashMap<SubscriptionID, Subscribe<T>>,
    _marker: PhantomData<T>,
}

impl<T: Message + Sync + Clone> DefaultBroker<T> {
    pub fn new() -> Self {
        Self {
            counter: AtomicU64::new(0),
            subscriptions: DashMap::new(),
            _marker: PhantomData,
        }
    }
}

impl<T: Message + Sync + Clone> Default for DefaultBroker<T> {
    fn default() -> Self {
        Self::new()
    }
}

impl<T: Message + Sync + Clone> Actor for DefaultBroker<T> {}

#[async_trait::async_trait]
impl<T: Message + Sync + Clone> Handler<Subscribe<T>> for DefaultBroker<T> {
    async fn handle(&self, _ctx: &Context, msg: Subscribe<T>) -> anyhow::Result<SubscriptionID> {
        let id = self
            .counter
            .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
        self.subscriptions.insert(id, msg);
        Ok(id)
    }
}
#[async_trait::async_trait]

impl<T: Message + Sync + Clone> Handler<Unsubscribe> for DefaultBroker<T> {
    async fn handle(&self, _ctx: &Context, msg: Unsubscribe) -> anyhow::Result<()> {
        self.subscriptions.remove(&msg.0);
        Ok(())
    }
}

#[async_trait::async_trait]
impl<T: Message + Sync + Clone> Handler<Publish<T>> for DefaultBroker<T> {
    async fn handle(&self, _ctx: &Context, msg: Publish<T>) -> anyhow::Result<()> {
        for kv in self.subscriptions.iter() {
            kv.proxy.call(msg.0.clone()).await?;
        }
        Ok(())
    }
}

impl<T: Message + Sync + Clone> Broker<T> for DefaultBroker<T> {}

pub struct StreamBroker<S: Stream<Item = I> + Sync + Send + 'static, I: Message + Clone>(pub S);
impl<S: Stream<Item = I> + Sync + Send + 'static, I: Message + Clone> Actor for StreamBroker<S, I> {}
impl<S: Stream<Item = I> + Sync + Send + 'static + Unpin, I: Message + Sync + Clone>
    StreamBroker<S, I>
{
    pub async fn spawn(mut self) -> anyhow::Result<(Addr, JoinHandle<anyhow::Result<()>>)> {
        let broker = DefaultBroker::<I>::new().spawn().await?;
        let broker_a = broker.clone();
        Ok((
            broker,
            tokio::spawn(async move {
                while let Some(msg) = self.0.next().await {
                    broker_a
                        .call::<DefaultBroker<I>, Publish<I>>(Publish(msg))
                        .await?;
                }
                Ok(())
            }),
        ))
    }
}