Skip to main content

harmont_cli/orchestrator/
events.rs

1//! Build-event broadcast channel.
2//!
3//! Subscribers (output formatter plugin, lifecycle hook plugins,
4//! the human-readable progress sink) all subscribe to the same
5//! channel; the host's `emit_event` / `emit_step_log` host fns
6//! publish into it.
7
8// `new()` returning `Arc<Self>` is intentional (the bus is always
9// shared); `subscribe()` returns a tokio receiver that callers must
10// own.  Both look like must-use candidates to clippy.
11#![allow(clippy::must_use_candidate)]
12
13use std::sync::Arc;
14
15use hm_plugin_protocol::BuildEvent;
16use tokio::sync::broadcast;
17
18/// Channel capacity. Larger than the queue any one subscriber should
19/// fall behind on. Subscribers that lag past this drop events and
20/// receive a `Lagged` error.
21const BUS_CAPACITY: usize = 1024;
22
23#[derive(Debug, Clone)]
24pub struct EventBus {
25    tx: broadcast::Sender<BuildEvent>,
26}
27
28impl EventBus {
29    pub fn new() -> Arc<Self> {
30        let (tx, _rx) = broadcast::channel(BUS_CAPACITY);
31        Arc::new(Self { tx })
32    }
33
34    pub fn subscribe(&self) -> broadcast::Receiver<BuildEvent> {
35        self.tx.subscribe()
36    }
37
38    /// Publish an event. Returns the number of subscribers that
39    /// received it. A return of 0 is normal (no subscribers yet).
40    pub fn emit(&self, event: BuildEvent) {
41        // We intentionally drop the error: zero-subscriber sends are
42        // not interesting and we don't want host_fn impls to fail
43        // because nobody is listening.
44        let _ = self.tx.send(event);
45    }
46}
47
48#[cfg(test)]
49#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
50mod tests {
51    use super::*;
52
53    #[tokio::test]
54    async fn emit_and_receive() {
55        let bus = EventBus::new();
56        let mut rx = bus.subscribe();
57        bus.emit(BuildEvent::BuildEnd {
58            exit_code: 0,
59            duration_ms: 1,
60        });
61        let ev = rx.recv().await.unwrap();
62        matches!(ev, BuildEvent::BuildEnd { exit_code: 0, .. });
63    }
64
65    #[tokio::test]
66    async fn no_subscribers_is_not_an_error() {
67        let bus = EventBus::new();
68        bus.emit(BuildEvent::BuildEnd {
69            exit_code: 0,
70            duration_ms: 0,
71        });
72        // Should not panic.
73    }
74}