use std::sync::Arc;
use tokio::sync::broadcast;
use tracing::{debug, trace, warn};
use crate::event::AstridEvent;
use crate::subscriber::SubscriberRegistry;
pub const DEFAULT_CHANNEL_CAPACITY: usize = 1024;
#[derive(Debug)]
pub struct EventBus {
sender: broadcast::Sender<Arc<AstridEvent>>,
registry: Arc<SubscriberRegistry>,
capacity: usize,
}
impl EventBus {
#[must_use]
pub fn new() -> Self {
Self::with_capacity(DEFAULT_CHANNEL_CAPACITY)
}
#[must_use]
pub fn with_capacity(capacity: usize) -> Self {
let (sender, _) = broadcast::channel(capacity);
Self {
sender,
registry: Arc::new(SubscriberRegistry::new()),
capacity,
}
}
pub fn publish(&self, event: AstridEvent) -> usize {
let event = Arc::new(event);
trace!(event_type = %event.event_type(), "Publishing event");
let count = if let Ok(c) = self.sender.send(Arc::clone(&event)) {
debug!(
event_type = %event.event_type(),
receiver_count = c,
"Event published"
);
c
} else {
trace!(event_type = %event.event_type(), "No receivers for event");
0
};
self.registry.notify(&event, self);
count
}
#[must_use]
pub fn subscribe(&self) -> EventReceiver {
EventReceiver::new(self.sender.subscribe(), None)
}
#[must_use]
pub fn subscribe_topic(&self, topic_pattern: impl Into<String>) -> EventReceiver {
EventReceiver::new(self.sender.subscribe(), Some(topic_pattern.into()))
}
#[must_use]
pub fn registry(&self) -> &SubscriberRegistry {
&self.registry
}
#[must_use]
pub fn subscriber_count(&self) -> usize {
self.sender
.receiver_count()
.saturating_add(self.registry.len())
}
#[must_use]
pub fn capacity(&self) -> usize {
self.capacity
}
}
impl Default for EventBus {
fn default() -> Self {
Self::new()
}
}
impl Clone for EventBus {
fn clone(&self) -> Self {
Self {
sender: self.sender.clone(),
registry: Arc::clone(&self.registry),
capacity: self.capacity,
}
}
}
pub struct EventReceiver {
receiver: broadcast::Receiver<Arc<AstridEvent>>,
topic_pattern: Option<String>,
}
impl EventReceiver {
pub(crate) fn new(
receiver: broadcast::Receiver<Arc<AstridEvent>>,
topic_pattern: Option<String>,
) -> Self {
Self {
receiver,
topic_pattern,
}
}
fn matches(&self, event: &AstridEvent) -> bool {
let Some(pattern) = &self.topic_pattern else {
return true;
};
if let AstridEvent::Ipc { message, .. } = event {
if let Some(prefix) = pattern.strip_suffix('*') {
message.topic.starts_with(prefix)
} else {
message.topic == *pattern
}
} else {
false
}
}
pub async fn recv(&mut self) -> Option<Arc<AstridEvent>> {
let mut skipped: usize = 0;
loop {
match self.receiver.recv().await {
Ok(event) => {
if self.matches(&event) {
return Some(event);
}
skipped = skipped.wrapping_add(1);
if skipped.is_multiple_of(100) {
tokio::task::yield_now().await;
}
},
Err(broadcast::error::RecvError::Lagged(count)) => {
warn!(skipped = count, "Event receiver lagged, events dropped");
},
Err(broadcast::error::RecvError::Closed) => return None,
}
}
}
pub fn try_recv(&mut self) -> Option<Arc<AstridEvent>> {
loop {
match self.receiver.try_recv() {
Ok(event) => {
if self.matches(&event) {
return Some(event);
}
},
Err(broadcast::error::TryRecvError::Lagged(count)) => {
warn!(skipped = count, "Event receiver lagged, events dropped");
},
Err(
broadcast::error::TryRecvError::Empty | broadcast::error::TryRecvError::Closed,
) => return None,
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::event::EventMetadata;
#[tokio::test]
async fn test_event_bus_creation() {
let bus = EventBus::new();
assert_eq!(bus.capacity(), DEFAULT_CHANNEL_CAPACITY);
assert_eq!(bus.subscriber_count(), 0);
}
#[tokio::test]
async fn test_event_bus_with_capacity() {
let bus = EventBus::with_capacity(100);
assert_eq!(bus.capacity(), 100);
}
#[tokio::test]
async fn test_publish_and_receive() {
let bus = EventBus::new();
let mut receiver = bus.subscribe();
let event = AstridEvent::RuntimeStarted {
metadata: EventMetadata::new("test"),
version: "0.1.0".to_string(),
};
let count = bus.publish(event);
assert_eq!(count, 1);
let msg = receiver.recv().await.unwrap();
assert_eq!(msg.event_type(), "runtime_started");
}
#[tokio::test]
async fn test_multiple_subscribers() {
let bus = EventBus::new();
let mut receiver1 = bus.subscribe();
let mut receiver2 = bus.subscribe();
let event = AstridEvent::RuntimeStarted {
metadata: EventMetadata::new("test"),
version: "0.1.0".to_string(),
};
let count = bus.publish(event);
assert_eq!(count, 2);
let obj1 = receiver1.recv().await.unwrap();
let obj2 = receiver2.recv().await.unwrap();
assert_eq!(obj1.event_type(), "runtime_started");
assert_eq!(obj2.event_type(), "runtime_started");
}
#[tokio::test]
async fn test_no_subscribers() {
let bus = EventBus::new();
let event = AstridEvent::RuntimeStarted {
metadata: EventMetadata::new("test"),
version: "0.1.0".to_string(),
};
let count = bus.publish(event);
assert_eq!(count, 0);
}
#[tokio::test]
async fn test_try_recv_empty() {
let bus = EventBus::new();
let mut receiver = bus.subscribe();
let result = receiver.try_recv();
assert!(result.is_none());
}
#[tokio::test]
async fn test_try_recv_with_event() {
let bus = EventBus::new();
let mut receiver = bus.subscribe();
let event = AstridEvent::RuntimeStarted {
metadata: EventMetadata::new("test"),
version: "0.1.0".to_string(),
};
bus.publish(event);
let result = receiver.try_recv();
assert!(result.is_some());
}
#[tokio::test]
async fn test_subscriber_count() {
let bus = EventBus::new();
assert_eq!(bus.subscriber_count(), 0);
let receiver1 = bus.subscribe();
assert_eq!(bus.subscriber_count(), 1);
let _receiver2 = bus.subscribe();
assert_eq!(bus.subscriber_count(), 2);
drop(receiver1);
}
#[tokio::test]
async fn test_cloned_bus_synchronous_subscriber() {
use crate::subscriber::FilterSubscriber;
use std::sync::atomic::{AtomicUsize, Ordering};
let bus = EventBus::new();
let cloned_bus = bus.clone();
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = Arc::clone(&counter);
let subscriber = FilterSubscriber::new("test_sync", move |_| {
counter_clone.fetch_add(1, Ordering::SeqCst);
});
cloned_bus.registry().register(Arc::new(subscriber));
let event = AstridEvent::RuntimeStarted {
metadata: EventMetadata::new("test"),
version: "0.1.0".to_string(),
};
bus.publish(event);
assert_eq!(counter.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_event_bus_drop_cleans_up_registry() {
use crate::subscriber::FilterSubscriber;
use std::sync::atomic::{AtomicUsize, Ordering};
struct DropNotify(Arc<AtomicUsize>);
impl Drop for DropNotify {
fn drop(&mut self) {
self.0.fetch_add(1, Ordering::SeqCst);
}
}
let drop_count = Arc::new(AtomicUsize::new(0));
let drop_count_clone = Arc::clone(&drop_count);
let notifier = DropNotify(drop_count_clone);
let bus = EventBus::new();
let subscriber = FilterSubscriber::new("test_drop", move |_| {
let _ = ¬ifier; });
bus.registry().register(Arc::new(subscriber));
assert_eq!(drop_count.load(Ordering::SeqCst), 0);
drop(bus);
assert_eq!(drop_count.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_reentrancy_unregister_from_on_event() {
use crate::subscriber::{EventSubscriber, SubscriberId};
use std::sync::Mutex;
struct UnregisteringSubscriber {
my_id: Mutex<Option<SubscriberId>>,
}
impl EventSubscriber for UnregisteringSubscriber {
fn on_event(&self, _event: &AstridEvent, bus: &EventBus) {
let id = self.my_id.lock().unwrap().expect("id not set");
bus.registry().unregister(id);
}
}
let bus = EventBus::new();
let subscriber = Arc::new(UnregisteringSubscriber {
my_id: Mutex::new(None),
});
let id = bus
.registry()
.register(Arc::clone(&subscriber) as Arc<dyn EventSubscriber>);
*subscriber.my_id.lock().unwrap() = Some(id);
let event = AstridEvent::RuntimeStarted {
metadata: EventMetadata::new("test"),
version: "0.1.0".to_string(),
};
bus.publish(event);
assert_eq!(bus.registry().len(), 0);
}
#[tokio::test]
async fn test_drop_deadlock_publish_from_drop() {
use crate::subscriber::EventSubscriber;
struct DroppingSubscriber {
bus: EventBus,
}
impl EventSubscriber for DroppingSubscriber {
fn on_event(&self, _event: &AstridEvent, _bus: &EventBus) {}
}
impl Drop for DroppingSubscriber {
fn drop(&mut self) {
let event = AstridEvent::RuntimeStarted {
metadata: EventMetadata::new("test"),
version: "0.1.0".to_string(),
};
self.bus.publish(event);
}
}
let bus = EventBus::new();
let id = bus
.registry()
.register(Arc::new(DroppingSubscriber { bus: bus.clone() }));
bus.registry().unregister(id);
}
#[tokio::test]
async fn test_topic_subscription_exact() {
let bus = EventBus::new();
let mut all_receiver = bus.subscribe();
let mut specific_receiver = bus.subscribe_topic("astrid.cli.input");
let msg = crate::ipc::IpcMessage::new(
"astrid.cli.input",
crate::ipc::IpcPayload::UserInput {
text: "hello".into(),
context: None,
},
uuid::Uuid::new_v4(),
);
let event = AstridEvent::Ipc {
metadata: EventMetadata::new("test"),
message: msg,
};
bus.publish(event);
assert!(all_receiver.try_recv().is_some());
assert!(specific_receiver.try_recv().is_some());
let msg2 = crate::ipc::IpcMessage::new(
"astrid.telegram.input",
crate::ipc::IpcPayload::UserInput {
text: "hello".into(),
context: None,
},
uuid::Uuid::new_v4(),
);
let event2 = AstridEvent::Ipc {
metadata: EventMetadata::new("test"),
message: msg2,
};
bus.publish(event2);
assert!(all_receiver.try_recv().is_some());
assert!(specific_receiver.try_recv().is_none());
}
#[tokio::test]
async fn test_topic_subscription_wildcard() {
let bus = EventBus::new();
let mut wildcard_receiver = bus.subscribe_topic("astrid.*");
let msg1 = crate::ipc::IpcMessage::new(
"astrid.cli.input",
crate::ipc::IpcPayload::UserInput {
text: "hello".into(),
context: None,
},
uuid::Uuid::new_v4(),
);
let event1 = AstridEvent::Ipc {
metadata: EventMetadata::new("test"),
message: msg1,
};
let msg2 = crate::ipc::IpcMessage::new(
"system.log",
crate::ipc::IpcPayload::UserInput {
text: "hello".into(),
context: None,
},
uuid::Uuid::new_v4(),
);
let event2 = AstridEvent::Ipc {
metadata: EventMetadata::new("test"),
message: msg2,
};
bus.publish(event1);
bus.publish(event2);
let received = wildcard_receiver.try_recv().unwrap();
if let AstridEvent::Ipc { message, .. } = &*received {
assert_eq!(message.topic, "astrid.cli.input");
} else {
panic!("Expected IPC event");
}
assert!(wildcard_receiver.try_recv().is_none());
}
#[tokio::test]
async fn test_topic_subscription_ignores_non_ipc() {
let bus = EventBus::new();
let mut specific_receiver = bus.subscribe_topic("astrid.cli.input");
let event = AstridEvent::RuntimeStarted {
metadata: EventMetadata::new("test"),
version: "0.1.0".into(),
};
bus.publish(event);
assert!(specific_receiver.try_recv().is_none());
}
}