bamboo_engine/runtime/execution/event_forwarder.rs
1//! Event forwarding from MPSC to broadcast channels.
2//!
3//! Creates an MPSC channel for agent loop events and spawns a background task
4//! that relays events to the session's broadcast sender while tracking runner
5//! diagnostic state (budget events, tool execution, round progress).
6
7use std::collections::HashMap;
8use std::sync::Arc;
9
10use chrono::Utc;
11use tokio::sync::{broadcast, mpsc, RwLock};
12
13use bamboo_agent_core::AgentEvent;
14
15use super::runner_state::AgentRunner;
16
17/// Inbox to the account-wide change feed: `(session_id, event)` before the
18/// writer assigns a seq. Threaded as `Option` so engine-internal callers that
19/// have no feed (tests, standalone embeddings) can pass `None`. Defined here so
20/// the engine stays free of any `bamboo-server` dependency.
21pub type AccountFeedInbox = mpsc::Sender<(Option<String>, AgentEvent)>;
22
23/// Forward a durable change event onto the account feed, if an inbox is wired.
24///
25/// Ephemeral events (tokens, heartbeats, …) are filtered out before any clone,
26/// so this is near-free on the hot path. `session_id` is supplied explicitly so
27/// terminal events (which carry no id) still route to the right session.
28fn mirror_to_account_feed(inbox: &Option<AccountFeedInbox>, session_id: &str, event: &AgentEvent) {
29 if let Some(inbox) = inbox {
30 if event.is_durable_change() {
31 let _ = inbox.try_send((Some(session_id.to_string()), event.clone()));
32 }
33 }
34}
35
36/// Create an MPSC channel for agent events and spawn a forwarding task
37/// that relays events to the broadcast sender while tracking runner
38/// diagnostic fields for live visibility.
39///
40/// `account_feed_inbox`, when present, also mirrors durable change events onto
41/// the account-wide feed for resumable multi-client sync.
42///
43/// Returns `(mpsc_tx, forwarder_handle)`.
44pub fn create_event_forwarder(
45 session_id: String,
46 broadcast_tx: broadcast::Sender<AgentEvent>,
47 runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
48 account_feed_inbox: Option<AccountFeedInbox>,
49) -> (mpsc::Sender<AgentEvent>, tokio::task::JoinHandle<()>) {
50 let (mpsc_tx, mut mpsc_rx) = mpsc::channel::<AgentEvent>(100);
51
52 let forwarder = tokio::spawn(async move {
53 // Emit ExecutionStarted as the first event so the frontend can correlate
54 // all subsequent events to this run via run_id.
55 {
56 let runners = runners.read().await;
57 if let Some(runner) = runners.get(&session_id) {
58 let started_event = AgentEvent::ExecutionStarted {
59 run_id: runner.run_id.clone(),
60 session_id: session_id.clone(),
61 started_at: Utc::now().to_rfc3339(),
62 };
63 mirror_to_account_feed(&account_feed_inbox, &session_id, &started_event);
64 let _ = broadcast_tx.send(started_event);
65 }
66 }
67
68 while let Some(event) = mpsc_rx.recv().await {
69 {
70 let mut runners = runners.write().await;
71 if let Some(runner) = runners.get_mut(&session_id) {
72 runner.last_event_at = Some(Utc::now());
73
74 match &event {
75 AgentEvent::TokenBudgetUpdated { .. } => {
76 runner.last_budget_event = Some(event.clone());
77 }
78 AgentEvent::ToolStart { tool_name, .. } => {
79 runner.last_tool_name = Some(tool_name.clone());
80 runner.last_tool_phase = Some("begin".to_string());
81 }
82 AgentEvent::ToolLifecycle {
83 tool_name, phase, ..
84 } => {
85 runner.last_tool_name = Some(tool_name.clone());
86 runner.last_tool_phase = Some(phase.clone());
87 }
88 AgentEvent::RunnerProgress { round_count, .. } => {
89 runner.round_count = *round_count;
90 }
91 _ => {}
92 }
93 }
94 }
95 mirror_to_account_feed(&account_feed_inbox, &session_id, &event);
96 let _ = broadcast_tx.send(event);
97 }
98 });
99
100 (mpsc_tx, forwarder)
101}