use std::sync::Arc;
use tokio::sync::broadcast;
use tracing::{debug, trace, warn};
use crate::event::AstridEvent;
use crate::subscriber::SubscriberRegistry;
pub(crate) const DEFAULT_CHANNEL_CAPACITY: usize = 1024;
#[derive(Debug)]
pub struct EventBus {
sender: broadcast::Sender<Arc<AstridEvent>>,
registry: Arc<SubscriberRegistry>,
capacity: usize,
}
impl EventBus {
#[must_use]
pub fn new() -> Self {
Self::with_capacity(DEFAULT_CHANNEL_CAPACITY)
}
#[must_use]
pub fn with_capacity(capacity: usize) -> Self {
let (sender, _) = broadcast::channel(capacity);
Self {
sender,
registry: Arc::new(SubscriberRegistry::new()),
capacity,
}
}
pub fn publish(&self, event: AstridEvent) -> usize {
let event = Arc::new(event);
trace!(event_type = %event.event_type(), "Publishing event");
let count = if let Ok(c) = self.sender.send(Arc::clone(&event)) {
debug!(
event_type = %event.event_type(),
receiver_count = c,
"Event published"
);
c
} else {
trace!(event_type = %event.event_type(), "No receivers for event");
0
};
self.registry.notify(&event, self);
count
}
#[must_use]
pub fn subscribe(&self) -> EventReceiver {
EventReceiver::new(self.sender.subscribe(), None)
}
#[must_use]
pub fn subscribe_topic(&self, topic_pattern: impl Into<String>) -> EventReceiver {
EventReceiver::new(self.sender.subscribe(), Some(topic_pattern.into()))
}
#[cfg(test)]
#[must_use]
pub(crate) fn registry(&self) -> &SubscriberRegistry {
&self.registry
}
#[must_use]
pub fn subscriber_count(&self) -> usize {
self.sender
.receiver_count()
.saturating_add(self.registry.len())
}
#[must_use]
pub fn capacity(&self) -> usize {
self.capacity
}
}
impl Default for EventBus {
fn default() -> Self {
Self::new()
}
}
impl Clone for EventBus {
fn clone(&self) -> Self {
Self {
sender: self.sender.clone(),
registry: Arc::clone(&self.registry),
capacity: self.capacity,
}
}
}
pub struct EventReceiver {
receiver: broadcast::Receiver<Arc<AstridEvent>>,
topic_pattern: Option<String>,
lagged_count: u64,
}
impl EventReceiver {
pub(crate) fn new(
receiver: broadcast::Receiver<Arc<AstridEvent>>,
topic_pattern: Option<String>,
) -> Self {
Self {
receiver,
topic_pattern,
lagged_count: 0,
}
}
const MAX_TOPIC_DEPTH: usize = 20;
fn matches(&self, event: &AstridEvent) -> bool {
let Some(pattern) = &self.topic_pattern else {
return true;
};
let AstridEvent::Ipc { message, .. } = event else {
return false;
};
let topic = &message.topic;
if topic.split('.').count() > Self::MAX_TOPIC_DEPTH {
return false;
}
if let Some(prefix_pat) = pattern.strip_suffix(".*") {
let mut prefix_segs = prefix_pat.split('.');
let mut topic_segs = topic.split('.');
let prefix_matched = prefix_segs
.by_ref()
.zip(topic_segs.by_ref())
.all(|(p, t)| p == "*" || p == t);
prefix_matched && prefix_segs.next().is_none() && topic_segs.next().is_some()
} else {
let mut pat_segs = pattern.split('.');
let mut topic_segs = topic.split('.');
let all_matched = pat_segs
.by_ref()
.zip(topic_segs.by_ref())
.all(|(p, t)| p == "*" || p == t);
all_matched && pat_segs.next().is_none() && topic_segs.next().is_none()
}
}
pub fn drain_lagged(&mut self) -> u64 {
std::mem::take(&mut self.lagged_count)
}
pub async fn recv(&mut self) -> Option<Arc<AstridEvent>> {
let mut skipped: usize = 0;
loop {
match self.receiver.recv().await {
Ok(event) => {
if self.matches(&event) {
return Some(event);
}
skipped = skipped.wrapping_add(1);
if skipped.is_multiple_of(100) {
#[cfg(not(target_os = "wasi"))]
tokio::task::yield_now().await;
#[cfg(target_os = "wasi")]
std::hint::spin_loop();
}
},
Err(broadcast::error::RecvError::Lagged(count)) => {
warn!(skipped = count, "Event receiver lagged, events dropped");
self.lagged_count = self.lagged_count.saturating_add(count);
},
Err(broadcast::error::RecvError::Closed) => return None,
}
}
}
pub fn try_recv(&mut self) -> Option<Arc<AstridEvent>> {
loop {
match self.receiver.try_recv() {
Ok(event) => {
if self.matches(&event) {
return Some(event);
}
},
Err(broadcast::error::TryRecvError::Lagged(count)) => {
warn!(skipped = count, "Event receiver lagged, events dropped");
self.lagged_count = self.lagged_count.saturating_add(count);
},
Err(
broadcast::error::TryRecvError::Empty | broadcast::error::TryRecvError::Closed,
) => return None,
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::event::EventMetadata;
#[tokio::test]
async fn test_event_bus_creation() {
let bus = EventBus::new();
assert_eq!(bus.capacity(), DEFAULT_CHANNEL_CAPACITY);
assert_eq!(bus.subscriber_count(), 0);
}
#[tokio::test]
async fn test_event_bus_with_capacity() {
let bus = EventBus::with_capacity(100);
assert_eq!(bus.capacity(), 100);
}
#[tokio::test]
async fn test_publish_and_receive() {
let bus = EventBus::new();
let mut receiver = bus.subscribe();
let event = AstridEvent::RuntimeStarted {
metadata: EventMetadata::new("test"),
version: "0.1.0".to_string(),
};
let count = bus.publish(event);
assert_eq!(count, 1);
let msg = receiver.recv().await.unwrap();
assert_eq!(msg.event_type(), "astrid.v1.lifecycle.runtime_started");
}
#[tokio::test]
async fn test_multiple_subscribers() {
let bus = EventBus::new();
let mut receiver1 = bus.subscribe();
let mut receiver2 = bus.subscribe();
let event = AstridEvent::RuntimeStarted {
metadata: EventMetadata::new("test"),
version: "0.1.0".to_string(),
};
let count = bus.publish(event);
assert_eq!(count, 2);
let obj1 = receiver1.recv().await.unwrap();
let obj2 = receiver2.recv().await.unwrap();
assert_eq!(obj1.event_type(), "astrid.v1.lifecycle.runtime_started");
assert_eq!(obj2.event_type(), "astrid.v1.lifecycle.runtime_started");
}
#[tokio::test]
async fn test_no_subscribers() {
let bus = EventBus::new();
let event = AstridEvent::RuntimeStarted {
metadata: EventMetadata::new("test"),
version: "0.1.0".to_string(),
};
let count = bus.publish(event);
assert_eq!(count, 0);
}
#[tokio::test]
async fn test_try_recv_empty() {
let bus = EventBus::new();
let mut receiver = bus.subscribe();
let result = receiver.try_recv();
assert!(result.is_none());
}
#[tokio::test]
async fn test_try_recv_with_event() {
let bus = EventBus::new();
let mut receiver = bus.subscribe();
let event = AstridEvent::RuntimeStarted {
metadata: EventMetadata::new("test"),
version: "0.1.0".to_string(),
};
bus.publish(event);
let result = receiver.try_recv();
assert!(result.is_some());
}
#[tokio::test]
async fn test_subscriber_count() {
let bus = EventBus::new();
assert_eq!(bus.subscriber_count(), 0);
let receiver1 = bus.subscribe();
assert_eq!(bus.subscriber_count(), 1);
let _receiver2 = bus.subscribe();
assert_eq!(bus.subscriber_count(), 2);
drop(receiver1);
}
#[tokio::test]
async fn test_cloned_bus_synchronous_subscriber() {
use crate::subscriber::FilterSubscriber;
use std::sync::atomic::{AtomicUsize, Ordering};
let bus = EventBus::new();
let cloned_bus = bus.clone();
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = Arc::clone(&counter);
let subscriber = FilterSubscriber::new("test_sync", move |_| {
counter_clone.fetch_add(1, Ordering::SeqCst);
});
cloned_bus.registry().register(Arc::new(subscriber));
let event = AstridEvent::RuntimeStarted {
metadata: EventMetadata::new("test"),
version: "0.1.0".to_string(),
};
bus.publish(event);
assert_eq!(counter.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_event_bus_drop_cleans_up_registry() {
use crate::subscriber::FilterSubscriber;
use std::sync::atomic::{AtomicUsize, Ordering};
struct DropNotify(Arc<AtomicUsize>);
impl Drop for DropNotify {
fn drop(&mut self) {
self.0.fetch_add(1, Ordering::SeqCst);
}
}
let drop_count = Arc::new(AtomicUsize::new(0));
let drop_count_clone = Arc::clone(&drop_count);
let notifier = DropNotify(drop_count_clone);
let bus = EventBus::new();
let subscriber = FilterSubscriber::new("test_drop", move |_| {
let _ = ¬ifier; });
bus.registry().register(Arc::new(subscriber));
assert_eq!(drop_count.load(Ordering::SeqCst), 0);
drop(bus);
assert_eq!(drop_count.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_reentrancy_unregister_from_on_event() {
use crate::subscriber::{EventSubscriber, SubscriberId};
use std::sync::Mutex;
struct UnregisteringSubscriber {
my_id: Mutex<Option<SubscriberId>>,
}
impl EventSubscriber for UnregisteringSubscriber {
fn on_event(&self, _event: &AstridEvent, bus: &EventBus) {
let id = self.my_id.lock().unwrap().expect("id not set");
bus.registry().unregister(id);
}
}
let bus = EventBus::new();
let subscriber = Arc::new(UnregisteringSubscriber {
my_id: Mutex::new(None),
});
let id = bus
.registry()
.register(Arc::clone(&subscriber) as Arc<dyn EventSubscriber>);
*subscriber.my_id.lock().unwrap() = Some(id);
let event = AstridEvent::RuntimeStarted {
metadata: EventMetadata::new("test"),
version: "0.1.0".to_string(),
};
bus.publish(event);
assert_eq!(bus.registry().len(), 0);
}
#[tokio::test]
async fn test_drop_deadlock_publish_from_drop() {
use crate::subscriber::EventSubscriber;
struct DroppingSubscriber {
bus: EventBus,
}
impl EventSubscriber for DroppingSubscriber {
fn on_event(&self, _event: &AstridEvent, _bus: &EventBus) {}
}
impl Drop for DroppingSubscriber {
fn drop(&mut self) {
let event = AstridEvent::RuntimeStarted {
metadata: EventMetadata::new("test"),
version: "0.1.0".to_string(),
};
self.bus.publish(event);
}
}
let bus = EventBus::new();
let id = bus
.registry()
.register(Arc::new(DroppingSubscriber { bus: bus.clone() }));
bus.registry().unregister(id);
}
#[tokio::test]
async fn test_topic_subscription_exact() {
let bus = EventBus::new();
let mut all_receiver = bus.subscribe();
let mut specific_receiver = bus.subscribe_topic("astrid.cli.input");
let msg = crate::ipc::IpcMessage::new(
"astrid.cli.input",
crate::ipc::IpcPayload::UserInput {
text: "hello".into(),
session_id: "default".into(),
context: None,
},
uuid::Uuid::new_v4(),
);
let event = AstridEvent::Ipc {
metadata: EventMetadata::new("test"),
message: msg,
};
bus.publish(event);
assert!(all_receiver.try_recv().is_some());
assert!(specific_receiver.try_recv().is_some());
let msg2 = crate::ipc::IpcMessage::new(
"astrid.telegram.input",
crate::ipc::IpcPayload::UserInput {
text: "hello".into(),
session_id: "default".into(),
context: None,
},
uuid::Uuid::new_v4(),
);
let event2 = AstridEvent::Ipc {
metadata: EventMetadata::new("test"),
message: msg2,
};
bus.publish(event2);
assert!(all_receiver.try_recv().is_some());
assert!(specific_receiver.try_recv().is_none());
}
#[tokio::test]
async fn test_topic_subscription_wildcard() {
let bus = EventBus::new();
let mut wildcard_receiver = bus.subscribe_topic("astrid.*");
let msg1 = crate::ipc::IpcMessage::new(
"astrid.cli.input",
crate::ipc::IpcPayload::UserInput {
text: "hello".into(),
session_id: "default".into(),
context: None,
},
uuid::Uuid::new_v4(),
);
let event1 = AstridEvent::Ipc {
metadata: EventMetadata::new("test"),
message: msg1,
};
let msg2 = crate::ipc::IpcMessage::new(
"system.log",
crate::ipc::IpcPayload::UserInput {
text: "hello".into(),
session_id: "default".into(),
context: None,
},
uuid::Uuid::new_v4(),
);
let event2 = AstridEvent::Ipc {
metadata: EventMetadata::new("test"),
message: msg2,
};
bus.publish(event1);
bus.publish(event2);
let received = wildcard_receiver.try_recv().unwrap();
if let AstridEvent::Ipc { message, .. } = &*received {
assert_eq!(message.topic, "astrid.cli.input");
} else {
panic!("Expected IPC event");
}
assert!(wildcard_receiver.try_recv().is_none());
}
#[tokio::test]
async fn test_topic_subscription_ignores_non_ipc() {
let bus = EventBus::new();
let mut specific_receiver = bus.subscribe_topic("astrid.cli.input");
let event = AstridEvent::RuntimeStarted {
metadata: EventMetadata::new("test"),
version: "0.1.0".into(),
};
bus.publish(event);
assert!(specific_receiver.try_recv().is_none());
}
fn ipc_event(topic: &str) -> AstridEvent {
AstridEvent::Ipc {
metadata: EventMetadata::new("test"),
message: crate::ipc::IpcMessage::new(
topic,
crate::ipc::IpcPayload::UserInput {
text: "x".into(),
session_id: "default".into(),
context: None,
},
uuid::Uuid::new_v4(),
),
}
}
#[tokio::test]
async fn test_wildcard_matches_multiple_depths() {
let bus = EventBus::new();
let mut receiver = bus.subscribe_topic("astrid.v1.request.*");
bus.publish(ipc_event("astrid.v1.request.list_capsules"));
assert!(receiver.try_recv().is_some());
bus.publish(ipc_event("astrid.v1.request.foo.bar"));
assert!(receiver.try_recv().is_some());
bus.publish(ipc_event("astrid.v1.request"));
assert!(receiver.try_recv().is_none());
bus.publish(ipc_event("system.v1.request.foo"));
assert!(receiver.try_recv().is_none());
}
#[tokio::test]
async fn test_wildcard_rejects_deep_topics() {
let bus = EventBus::new();
let mut receiver = bus.subscribe_topic("a.*");
let deep = (0..21)
.map(|i| format!("s{i}"))
.collect::<Vec<_>>()
.join(".");
let topic = format!("a.{deep}");
bus.publish(ipc_event(&topic));
assert!(receiver.try_recv().is_none());
}
#[tokio::test]
async fn test_middle_wildcard_matches_one_segment() {
let bus = EventBus::new();
let mut receiver = bus.subscribe_topic("astrid.*.input");
bus.publish(ipc_event("astrid.cli.input"));
assert!(receiver.try_recv().is_some());
bus.publish(ipc_event("astrid.telegram.input"));
assert!(receiver.try_recv().is_some());
bus.publish(ipc_event("astrid.cli.output"));
assert!(receiver.try_recv().is_none());
bus.publish(ipc_event("astrid.cli.sub.input"));
assert!(receiver.try_recv().is_none());
}
#[tokio::test]
async fn test_drain_lagged_initially_zero() {
let bus = EventBus::new();
let mut receiver = bus.subscribe();
assert_eq!(receiver.drain_lagged(), 0);
}
#[tokio::test]
async fn test_drain_lagged_resets_after_read() {
let bus = EventBus::with_capacity(2);
let mut receiver = bus.subscribe();
for i in 0..5 {
let event = AstridEvent::RuntimeStarted {
metadata: EventMetadata::new("test"),
version: format!("{i}"),
};
bus.publish(event);
}
let _ = receiver.try_recv();
let lagged = receiver.drain_lagged();
assert!(lagged > 0, "expected lag count > 0, got {lagged}");
assert_eq!(receiver.drain_lagged(), 0);
}
#[tokio::test]
async fn test_drain_lagged_accumulates_across_calls() {
let bus = EventBus::with_capacity(2);
let mut receiver = bus.subscribe();
for _ in 0..4 {
bus.publish(AstridEvent::RuntimeStarted {
metadata: EventMetadata::new("test"),
version: "v1".into(),
});
}
while receiver.try_recv().is_some() {}
let lag1 = receiver.drain_lagged();
for _ in 0..4 {
bus.publish(AstridEvent::RuntimeStarted {
metadata: EventMetadata::new("test"),
version: "v2".into(),
});
}
while receiver.try_recv().is_some() {}
let lag2 = receiver.drain_lagged();
assert!(lag1 > 0, "first burst should lag");
assert!(lag2 > 0, "second burst should lag");
}
#[tokio::test]
async fn test_recv_blocking_with_timeout() {
let bus = EventBus::new();
let mut receiver = bus.subscribe();
let result =
tokio::time::timeout(std::time::Duration::from_millis(50), receiver.recv()).await;
assert!(result.is_err(), "expected timeout, got a message");
}
#[tokio::test]
async fn test_recv_blocking_wakes_on_message() {
let bus = EventBus::new();
let mut receiver = bus.subscribe();
let bus_clone = bus.clone();
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
bus_clone.publish(AstridEvent::RuntimeStarted {
metadata: EventMetadata::new("test"),
version: "wake".into(),
});
});
let result = tokio::time::timeout(std::time::Duration::from_secs(5), receiver.recv()).await;
assert!(result.is_ok(), "recv should have woken up");
let event = result.unwrap().unwrap();
assert_eq!(event.event_type(), "astrid.v1.lifecycle.runtime_started");
}
#[tokio::test]
async fn test_try_recv_drains_burst() {
let bus = EventBus::new();
let mut receiver = bus.subscribe();
for i in 0..10 {
bus.publish(AstridEvent::RuntimeStarted {
metadata: EventMetadata::new("test"),
version: format!("{i}"),
});
}
let mut count = 0;
while receiver.try_recv().is_some() {
count += 1;
}
assert_eq!(count, 10);
assert!(receiver.try_recv().is_none());
}
}