harmont_cli/orchestrator/output_subscriber.rs
1//! Build-event subscriber that dispatches every `BuildEvent` into the
2//! selected output-formatter plugin's `hm_output_on_event` capability.
3//!
4//! Replaces the plan-2 stop-gap `stderr_sink`. The subscriber acquires
5//! an `Arc<LoadedPlugin>` from the registry per event; the actual
6//! `call_capability` await happens AFTER the registry lock is dropped
7//! so concurrent step-executor invocations do not contend with it.
8//! Output plugins live in their own pool slot (default size 1) — only
9//! this one subscriber task drains the bus, so a pool of 1 suffices.
10
11// Pedantic-bucket nags accepted at module scope:
12// - `needless_pass_by_value` on `bus`: the owned `Arc<EventBus>` makes
13// the bus->subscriber handoff explicit at the call site, mirrors the
14// plan-2 `stderr_sink::spawn_stderr_sink` shape.
15// - `significant_drop_tightening`: the registry `MutexGuard` is held
16// only across the synchronous `get` lookup; the `else` arms return
17// from the spawn task and the happy path moves the `Arc` out and
18// drops the guard naturally at the end of the inner block. The lint
19// would have us sprinkle `drop(reg)` calls which add no clarity.
20// - `print_stderr`: the Lagged arm intentionally bypasses the event
21// bus (which is the source of the lag) to surface a user-visible
22// drop signal, so an `eprintln!` direct to stderr is correct.
23#![allow(
24 clippy::needless_pass_by_value,
25 clippy::significant_drop_tightening,
26 clippy::print_stderr
27)]
28
29use std::sync::Arc;
30
31use anyhow::Result;
32use hm_plugin_protocol::BuildEvent;
33use tokio::sync::Mutex;
34use tokio::sync::broadcast::error::RecvError;
35
36use super::events::EventBus;
37use crate::plugin::PluginRegistry;
38
39/// Spawn the subscriber task. Returns a join handle the orchestrator
40/// awaits at shutdown so the `BuildEnd` event is fully drained.
41///
42/// `format_name` must already exist in `registry.output_formatter_index`
43/// — `scheduler::run` validates this before emitting `BuildStart`, so
44/// a missing entry here means we lost a race against a concurrent
45/// registry mutation (impossible in single-run orchestration). We drop
46/// events silently in that case and exit on `BuildEnd`.
47#[must_use]
48pub fn spawn(
49 bus: Arc<EventBus>,
50 registry: Arc<Mutex<PluginRegistry>>,
51 format_name: String,
52) -> tokio::task::JoinHandle<Result<()>> {
53 let mut rx = bus.subscribe();
54 tokio::spawn(async move {
55 loop {
56 match rx.recv().await {
57 Ok(event) => {
58 // Resolve the plugin under the registry lock, then
59 // drop the lock before awaiting `call_capability`
60 // so concurrent step-executor calls keep flowing.
61 let plugin = {
62 let reg = registry.lock().await;
63 let Some(&idx) = reg.output_formatter_index.get(&format_name) else {
64 // No plugin for this format; CLI parser
65 // should have caught this. Drain silently.
66 if matches!(event, BuildEvent::BuildEnd { .. }) {
67 return Ok(());
68 }
69 continue;
70 };
71 let Some(p) = reg.get(idx) else {
72 if matches!(event, BuildEvent::BuildEnd { .. }) {
73 return Ok(());
74 }
75 continue;
76 };
77 p
78 };
79 let is_end = matches!(event, BuildEvent::BuildEnd { .. });
80 // Log-and-continue on formatter failures: a broken
81 // output plugin shouldn't fail the build.
82 let _: Result<()> = plugin.call_capability("hm_output_on_event", &event).await;
83 if is_end {
84 // Finalise if the plugin exports it. Tolerate
85 // missing/erroring export — most streaming
86 // formatters don't implement it.
87 let _: Result<Vec<u8>> =
88 plugin.call_capability("hm_output_finalize", &()).await;
89 return Ok(());
90 }
91 }
92 Err(RecvError::Closed) => return Ok(()),
93 Err(RecvError::Lagged(n)) => {
94 tracing::warn!(
95 target: "orchestrator",
96 "output_subscriber: dropped {n} build events (subscriber fell behind)"
97 );
98 // Also surface to the user: send a synthetic stderr line via
99 // the host's write_stderr fn directly. This bypasses the
100 // event bus (which is the source of the lag), so it can't
101 // contribute to the lag we're reporting.
102 eprintln!("[output] dropped {n} build events (subscriber fell behind)");
103 }
104 }
105 }
106 })
107}