Skip to main content

bamboo_engine/runtime/execution/
session_events.rs

1//! Session-scoped event sender management.
2//!
3//! Provides long-lived broadcast senders for session event streams.
4//! Unlike runner-scoped senders (which exist only during agent execution),
5//! these persist for the lifetime of the session.
6
7use std::collections::HashMap;
8use std::sync::Arc;
9
10use tokio::sync::{broadcast, RwLock};
11
12use bamboo_agent_core::AgentEvent;
13
14/// Default broadcast channel capacity for session event senders.
15pub const SESSION_EVENT_CHANNEL_CAPACITY: usize = 1000;
16
17/// Get or create a broadcast sender for the given session.
18///
19/// If a sender already exists in the map, returns a clone of it.
20/// Otherwise creates a new one with [`SESSION_EVENT_CHANNEL_CAPACITY`]
21/// capacity and inserts it.
22pub async fn get_or_create_event_sender(
23    senders: &Arc<RwLock<HashMap<String, broadcast::Sender<AgentEvent>>>>,
24    session_id: &str,
25) -> broadcast::Sender<AgentEvent> {
26    {
27        let read = senders.read().await;
28        if let Some(sender) = read.get(session_id) {
29            return sender.clone();
30        }
31    }
32
33    let mut write = senders.write().await;
34    // Double-check after acquiring write lock.
35    if let Some(sender) = write.get(session_id) {
36        return sender.clone();
37    }
38
39    let (sender, _) = broadcast::channel(SESSION_EVENT_CHANNEL_CAPACITY);
40    write.insert(session_id.to_string(), sender.clone());
41    sender
42}