Skip to main content

astrid_events/
bus.rs

1//! Event bus for broadcasting events to subscribers.
2
3use std::collections::HashMap;
4use std::sync::Arc;
5use std::sync::atomic::{AtomicU64, Ordering};
6use tokio::sync::broadcast;
7use tracing::{debug, trace, warn};
8
9use crate::event::AstridEvent;
10use crate::route::{
11    MAX_SUBSCRIPTION_BUDGET_BYTES, PrincipalKey, RouteEntry, RouteKey, RoutedEventReceiver,
12    SubscriptionRepAllocator, TopicMatcher,
13};
14use crate::subscriber::SubscriberRegistry;
15
16/// Default channel capacity for the event bus.
17pub(crate) const DEFAULT_CHANNEL_CAPACITY: usize = 1024;
18
19/// How many consecutive non-matching events a topic-filtered subscriber may
20/// drain before yielding to the scheduler. A subscriber filtering a backlog
21/// under a broadcast storm would otherwise hold its worker for this many
22/// synchronous iterations (`broadcast::recv` returns buffered items without
23/// awaiting). Kept small to bound that monopolization, but not 1 — yielding on
24/// every event would slow the drain enough to risk self-induced lag. Normal
25/// operation rarely reaches it: `recv().await` parks between events when the
26/// channel isn't backlogged.
27const YIELD_AFTER_SKIPPED: usize = 32;
28
29/// Counter: events published to the bus, labelled by the bounded
30/// `event_kind` (`AstridEvent::event_type`, a closed `&'static str` set).
31pub(crate) const METRIC_BUS_EVENTS_PUBLISHED_TOTAL: &str = "astrid_bus_events_published_total";
32
33/// Counter: events a receiver dropped by falling behind the sender,
34/// labelled by `subscriber`. A non-zero `rate()` on any subscriber is the
35/// signature of bus backpressure / a feedback storm — the failure mode
36/// that pegs CPU by waking every broadcast subscriber. Subscriber labels
37/// are a fixed, code-assigned set (see [`EventBus::subscribe_as`]);
38/// untagged subscriptions collapse to `"untagged"`.
39pub(crate) const METRIC_BUS_RECEIVER_LAGGED_TOTAL: &str = "astrid_bus_receiver_lagged_total";
40
41/// Subscriber label applied to receivers created without an explicit tag.
42/// Keeps the `subscriber` label cardinality bounded even for dynamic
43/// (capsule-supplied) topic subscriptions.
44const SUBSCRIBER_UNTAGGED: &str = "untagged";
45
46/// Event bus for broadcasting events to all subscribers.
47///
48/// The event bus uses a broadcast channel to deliver events to all
49/// connected receivers. Events are delivered asynchronously and in order.
50///
51/// **WARNING:** Synchronous subscribers (`SubscriberRegistry`) are shared
52/// across clones. Storing a cloned `EventBus` inside a synchronous subscriber
53/// will create a memory leak via an `Arc` reference cycle. If a synchronous
54/// subscriber needs to publish events, store a `std::sync::Weak<EventBus>`
55/// or communicate via a separate channel.
56#[derive(Debug)]
57pub struct EventBus {
58    /// Sender for broadcasting events.
59    sender: broadcast::Sender<Arc<AstridEvent>>,
60    /// Registry for synchronous subscribers.
61    registry: Arc<SubscriberRegistry>,
62    /// Channel capacity.
63    capacity: usize,
64    /// Monotonic sequence counter for IPC message ordering.
65    ipc_seq: Arc<AtomicU64>,
66    /// Per-(capsule, topic, principal) routing table for guest
67    /// subscriptions. Demand-allocated entries; an idle principal has
68    /// zero entries even when the bus has 5000 active subscribers (#813).
69    /// `parking_lot::RwLock` keeps `publish` synchronous so the
70    /// reentrant `SubscriberRegistry::notify` path does not need to be
71    /// rewritten as async.
72    routes: Arc<parking_lot::RwLock<HashMap<RouteKey, Arc<parking_lot::Mutex<RouteEntry>>>>>,
73    /// Allocator for new `RouteKey.subscription_rep` ids; monotonic and
74    /// shared across all `EventBus` clones.
75    next_subscription_rep: Arc<SubscriptionRepAllocator>,
76}
77
78impl EventBus {
79    /// Create a new event bus with default capacity.
80    #[must_use]
81    pub fn new() -> Self {
82        Self::with_capacity(DEFAULT_CHANNEL_CAPACITY)
83    }
84
85    /// Create a new event bus with specified capacity.
86    #[must_use]
87    pub fn with_capacity(capacity: usize) -> Self {
88        let (sender, _) = broadcast::channel(capacity);
89        Self {
90            sender,
91            registry: Arc::new(SubscriberRegistry::new()),
92            capacity,
93            ipc_seq: Arc::new(AtomicU64::new(1)),
94            routes: Arc::new(parking_lot::RwLock::new(HashMap::new())),
95            next_subscription_rep: Arc::new(SubscriptionRepAllocator::default()),
96        }
97    }
98
99    /// Publish an event to all subscribers.
100    ///
101    /// This method broadcasts the event to all async subscribers and
102    /// notifies all synchronous subscribers in the registry.
103    ///
104    /// Returns the number of async receivers that received the event.
105    pub fn publish(&self, mut event: AstridEvent) -> usize {
106        // Stamp IPC messages with a monotonic sequence number for ordered delivery.
107        if let AstridEvent::Ipc {
108            ref mut message, ..
109        } = event
110        {
111            message.seq = self.ipc_seq.fetch_add(1, Ordering::Relaxed);
112        }
113        let event = Arc::new(event);
114
115        // Publish throughput by bounded event kind. `rate()` shows bus
116        // load; paired with the per-subscriber lag counter it localises a
117        // feedback storm. `event_type()` is a closed `&'static str` set,
118        // so cardinality is fixed (IPC traffic collapses to `"ipc"`).
119        metrics::counter!(METRIC_BUS_EVENTS_PUBLISHED_TOTAL, "event_kind" => event.event_type())
120            .increment(1);
121
122        trace!(event_type = %event.event_type(), "Publishing event");
123
124        // Broadcast to async subscribers first so they don't wait for synchronous subscribers
125        let count = if let Ok(c) = self.sender.send(Arc::clone(&event)) {
126            debug!(
127                event_type = %event.event_type(),
128                receiver_count = c,
129                "Event published"
130            );
131            c
132        } else {
133            // No receivers - this is fine
134            trace!(event_type = %event.event_type(), "No receivers for event");
135            0
136        };
137
138        // Notify synchronous subscribers
139        self.registry.notify(&event, self);
140
141        // Fan out to routed subscriptions AFTER broadcast::send so a
142        // slow routed enqueue can never delay untargeted consumers
143        // (kernel_router, admin_router, bus_monitor — all still on
144        // broadcast). Routed receivers attached to the bus get full
145        // per-(capsule, topic, principal) delivery via the demux here.
146        self.dispatch_to_routes(&event);
147
148        count
149    }
150
151    /// Iterate the routes table, fan out matching events into each
152    /// route's per-principal queue. The read-lock is released as soon
153    /// as the matching set is cloned out so a slow per-route push can
154    /// never block a sibling publish or a `subscribe_topic_routed`
155    /// write-lock acquisition.
156    fn dispatch_to_routes(&self, event: &Arc<AstridEvent>) {
157        // Snapshot matching route Arcs under the read lock, then
158        // release the lock before doing any per-route enqueue work.
159        // Without this, a publisher loop would hold the read lock
160        // across every route's lock-and-push, blocking
161        // `subscribe_topic_routed` callers (which need the write lock).
162        let matched: Vec<(RouteKey, Arc<parking_lot::Mutex<RouteEntry>>)> = {
163            let routes = self.routes.read();
164            if routes.is_empty() {
165                return;
166            }
167            routes
168                .iter()
169                .filter_map(|(k, e)| {
170                    let entry = e.lock();
171                    if entry.matcher.matches(event) {
172                        // Hold a shared label snapshot before drop so we
173                        // can release the per-entry lock between the
174                        // matcher check and the actual push (push needs
175                        // its own write lock).
176                        drop(entry);
177                        Some((k.clone(), Arc::clone(e)))
178                    } else {
179                        None
180                    }
181                })
182                .collect()
183        };
184        if matched.is_empty() {
185            return;
186        }
187
188        let principal: PrincipalKey = match &**event {
189            AstridEvent::Ipc { message, .. } => message.principal.clone(),
190            _ => None,
191        };
192
193        for (_key, entry_arc) in matched {
194            let mut entry = entry_arc.lock();
195            // Self-scope gate: a route scoped to a single principal drops a
196            // foreign-principal event here, skipping BOTH the push and the
197            // wakeup. Without the notify-skip the receiver would be woken to
198            // drain nothing and immediately re-park. Unscoped routes
199            // (`scope == None`) accept every publisher, so this is a pure
200            // no-op for them and the push path is byte-identical to before.
201            if !entry.accepts(&principal) {
202                continue;
203            }
204            entry.push_with_eviction(
205                Arc::clone(event),
206                principal.clone(),
207                MAX_SUBSCRIPTION_BUDGET_BYTES,
208            );
209            // Capture the notify Arc before drop so we can wake the
210            // receiver without holding the entry lock across the wake.
211            let notify = Arc::clone(&entry.notify);
212            drop(entry);
213            notify.notify_one();
214        }
215    }
216
217    /// Subscribe to events.
218    ///
219    /// Returns a receiver that will receive all published events. The
220    /// receiver's lag is attributed to the `"untagged"` subscriber in
221    /// [`METRIC_BUS_RECEIVER_LAGGED_TOTAL`]; use [`subscribe_as`] to give a
222    /// long-lived consumer a stable label.
223    ///
224    /// [`subscribe_as`]: Self::subscribe_as
225    #[must_use]
226    pub fn subscribe(&self) -> EventReceiver {
227        self.subscribe_as(SUBSCRIBER_UNTAGGED)
228    }
229
230    /// Subscribe to all events, attributing this receiver's lag to a
231    /// stable `subscriber` label. Pass a fixed `&'static str` (never
232    /// caller/remote text) so the lag-counter cardinality stays bounded.
233    #[must_use]
234    pub fn subscribe_as(&self, subscriber: &'static str) -> EventReceiver {
235        EventReceiver::new(self.sender.subscribe(), None, subscriber)
236    }
237
238    /// Subscribe to IPC events matching a specific topic pattern.
239    ///
240    /// The pattern can be an exact match (e.g. `astrid.cli.input`)
241    /// or end with a trailing `*` (e.g. `astrid.v1.request.*`) which matches
242    /// one or more remaining dot-separated segments up to a maximum depth of 20.
243    /// Middle wildcards (e.g. `astrid.*.event`) match exactly one segment.
244    ///
245    /// Lag is attributed to `"untagged"`; use [`subscribe_topic_as`] for a
246    /// long-lived consumer.
247    ///
248    /// [`subscribe_topic_as`]: Self::subscribe_topic_as
249    #[must_use]
250    pub fn subscribe_topic(&self, topic_pattern: impl Into<String>) -> EventReceiver {
251        self.subscribe_topic_as(topic_pattern, SUBSCRIBER_UNTAGGED)
252    }
253
254    /// Topic subscription that attributes this receiver's lag to a stable
255    /// `subscriber` label. Pass a fixed `&'static str` (never the topic
256    /// pattern itself, which can be capsule-supplied) so the lag-counter
257    /// cardinality stays bounded.
258    #[must_use]
259    pub fn subscribe_topic_as(
260        &self,
261        topic_pattern: impl Into<String>,
262        subscriber: &'static str,
263    ) -> EventReceiver {
264        EventReceiver::new(
265            self.sender.subscribe(),
266            Some(topic_pattern.into()),
267            subscriber,
268        )
269    }
270
271    /// Subscribe with publish-side per-(capsule, topic, principal)
272    /// routing.
273    ///
274    /// Allocates a [`RouteEntry`] in the bus's `routes` table and
275    /// returns a [`RoutedEventReceiver`] that drains its own queues
276    /// with deficit-round-robin fairness across principals. Two
277    /// receivers of the same `(capsule_uuid, topic_pattern)` get
278    /// distinct routes — each receives its own copy of every matching
279    /// event, unlike the broadcast channel which shares one queue.
280    ///
281    /// Dropping the receiver removes its route from the bus.
282    #[must_use]
283    pub fn subscribe_topic_routed(
284        &self,
285        capsule_uuid: uuid::Uuid,
286        topic_pattern: impl Into<String>,
287        capsule_id_label: impl Into<String>,
288        subscriber: &'static str,
289    ) -> RoutedEventReceiver {
290        self.subscribe_topic_routed_scoped(
291            capsule_uuid,
292            topic_pattern,
293            capsule_id_label,
294            subscriber,
295            None,
296        )
297    }
298
299    /// Routed subscription self-scoped to a single publisher principal.
300    ///
301    /// Identical to [`subscribe_topic_routed`](Self::subscribe_topic_routed)
302    /// except the route only ever admits events whose publisher
303    /// [`PrincipalKey`] equals `scope`; foreign-principal events are dropped
304    /// at enqueue so they never enter this route's byte budget (see
305    /// [`RouteEntry::accepts`](crate::route::RouteEntry::accepts)). Pass
306    /// `scope == None` for the unscoped, all-principals behaviour —
307    /// `subscribe_topic_routed` is exactly that delegation.
308    ///
309    /// The scope is the authorization seam for capability-gated firehose
310    /// topics (e.g. the audit feed): a non-privileged subscriber is scoped
311    /// to its own principal so it can never observe another principal's
312    /// events, while a privileged firehose holder subscribes with
313    /// `scope == None`.
314    ///
315    /// Dropping the receiver removes its route from the bus.
316    #[must_use]
317    pub fn subscribe_topic_routed_scoped(
318        &self,
319        capsule_uuid: uuid::Uuid,
320        topic_pattern: impl Into<String>,
321        capsule_id_label: impl Into<String>,
322        subscriber: &'static str,
323        scope: Option<PrincipalKey>,
324    ) -> RoutedEventReceiver {
325        let topic_pattern = topic_pattern.into();
326        let capsule_label = capsule_id_label.into();
327        let route_key = RouteKey {
328            capsule_uuid,
329            topic_pattern: topic_pattern.clone(),
330            subscription_rep: self.next_subscription_rep.next(),
331        };
332        let matcher = TopicMatcher::new(topic_pattern);
333        let entry = Arc::new(parking_lot::Mutex::new(RouteEntry::new(
334            matcher,
335            capsule_label,
336            scope,
337        )));
338        let notify = Arc::clone(&entry.lock().notify);
339        {
340            let mut routes = self.routes.write();
341            routes.insert(route_key.clone(), Arc::clone(&entry));
342        }
343        RoutedEventReceiver {
344            route_key,
345            route_entry: entry,
346            notify,
347            routes: Arc::clone(&self.routes),
348            lagged_count: 0,
349            subscriber,
350        }
351    }
352
353    /// Number of active routed subscriptions (diagnostic).
354    #[must_use]
355    pub fn routed_subscription_count(&self) -> usize {
356        self.routes.read().len()
357    }
358
359    /// Get the synchronous subscriber registry (test-only).
360    #[cfg(test)]
361    #[must_use]
362    pub(crate) fn registry(&self) -> &SubscriberRegistry {
363        &self.registry
364    }
365
366    /// Get the current number of active subscribers (both async and synchronous).
367    #[must_use]
368    pub fn subscriber_count(&self) -> usize {
369        self.sender
370            .receiver_count()
371            .saturating_add(self.registry.len())
372    }
373
374    /// Get the channel capacity.
375    #[must_use]
376    pub fn capacity(&self) -> usize {
377        self.capacity
378    }
379}
380
381impl Default for EventBus {
382    fn default() -> Self {
383        Self::new()
384    }
385}
386
387impl Clone for EventBus {
388    fn clone(&self) -> Self {
389        // Create a new bus that shares the same sender,
390        // subscriber registry, sequence counter, and routes table so
391        // a routed subscription created via one handle is visible to
392        // every publisher holding any clone of the bus.
393        Self {
394            sender: self.sender.clone(),
395            registry: Arc::clone(&self.registry),
396            capacity: self.capacity,
397            ipc_seq: Arc::clone(&self.ipc_seq),
398            routes: Arc::clone(&self.routes),
399            next_subscription_rep: Arc::clone(&self.next_subscription_rep),
400        }
401    }
402}
403
404/// Receiver for events from the event bus.
405pub struct EventReceiver {
406    receiver: broadcast::Receiver<Arc<AstridEvent>>,
407    /// Optional topic pattern. If specified, only `AstridEvent::Ipc` messages matching
408    /// this pattern will be yielded (non-IPC events will be strictly filtered out).
409    topic_pattern: Option<String>,
410    /// Cumulative count of messages lost due to broadcast channel lag.
411    /// Incremented each time the receiver falls behind the sender.
412    lagged_count: u64,
413    /// Stable label for this receiver in [`METRIC_BUS_RECEIVER_LAGGED_TOTAL`].
414    /// A fixed `&'static str` (code-assigned, never caller text) so the
415    /// lag counter's cardinality is bounded.
416    subscriber: &'static str,
417}
418
419impl EventReceiver {
420    /// Create a new receiver with an optional topic filter and a stable
421    /// subscriber label for lag attribution.
422    pub(crate) fn new(
423        receiver: broadcast::Receiver<Arc<AstridEvent>>,
424        topic_pattern: Option<String>,
425        subscriber: &'static str,
426    ) -> Self {
427        Self {
428            receiver,
429            topic_pattern,
430            lagged_count: 0,
431            subscriber,
432        }
433    }
434
435    /// Check if an event matches our topic pattern.
436    ///
437    /// Uses segment-aware matching. A `*` in a non-trailing position matches
438    /// exactly one segment. A trailing `*` (last segment) matches one or more
439    /// remaining segments, enabling namespace-level subscriptions (e.g.
440    /// `astrid.v1.lifecycle.*` matches all lifecycle events regardless of depth).
441    ///
442    /// Note: this differs from `dispatcher::topic_matches` used for interceptor
443    /// routing, where `*` always matches exactly one segment (equal segment
444    /// count is required). Topics deeper than 20 segments are rejected.
445    fn matches(&self, event: &AstridEvent) -> bool {
446        let Some(pattern) = &self.topic_pattern else {
447            return true;
448        };
449
450        let AstridEvent::Ipc { message, .. } = event else {
451            // If a topic pattern is set, we ONLY care about matching IPC events.
452            return false;
453        };
454
455        crate::topic_pattern_matches(pattern, &message.topic)
456    }
457
458    /// Returns and resets the cumulative count of messages lost due to
459    /// broadcast channel lag since the last call.
460    pub fn drain_lagged(&mut self) -> u64 {
461        std::mem::take(&mut self.lagged_count)
462    }
463
464    /// Receive the next event.
465    ///
466    /// Returns `None` if the channel is closed or if events were dropped
467    /// due to the receiver being too slow.
468    pub async fn recv(&mut self) -> Option<Arc<AstridEvent>> {
469        let mut skipped: usize = 0;
470        loop {
471            match self.receiver.recv().await {
472                Ok(event) => {
473                    if self.matches(&event) {
474                        return Some(event);
475                    }
476                    // Filtered-out event. Yield every `YIELD_AFTER_SKIPPED`
477                    // non-matching events so a subscriber draining a backlog
478                    // under a broadcast storm can't hold the worker for an
479                    // unbounded synchronous run.
480                    skipped = skipped.wrapping_add(1);
481                    if skipped.is_multiple_of(YIELD_AFTER_SKIPPED) {
482                        #[cfg(not(target_os = "wasi"))]
483                        tokio::task::yield_now().await;
484                        #[cfg(target_os = "wasi")]
485                        std::hint::spin_loop();
486                    }
487                },
488                Err(broadcast::error::RecvError::Lagged(count)) => {
489                    tracing::error!(target: "astrid.bus", security_event = true, skipped = count, subscriber = self.subscriber, "Event receiver lagged, events dropped");
490                    self.lagged_count = self.lagged_count.saturating_add(count);
491                    metrics::counter!(
492                        METRIC_BUS_RECEIVER_LAGGED_TOTAL,
493                        "subscriber" => self.subscriber,
494                    )
495                    .increment(count);
496                    // A lag means the broadcast buffer overran this receiver —
497                    // i.e. a storm is in progress. Yield before catching up so
498                    // the catch-up doesn't monopolize the worker at the worst
499                    // possible moment.
500                    #[cfg(not(target_os = "wasi"))]
501                    tokio::task::yield_now().await;
502                    #[cfg(target_os = "wasi")]
503                    std::hint::spin_loop();
504                    // Just yielded — reset so the next non-matching event can't
505                    // trigger an immediate back-to-back yield.
506                    skipped = 0;
507                },
508                Err(broadcast::error::RecvError::Closed) => return None,
509            }
510        }
511    }
512
513    /// Try to receive the next event without blocking.
514    ///
515    /// Returns `Some(event)` if an event is available, or `None` if no event
516    /// is available or the channel is closed.
517    pub fn try_recv(&mut self) -> Option<Arc<AstridEvent>> {
518        loop {
519            match self.receiver.try_recv() {
520                Ok(event) => {
521                    if self.matches(&event) {
522                        return Some(event);
523                    }
524                },
525                Err(broadcast::error::TryRecvError::Lagged(count)) => {
526                    warn!(skipped = count, "Event receiver lagged, events dropped");
527                    self.lagged_count = self.lagged_count.saturating_add(count);
528                    metrics::counter!(
529                        METRIC_BUS_RECEIVER_LAGGED_TOTAL,
530                        "subscriber" => self.subscriber,
531                    )
532                    .increment(count);
533                    // Continue receiving
534                },
535                Err(
536                    broadcast::error::TryRecvError::Empty | broadcast::error::TryRecvError::Closed,
537                ) => return None,
538            }
539        }
540    }
541}
542
543#[cfg(test)]
544#[path = "bus_tests.rs"]
545mod tests;