use std::sync::{Arc, Mutex, RwLock};
use crate::{BusEvent, Subscriber};
pub struct EventBusInternal<ContentType, TopicId: std::cmp::PartialEq + Clone> {
next_event_id: Arc<Mutex<usize>>,
subscribers: RwLock<Vec<Arc<Mutex<dyn Subscriber<ContentType, TopicId>>>>>,
publishers: RwLock<Vec<u64>>,
}
impl<ContentType, TopicId: std::cmp::PartialEq + Clone> EventBusInternal<ContentType, TopicId> {
pub fn new() -> Self {
Self {
next_event_id: Arc::new(Mutex::new(0)),
subscribers: RwLock::new(Vec::new()),
publishers: RwLock::new(Vec::new()),
}
}
pub fn add_subscriber_shared(
&self,
subscriber: Arc<Mutex<dyn Subscriber<ContentType, TopicId>>>,
) {
self.subscribers.write().unwrap().push(subscriber);
}
pub fn add_subscriber<S>(&self, subscriber: S)
where
S: Subscriber<ContentType, TopicId> + 'static, {
let subscriber = Arc::new(Mutex::new(subscriber));
self.subscribers.write().unwrap().push(subscriber);
}
pub fn register_publisher(&self, source_id: Option<u64>) -> Result<u64, &'static str> {
let mut publishers = self.publishers.write().unwrap();
let id = match source_id {
Some(id) => {
if publishers.contains(&id) {
return Err("Publisher with the same id already exists");
}
id
}
None => {
let mut id = 0;
while publishers.contains(&id) {
id += 1;
}
id
}
};
publishers.push(id);
Ok(id)
}
pub fn get_next_id(&self) -> usize {
let mut id = self.next_event_id.lock().unwrap();
*id += 1;
*id
}
pub fn publish(&self, event: ContentType, topic_id: Option<TopicId>, source_id: u64) {
let id = self.get_next_id(); let event_internal = BusEvent::new(id, source_id, topic_id.clone(), event);
for s in self.subscribers.read().unwrap().iter() {
if topic_id.is_some() {
let topic_id = topic_id.as_ref().unwrap();
if !s.lock().unwrap().is_subscribed_to(topic_id) {
continue;
}
{
let topics = s.lock().unwrap().get_subscribed_topics();
if let Some(topics) = topics {
if !topics.contains(event_internal.get_topic_id().as_ref().unwrap()) {
continue;
}
}
}
}
s.lock().unwrap().on_event(&event_internal);
}
}
}