pub struct InMemoryEventBus<T> {
queue: std::cell::RefCell<std::vec::Vec<crate::ports::events::CloudEventsEnvelope<T>>>,
handlers: std::cell::RefCell<
std::collections::HashMap<
std::string::String,
std::boxed::Box<dyn Fn(crate::ports::events::CloudEventsEnvelope<T>) -> crate::HexResult<()>>,
>,
>,
topic: std::string::String,
}
impl<T> InMemoryEventBus<T> {
pub fn new() -> Self {
Self {
queue: std::cell::RefCell::new(std::vec::Vec::new()),
handlers: std::cell::RefCell::new(std::collections::HashMap::new()),
topic: std::string::String::from("default.events"),
}
}
pub fn with_topic(topic: std::string::String) -> Self {
Self {
queue: std::cell::RefCell::new(std::vec::Vec::new()),
handlers: std::cell::RefCell::new(std::collections::HashMap::new()),
topic,
}
}
pub fn queue_size(&self) -> usize {
self.queue.borrow().len()
}
pub fn clear(&mut self) {
self.queue.borrow_mut().clear();
}
}
impl<T> Default for InMemoryEventBus<T> {
fn default() -> Self {
Self::new()
}
}
impl<T> crate::adapters::Adapter for InMemoryEventBus<T> {}
impl<T> crate::ports::events::EventPublisher<T> for InMemoryEventBus<T>
where
T: Clone,
{
fn publish(
&self,
envelope: &crate::ports::events::CloudEventsEnvelope<T>,
) -> crate::HexResult<()> {
envelope.validate()?;
envelope.validate_time_format()?;
self.queue.borrow_mut().push(envelope.clone());
let handlers = self.handlers.borrow();
if let std::option::Option::Some(handler) = handlers.get(&self.topic) {
handler(envelope.clone())?;
}
std::result::Result::Ok(())
}
fn publish_batch(
&self,
envelopes: &[crate::ports::events::CloudEventsEnvelope<T>],
) -> crate::HexResult<()> {
for envelope in envelopes {
self.publish(envelope)?;
}
std::result::Result::Ok(())
}
}
impl<T> crate::ports::events::EventSubscriber<T> for InMemoryEventBus<T>
where
T: Clone,
{
fn subscribe(
&mut self,
topic: &str,
handler: std::boxed::Box<
dyn Fn(crate::ports::events::CloudEventsEnvelope<T>) -> crate::HexResult<()>,
>,
) -> crate::HexResult<()> {
self
.handlers
.borrow_mut()
.insert(std::string::String::from(topic), handler);
self.topic = std::string::String::from(topic);
std::result::Result::Ok(())
}
fn poll(
&mut self,
) -> crate::HexResult<std::option::Option<crate::ports::events::CloudEventsEnvelope<T>>> {
let mut queue = self.queue.borrow_mut();
if queue.is_empty() {
std::result::Result::Ok(std::option::Option::None)
} else {
std::result::Result::Ok(std::option::Option::Some(queue.remove(0)))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::ports::events::{EventPublisher, EventSubscriber};
#[derive(Clone)]
struct TestEvent {
id: std::string::String,
value: std::string::String,
}
impl crate::domain::DomainEvent for TestEvent {
fn event_type(&self) -> &str {
"com.test.event.created"
}
fn aggregate_id(&self) -> std::string::String {
self.id.clone()
}
}
#[test]
fn test_new_bus_is_empty() {
let bus: InMemoryEventBus<TestEvent> = InMemoryEventBus::new();
std::assert_eq!(bus.queue_size(), 0);
}
#[test]
fn test_with_topic_sets_topic() {
let bus: InMemoryEventBus<TestEvent> =
InMemoryEventBus::with_topic(std::string::String::from("test.events"));
std::assert_eq!(bus.topic, "test.events");
}
#[test]
fn test_publish_adds_to_queue() {
let bus: InMemoryEventBus<TestEvent> = InMemoryEventBus::new();
let event = TestEvent {
id: std::string::String::from("test-123"),
value: std::string::String::from("test value"),
};
let envelope = crate::ports::events::CloudEventsEnvelope::from_domain_event(
std::string::String::from("evt-001"),
std::string::String::from("/test/source"),
event,
);
bus.publish(&envelope).unwrap();
std::assert_eq!(bus.queue_size(), 1);
}
#[test]
fn test_publish_batch_adds_multiple() {
let bus: InMemoryEventBus<TestEvent> = InMemoryEventBus::new();
let envelopes = vec![
crate::ports::events::CloudEventsEnvelope::from_domain_event(
std::string::String::from("evt-001"),
std::string::String::from("/test/source"),
TestEvent {
id: std::string::String::from("test-1"),
value: std::string::String::from("value-1"),
},
),
crate::ports::events::CloudEventsEnvelope::from_domain_event(
std::string::String::from("evt-002"),
std::string::String::from("/test/source"),
TestEvent {
id: std::string::String::from("test-2"),
value: std::string::String::from("value-2"),
},
),
];
bus.publish_batch(&envelopes).unwrap();
std::assert_eq!(bus.queue_size(), 2);
}
#[test]
fn test_poll_returns_none_when_empty() {
let mut bus: InMemoryEventBus<TestEvent> = InMemoryEventBus::new();
let result = bus.poll().unwrap();
std::assert!(result.is_none());
}
#[test]
fn test_poll_returns_event_and_removes_from_queue() {
let mut bus: InMemoryEventBus<TestEvent> = InMemoryEventBus::new();
let event = TestEvent {
id: std::string::String::from("test-123"),
value: std::string::String::from("test value"),
};
let envelope = crate::ports::events::CloudEventsEnvelope::from_domain_event(
std::string::String::from("evt-001"),
std::string::String::from("/test/source"),
event,
);
bus.publish(&envelope).unwrap();
std::assert_eq!(bus.queue_size(), 1);
let polled = bus.poll().unwrap();
std::assert!(polled.is_some());
std::assert_eq!(bus.queue_size(), 0);
}
#[test]
fn test_subscribe_and_handler_invoked() {
let mut bus: InMemoryEventBus<TestEvent> = InMemoryEventBus::new();
let invoked = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let invoked_clone = invoked.clone();
bus
.subscribe(
"default.events",
std::boxed::Box::new(move |_envelope| {
invoked_clone.store(true, std::sync::atomic::Ordering::SeqCst);
std::result::Result::Ok(())
}),
)
.unwrap();
let event = TestEvent {
id: std::string::String::from("test-123"),
value: std::string::String::from("test value"),
};
let envelope = crate::ports::events::CloudEventsEnvelope::from_domain_event(
std::string::String::from("evt-001"),
std::string::String::from("/test/source"),
event,
);
bus.publish(&envelope).unwrap();
std::assert!(invoked.load(std::sync::atomic::Ordering::SeqCst));
}
#[test]
fn test_clear_empties_queue() {
let mut bus: InMemoryEventBus<TestEvent> = InMemoryEventBus::new();
let event = TestEvent {
id: std::string::String::from("test-123"),
value: std::string::String::from("test value"),
};
let envelope = crate::ports::events::CloudEventsEnvelope::from_domain_event(
std::string::String::from("evt-001"),
std::string::String::from("/test/source"),
event,
);
bus.publish(&envelope).unwrap();
std::assert_eq!(bus.queue_size(), 1);
bus.clear();
std::assert_eq!(bus.queue_size(), 0);
}
#[test]
fn test_publish_validates_envelope() {
let bus: InMemoryEventBus<TestEvent> = InMemoryEventBus::new();
let envelope = crate::ports::events::CloudEventsEnvelope::<TestEvent>::new(
std::string::String::from(""),
std::string::String::from("/test/source"),
std::string::String::from("com.test.event"),
);
let result = bus.publish(&envelope);
std::assert!(result.is_err());
std::assert_eq!(bus.queue_size(), 0);
}
}