Skip to main content

bamboo_engine/runtime/execution/
event_forwarder.rs

1//! Event forwarding from MPSC to broadcast channels.
2//!
3//! Creates an MPSC channel for agent loop events and spawns a background task
4//! that relays events to the session's broadcast sender while tracking runner
5//! diagnostic state (budget events, tool execution, round progress).
6
7use std::collections::HashMap;
8use std::sync::Arc;
9
10use chrono::Utc;
11use tokio::sync::{broadcast, mpsc, RwLock};
12
13use bamboo_agent_core::AgentEvent;
14
15use super::runner_state::AgentRunner;
16
17/// Create an MPSC channel for agent events and spawn a forwarding task
18/// that relays events to the broadcast sender while tracking runner
19/// diagnostic fields for live visibility.
20///
21/// Returns `(mpsc_tx, forwarder_handle)`.
22pub fn create_event_forwarder(
23    session_id: String,
24    broadcast_tx: broadcast::Sender<AgentEvent>,
25    runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
26) -> (mpsc::Sender<AgentEvent>, tokio::task::JoinHandle<()>) {
27    let (mpsc_tx, mut mpsc_rx) = mpsc::channel::<AgentEvent>(100);
28
29    let forwarder = tokio::spawn(async move {
30        // Emit ExecutionStarted as the first event so the frontend can correlate
31        // all subsequent events to this run via run_id.
32        {
33            let runners = runners.read().await;
34            if let Some(runner) = runners.get(&session_id) {
35                let started_event = AgentEvent::ExecutionStarted {
36                    run_id: runner.run_id.clone(),
37                    session_id: session_id.clone(),
38                    started_at: Utc::now().to_rfc3339(),
39                };
40                let _ = broadcast_tx.send(started_event);
41            }
42        }
43
44        while let Some(event) = mpsc_rx.recv().await {
45            {
46                let mut runners = runners.write().await;
47                if let Some(runner) = runners.get_mut(&session_id) {
48                    runner.last_event_at = Some(Utc::now());
49
50                    match &event {
51                        AgentEvent::TokenBudgetUpdated { .. } => {
52                            runner.last_budget_event = Some(event.clone());
53                        }
54                        AgentEvent::ToolStart { tool_name, .. } => {
55                            runner.last_tool_name = Some(tool_name.clone());
56                            runner.last_tool_phase = Some("begin".to_string());
57                        }
58                        AgentEvent::ToolLifecycle {
59                            tool_name, phase, ..
60                        } => {
61                            runner.last_tool_name = Some(tool_name.clone());
62                            runner.last_tool_phase = Some(phase.clone());
63                        }
64                        AgentEvent::RunnerProgress { round_count, .. } => {
65                            runner.round_count = *round_count;
66                        }
67                        _ => {}
68                    }
69                }
70            }
71            let _ = broadcast_tx.send(event);
72        }
73    });
74
75    (mpsc_tx, forwarder)
76}