Skip to main content

bamboo_engine/external_agents/
live.rs

1//! Live actor registry: in-band delivery to currently-running actor children.
2//!
3//! While `ActorChildRunner` drives a child over WebSocket, it registers a frame
4//! sender here keyed by `child_session_id`. `send_message` (running, no
5//! interrupt) consults this map: when the child is live, the message rides the
6//! existing WS as a `ParentFrame::Message` and is admitted by the worker's
7//! agent loop at its next round boundary — the same mechanism in-process
8//! children use, extended across the process boundary. When the child is not
9//! live, callers fall back to the durable `pending_injected_messages` queue.
10
11use std::collections::HashMap;
12use std::sync::{Mutex, OnceLock};
13
14use bamboo_subagent::proto::ParentFrame;
15use tokio::sync::mpsc;
16
17fn map() -> &'static Mutex<HashMap<String, mpsc::UnboundedSender<ParentFrame>>> {
18    static MAP: OnceLock<Mutex<HashMap<String, mpsc::UnboundedSender<ParentFrame>>>> =
19        OnceLock::new();
20    MAP.get_or_init(|| Mutex::new(HashMap::new()))
21}
22
23/// Unregisters the child on drop, so a panicking/returning runner can't leak
24/// a stale sender.
25pub struct LiveActorGuard {
26    child_id: String,
27}
28
29impl Drop for LiveActorGuard {
30    fn drop(&mut self) {
31        map().lock().unwrap().remove(&self.child_id);
32    }
33}
34
35/// Register a live child's frame sender for the duration of its run.
36pub fn register(child_id: &str, tx: mpsc::UnboundedSender<ParentFrame>) -> LiveActorGuard {
37    map().lock().unwrap().insert(child_id.to_string(), tx);
38    LiveActorGuard {
39        child_id: child_id.to_string(),
40    }
41}
42
43/// Deliver an in-band steering message to a live child. Returns `false` when
44/// the child is not live (caller should use the durable queue instead).
45pub fn deliver_message(child_id: &str, text: &str) -> bool {
46    let guard = map().lock().unwrap();
47    match guard.get(child_id) {
48        Some(tx) => tx
49            .send(ParentFrame::Message {
50                text: text.to_string(),
51            })
52            .is_ok(),
53        None => false,
54    }
55}
56
57/// Whether a child currently has a live actor connection.
58pub fn is_live(child_id: &str) -> bool {
59    map().lock().unwrap().contains_key(child_id)
60}
61
62#[cfg(test)]
63mod tests {
64    use super::*;
65
66    #[test]
67    fn register_deliver_unregister() {
68        let (tx, mut rx) = mpsc::unbounded_channel();
69        let guard = register("c-live", tx);
70        assert!(is_live("c-live"));
71        assert!(deliver_message("c-live", "hi"));
72        match rx.try_recv() {
73            Ok(ParentFrame::Message { text }) => assert_eq!(text, "hi"),
74            other => panic!("expected message frame, got {other:?}"),
75        }
76
77        drop(guard);
78        assert!(!is_live("c-live"));
79        assert!(!deliver_message("c-live", "gone"));
80    }
81
82    #[test]
83    fn deliver_fails_when_receiver_dropped() {
84        let (tx, rx) = mpsc::unbounded_channel();
85        let _guard = register("c-dead", tx);
86        drop(rx);
87        assert!(!deliver_message("c-dead", "hi"));
88    }
89}