hyperlane-broadcast 2.0.3

hyperlane-broadcast is a lightweight and ergonomic wrapper over Tokio’s broadcast channel designed for easy-to-use publish-subscribe messaging in async Rust applications. It simplifies the native Tokio broadcast API by providing a straightforward interface for broadcasting messages to multiple subscribers with minimal boilerplate.
Documentation
use crate::*;

/// Implements the `BroadcastMapTrait` for any type that also implements `Clone` and `Debug`.
/// This blanket implementation allows any clonable and debuggable type to be used as a value in the broadcast map system.
impl<T: Clone + Debug> BroadcastMapTrait for T {}

/// Provides a default implementation for `BroadcastMap` instances.
///
/// The default broadcast map is initialized as an empty `DashMap`.
impl<T: BroadcastMapTrait> Default for BroadcastMap<T> {
    /// Creates a new, empty `BroadcastMap` instance.
    ///
    /// # Returns
    ///
    /// - `BroadcastMap<T>` - An empty broadcast map.
    #[inline(always)]
    fn default() -> Self {
        Self(DashMap::with_hasher(BuildHasherDefault::default()))
    }
}

/// Implements core functionalities for the `BroadcastMap` struct.
impl<T: BroadcastMapTrait> BroadcastMap<T> {
    /// Creates a new, empty `BroadcastMap` instance.
    ///
    /// This is a convenience constructor that simply calls `default()`.
    ///
    /// # Returns
    ///
    /// - `BroadcastMap<T>` - An empty broadcast map.
    #[inline(always)]
    pub fn new() -> Self {
        Self::default()
    }

    /// Retrieves an immutable reference to the underlying `DashMapStringBroadcast`.
    ///
    /// This private helper method provides direct access to the internal map.
    ///
    /// # Returns
    ///
    /// - `&DashMapStringBroadcast<T>` - Reference to the internal map.
    #[inline(always)]
    fn get(&self) -> &DashMapStringBroadcast<T> {
        &self.0
    }

    /// Inserts a new broadcast channel into the map with a specified key and capacity.
    ///
    /// If a broadcast channel with the given key already exists, it will be replaced.
    ///
    /// # Arguments
    ///
    /// - `AsRef<str>` - Key convertible to `str`.
    /// - `capacity` - Maximum number of buffered messages.
    ///
    /// # Returns
    ///
    /// - `Option<Broadcast<T>>` - Previous broadcast channel if replaced.
    #[inline(always)]
    pub fn insert<K>(&self, key: K, capacity: Capacity) -> Option<Broadcast<T>>
    where
        K: AsRef<str>,
    {
        let broadcast: Broadcast<T> = Broadcast::new(capacity);
        self.get().insert(key.as_ref().to_owned(), broadcast)
    }

    /// Retrieves the number of active receivers for the broadcast channel associated with the given key.
    ///
    /// # Arguments
    ///
    /// - `AsRef<str>` - Key convertible to `str`.
    ///
    /// # Returns
    ///
    /// - `Option<ReceiverCount>` - Number of receivers if channel exists.
    #[inline(always)]
    pub fn receiver_count<K>(&self, key: K) -> Option<ReceiverCount>
    where
        K: AsRef<str>,
    {
        self.get()
            .get(key.as_ref())
            .map(|receiver: Ref<'_, String, Broadcast<T>>| receiver.receiver_count())
    }

    /// Subscribes a new receiver to the broadcast channel associated with the given key.
    ///
    /// # Arguments
    ///
    /// - `AsRef<str>` - Key convertible to `str`.
    ///
    /// # Returns
    ///
    /// - `Option<BroadcastReceiver<T>>` - New receiver if channel exists.
    #[inline(always)]
    pub fn subscribe<K>(&self, key: K) -> Option<BroadcastMapReceiver<T>>
    where
        K: AsRef<str>,
    {
        self.get()
            .get(key.as_ref())
            .map(|receiver: Ref<'_, String, Broadcast<T>>| receiver.subscribe())
    }

    /// Subscribes a new receiver to the broadcast channel associated with the given key.
    /// If the channel does not exist, it will be created with the specified capacity before subscribing.
    ///
    /// # Arguments
    ///
    /// - `AsRef<str>` - Key convertible to `str`.
    /// - `capacity` - Capacity for new channel if needed.
    ///
    /// # Returns
    ///
    /// - `BroadcastReceiver<T>` - New receiver for the channel.
    #[inline(always)]
    pub fn subscribe_or_insert<K>(&self, key: K, capacity: Capacity) -> BroadcastMapReceiver<T>
    where
        K: AsRef<str>,
    {
        let key_ref: &str = key.as_ref();
        match self.get().get(key_ref) {
            Some(sender) => sender.subscribe(),
            None => {
                self.insert(key_ref, capacity);
                self.subscribe_or_insert(key_ref, capacity)
            }
        }
    }

    /// Attempts to send a message to the broadcast channel associated with the given key.
    ///
    /// # Arguments
    ///
    /// - `AsRef<str>` - Key convertible to `str`.
    /// - `T` - Message to broadcast.
    ///
    /// # Returns
    ///
    /// - `Result<Option<ReceiverCount>, SendError<T>>` - Send result with receiver count or error.
    #[inline(always)]
    pub fn try_send<K>(&self, key: K, data: T) -> Result<Option<ReceiverCount>, SendError<T>>
    where
        K: AsRef<str>,
    {
        match self.get().get(key.as_ref()) {
            Some(sender) => sender.send(data).map(Some),
            None => Ok(None),
        }
    }

    /// Sends a message to the broadcast channel associated with the given key.
    ///
    /// # Arguments
    ///
    /// - `AsRef<str>` - Key convertible to `str`.
    /// - `T` - Message to broadcast.
    ///
    /// # Returns
    ///
    /// - `Option<ReceiverCount>` - The receiver count if the channel exists.
    ///
    /// # Panics
    ///
    /// Panics if the send operation fails (e.g., if the channel is closed).
    #[inline(always)]
    pub fn send<K>(&self, key: K, data: T) -> Option<ReceiverCount>
    where
        K: AsRef<str>,
    {
        self.try_send(key, data).unwrap()
    }

    /// Unsubscribes and removes the broadcast channel associated with the given key from the map.
    ///
    /// This operation effectively cancels all subscriptions to the channel by removing it from the map.
    /// Any existing receivers will no longer receive new messages, and the channel will be dropped.
    ///
    /// # Arguments
    ///
    /// - `AsRef<str>` - Key convertible to `str`.
    ///
    /// # Returns
    ///
    /// - `Option<Broadcast<T>>` - The removed broadcast channel if it existed, or `None` if no channel was associated with the key.
    #[inline(always)]
    pub fn unsubscribe<K>(&self, key: K) -> Option<Broadcast<T>>
    where
        K: AsRef<str>,
    {
        self.get()
            .remove(key.as_ref())
            .map(|(_, broadcast): (String, Broadcast<T>)| broadcast)
    }
}