astrid_events/bus.rs
1//! Event bus for broadcasting events to subscribers.
2
3use std::collections::HashMap;
4use std::sync::Arc;
5use std::sync::atomic::{AtomicU64, Ordering};
6use tokio::sync::broadcast;
7use tracing::{debug, trace, warn};
8
9use crate::event::AstridEvent;
10use crate::route::{
11 MAX_SUBSCRIPTION_BUDGET_BYTES, PrincipalKey, RouteEntry, RouteKey, RoutedEventReceiver,
12 SubscriptionRepAllocator, TopicMatcher,
13};
14use crate::subscriber::SubscriberRegistry;
15
16/// Default channel capacity for the event bus.
17pub(crate) const DEFAULT_CHANNEL_CAPACITY: usize = 1024;
18
19/// How many consecutive non-matching events a topic-filtered subscriber may
20/// drain before yielding to the scheduler. A subscriber filtering a backlog
21/// under a broadcast storm would otherwise hold its worker for this many
22/// synchronous iterations (`broadcast::recv` returns buffered items without
23/// awaiting). Kept small to bound that monopolization, but not 1 — yielding on
24/// every event would slow the drain enough to risk self-induced lag. Normal
25/// operation rarely reaches it: `recv().await` parks between events when the
26/// channel isn't backlogged.
27const YIELD_AFTER_SKIPPED: usize = 32;
28
29/// Counter: events published to the bus, labelled by the bounded
30/// `event_kind` (`AstridEvent::event_type`, a closed `&'static str` set).
31pub(crate) const METRIC_BUS_EVENTS_PUBLISHED_TOTAL: &str = "astrid_bus_events_published_total";
32
33/// Counter: events a receiver dropped by falling behind the sender,
34/// labelled by `subscriber`. A non-zero `rate()` on any subscriber is the
35/// signature of bus backpressure / a feedback storm — the failure mode
36/// that pegs CPU by waking every broadcast subscriber. Subscriber labels
37/// are a fixed, code-assigned set (see [`EventBus::subscribe_as`]);
38/// untagged subscriptions collapse to `"untagged"`.
39pub(crate) const METRIC_BUS_RECEIVER_LAGGED_TOTAL: &str = "astrid_bus_receiver_lagged_total";
40
41/// Subscriber label applied to receivers created without an explicit tag.
42/// Keeps the `subscriber` label cardinality bounded even for dynamic
43/// (capsule-supplied) topic subscriptions.
44const SUBSCRIBER_UNTAGGED: &str = "untagged";
45
46/// Event bus for broadcasting events to all subscribers.
47///
48/// The event bus uses a broadcast channel to deliver events to all
49/// connected receivers. Events are delivered asynchronously and in order.
50///
51/// **WARNING:** Synchronous subscribers (`SubscriberRegistry`) are shared
52/// across clones. Storing a cloned `EventBus` inside a synchronous subscriber
53/// will create a memory leak via an `Arc` reference cycle. If a synchronous
54/// subscriber needs to publish events, store a `std::sync::Weak<EventBus>`
55/// or communicate via a separate channel.
56#[derive(Debug)]
57pub struct EventBus {
58 /// Sender for broadcasting events.
59 sender: broadcast::Sender<Arc<AstridEvent>>,
60 /// Registry for synchronous subscribers.
61 registry: Arc<SubscriberRegistry>,
62 /// Channel capacity.
63 capacity: usize,
64 /// Monotonic sequence counter for IPC message ordering.
65 ipc_seq: Arc<AtomicU64>,
66 /// Per-(capsule, topic, principal) routing table for guest
67 /// subscriptions. Demand-allocated entries; an idle principal has
68 /// zero entries even when the bus has 5000 active subscribers (#813).
69 /// `parking_lot::RwLock` keeps `publish` synchronous so the
70 /// reentrant `SubscriberRegistry::notify` path does not need to be
71 /// rewritten as async.
72 routes: Arc<parking_lot::RwLock<HashMap<RouteKey, Arc<parking_lot::Mutex<RouteEntry>>>>>,
73 /// Allocator for new `RouteKey.subscription_rep` ids; monotonic and
74 /// shared across all `EventBus` clones.
75 next_subscription_rep: Arc<SubscriptionRepAllocator>,
76}
77
78impl EventBus {
79 /// Create a new event bus with default capacity.
80 #[must_use]
81 pub fn new() -> Self {
82 Self::with_capacity(DEFAULT_CHANNEL_CAPACITY)
83 }
84
85 /// Create a new event bus with specified capacity.
86 #[must_use]
87 pub fn with_capacity(capacity: usize) -> Self {
88 let (sender, _) = broadcast::channel(capacity);
89 Self {
90 sender,
91 registry: Arc::new(SubscriberRegistry::new()),
92 capacity,
93 ipc_seq: Arc::new(AtomicU64::new(1)),
94 routes: Arc::new(parking_lot::RwLock::new(HashMap::new())),
95 next_subscription_rep: Arc::new(SubscriptionRepAllocator::default()),
96 }
97 }
98
99 /// Publish an event to all subscribers.
100 ///
101 /// This method broadcasts the event to all async subscribers and
102 /// notifies all synchronous subscribers in the registry.
103 ///
104 /// Returns the number of async receivers that received the event.
105 pub fn publish(&self, mut event: AstridEvent) -> usize {
106 // Stamp IPC messages with a monotonic sequence number for ordered delivery.
107 if let AstridEvent::Ipc {
108 ref mut message, ..
109 } = event
110 {
111 message.seq = self.ipc_seq.fetch_add(1, Ordering::Relaxed);
112 }
113 let event = Arc::new(event);
114
115 // Publish throughput by bounded event kind. `rate()` shows bus
116 // load; paired with the per-subscriber lag counter it localises a
117 // feedback storm. `event_type()` is a closed `&'static str` set,
118 // so cardinality is fixed (IPC traffic collapses to `"ipc"`).
119 metrics::counter!(METRIC_BUS_EVENTS_PUBLISHED_TOTAL, "event_kind" => event.event_type())
120 .increment(1);
121
122 trace!(event_type = %event.event_type(), "Publishing event");
123
124 // Broadcast to async subscribers first so they don't wait for synchronous subscribers
125 let count = if let Ok(c) = self.sender.send(Arc::clone(&event)) {
126 debug!(
127 event_type = %event.event_type(),
128 receiver_count = c,
129 "Event published"
130 );
131 c
132 } else {
133 // No receivers - this is fine
134 trace!(event_type = %event.event_type(), "No receivers for event");
135 0
136 };
137
138 // Notify synchronous subscribers
139 self.registry.notify(&event, self);
140
141 // Fan out to routed subscriptions AFTER broadcast::send so a
142 // slow routed enqueue can never delay untargeted consumers
143 // (kernel_router, admin_router, bus_monitor — all still on
144 // broadcast). Routed receivers attached to the bus get full
145 // per-(capsule, topic, principal) delivery via the demux here.
146 self.dispatch_to_routes(&event);
147
148 count
149 }
150
151 /// Iterate the routes table, fan out matching events into each
152 /// route's per-principal queue. The read-lock is released as soon
153 /// as the matching set is cloned out so a slow per-route push can
154 /// never block a sibling publish or a `subscribe_topic_routed`
155 /// write-lock acquisition.
156 fn dispatch_to_routes(&self, event: &Arc<AstridEvent>) {
157 // Snapshot matching route Arcs under the read lock, then
158 // release the lock before doing any per-route enqueue work.
159 // Without this, a publisher loop would hold the read lock
160 // across every route's lock-and-push, blocking
161 // `subscribe_topic_routed` callers (which need the write lock).
162 let matched: Vec<(RouteKey, Arc<parking_lot::Mutex<RouteEntry>>)> = {
163 let routes = self.routes.read();
164 if routes.is_empty() {
165 return;
166 }
167 routes
168 .iter()
169 .filter_map(|(k, e)| {
170 let entry = e.lock();
171 if entry.matcher.matches(event) {
172 // Hold a shared label snapshot before drop so we
173 // can release the per-entry lock between the
174 // matcher check and the actual push (push needs
175 // its own write lock).
176 drop(entry);
177 Some((k.clone(), Arc::clone(e)))
178 } else {
179 None
180 }
181 })
182 .collect()
183 };
184 if matched.is_empty() {
185 return;
186 }
187
188 let principal: PrincipalKey = match &**event {
189 AstridEvent::Ipc { message, .. } => message.principal.clone(),
190 _ => None,
191 };
192
193 for (_key, entry_arc) in matched {
194 let mut entry = entry_arc.lock();
195 // Self-scope gate: a route scoped to a single principal drops a
196 // foreign-principal event here, skipping BOTH the push and the
197 // wakeup. Without the notify-skip the receiver would be woken to
198 // drain nothing and immediately re-park. Unscoped routes
199 // (`scope == None`) accept every publisher, so this is a pure
200 // no-op for them and the push path is byte-identical to before.
201 if !entry.accepts(&principal) {
202 continue;
203 }
204 entry.push_with_eviction(
205 Arc::clone(event),
206 principal.clone(),
207 MAX_SUBSCRIPTION_BUDGET_BYTES,
208 );
209 // Capture the notify Arc before drop so we can wake the
210 // receiver without holding the entry lock across the wake.
211 let notify = Arc::clone(&entry.notify);
212 drop(entry);
213 notify.notify_one();
214 }
215 }
216
217 /// Subscribe to events.
218 ///
219 /// Returns a receiver that will receive all published events. The
220 /// receiver's lag is attributed to the `"untagged"` subscriber in
221 /// [`METRIC_BUS_RECEIVER_LAGGED_TOTAL`]; use [`subscribe_as`] to give a
222 /// long-lived consumer a stable label.
223 ///
224 /// [`subscribe_as`]: Self::subscribe_as
225 #[must_use]
226 pub fn subscribe(&self) -> EventReceiver {
227 self.subscribe_as(SUBSCRIBER_UNTAGGED)
228 }
229
230 /// Subscribe to all events, attributing this receiver's lag to a
231 /// stable `subscriber` label. Pass a fixed `&'static str` (never
232 /// caller/remote text) so the lag-counter cardinality stays bounded.
233 #[must_use]
234 pub fn subscribe_as(&self, subscriber: &'static str) -> EventReceiver {
235 EventReceiver::new(self.sender.subscribe(), None, subscriber)
236 }
237
238 /// Subscribe to IPC events matching a specific topic pattern.
239 ///
240 /// The pattern can be an exact match (e.g. `astrid.cli.input`)
241 /// or end with a trailing `*` (e.g. `astrid.v1.request.*`) which matches
242 /// one or more remaining dot-separated segments up to a maximum depth of 20.
243 /// Middle wildcards (e.g. `astrid.*.event`) match exactly one segment.
244 ///
245 /// Lag is attributed to `"untagged"`; use [`subscribe_topic_as`] for a
246 /// long-lived consumer.
247 ///
248 /// [`subscribe_topic_as`]: Self::subscribe_topic_as
249 #[must_use]
250 pub fn subscribe_topic(&self, topic_pattern: impl Into<String>) -> EventReceiver {
251 self.subscribe_topic_as(topic_pattern, SUBSCRIBER_UNTAGGED)
252 }
253
254 /// Topic subscription that attributes this receiver's lag to a stable
255 /// `subscriber` label. Pass a fixed `&'static str` (never the topic
256 /// pattern itself, which can be capsule-supplied) so the lag-counter
257 /// cardinality stays bounded.
258 #[must_use]
259 pub fn subscribe_topic_as(
260 &self,
261 topic_pattern: impl Into<String>,
262 subscriber: &'static str,
263 ) -> EventReceiver {
264 EventReceiver::new(
265 self.sender.subscribe(),
266 Some(topic_pattern.into()),
267 subscriber,
268 )
269 }
270
271 /// Subscribe with publish-side per-(capsule, topic, principal)
272 /// routing.
273 ///
274 /// Allocates a [`RouteEntry`] in the bus's `routes` table and
275 /// returns a [`RoutedEventReceiver`] that drains its own queues
276 /// with deficit-round-robin fairness across principals. Two
277 /// receivers of the same `(capsule_uuid, topic_pattern)` get
278 /// distinct routes — each receives its own copy of every matching
279 /// event, unlike the broadcast channel which shares one queue.
280 ///
281 /// Dropping the receiver removes its route from the bus.
282 #[must_use]
283 pub fn subscribe_topic_routed(
284 &self,
285 capsule_uuid: uuid::Uuid,
286 topic_pattern: impl Into<String>,
287 capsule_id_label: impl Into<String>,
288 subscriber: &'static str,
289 ) -> RoutedEventReceiver {
290 self.subscribe_topic_routed_scoped(
291 capsule_uuid,
292 topic_pattern,
293 capsule_id_label,
294 subscriber,
295 None,
296 )
297 }
298
299 /// Routed subscription self-scoped to a single publisher principal.
300 ///
301 /// Identical to [`subscribe_topic_routed`](Self::subscribe_topic_routed)
302 /// except the route only ever admits events whose publisher
303 /// [`PrincipalKey`] equals `scope`; foreign-principal events are dropped
304 /// at enqueue so they never enter this route's byte budget (see
305 /// [`RouteEntry::accepts`](crate::route::RouteEntry::accepts)). Pass
306 /// `scope == None` for the unscoped, all-principals behaviour —
307 /// `subscribe_topic_routed` is exactly that delegation.
308 ///
309 /// The scope is the authorization seam for capability-gated firehose
310 /// topics (e.g. the audit feed): a non-privileged subscriber is scoped
311 /// to its own principal so it can never observe another principal's
312 /// events, while a privileged firehose holder subscribes with
313 /// `scope == None`.
314 ///
315 /// Dropping the receiver removes its route from the bus.
316 #[must_use]
317 pub fn subscribe_topic_routed_scoped(
318 &self,
319 capsule_uuid: uuid::Uuid,
320 topic_pattern: impl Into<String>,
321 capsule_id_label: impl Into<String>,
322 subscriber: &'static str,
323 scope: Option<PrincipalKey>,
324 ) -> RoutedEventReceiver {
325 let topic_pattern = topic_pattern.into();
326 let capsule_label = capsule_id_label.into();
327 let route_key = RouteKey {
328 capsule_uuid,
329 topic_pattern: topic_pattern.clone(),
330 subscription_rep: self.next_subscription_rep.next(),
331 };
332 let matcher = TopicMatcher::new(topic_pattern);
333 let entry = Arc::new(parking_lot::Mutex::new(RouteEntry::new(
334 matcher,
335 capsule_label,
336 scope,
337 )));
338 let notify = Arc::clone(&entry.lock().notify);
339 {
340 let mut routes = self.routes.write();
341 routes.insert(route_key.clone(), Arc::clone(&entry));
342 }
343 RoutedEventReceiver {
344 route_key,
345 route_entry: entry,
346 notify,
347 routes: Arc::clone(&self.routes),
348 lagged_count: 0,
349 subscriber,
350 }
351 }
352
353 /// Number of active routed subscriptions (diagnostic).
354 #[must_use]
355 pub fn routed_subscription_count(&self) -> usize {
356 self.routes.read().len()
357 }
358
359 /// Get the synchronous subscriber registry (test-only).
360 #[cfg(test)]
361 #[must_use]
362 pub(crate) fn registry(&self) -> &SubscriberRegistry {
363 &self.registry
364 }
365
366 /// Get the current number of active subscribers (both async and synchronous).
367 #[must_use]
368 pub fn subscriber_count(&self) -> usize {
369 self.sender
370 .receiver_count()
371 .saturating_add(self.registry.len())
372 }
373
374 /// Get the channel capacity.
375 #[must_use]
376 pub fn capacity(&self) -> usize {
377 self.capacity
378 }
379}
380
381impl Default for EventBus {
382 fn default() -> Self {
383 Self::new()
384 }
385}
386
387impl Clone for EventBus {
388 fn clone(&self) -> Self {
389 // Create a new bus that shares the same sender,
390 // subscriber registry, sequence counter, and routes table so
391 // a routed subscription created via one handle is visible to
392 // every publisher holding any clone of the bus.
393 Self {
394 sender: self.sender.clone(),
395 registry: Arc::clone(&self.registry),
396 capacity: self.capacity,
397 ipc_seq: Arc::clone(&self.ipc_seq),
398 routes: Arc::clone(&self.routes),
399 next_subscription_rep: Arc::clone(&self.next_subscription_rep),
400 }
401 }
402}
403
404/// Receiver for events from the event bus.
405pub struct EventReceiver {
406 receiver: broadcast::Receiver<Arc<AstridEvent>>,
407 /// Optional topic pattern. If specified, only `AstridEvent::Ipc` messages matching
408 /// this pattern will be yielded (non-IPC events will be strictly filtered out).
409 topic_pattern: Option<String>,
410 /// Cumulative count of messages lost due to broadcast channel lag.
411 /// Incremented each time the receiver falls behind the sender.
412 lagged_count: u64,
413 /// Stable label for this receiver in [`METRIC_BUS_RECEIVER_LAGGED_TOTAL`].
414 /// A fixed `&'static str` (code-assigned, never caller text) so the
415 /// lag counter's cardinality is bounded.
416 subscriber: &'static str,
417}
418
419impl EventReceiver {
420 /// Create a new receiver with an optional topic filter and a stable
421 /// subscriber label for lag attribution.
422 pub(crate) fn new(
423 receiver: broadcast::Receiver<Arc<AstridEvent>>,
424 topic_pattern: Option<String>,
425 subscriber: &'static str,
426 ) -> Self {
427 Self {
428 receiver,
429 topic_pattern,
430 lagged_count: 0,
431 subscriber,
432 }
433 }
434
435 /// Check if an event matches our topic pattern.
436 ///
437 /// Uses segment-aware matching. A `*` in a non-trailing position matches
438 /// exactly one segment. A trailing `*` (last segment) matches one or more
439 /// remaining segments, enabling namespace-level subscriptions (e.g.
440 /// `astrid.v1.lifecycle.*` matches all lifecycle events regardless of depth).
441 ///
442 /// Note: this differs from `dispatcher::topic_matches` used for interceptor
443 /// routing, where `*` always matches exactly one segment (equal segment
444 /// count is required). Topics deeper than 20 segments are rejected.
445 fn matches(&self, event: &AstridEvent) -> bool {
446 let Some(pattern) = &self.topic_pattern else {
447 return true;
448 };
449
450 let AstridEvent::Ipc { message, .. } = event else {
451 // If a topic pattern is set, we ONLY care about matching IPC events.
452 return false;
453 };
454
455 crate::topic_pattern_matches(pattern, &message.topic)
456 }
457
458 /// Returns and resets the cumulative count of messages lost due to
459 /// broadcast channel lag since the last call.
460 pub fn drain_lagged(&mut self) -> u64 {
461 std::mem::take(&mut self.lagged_count)
462 }
463
464 /// Receive the next event.
465 ///
466 /// Returns `None` if the channel is closed or if events were dropped
467 /// due to the receiver being too slow.
468 pub async fn recv(&mut self) -> Option<Arc<AstridEvent>> {
469 let mut skipped: usize = 0;
470 loop {
471 match self.receiver.recv().await {
472 Ok(event) => {
473 if self.matches(&event) {
474 return Some(event);
475 }
476 // Filtered-out event. Yield every `YIELD_AFTER_SKIPPED`
477 // non-matching events so a subscriber draining a backlog
478 // under a broadcast storm can't hold the worker for an
479 // unbounded synchronous run.
480 skipped = skipped.wrapping_add(1);
481 if skipped.is_multiple_of(YIELD_AFTER_SKIPPED) {
482 #[cfg(not(target_os = "wasi"))]
483 tokio::task::yield_now().await;
484 #[cfg(target_os = "wasi")]
485 std::hint::spin_loop();
486 }
487 },
488 Err(broadcast::error::RecvError::Lagged(count)) => {
489 tracing::error!(target: "astrid.bus", security_event = true, skipped = count, subscriber = self.subscriber, "Event receiver lagged, events dropped");
490 self.lagged_count = self.lagged_count.saturating_add(count);
491 metrics::counter!(
492 METRIC_BUS_RECEIVER_LAGGED_TOTAL,
493 "subscriber" => self.subscriber,
494 )
495 .increment(count);
496 // A lag means the broadcast buffer overran this receiver —
497 // i.e. a storm is in progress. Yield before catching up so
498 // the catch-up doesn't monopolize the worker at the worst
499 // possible moment.
500 #[cfg(not(target_os = "wasi"))]
501 tokio::task::yield_now().await;
502 #[cfg(target_os = "wasi")]
503 std::hint::spin_loop();
504 // Just yielded — reset so the next non-matching event can't
505 // trigger an immediate back-to-back yield.
506 skipped = 0;
507 },
508 Err(broadcast::error::RecvError::Closed) => return None,
509 }
510 }
511 }
512
513 /// Try to receive the next event without blocking.
514 ///
515 /// Returns `Some(event)` if an event is available, or `None` if no event
516 /// is available or the channel is closed.
517 pub fn try_recv(&mut self) -> Option<Arc<AstridEvent>> {
518 loop {
519 match self.receiver.try_recv() {
520 Ok(event) => {
521 if self.matches(&event) {
522 return Some(event);
523 }
524 },
525 Err(broadcast::error::TryRecvError::Lagged(count)) => {
526 warn!(skipped = count, "Event receiver lagged, events dropped");
527 self.lagged_count = self.lagged_count.saturating_add(count);
528 metrics::counter!(
529 METRIC_BUS_RECEIVER_LAGGED_TOTAL,
530 "subscriber" => self.subscriber,
531 )
532 .increment(count);
533 // Continue receiving
534 },
535 Err(
536 broadcast::error::TryRecvError::Empty | broadcast::error::TryRecvError::Closed,
537 ) => return None,
538 }
539 }
540 }
541}
542
543#[cfg(test)]
544#[path = "bus_tests.rs"]
545mod tests;