use crate::{
closable_trait::ClosableMessage,
error::{NotifierError, UnexpectedErrorKind},
unexpected,
writing_handler::WritingHandler,
};
use smart_channel::channel;
pub use smart_channel::{Receiver, Sender};
use std::{collections::HashMap, hash::Hash, sync::Arc};
pub(crate) const NOTIFIER_CHANNEL_SIZE: usize = 10;
#[derive(Clone, Copy, Eq, PartialEq, Debug)]
pub enum ChannelState {
Uninitialised,
Running,
Over,
}
#[derive(Clone, Copy, Eq, PartialEq, Debug)]
pub struct SmartChannelId {
pub(crate) channel_counter: usize,
pub(crate) notifier_address: usize,
}
pub type DeadSender<M> = MessageSender<M>;
type Waiter<T> = Receiver<T, SmartChannelId>;
type NotificationSender<T> = Sender<T, SmartChannelId>;
pub type MessageSender<M> = Sender<M, SmartChannelId>;
pub type MessageReceiver<M> = Receiver<M, SmartChannelId>;
pub type DestructionWaiter<M> = Receiver<DeadSender<M>, SmartChannelId>;
type DestructionSender<M> = Sender<DeadSender<M>, SmartChannelId>;
pub type CreationWaiter = Receiver<(), SmartChannelId>;
type CreationSender = Sender<(), SmartChannelId>;
pub struct NotifierHub<M, ChannelId: Eq + Hash> {
connection_id: usize,
senders: HashMap<ChannelId, Vec<MessageSender<M>>>,
creation_senders: HashMap<ChannelId, Vec<CreationSender>>,
destruction_senders: HashMap<ChannelId, Vec<DestructionSender<M>>>,
}
macro_rules! get_senders {
($center:expr, $id:expr) => {
$center.senders.get(&$id).unwrap_or(&Vec::new())
};
}
impl<M, ChannelId: Eq + Hash> Default for NotifierHub<M, ChannelId> {
fn default() -> Self {
Self::new()
}
}
impl<M, ChannelId: Eq + Hash> NotifierHub<M, ChannelId> {
pub fn new() -> Self {
NotifierHub {
connection_id: 0,
senders: HashMap::new(),
creation_senders: HashMap::new(),
destruction_senders: HashMap::new(),
}
}
fn get_new_id(&mut self) -> SmartChannelId {
let channel_counter = self.connection_id;
self.connection_id += 1;
SmartChannelId {
notifier_address: (self as *const NotifierHub<M, ChannelId>) as usize,
channel_counter,
}
}
fn notify<T: Send + Clone>(
id: &ChannelId,
m: T,
map: &HashMap<ChannelId, Vec<NotificationSender<T>>>,
) -> WritingHandler<T> {
if let Some(waiters) = map.get(id) {
WritingHandler::new_cloning_broadcast(m, waiters)
} else {
WritingHandler::empty()
}
}
fn notify_creation(&mut self, id: &ChannelId) -> WritingHandler<()> {
Self::notify(id, (), &self.creation_senders)
}
pub fn is_subscribed(&self, channel: &ChannelId, receiver: &MessageReceiver<M>) -> bool {
match self.channel_state(channel) {
ChannelState::Running => get_senders!(self, channel)
.iter()
.any(|s| s.is_bound_to(receiver)),
_ => false,
}
}
pub fn number_of_waiter<T>(id: &ChannelId, map: &HashMap<ChannelId, Vec<T>>) -> usize {
match map.get(id) {
Some(w) => w.len(),
None => 0,
}
}
pub fn number_of_creation_waiter(&self, id: &ChannelId) -> usize {
Self::number_of_waiter(id, &self.creation_senders)
}
pub fn number_of_destruction_waiter(&self, id: &ChannelId) -> usize {
Self::number_of_waiter(id, &self.destruction_senders)
}
pub fn channel_state(&self, id: &ChannelId) -> ChannelState {
match self.senders.get(id) {
Some(s) if !s.is_empty() => ChannelState::Running,
Some(_) => ChannelState::Over,
None => ChannelState::Uninitialised,
}
}
pub fn channel_number_subscriber(&self, id: &ChannelId) -> usize {
match self.channel_state(id) {
ChannelState::Over | ChannelState::Uninitialised => 0,
ChannelState::Running => get_senders!(self, id).len(),
}
}
pub fn clean_channel(&mut self, channel: &ChannelId) -> ChannelState {
let senders = match self.senders.get_mut(channel) {
Some(s) => s,
None => return ChannelState::Uninitialised,
};
senders.retain(|s| !s.is_closed());
if senders.is_empty() {
ChannelState::Over
} else {
ChannelState::Running
}
}
}
impl<M, ChannelId> NotifierHub<Arc<M>, ChannelId>
where
M: Send + Sync + 'static,
ChannelId: Eq + Hash + Clone,
{
pub fn broadcast_arc(&self, msg: M) -> WritingHandler<Arc<M>> {
let senders: Vec<_> = self
.senders
.values()
.flat_map(|s| s.iter().cloned())
.collect();
WritingHandler::new_arc_broadcast(msg, &senders)
}
pub fn arc_send(
&self,
msg: M,
id: &ChannelId,
) -> Result<WritingHandler<Arc<M>>, NotifierError<Arc<M>, ChannelId>> {
match self.channel_state(id) {
ChannelState::Running => Ok(WritingHandler::new_arc_broadcast(
msg,
get_senders!(self, id),
)),
ChannelState::Over => Ok(WritingHandler::empty()),
ChannelState::Uninitialised => Err(NotifierError::ChannelUninitialized(id.clone())),
}
}
}
impl<M, ChannelId> NotifierHub<M, ChannelId>
where
M: Send + Clone + 'static,
ChannelId: Eq + Hash + Clone,
{
fn notify_destruction(
&mut self,
id: &ChannelId,
dead_sender: DeadSender<M>,
) -> WritingHandler<DeadSender<M>> {
Self::notify(id, dead_sender, &self.destruction_senders)
}
pub fn unsubscribe_all(&mut self, receiver: &MessageReceiver<M>) -> Vec<ChannelId> {
let sub_list = self.subscribed_list(receiver);
if !sub_list.is_empty() {
let _ = self.unsubscribe_multiple(&sub_list, receiver); }
sub_list
}
pub fn unsubscribe(
&mut self,
id: &ChannelId,
receiver: &MessageReceiver<M>,
) -> Result<ChannelState, NotifierError<M, ChannelId>> {
match self.channel_state(id) {
ChannelState::Running => {
if !self.is_subscribed(id, receiver) {
return Err(NotifierError::NotSubscribed(id.clone()));
}
match self.senders.get_mut(id) {
Some(senders) => {
let sender = match senders.iter().find(|s| s.is_bound_to(receiver)).cloned()
{
Some(s) => s,
None => unexpected!(SenderIsMissing),
};
senders.retain(|sender| !sender.is_bound_to(receiver));
self.notify_destruction(id, sender);
Ok(self.channel_state(id))
}
None => unexpected!(InvalidChannelStateUnsubscribe), }
}
_ => Err(NotifierError::NotSubscribed(id.clone())),
}
}
pub fn unsubscribe_multiple(
&mut self,
ids: &[ChannelId],
receiver: &MessageReceiver<M>,
) -> Result<(), NotifierError<M, ChannelId>> {
let mut errors = Vec::new();
for id in ids {
if let Err(e) = self.unsubscribe(id, receiver) {
errors.push(e)
}
}
if errors.is_empty() {
Ok(())
} else {
Err(NotifierError::NotSubscribedMultiple(errors))
}
}
pub fn broadcast_clone(&self, msg: M) -> WritingHandler<M> {
let senders: Vec<_> = self
.senders
.values()
.flat_map(|s| s.iter().cloned())
.collect();
WritingHandler::new_cloning_broadcast(msg, &senders)
}
pub fn clone_send(
&self,
msg: M,
id: &ChannelId,
) -> Result<WritingHandler<M>, NotifierError<M, ChannelId>> {
match self.channel_state(id) {
ChannelState::Running => Ok(WritingHandler::new_cloning_broadcast(
msg,
get_senders!(self, id),
)),
ChannelState::Over => Ok(WritingHandler::empty()),
ChannelState::Uninitialised => Err(NotifierError::ChannelUninitialized(id.clone())),
}
}
}
impl<M, ChannelId: Eq + Hash + Clone> NotifierHub<M, ChannelId> {
pub fn get_channels(&self) -> Vec<ChannelId> {
self.senders.keys().cloned().collect()
}
pub fn clean_all(&mut self) -> HashMap<ChannelId, ChannelState> {
let mut map = HashMap::with_capacity(self.senders.len());
for id in self.senders.keys().cloned().collect::<Vec<_>>() {
map.insert(id.clone(), self.clean_channel(&id));
}
map
}
pub fn subscribe(&mut self, id: &ChannelId, channel_size: usize) -> MessageReceiver<M> {
let (sender, receiver) = channel(channel_size, self.get_new_id());
self.insert_sender(sender, id);
receiver
}
fn insert_sender(&mut self, sender: MessageSender<M>, id: &ChannelId) {
match self.senders.get_mut(id) {
Some(senders) => senders.push(sender),
None => {
self.senders.insert(id.clone(), vec![sender]);
}
}
let _ = self.notify_creation(id);
}
pub fn subscribed_list(&self, receiver: &MessageReceiver<M>) -> Vec<ChannelId> {
self.senders
.keys()
.filter(|id| self.is_subscribed(id, receiver))
.cloned()
.collect()
}
pub fn get_waiter<T>(
channel_id: SmartChannelId,
id: &ChannelId,
map: &mut HashMap<ChannelId, Vec<NotificationSender<T>>>,
) -> Waiter<T> {
let (sender, receiver) = channel(NOTIFIER_CHANNEL_SIZE, channel_id);
match map.get_mut(id) {
Some(s) => s.push(sender),
None => {
map.insert(id.clone(), vec![sender]);
}
}
receiver
}
pub fn get_creation_waiter(&mut self, id: &ChannelId) -> CreationWaiter {
Self::get_waiter(self.get_new_id(), id, &mut self.creation_senders)
}
pub fn get_destruction_waiter(&mut self, id: &ChannelId) -> DestructionWaiter<M> {
Self::get_waiter(self.get_new_id(), id, &mut self.destruction_senders)
}
}
impl<M: Clone, ChannelId: Eq + Hash + Clone> NotifierHub<M, ChannelId> {
pub fn subscribe_multiple(
&mut self,
ids: &[ChannelId],
channel_size: usize,
) -> MessageReceiver<M> {
let (sender, receiver) = channel(channel_size, self.get_new_id());
for id in ids {
self.insert_sender(sender.clone(), id);
}
receiver
}
pub fn get_sender(
&self,
channel: &ChannelId,
receiver: &MessageReceiver<M>,
) -> Option<MessageSender<M>> {
self.senders
.get(channel)
.and_then(|senders| senders.iter().find(|s| s.is_bound_to(receiver)).cloned())
}
pub fn get_senders(
&self,
receiver: &MessageReceiver<M>,
channel: &[ChannelId],
) -> HashMap<ChannelId, MessageSender<M>> {
channel
.iter()
.filter_map(|id| {
self.get_sender(id, receiver)
.map(|sender| (id.clone(), sender))
})
.collect()
}
}
impl<M, ChannelId> NotifierHub<M, ChannelId>
where
M: Send + 'static + Clone + ClosableMessage,
ChannelId: Eq + Hash + Clone + Clone,
{
pub fn shutdown_clone(
&mut self,
channel: &ChannelId,
) -> Result<WritingHandler<M>, NotifierError<M, ChannelId>> {
match self.senders.remove(&channel) {
Some(dead_senders) => {
for dead_sender in dead_senders.iter() {
self.notify_destruction(channel, dead_sender.clone());
}
let h =
WritingHandler::new_cloning_broadcast(M::get_close_message(), &dead_senders);
Ok(h)
}
None => Err(NotifierError::ChannelNotExist(channel.clone())),
}
}
pub fn shutdown_all_clone(&mut self) {
let channels = self.get_channels();
for channel in channels {
let _ = self.shutdown_clone(&channel); }
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::notifier::ChannelState;
use smart_channel::channel;
#[tokio::test]
async fn test_empty_notifier_hub() {
let hub: NotifierHub<String, &'static str> = NotifierHub::new();
assert_eq!(hub.channel_state(&"channel1"), ChannelState::Uninitialised);
assert_eq!(hub.channel_number_subscriber(&"channel1"), 0);
assert_eq!(hub.number_of_creation_waiter(&"channel1"), 0);
}
#[tokio::test]
async fn test_unique_channel_ids() {
let mut hub: NotifierHub<String, &'static str> = NotifierHub::new();
let id1 = hub.get_new_id();
let id2 = hub.get_new_id();
let id3 = hub.get_new_id();
assert_ne!(id1, id2);
assert_ne!(id1, id3);
assert_ne!(id2, id3);
}
#[tokio::test]
async fn test_is_subscribed() {
let mut hub: NotifierHub<String, &'static str> = NotifierHub::new();
let (sender, receiver) = channel(10, hub.get_new_id());
hub.senders.insert("channel1", vec![sender.clone()]);
assert!(hub.is_subscribed(&"channel1", &receiver));
}
#[tokio::test]
async fn test_channel_number_subscriber() {
let mut hub: NotifierHub<String, &'static str> = NotifierHub::new();
let (sender1, _receiver1) = channel(10, hub.get_new_id());
let (sender2, _receiver2) = channel(10, hub.get_new_id());
hub.senders.insert("channel1", vec![sender1, sender2]);
assert_eq!(hub.channel_number_subscriber(&"channel1"), 2);
}
#[tokio::test]
async fn test_notify_creation() {
let mut hub: NotifierHub<String, &'static str> = NotifierHub::new();
let (waiter_sender, mut waiter_receiver) = channel(10, hub.get_new_id());
hub.creation_senders.insert("channel1", vec![waiter_sender]);
let handler = hub.notify_creation(&"channel1");
let result = handler.wait(None).await;
assert!(result.is_ok());
assert!(waiter_receiver.recv().await.is_some()); }
#[tokio::test]
async fn test_number_of_waiter() {
let mut hub: NotifierHub<String, &'static str> = NotifierHub::new();
let (waiter1, _) = channel(10, hub.get_new_id());
let (waiter2, _) = channel(10, hub.get_new_id());
hub.creation_senders
.insert("channel1", vec![waiter1, waiter2]);
assert_eq!(hub.number_of_creation_waiter(&"channel1"), 2);
}
#[tokio::test]
async fn test_channel_state_transitions() {
let mut hub: NotifierHub<String, &'static str> = NotifierHub::new();
assert_eq!(hub.channel_state(&"channel1"), ChannelState::Uninitialised);
let (sender, _receiver) = channel(10, hub.get_new_id());
hub.senders.insert("channel1", vec![sender]);
assert_eq!(hub.channel_state(&"channel1"), ChannelState::Running);
hub.clean_channel(&"channel1"); assert_eq!(hub.channel_state(&"channel1"), ChannelState::Running);
hub.senders.get_mut("channel1").unwrap().clear(); assert_eq!(hub.channel_state(&"channel1"), ChannelState::Over);
}
#[tokio::test]
async fn test_clean_channel() {
let mut hub: NotifierHub<String, &'static str> = NotifierHub::new();
let (sender, _) = channel(10, hub.get_new_id());
hub.senders.insert("channel1", vec![sender]);
assert_eq!(hub.channel_state(&"channel1"), ChannelState::Running);
hub.clean_channel(&"channel1"); assert_eq!(hub.channel_state(&"channel1"), ChannelState::Over); }
#[tokio::test]
async fn test_clean_all() {
let mut hub: NotifierHub<String, &'static str> = NotifierHub::new();
let (sender1, _) = channel(10, hub.get_new_id());
let (sender2, _receiver2) = channel(10, hub.get_new_id());
hub.senders.insert("channel1", vec![sender1.clone()]);
hub.senders.insert("channel2", vec![sender2.clone()]);
assert_eq!(hub.channel_state(&"channel1"), ChannelState::Running);
assert_eq!(hub.channel_state(&"channel2"), ChannelState::Running);
let cleaned_states = hub.clean_all();
assert_eq!(cleaned_states.get(&"channel1"), Some(&ChannelState::Over));
assert_eq!(
cleaned_states.get(&"channel2"),
Some(&ChannelState::Running)
);
}
#[tokio::test]
async fn test_subscribe() {
let mut hub: NotifierHub<String, &'static str> = NotifierHub::new();
let (waiter, mut wait_receiver) = channel(10, hub.get_new_id());
hub.creation_senders.insert("channel1", vec![waiter]);
let receiver = hub.subscribe(&"channel1", 100);
assert_eq!(hub.channel_state(&"channel1"), ChannelState::Running);
assert!(hub.is_subscribed(&"channel1", &receiver));
assert!(hub.channel_number_subscriber(&"channel1") == 1);
assert!(wait_receiver.recv().await == Some(()))
}
#[tokio::test]
async fn test_subscribed_list() {
let mut hub: NotifierHub<String, &'static str> = NotifierHub::new();
let receiver = hub.subscribe(&"channel1", 100);
hub.subscribe(&"channel2", 100);
let subscribed_channels = hub.subscribed_list(&receiver);
assert!(subscribed_channels == vec!("channel1"));
}
#[tokio::test]
async fn test_unsubscribe() {
let mut hub: NotifierHub<String, &'static str> = NotifierHub::new();
let receiver = hub.subscribe(&"channel1", 100);
let result = hub.unsubscribe(&"channel1", &receiver);
assert!(result.is_ok());
assert_eq!(hub.channel_state(&"channel1"), ChannelState::Over);
let invalid_result = hub.unsubscribe(&"channel1", &receiver);
assert!(matches!(
invalid_result,
Err(NotifierError::NotSubscribed("channel1"))
));
}
#[tokio::test]
async fn test_unsubscribe_multiple() {
let mut hub: NotifierHub<String, &'static str> = NotifierHub::new();
let receiver = hub.subscribe(&"channel1", 100);
hub.subscribe(&"channel2", 100);
let result = hub.unsubscribe_multiple(&["channel1", "channel2"], &receiver);
match result {
Ok(()) => panic!(),
Err(NotifierError::NotSubscribedMultiple(errors)) => assert!(
errors.len() == 1 && matches!(errors[0], NotifierError::NotSubscribed("channel2"))
),
_ => panic!("Unexpected error"),
}
assert!(!hub.is_subscribed(&"channel1", &receiver));
assert_eq!(hub.channel_state(&"channel1"), ChannelState::Over);
assert_eq!(hub.channel_state(&"channel2"), ChannelState::Running);
}
#[tokio::test]
async fn test_get_creation_waiter() {
let mut hub: NotifierHub<String, &'static str> = NotifierHub::new();
let mut waiter = hub.get_creation_waiter(&"channel1");
let _ = hub.subscribe(&"channel1", 100);
assert!(waiter.recv().await.is_some());
}
#[tokio::test]
async fn test_subscribe_multiple() {
let mut hub: NotifierHub<String, &'static str> = NotifierHub::new();
let receiver = hub.subscribe_multiple(&["channel1", "channel2"], 100);
assert!(hub.is_subscribed(&"channel1", &receiver));
assert!(hub.is_subscribed(&"channel2", &receiver));
}
#[tokio::test]
async fn test_get_sender() {
let mut hub: NotifierHub<String, &'static str> = NotifierHub::new();
let receiver = hub.subscribe(&"channel1", 100);
let sender = hub.get_sender(&"channel1", &receiver);
assert!(sender.is_some());
let nonexistent_sender = hub.get_sender(&"channel2", &receiver);
assert!(nonexistent_sender.is_none());
}
#[tokio::test]
async fn test_get_senders() {
let mut hub: NotifierHub<String, &'static str> = NotifierHub::new();
let receiver = hub.subscribe_multiple(&["channel1", "channel2"], 100);
let senders = hub.get_senders(&receiver, &["channel1", "channel2"]);
assert_eq!(senders.len(), 2);
assert!(senders.contains_key(&"channel1"));
assert!(senders.contains_key(&"channel2"));
let empty_senders = hub.get_senders(&receiver, &["channel3", "channel1"]);
assert!(empty_senders.len() == 1);
}
#[tokio::test]
async fn test_unsubscribe_all_multiple_channels() {
let mut hub: NotifierHub<String, &'static str> = NotifierHub::new();
let _receiver1 = hub.subscribe(&"channel1", 100);
let _receiver2 = hub.subscribe(&"channel2", 100);
let _receiver3 = hub.subscribe(&"channel3", 100);
let receiver = hub.subscribe_multiple(&["channel1", "channel2", "channel3"], 100);
let unsubscribed_channels = hub.unsubscribe_all(&receiver);
assert_eq!(unsubscribed_channels.len(), 3);
assert!(!hub.is_subscribed(&"channel1", &receiver));
assert!(!hub.is_subscribed(&"channel2", &receiver));
assert!(!hub.is_subscribed(&"channel3", &receiver));
}
#[tokio::test]
async fn test_broadcast_arc() {
let mut hub: NotifierHub<Arc<String>, &'static str> = NotifierHub::new();
let receiver1 = hub.subscribe_multiple(&["channel1", &"channel2"], 100);
let _receiver2 = hub.subscribe(&"channel3", 100);
let msg = "Hello ARC broadcast!".to_string();
let handler = hub.broadcast_arc(msg.clone());
assert_eq!(handler.len(), 3);
hub.unsubscribe_all(&receiver1);
let handler_after_drop = hub.broadcast_arc(msg.clone());
assert_eq!(handler_after_drop.len(), 1);
}
#[tokio::test]
async fn test_arc_send() {
let mut hub: NotifierHub<Arc<String>, &'static str> = NotifierHub::new();
let receiver = hub.subscribe(&"channel1", 100);
let msg = "Hello ARC send!".to_string();
let handlers = hub.arc_send(msg, &"channel1").unwrap();
assert_eq!(handlers.len(), 1);
let msg = "Message to no channel".to_string();
let uninitialised_result = hub.arc_send(msg, &"channel2");
assert!(matches!(
uninitialised_result,
Err(NotifierError::ChannelUninitialized("channel2"))
));
hub.unsubscribe(&"channel1", &receiver).unwrap();
hub.clean_channel(&"channel1");
let msg = "Message to nobody".to_string();
let closed_result = hub.arc_send(msg, &"channel1");
assert_eq!(closed_result.unwrap().len(), 0);
}
#[tokio::test]
async fn test_clone_send() {
let mut hub = NotifierHub::new();
let receiver = hub.subscribe(&"channel1", 100);
let msg = "Message !".to_string();
let handler = hub.clone_send(msg.clone(), &"channel1").unwrap();
handler.wait(None).await.unwrap();
let uninitialised_result = hub.clone_send("No such channel".to_string(), &"channel2");
assert!(matches!(
uninitialised_result,
Err(NotifierError::ChannelUninitialized("channel2"))
));
hub.unsubscribe(&"channel1", &receiver).unwrap();
hub.clean_channel(&"channel1");
let closed_result = hub.clone_send(msg, &"channel1");
assert_eq!(closed_result.unwrap().len(), 0);
}
#[tokio::test]
async fn test_broadcast_clone() {
let mut hub: NotifierHub<String, &'static str> = NotifierHub::new();
let mut receiver1 = hub.subscribe(&"channel1", 100);
let mut receiver2 = hub.subscribe(&"channel2", 100);
let msg = "Clone broadcast message".to_string();
let handler = hub.broadcast_clone(msg.clone());
assert_eq!(handler.len(), 2);
assert_eq!(
receiver1.recv().await.unwrap(),
"Clone broadcast message".to_string()
);
assert_eq!(
receiver2.recv().await.unwrap(),
"Clone broadcast message".to_string()
);
}
#[tokio::test]
async fn test_get_destruction_sender() {
let mut hub: NotifierHub<String, &'static str> = NotifierHub::new();
let mut destruction_waiter = hub.get_destruction_waiter(&"channel1");
let mut receiver = hub.subscribe(&"channel1", 100);
assert_eq!(hub.channel_state(&"channel1"), ChannelState::Running);
let result = hub.unsubscribe(&"channel1", &receiver);
assert!(result.is_ok());
assert_eq!(hub.channel_state(&"channel1"), ChannelState::Over);
let dead_sender = destruction_waiter.recv().await.unwrap();
let msg = "Dead Message".to_string();
dead_sender.send(msg.clone()).await.unwrap();
assert_eq!(receiver.recv().await.unwrap(), msg);
let invalid_result = hub.unsubscribe(&"channel1", &receiver);
assert!(matches!(
invalid_result,
Err(NotifierError::NotSubscribed("channel1"))
));
}
#[tokio::test]
async fn test_get_channels() {
let mut hub: NotifierHub<String, &'static str> = NotifierHub::new();
hub.subscribe(&"channel1", 100);
hub.subscribe(&"channel2", 100);
hub.subscribe(&"channel3", 100);
let channels = hub.get_channels();
assert_eq!(channels.len(), 3);
assert!(channels.contains(&"channel1"));
assert!(channels.contains(&"channel2"));
assert!(channels.contains(&"channel3"));
}
}
#[cfg(test)]
mod shutdown_tests {
use super::*;
impl ClosableMessage for String {
fn get_close_message() -> Self {
"CLOSE_MESSAGE".to_string()
}
}
#[tokio::test]
async fn test_shutdown_clone() {
let mut hub: NotifierHub<String, &'static str> = NotifierHub::new();
let mut receiver1 = hub.subscribe(&"channel1", 100);
let mut receiver2 = hub.subscribe(&"channel1", 100);
let _ = hub.subscribe(&"channel2", 100);
assert_eq!(hub.channel_state(&"channel1"), ChannelState::Running);
assert_eq!(hub.channel_state(&"channel2"), ChannelState::Running);
let handler = hub.shutdown_clone(&"channel1").unwrap();
assert_eq!(handler.len(), 2);
assert_eq!(receiver1.recv().await.unwrap(), "CLOSE_MESSAGE");
assert_eq!(receiver2.recv().await.unwrap(), "CLOSE_MESSAGE");
assert_eq!(hub.channel_state(&"channel1"), ChannelState::Uninitialised);
assert_eq!(hub.channel_state(&"channel2"), ChannelState::Running);
let nonexistent_result = hub.shutdown_clone(&"channel3");
assert!(matches!(
nonexistent_result,
Err(NotifierError::ChannelNotExist("channel3"))
));
}
#[tokio::test]
async fn test_shutdown_all_clone() {
let mut hub: NotifierHub<String, &'static str> = NotifierHub::new();
let mut receiver1 = hub.subscribe(&"channel1", 100);
let mut receiver2 = hub.subscribe(&"channel2", 100);
let mut receiver3 = hub.subscribe(&"channel3", 100);
assert_eq!(hub.channel_state(&"channel1"), ChannelState::Running);
assert_eq!(hub.channel_state(&"channel2"), ChannelState::Running);
assert_eq!(hub.channel_state(&"channel3"), ChannelState::Running);
hub.shutdown_all_clone();
assert_eq!(hub.channel_state(&"channel1"), ChannelState::Uninitialised);
assert_eq!(hub.channel_state(&"channel2"), ChannelState::Uninitialised);
assert_eq!(hub.channel_state(&"channel3"), ChannelState::Uninitialised);
assert_eq!(receiver1.recv().await.unwrap(), "CLOSE_MESSAGE");
assert_eq!(receiver2.recv().await.unwrap(), "CLOSE_MESSAGE");
assert_eq!(receiver3.recv().await.unwrap(), "CLOSE_MESSAGE");
}
}