use super::EventHandler;
use crate::events::events::*;
use radiate_core::{Chromosome, Executor};
use std::sync::{Arc, Mutex};
type Subscriber<T> = Arc<Mutex<dyn EventHandler<T>>>;
#[derive(Clone)]
pub struct EventBus<T> {
handlers: Vec<Subscriber<T>>,
executor: Arc<Executor>,
}
impl<T> EventBus<T> {
pub fn new(executor: Arc<Executor>, handlers: Vec<Subscriber<T>>) -> Self {
EventBus { handlers, executor }
}
pub fn subscribe<H>(&mut self, handler: H)
where
H: EventHandler<T> + 'static,
{
self.handlers.push(Arc::new(Mutex::new(handler)));
}
pub fn publish<C>(&self, message: EngineMessage<C, T>)
where
C: Chromosome,
T: Clone + Send + Sync + 'static,
{
if self.handlers.is_empty() {
return;
}
let event = match message {
EngineMessage::Start => EngineEvent::new(EngineEventInner::Start),
EngineMessage::Stop(ctx) => EngineEvent::new(EngineEventInner::Stop(
ctx.index,
ctx.best.clone(),
ctx.metrics.clone(),
ctx.score.clone().unwrap_or_default(),
)),
EngineMessage::EpochStart(ctx) => {
EngineEvent::new(EngineEventInner::EpochStart(ctx.index))
}
EngineMessage::EpochEnd(ctx) => EngineEvent::new(EngineEventInner::EpochComplete(
ctx.index,
ctx.best.clone(),
ctx.metrics.clone(),
ctx.score.clone().unwrap_or_default(),
ctx.objective.clone(),
)),
EngineMessage::Improvement(ctx) => EngineEvent::new(EngineEventInner::Improvement(
ctx.index,
ctx.best.clone(),
ctx.score.clone().unwrap_or_default(),
)),
};
for handler in self.handlers.iter() {
let clone_handler = Arc::clone(handler);
let clone_event = event.clone();
self.executor.submit(move || {
clone_handler.lock().unwrap().handle(clone_event);
});
}
}
}