use tokio::sync::mpsc;
use tracing::Instrument;
use crate::app_state::AppState;
use bamboo_agent_core::AgentEvent;
pub(crate) fn spawn_event_forwarder(
state: actix_web::web::Data<AppState>,
session_id: String,
mut mpsc_rx: mpsc::Receiver<AgentEvent>,
session_tx: tokio::sync::broadcast::Sender<AgentEvent>,
) {
let span_session_id = session_id.clone();
let session_span = tracing::info_span!("event_forwarder", session_id = %span_session_id);
tokio::spawn(
async move {
let mut dropped_without_subscribers = 0usize;
let mut no_subscriber_mode = false;
while let Some(event) = mpsc_rx.recv().await {
if matches!(&event, AgentEvent::TokenBudgetUpdated { .. }) {
let mut runners = state.agent_runners.write().await;
if let Some(runner) = runners.get_mut(&session_id) {
runner.last_budget_event = Some(event.clone());
tracing::debug!(
"[{}] Stored budget event for late subscribers",
session_id
);
}
}
if session_tx.receiver_count() == 0 {
dropped_without_subscribers = dropped_without_subscribers.saturating_add(1);
if !no_subscriber_mode {
no_subscriber_mode = true;
tracing::debug!(
"[{}] No subscribers for event stream; suppressing repeated logs",
session_id
);
}
continue;
}
if no_subscriber_mode {
tracing::debug!(
"[{}] Subscribers resumed; dropped {} events while unsubscribed",
session_id,
dropped_without_subscribers
);
dropped_without_subscribers = 0;
no_subscriber_mode = false;
}
let _ = session_tx.send(event);
}
if dropped_without_subscribers > 0 {
tracing::debug!(
"[{}] Event forwarder finished after dropping {} events with no subscribers",
session_id,
dropped_without_subscribers
);
} else {
tracing::debug!("[{}] Event forwarder finished", session_id);
}
}
.instrument(session_span),
);
}