use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use tokio::sync::broadcast;
use tracing::{debug, trace, warn};
use crate::event::AstridEvent;
use crate::route::{
MAX_SUBSCRIPTION_BUDGET_BYTES, PrincipalKey, RouteEntry, RouteKey, RoutedEventReceiver,
SubscriptionRepAllocator, TopicMatcher,
};
use crate::subscriber::SubscriberRegistry;
pub(crate) const DEFAULT_CHANNEL_CAPACITY: usize = 1024;
const YIELD_AFTER_SKIPPED: usize = 32;
pub(crate) const METRIC_BUS_EVENTS_PUBLISHED_TOTAL: &str = "astrid_bus_events_published_total";
pub(crate) const METRIC_BUS_RECEIVER_LAGGED_TOTAL: &str = "astrid_bus_receiver_lagged_total";
const SUBSCRIBER_UNTAGGED: &str = "untagged";
#[derive(Debug)]
pub struct EventBus {
sender: broadcast::Sender<Arc<AstridEvent>>,
registry: Arc<SubscriberRegistry>,
capacity: usize,
ipc_seq: Arc<AtomicU64>,
routes: Arc<parking_lot::RwLock<HashMap<RouteKey, Arc<parking_lot::Mutex<RouteEntry>>>>>,
next_subscription_rep: Arc<SubscriptionRepAllocator>,
}
impl EventBus {
#[must_use]
pub fn new() -> Self {
Self::with_capacity(DEFAULT_CHANNEL_CAPACITY)
}
#[must_use]
pub fn with_capacity(capacity: usize) -> Self {
let (sender, _) = broadcast::channel(capacity);
Self {
sender,
registry: Arc::new(SubscriberRegistry::new()),
capacity,
ipc_seq: Arc::new(AtomicU64::new(1)),
routes: Arc::new(parking_lot::RwLock::new(HashMap::new())),
next_subscription_rep: Arc::new(SubscriptionRepAllocator::default()),
}
}
pub fn publish(&self, mut event: AstridEvent) -> usize {
if let AstridEvent::Ipc {
ref mut message, ..
} = event
{
message.seq = self.ipc_seq.fetch_add(1, Ordering::Relaxed);
}
let event = Arc::new(event);
metrics::counter!(METRIC_BUS_EVENTS_PUBLISHED_TOTAL, "event_kind" => event.event_type())
.increment(1);
trace!(event_type = %event.event_type(), "Publishing event");
let count = if let Ok(c) = self.sender.send(Arc::clone(&event)) {
debug!(
event_type = %event.event_type(),
receiver_count = c,
"Event published"
);
c
} else {
trace!(event_type = %event.event_type(), "No receivers for event");
0
};
self.registry.notify(&event, self);
self.dispatch_to_routes(&event);
count
}
fn dispatch_to_routes(&self, event: &Arc<AstridEvent>) {
let matched: Vec<(RouteKey, Arc<parking_lot::Mutex<RouteEntry>>)> = {
let routes = self.routes.read();
if routes.is_empty() {
return;
}
routes
.iter()
.filter_map(|(k, e)| {
let entry = e.lock();
if entry.matcher.matches(event) {
drop(entry);
Some((k.clone(), Arc::clone(e)))
} else {
None
}
})
.collect()
};
if matched.is_empty() {
return;
}
let principal: PrincipalKey = match &**event {
AstridEvent::Ipc { message, .. } => message.principal.clone(),
_ => None,
};
for (_key, entry_arc) in matched {
let mut entry = entry_arc.lock();
if !entry.accepts(&principal) {
continue;
}
entry.push_with_eviction(
Arc::clone(event),
principal.clone(),
MAX_SUBSCRIPTION_BUDGET_BYTES,
);
let notify = Arc::clone(&entry.notify);
drop(entry);
notify.notify_one();
}
}
#[must_use]
pub fn subscribe(&self) -> EventReceiver {
self.subscribe_as(SUBSCRIBER_UNTAGGED)
}
#[must_use]
pub fn subscribe_as(&self, subscriber: &'static str) -> EventReceiver {
EventReceiver::new(self.sender.subscribe(), None, subscriber)
}
#[must_use]
pub fn subscribe_topic(&self, topic_pattern: impl Into<String>) -> EventReceiver {
self.subscribe_topic_as(topic_pattern, SUBSCRIBER_UNTAGGED)
}
#[must_use]
pub fn subscribe_topic_as(
&self,
topic_pattern: impl Into<String>,
subscriber: &'static str,
) -> EventReceiver {
EventReceiver::new(
self.sender.subscribe(),
Some(topic_pattern.into()),
subscriber,
)
}
#[must_use]
pub fn subscribe_topic_routed(
&self,
capsule_uuid: uuid::Uuid,
topic_pattern: impl Into<String>,
capsule_id_label: impl Into<String>,
subscriber: &'static str,
) -> RoutedEventReceiver {
self.subscribe_topic_routed_scoped(
capsule_uuid,
topic_pattern,
capsule_id_label,
subscriber,
None,
)
}
#[must_use]
pub fn subscribe_topic_routed_scoped(
&self,
capsule_uuid: uuid::Uuid,
topic_pattern: impl Into<String>,
capsule_id_label: impl Into<String>,
subscriber: &'static str,
scope: Option<PrincipalKey>,
) -> RoutedEventReceiver {
let topic_pattern = topic_pattern.into();
let capsule_label = capsule_id_label.into();
let route_key = RouteKey {
capsule_uuid,
topic_pattern: topic_pattern.clone(),
subscription_rep: self.next_subscription_rep.next(),
};
let matcher = TopicMatcher::new(topic_pattern);
let entry = Arc::new(parking_lot::Mutex::new(RouteEntry::new(
matcher,
capsule_label,
scope,
)));
let notify = Arc::clone(&entry.lock().notify);
{
let mut routes = self.routes.write();
routes.insert(route_key.clone(), Arc::clone(&entry));
}
RoutedEventReceiver {
route_key,
route_entry: entry,
notify,
routes: Arc::clone(&self.routes),
lagged_count: 0,
subscriber,
}
}
#[must_use]
pub fn routed_subscription_count(&self) -> usize {
self.routes.read().len()
}
#[cfg(test)]
#[must_use]
pub(crate) fn registry(&self) -> &SubscriberRegistry {
&self.registry
}
#[must_use]
pub fn subscriber_count(&self) -> usize {
self.sender
.receiver_count()
.saturating_add(self.registry.len())
}
#[must_use]
pub fn capacity(&self) -> usize {
self.capacity
}
}
impl Default for EventBus {
fn default() -> Self {
Self::new()
}
}
impl Clone for EventBus {
fn clone(&self) -> Self {
Self {
sender: self.sender.clone(),
registry: Arc::clone(&self.registry),
capacity: self.capacity,
ipc_seq: Arc::clone(&self.ipc_seq),
routes: Arc::clone(&self.routes),
next_subscription_rep: Arc::clone(&self.next_subscription_rep),
}
}
}
pub struct EventReceiver {
receiver: broadcast::Receiver<Arc<AstridEvent>>,
topic_pattern: Option<String>,
lagged_count: u64,
subscriber: &'static str,
}
impl EventReceiver {
pub(crate) fn new(
receiver: broadcast::Receiver<Arc<AstridEvent>>,
topic_pattern: Option<String>,
subscriber: &'static str,
) -> Self {
Self {
receiver,
topic_pattern,
lagged_count: 0,
subscriber,
}
}
fn matches(&self, event: &AstridEvent) -> bool {
let Some(pattern) = &self.topic_pattern else {
return true;
};
let AstridEvent::Ipc { message, .. } = event else {
return false;
};
crate::topic_pattern_matches(pattern, &message.topic)
}
pub fn drain_lagged(&mut self) -> u64 {
std::mem::take(&mut self.lagged_count)
}
pub async fn recv(&mut self) -> Option<Arc<AstridEvent>> {
let mut skipped: usize = 0;
loop {
match self.receiver.recv().await {
Ok(event) => {
if self.matches(&event) {
return Some(event);
}
skipped = skipped.wrapping_add(1);
if skipped.is_multiple_of(YIELD_AFTER_SKIPPED) {
#[cfg(not(target_os = "wasi"))]
tokio::task::yield_now().await;
#[cfg(target_os = "wasi")]
std::hint::spin_loop();
}
},
Err(broadcast::error::RecvError::Lagged(count)) => {
tracing::error!(target: "astrid.bus", security_event = true, skipped = count, subscriber = self.subscriber, "Event receiver lagged, events dropped");
self.lagged_count = self.lagged_count.saturating_add(count);
metrics::counter!(
METRIC_BUS_RECEIVER_LAGGED_TOTAL,
"subscriber" => self.subscriber,
)
.increment(count);
#[cfg(not(target_os = "wasi"))]
tokio::task::yield_now().await;
#[cfg(target_os = "wasi")]
std::hint::spin_loop();
skipped = 0;
},
Err(broadcast::error::RecvError::Closed) => return None,
}
}
}
pub fn try_recv(&mut self) -> Option<Arc<AstridEvent>> {
loop {
match self.receiver.try_recv() {
Ok(event) => {
if self.matches(&event) {
return Some(event);
}
},
Err(broadcast::error::TryRecvError::Lagged(count)) => {
warn!(skipped = count, "Event receiver lagged, events dropped");
self.lagged_count = self.lagged_count.saturating_add(count);
metrics::counter!(
METRIC_BUS_RECEIVER_LAGGED_TOTAL,
"subscriber" => self.subscriber,
)
.increment(count);
},
Err(
broadcast::error::TryRecvError::Empty | broadcast::error::TryRecvError::Closed,
) => return None,
}
}
}
}
#[cfg(test)]
#[path = "bus_tests.rs"]
mod tests;