Skip to main content

hyperlane_broadcast/broadcast_map/
impl.rs

1use crate::*;
2
3/// Implements the `BroadcastMapTrait` for any type that also implements `Clone` and `Debug`.
4/// This blanket implementation allows any clonable and debuggable type to be used as a value in the broadcast map system.
5impl<T: Clone + Debug> BroadcastMapTrait for T {}
6
7/// Provides a default implementation for `BroadcastMap` instances.
8///
9/// The default broadcast map is initialized as an empty `DashMap`.
10impl<T: BroadcastMapTrait> Default for BroadcastMap<T> {
11    /// Creates a new, empty `BroadcastMap` instance.
12    ///
13    /// # Returns
14    ///
15    /// - `BroadcastMap<T>` - An empty broadcast map.
16    #[inline(always)]
17    fn default() -> Self {
18        Self(DashMap::with_hasher(BuildHasherDefault::default()))
19    }
20}
21
22/// Implements core functionalities for the `BroadcastMap` struct.
23impl<T: BroadcastMapTrait> BroadcastMap<T> {
24    /// Creates a new, empty `BroadcastMap` instance.
25    ///
26    /// This is a convenience constructor that simply calls `default()`.
27    ///
28    /// # Returns
29    ///
30    /// - `BroadcastMap<T>` - An empty broadcast map.
31    #[inline(always)]
32    pub fn new() -> Self {
33        Self::default()
34    }
35
36    /// Retrieves an immutable reference to the underlying `DashMapStringBroadcast`.
37    ///
38    /// This private helper method provides direct access to the internal map.
39    ///
40    /// # Returns
41    ///
42    /// - `&DashMapStringBroadcast<T>` - Reference to the internal map.
43    #[inline(always)]
44    fn get(&self) -> &DashMapStringBroadcast<T> {
45        &self.0
46    }
47
48    /// Inserts a new broadcast channel into the map with a specified key and capacity.
49    ///
50    /// If a broadcast channel with the given key already exists, it will be replaced.
51    ///
52    /// # Arguments
53    ///
54    /// - `AsRef<str>` - Key convertible to `str`.
55    /// - `capacity` - Maximum number of buffered messages.
56    ///
57    /// # Returns
58    ///
59    /// - `Option<Broadcast<T>>` - Previous broadcast channel if replaced.
60    #[inline(always)]
61    pub fn insert<K>(&self, key: K, capacity: Capacity) -> Option<Broadcast<T>>
62    where
63        K: AsRef<str>,
64    {
65        let broadcast: Broadcast<T> = Broadcast::new(capacity);
66        self.get().insert(key.as_ref().to_owned(), broadcast)
67    }
68
69    /// Retrieves the number of active receivers for the broadcast channel associated with the given key.
70    ///
71    /// # Arguments
72    ///
73    /// - `AsRef<str>` - Key convertible to `str`.
74    ///
75    /// # Returns
76    ///
77    /// - `Option<ReceiverCount>` - Number of receivers if channel exists.
78    #[inline(always)]
79    pub fn receiver_count<K>(&self, key: K) -> Option<ReceiverCount>
80    where
81        K: AsRef<str>,
82    {
83        self.get()
84            .get(key.as_ref())
85            .map(|receiver: Ref<'_, String, Broadcast<T>>| receiver.receiver_count())
86    }
87
88    /// Subscribes a new receiver to the broadcast channel associated with the given key.
89    ///
90    /// # Arguments
91    ///
92    /// - `AsRef<str>` - Key convertible to `str`.
93    ///
94    /// # Returns
95    ///
96    /// - `Option<BroadcastReceiver<T>>` - New receiver if channel exists.
97    #[inline(always)]
98    pub fn subscribe<K>(&self, key: K) -> Option<BroadcastMapReceiver<T>>
99    where
100        K: AsRef<str>,
101    {
102        self.get()
103            .get(key.as_ref())
104            .map(|receiver: Ref<'_, String, Broadcast<T>>| receiver.subscribe())
105    }
106
107    /// Subscribes a new receiver to the broadcast channel associated with the given key.
108    /// If the channel does not exist, it will be created with the specified capacity before subscribing.
109    ///
110    /// # Arguments
111    ///
112    /// - `AsRef<str>` - Key convertible to `str`.
113    /// - `capacity` - Capacity for new channel if needed.
114    ///
115    /// # Returns
116    ///
117    /// - `BroadcastReceiver<T>` - New receiver for the channel.
118    #[inline(always)]
119    pub fn subscribe_or_insert<K>(&self, key: K, capacity: Capacity) -> BroadcastMapReceiver<T>
120    where
121        K: AsRef<str>,
122    {
123        let key_ref: &str = key.as_ref();
124        match self.get().get(key_ref) {
125            Some(sender) => sender.subscribe(),
126            None => {
127                self.insert(key_ref, capacity);
128                self.subscribe_or_insert(key_ref, capacity)
129            }
130        }
131    }
132
133    /// Attempts to send a message to the broadcast channel associated with the given key.
134    ///
135    /// # Arguments
136    ///
137    /// - `AsRef<str>` - Key convertible to `str`.
138    /// - `T` - Message to broadcast.
139    ///
140    /// # Returns
141    ///
142    /// - `Result<Option<ReceiverCount>, SendError<T>>` - Send result with receiver count or error.
143    #[inline(always)]
144    pub fn try_send<K>(&self, key: K, data: T) -> Result<Option<ReceiverCount>, SendError<T>>
145    where
146        K: AsRef<str>,
147    {
148        match self.get().get(key.as_ref()) {
149            Some(sender) => sender.send(data).map(Some),
150            None => Ok(None),
151        }
152    }
153
154    /// Sends a message to the broadcast channel associated with the given key.
155    ///
156    /// # Arguments
157    ///
158    /// - `AsRef<str>` - Key convertible to `str`.
159    /// - `T` - Message to broadcast.
160    ///
161    /// # Returns
162    ///
163    /// - `Option<ReceiverCount>` - The receiver count if the channel exists.
164    ///
165    /// # Panics
166    ///
167    /// Panics if the send operation fails (e.g., if the channel is closed).
168    #[inline(always)]
169    pub fn send<K>(&self, key: K, data: T) -> Option<ReceiverCount>
170    where
171        K: AsRef<str>,
172    {
173        self.try_send(key, data).unwrap()
174    }
175
176    /// Unsubscribes and removes the broadcast channel associated with the given key from the map.
177    ///
178    /// This operation effectively cancels all subscriptions to the channel by removing it from the map.
179    /// Any existing receivers will no longer receive new messages, and the channel will be dropped.
180    ///
181    /// # Arguments
182    ///
183    /// - `AsRef<str>` - Key convertible to `str`.
184    ///
185    /// # Returns
186    ///
187    /// - `Option<Broadcast<T>>` - The removed broadcast channel if it existed, or `None` if no channel was associated with the key.
188    #[inline(always)]
189    pub fn unsubscribe<K>(&self, key: K) -> Option<Broadcast<T>>
190    where
191        K: AsRef<str>,
192    {
193        self.get()
194            .remove(key.as_ref())
195            .map(|(_, broadcast): (String, Broadcast<T>)| broadcast)
196    }
197}