Skip to main content

bamboo_engine/runtime/execution/
event_forwarder.rs

1//! Event forwarding from MPSC to broadcast channels.
2//!
3//! Creates an MPSC channel for agent loop events and spawns a background task
4//! that relays events to the session's broadcast sender while tracking budget events.
5
6use std::collections::HashMap;
7use std::sync::Arc;
8
9use tokio::sync::{broadcast, mpsc, RwLock};
10
11use bamboo_agent_core::AgentEvent;
12
13use super::runner_state::AgentRunner;
14
15/// Create an MPSC channel for agent events and spawn a forwarding task
16/// that relays events to the broadcast sender while tracking budget events.
17///
18/// Returns `(mpsc_tx, forwarder_handle)`.
19pub fn create_event_forwarder(
20    session_id: String,
21    broadcast_tx: broadcast::Sender<AgentEvent>,
22    runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
23) -> (mpsc::Sender<AgentEvent>, tokio::task::JoinHandle<()>) {
24    let (mpsc_tx, mut mpsc_rx) = mpsc::channel::<AgentEvent>(100);
25
26    let forwarder = tokio::spawn(async move {
27        while let Some(event) = mpsc_rx.recv().await {
28            if matches!(&event, AgentEvent::TokenBudgetUpdated { .. }) {
29                let mut runners = runners.write().await;
30                if let Some(runner) = runners.get_mut(&session_id) {
31                    runner.last_budget_event = Some(event.clone());
32                }
33            }
34            let _ = broadcast_tx.send(event);
35        }
36    });
37
38    (mpsc_tx, forwarder)
39}