Skip to main content

bamboo_engine/runtime/execution/
event_forwarder.rs

1//! Event forwarding from MPSC to broadcast channels.
2//!
3//! Creates an MPSC channel for agent loop events and spawns a background task
4//! that relays events to the session's broadcast sender while tracking runner
5//! diagnostic state (budget events, tool execution, round progress).
6
7use std::collections::HashMap;
8use std::sync::Arc;
9
10use chrono::Utc;
11use tokio::sync::{broadcast, mpsc, RwLock};
12
13use bamboo_agent_core::AgentEvent;
14
15use super::runner_state::AgentRunner;
16
17/// Inbox to the account-wide change feed: `(session_id, event)` before the
18/// writer assigns a seq. Threaded as `Option` so engine-internal callers that
19/// have no feed (tests, standalone embeddings) can pass `None`. Defined here so
20/// the engine stays free of any `bamboo-server` dependency.
21pub type AccountFeedInbox = mpsc::Sender<(Option<String>, AgentEvent)>;
22
23/// Forward a durable change event onto the account feed, if an inbox is wired.
24///
25/// Ephemeral events (tokens, heartbeats, …) are filtered out before any clone,
26/// so this is near-free on the hot path. `session_id` is supplied explicitly so
27/// terminal events (which carry no id) still route to the right session.
28fn mirror_to_account_feed(inbox: &Option<AccountFeedInbox>, session_id: &str, event: &AgentEvent) {
29    if let Some(inbox) = inbox {
30        if event.is_durable_change() {
31            let _ = inbox.try_send((Some(session_id.to_string()), event.clone()));
32        }
33    }
34}
35
36/// Create an MPSC channel for agent events and spawn a forwarding task
37/// that relays events to the broadcast sender while tracking runner
38/// diagnostic fields for live visibility.
39///
40/// `account_feed_inbox`, when present, also mirrors durable change events onto
41/// the account-wide feed for resumable multi-client sync.
42///
43/// Returns `(mpsc_tx, forwarder_handle)`.
44pub fn create_event_forwarder(
45    session_id: String,
46    broadcast_tx: broadcast::Sender<AgentEvent>,
47    runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
48    account_feed_inbox: Option<AccountFeedInbox>,
49) -> (mpsc::Sender<AgentEvent>, tokio::task::JoinHandle<()>) {
50    let (mpsc_tx, mut mpsc_rx) = mpsc::channel::<AgentEvent>(100);
51
52    let forwarder = tokio::spawn(async move {
53        // Emit ExecutionStarted as the first event so the frontend can correlate
54        // all subsequent events to this run via run_id.
55        {
56            let runners = runners.read().await;
57            if let Some(runner) = runners.get(&session_id) {
58                let started_event = AgentEvent::ExecutionStarted {
59                    run_id: runner.run_id.clone(),
60                    session_id: session_id.clone(),
61                    started_at: Utc::now().to_rfc3339(),
62                };
63                mirror_to_account_feed(&account_feed_inbox, &session_id, &started_event);
64                let _ = broadcast_tx.send(started_event);
65            }
66        }
67
68        while let Some(event) = mpsc_rx.recv().await {
69            {
70                let mut runners = runners.write().await;
71                if let Some(runner) = runners.get_mut(&session_id) {
72                    runner.last_event_at = Some(Utc::now());
73
74                    match &event {
75                        AgentEvent::TokenBudgetUpdated { .. } => {
76                            runner.last_budget_event = Some(event.clone());
77                        }
78                        AgentEvent::ToolStart { tool_name, .. } => {
79                            runner.last_tool_name = Some(tool_name.clone());
80                            runner.last_tool_phase = Some("begin".to_string());
81                        }
82                        AgentEvent::ToolLifecycle {
83                            tool_name, phase, ..
84                        } => {
85                            runner.last_tool_name = Some(tool_name.clone());
86                            runner.last_tool_phase = Some(phase.clone());
87                        }
88                        AgentEvent::RunnerProgress { round_count, .. } => {
89                            runner.round_count = *round_count;
90                        }
91                        _ => {}
92                    }
93                }
94            }
95            mirror_to_account_feed(&account_feed_inbox, &session_id, &event);
96            let _ = broadcast_tx.send(event);
97        }
98    });
99
100    (mpsc_tx, forwarder)
101}