serviceless 0.4.4

An simple actor model in rust, like actix
Documentation
use service_channel::{mpsc, oneshot};
use std::collections::BTreeMap;

/// A typed pub/sub topic.
pub trait Topic: Ord + Clone + Send + 'static {
    type Item: Clone + Send + 'static;
}

/// Bind a topic to a concrete endpoint field on a service.
///
/// This is the key piece that replaces Any/TypeId routing:
/// each topic knows where its endpoint lives on service S.
pub trait RoutedTopic<S>: Topic
where
    S: crate::Service,
{
    /// Returns this topic's [`TopicEndpoint`] on `service`.
    ///
    /// Implementations should consistently point at the same logical field on `S` so
    /// routing matches how the service stores topic state.
    fn endpoint(service: &mut S) -> &mut TopicEndpoint<Self>
    where
        Self: Sized;
}

/// A single-shot broadcast endpoint.
///
/// - each subscribe registers one waiter
/// - each publish wakes all current waiters once
/// - future publishes require future subscribe calls again
pub struct TopicEndpoint<T>
where
    T: Topic,
{
    once_waiters: BTreeMap<T, Vec<oneshot::Sender<T::Item>>>,
    all_waiters: BTreeMap<T, Vec<mpsc::UnboundedSender<T::Item>>>,
}

impl<T> Default for TopicEndpoint<T>
where
    T: Topic,
{
    /// Empty endpoint with no waiters.
    fn default() -> Self {
        Self {
            once_waiters: BTreeMap::new(),
            all_waiters: BTreeMap::new(),
        }
    }
}

impl<T> TopicEndpoint<T>
where
    T: Topic,
{
    /// Register one subscriber waiting for the next publication.
    pub fn subscribe(&mut self, topic: T, tx: oneshot::Sender<T::Item>) {
        self.once_waiters.entry(topic).or_default().push(tx);
    }

    /// Register a subscriber waiting for all future publications.
    pub fn subscribe_all(&mut self, topic: T, tx: mpsc::UnboundedSender<T::Item>) {
        self.all_waiters.entry(topic).or_default().push(tx);
    }

    /// Publish once to all current subscribers, then clear them.
    ///
    /// Subscribers that already dropped are silently skipped.
    pub fn publish(&mut self, topic: &T, item: T::Item) {
        let waiters = self.once_waiters.remove(topic).unwrap_or_default();

        for tx in waiters {
            let _ = tx.send(item.clone());
        }

        let waiters = self.all_waiters.get(topic);

        if let Some(waiters) = waiters {
            for tx in waiters {
                let _ = tx.unbounded_send(item.clone());
            }
        }
    }

    // /// Remove all pending subscribers without publishing.
    // ///
    // /// Dropped [`oneshot::Sender`]s cause the corresponding receivers to see a closed channel.
    // pub fn clear(&mut self) {
    //     self.once_waiters.clear();
    // }

    // /// Remove all pending subscribers for a specific topic value.
    // pub fn clear_topic(&mut self, topic: &T) {
    //     self.once_waiters.remove(topic);
    // }
}