bamboo_engine/runtime/execution/
event_forwarder.rs1use std::collections::HashMap;
8use std::sync::Arc;
9
10use chrono::Utc;
11use tokio::sync::{broadcast, mpsc, RwLock};
12
13use bamboo_agent_core::AgentEvent;
14
15use super::runner_state::AgentRunner;
16
17pub fn create_event_forwarder(
23 session_id: String,
24 broadcast_tx: broadcast::Sender<AgentEvent>,
25 runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
26) -> (mpsc::Sender<AgentEvent>, tokio::task::JoinHandle<()>) {
27 let (mpsc_tx, mut mpsc_rx) = mpsc::channel::<AgentEvent>(100);
28
29 let forwarder = tokio::spawn(async move {
30 {
33 let runners = runners.read().await;
34 if let Some(runner) = runners.get(&session_id) {
35 let started_event = AgentEvent::ExecutionStarted {
36 run_id: runner.run_id.clone(),
37 session_id: session_id.clone(),
38 started_at: Utc::now().to_rfc3339(),
39 };
40 let _ = broadcast_tx.send(started_event);
41 }
42 }
43
44 while let Some(event) = mpsc_rx.recv().await {
45 {
46 let mut runners = runners.write().await;
47 if let Some(runner) = runners.get_mut(&session_id) {
48 runner.last_event_at = Some(Utc::now());
49
50 match &event {
51 AgentEvent::TokenBudgetUpdated { .. } => {
52 runner.last_budget_event = Some(event.clone());
53 }
54 AgentEvent::ToolStart { tool_name, .. } => {
55 runner.last_tool_name = Some(tool_name.clone());
56 runner.last_tool_phase = Some("begin".to_string());
57 }
58 AgentEvent::ToolLifecycle {
59 tool_name, phase, ..
60 } => {
61 runner.last_tool_name = Some(tool_name.clone());
62 runner.last_tool_phase = Some(phase.clone());
63 }
64 AgentEvent::RunnerProgress { round_count, .. } => {
65 runner.round_count = *round_count;
66 }
67 _ => {}
68 }
69 }
70 }
71 let _ = broadcast_tx.send(event);
72 }
73 });
74
75 (mpsc_tx, forwarder)
76}