use std::{
any::{Any, TypeId},
collections::HashMap,
sync,
sync::Arc,
};
use reifydb_runtime::actor::{
context::Context,
mailbox::ActorRef,
system::ActorSystem,
traits::{Actor, Directive},
};
use sync::mpsc::Sender;
pub mod flow;
pub mod lifecycle;
#[macro_use]
pub mod r#macro;
pub mod metric;
pub mod procedure;
pub mod row;
pub mod store;
pub mod transaction;
type EventListenerInstaller = Box<dyn FnOnce(&mut HashMap<TypeId, Box<dyn EventListenerList>>) + Send>;
pub trait Event: Any + Send + Sync + Clone + 'static {
fn as_any(&self) -> &dyn Any;
fn into_any(self) -> Box<dyn Any + Send>;
}
pub trait EventListener<E>: Send + Sync + 'static
where
E: Event,
{
fn on(&self, event: &E);
}
trait EventListenerList: Any + Send + Sync {
fn on_any(&self, event: Box<dyn Any + Send>);
fn as_any_mut(&mut self) -> &mut dyn Any;
}
struct EventListenerListImpl<E> {
listeners: Vec<Arc<dyn EventListener<E>>>,
}
impl<E> EventListenerListImpl<E>
where
E: Event,
{
fn new() -> Self {
Self {
listeners: Vec::new(),
}
}
fn add(&mut self, listener: Arc<dyn EventListener<E>>) {
self.listeners.push(listener);
}
}
impl<E> EventListenerList for EventListenerListImpl<E>
where
E: Event,
{
fn on_any(&self, event: Box<dyn Any + Send>) {
if let Ok(event) = event.downcast::<E>() {
for listener in &self.listeners {
listener.on(&*event);
}
}
}
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
}
struct EventEnvelope {
type_id: TypeId,
event: Box<dyn Any + Send>,
}
enum EventBusMessage {
Emit(EventEnvelope),
Register {
installer: EventListenerInstaller,
},
WaitForCompletion(Sender<()>),
}
struct EventBusActor;
impl Actor for EventBusActor {
type State = HashMap<TypeId, Box<dyn EventListenerList>>;
type Message = EventBusMessage;
fn init(&self, _ctx: &Context<Self::Message>) -> Self::State {
HashMap::new()
}
fn handle(&self, state: &mut Self::State, msg: Self::Message, _ctx: &Context<Self::Message>) -> Directive {
match msg {
EventBusMessage::Emit(envelope) => {
if let Some(list) = state.get(&envelope.type_id) {
list.on_any(envelope.event);
}
}
EventBusMessage::Register {
installer,
} => {
installer(state);
}
EventBusMessage::WaitForCompletion(tx) => {
let _ = tx.send(());
}
}
Directive::Continue
}
}
#[derive(Clone)]
pub struct EventBus {
actor_ref: ActorRef<EventBusMessage>,
_actor_system: ActorSystem,
}
impl EventBus {
pub fn new(actor_system: &ActorSystem) -> Self {
let handle = actor_system.spawn("event-bus", EventBusActor);
Self {
actor_ref: handle.actor_ref().clone(),
_actor_system: actor_system.clone(),
}
}
pub fn register<E, L>(&self, listener: L)
where
E: Event,
L: EventListener<E>,
{
let type_id = TypeId::of::<E>();
let listener = Arc::new(listener);
let installer: EventListenerInstaller = Box::new(move |map| {
let list = map.entry(type_id).or_insert_with(|| Box::new(EventListenerListImpl::<E>::new()));
list.as_any_mut().downcast_mut::<EventListenerListImpl<E>>().unwrap().add(listener);
});
let _ = self.actor_ref.send(EventBusMessage::Register {
installer,
});
}
pub fn emit<E>(&self, event: E)
where
E: Event,
{
let type_id = TypeId::of::<E>();
let _ = self.actor_ref.send(EventBusMessage::Emit(EventEnvelope {
type_id,
event: event.into_any(),
}));
}
pub fn wait_for_completion(&self) {
let (tx, rx) = sync::mpsc::channel();
let _ = self.actor_ref.send(EventBusMessage::WaitForCompletion(tx));
let _ = rx.recv();
}
}
#[cfg(test)]
pub mod tests {
use std::{
sync::{Arc, Mutex},
thread,
};
use reifydb_runtime::{
actor::system::ActorSystem,
context::clock::Clock,
pool::{PoolConfig, Pools},
};
use crate::event::{Event, EventBus, EventListener};
fn test_actor_system() -> ActorSystem {
let pools = Pools::new(PoolConfig::default());
ActorSystem::new(pools, Clock::Real)
}
define_event! {
pub struct TestEvent{}
}
define_event! {
pub struct AnotherEvent{}
}
#[derive(Default, Debug, Clone)]
pub struct TestEventListener(Arc<TestHandlerInner>);
#[derive(Default, Debug)]
pub struct TestHandlerInner {
pub counter: Arc<Mutex<i32>>,
}
impl EventListener<TestEvent> for TestEventListener {
fn on(&self, _event: &TestEvent) {
let mut x = self.0.counter.lock().unwrap();
*x += 1;
}
}
impl EventListener<AnotherEvent> for TestEventListener {
fn on(&self, _event: &AnotherEvent) {
let mut x = self.0.counter.lock().unwrap();
*x *= 2;
}
}
#[test]
fn test_event_bus_new() {
let actor_system = test_actor_system();
let event_bus = EventBus::new(&actor_system);
event_bus.emit(TestEvent::new());
event_bus.wait_for_completion();
}
#[test]
fn test_register_single_listener() {
let actor_system = test_actor_system();
let event_bus = EventBus::new(&actor_system);
let listener = TestEventListener::default();
event_bus.register::<TestEvent, TestEventListener>(listener.clone());
event_bus.emit(TestEvent::new());
event_bus.wait_for_completion();
assert_eq!(*listener.0.counter.lock().unwrap(), 1);
}
#[test]
fn test_emit_unregistered_event() {
let actor_system = test_actor_system();
let event_bus = EventBus::new(&actor_system);
event_bus.emit(TestEvent::new());
event_bus.wait_for_completion();
}
#[test]
fn test_multiple_listeners_same_event() {
let actor_system = test_actor_system();
let event_bus = EventBus::new(&actor_system);
let listener1 = TestEventListener::default();
let listener2 = TestEventListener::default();
event_bus.register::<TestEvent, TestEventListener>(listener1.clone());
event_bus.register::<TestEvent, TestEventListener>(listener2.clone());
event_bus.emit(TestEvent::new());
event_bus.wait_for_completion();
assert_eq!(*listener1.0.counter.lock().unwrap(), 1);
assert_eq!(*listener2.0.counter.lock().unwrap(), 1);
}
#[test]
fn test_event_bus_clone() {
let actor_system = test_actor_system();
let event_bus1 = EventBus::new(&actor_system);
let listener = TestEventListener::default();
event_bus1.register::<TestEvent, TestEventListener>(listener.clone());
let event_bus2 = event_bus1.clone();
event_bus2.emit(TestEvent::new());
event_bus2.wait_for_completion();
assert_eq!(*listener.0.counter.lock().unwrap(), 1);
}
#[test]
fn test_concurrent_registration() {
let actor_system = test_actor_system();
let event_bus = Arc::new(EventBus::new(&actor_system));
let mut handles = Vec::new();
for _ in 0..10 {
let event_bus = event_bus.clone();
handles.push(thread::spawn(move || {
let listener = TestEventListener::default();
event_bus.register::<TestEvent, TestEventListener>(listener);
}));
}
for handle in handles {
handle.join().unwrap();
}
event_bus.emit(TestEvent::new());
event_bus.wait_for_completion();
}
#[test]
fn test_concurrent_emitting() {
let actor_system = test_actor_system();
let event_bus = Arc::new(EventBus::new(&actor_system));
let listener = TestEventListener::default();
event_bus.register::<TestEvent, TestEventListener>(listener.clone());
event_bus.wait_for_completion();
let mut handles = Vec::new();
for _ in 0..10 {
let event_bus = event_bus.clone();
handles.push(thread::spawn(move || {
event_bus.emit(TestEvent::new());
}));
}
for handle in handles {
handle.join().unwrap();
}
event_bus.wait_for_completion();
assert_eq!(*listener.0.counter.lock().unwrap(), 10);
}
define_event! {
pub struct MacroTestEvent {
pub value: i32,
}
}
#[test]
fn testine_event_macro() {
let event = MacroTestEvent::new(42);
let any_ref = event.as_any();
assert!(any_ref.downcast_ref::<MacroTestEvent>().is_some());
assert_eq!(any_ref.downcast_ref::<MacroTestEvent>().unwrap().value(), &42);
}
#[test]
fn test_multi_event_listener() {
let actor_system = test_actor_system();
let event_bus = EventBus::new(&actor_system);
let listener = TestEventListener::default();
event_bus.register::<TestEvent, TestEventListener>(listener.clone());
event_bus.register::<AnotherEvent, TestEventListener>(listener.clone());
event_bus.emit(TestEvent::new());
event_bus.wait_for_completion();
assert_eq!(*listener.0.counter.lock().unwrap(), 1);
event_bus.emit(TestEvent::new());
event_bus.wait_for_completion();
assert_eq!(*listener.0.counter.lock().unwrap(), 2);
event_bus.emit(AnotherEvent::new());
event_bus.wait_for_completion();
assert_eq!(*listener.0.counter.lock().unwrap(), 4); }
}