Skip to main content

bamboo_engine/external_agents/
live.rs

1//! Live actor registry: in-band delivery to currently-running actor children.
2//!
3//! While `ActorChildRunner` drives a child over WebSocket, it registers a frame
4//! sender here keyed by `child_session_id`. `send_message` (running, no
5//! interrupt) consults this map: when the child is live, the message rides the
6//! existing WS as a `ParentFrame::Message` and is admitted by the worker's
7//! agent loop at its next round boundary — the same mechanism in-process
8//! children use, extended across the process boundary. When the child is not
9//! live, callers fall back to the durable `pending_injected_messages` queue.
10
11use std::collections::{HashMap, HashSet};
12use std::sync::{Mutex, OnceLock};
13
14use bamboo_subagent::proto::ParentFrame;
15use tokio::sync::mpsc;
16
17fn map() -> &'static Mutex<HashMap<String, mpsc::UnboundedSender<ParentFrame>>> {
18    static MAP: OnceLock<Mutex<HashMap<String, mpsc::UnboundedSender<ParentFrame>>>> =
19        OnceLock::new();
20    MAP.get_or_init(|| Mutex::new(HashMap::new()))
21}
22
23/// Process-global registry of pending human-loop approval requests, keyed by
24/// `child_id` → set of `request_id`s currently awaiting a decision. Only the
25/// human-in-the-loop path (top orchestrator) registers here; trusted internal
26/// paths (model-review, escalation-bridge) do NOT — so the external handler's
27/// [`deliver_approval_checked`] correctly rejects any stray external POST aimed
28/// at a request that isn't a genuinely-pending human-loop one.
29fn pending() -> &'static Mutex<HashMap<String, HashSet<String>>> {
30    static PENDING: OnceLock<Mutex<HashMap<String, HashSet<String>>>> = OnceLock::new();
31    PENDING.get_or_init(|| Mutex::new(HashMap::new()))
32}
33
34/// Record a `(child_id, request_id)` as a pending human-loop approval. Called
35/// just before surfacing `ChildApprovalRequested` so an external POST can be
36/// correlated against a genuinely-pending request.
37pub fn register_pending_approval(child_id: &str, request_id: &str) {
38    pending()
39        .lock()
40        .unwrap()
41        .entry(child_id.to_string())
42        .or_default()
43        .insert(request_id.to_string());
44}
45
46/// One-shot consume of a `(child_id, request_id)` pending pair: remove it and
47/// return whether it WAS present. A second call for the same pair returns
48/// `false`, so a request can't be answered (or replayed) twice.
49pub fn take_pending_approval(child_id: &str, request_id: &str) -> bool {
50    let mut guard = pending().lock().unwrap();
51    let Some(set) = guard.get_mut(child_id) else {
52        return false;
53    };
54    let took = set.remove(request_id);
55    if set.is_empty() {
56        guard.remove(child_id);
57    }
58    took
59}
60
61/// Drop all pending approvals for a child (e.g. when its live connection ends).
62pub fn clear_pending_approvals_for(child_id: &str) {
63    pending().lock().unwrap().remove(child_id);
64}
65
66/// Validated external entry point: deliver an approval decision ONLY if the
67/// `(child_id, request_id)` pair is currently pending. Consumes the pending
68/// entry (one-shot) before delivering, so the same request can't be replayed,
69/// and rejects (returns `false`) any `request_id` that isn't currently pending
70/// — unknown, already-answered/timed-out, or a non-human-loop path
71/// (model-review / escalation) that never registered. This is the entry the
72/// external HTTP handler must use.
73pub fn deliver_approval_checked(child_id: &str, request_id: &str, approved: bool) -> bool {
74    if take_pending_approval(child_id, request_id) {
75        deliver_approval(child_id, request_id, approved)
76    } else {
77        false
78    }
79}
80
81/// Unregisters the child on drop, so a panicking/returning runner can't leak
82/// a stale sender.
83pub struct LiveActorGuard {
84    child_id: String,
85}
86
87impl Drop for LiveActorGuard {
88    fn drop(&mut self) {
89        map().lock().unwrap().remove(&self.child_id);
90        // A disconnecting child can't answer any still-pending approval — drop
91        // them so a late external POST finds nothing pending and is rejected.
92        clear_pending_approvals_for(&self.child_id);
93    }
94}
95
96/// Register a live child's frame sender for the duration of its run.
97pub fn register(child_id: &str, tx: mpsc::UnboundedSender<ParentFrame>) -> LiveActorGuard {
98    map().lock().unwrap().insert(child_id.to_string(), tx);
99    LiveActorGuard {
100        child_id: child_id.to_string(),
101    }
102}
103
104/// Deliver an in-band steering message to a live child. Returns `false` when
105/// the child is not live (caller should use the durable queue instead).
106pub fn deliver_message(child_id: &str, text: &str) -> bool {
107    let guard = map().lock().unwrap();
108    match guard.get(child_id) {
109        Some(tx) => tx
110            .send(ParentFrame::Message {
111                text: text.to_string(),
112            })
113            .is_ok(),
114        None => false,
115    }
116}
117
118/// Deliver a host/human approval decision to a live child's pending gated-tool
119/// request (Phase 2: child → parent approval delegation). Sends
120/// `ParentFrame::ApprovalReply{id, approved}` over the child's live WS
121/// connection; `drive()` forwards it to the worker, whose pending map resolves
122/// the `host.approval_call` the child's gated tool is blocked on (approve ⇒ the
123/// tool proceeds, deny ⇒ it fails closed). This is the decision-DOWN half of the
124/// human-in-the-loop route: a parent-side responder (e.g. a `/respond`-style
125/// handler) calls this with the `request_id` it surfaced to the human. Returns
126/// `false` when the child is not live (no connection to answer on — the caller
127/// should treat that as a denied/expired request).
128pub fn deliver_approval(child_id: &str, request_id: &str, approved: bool) -> bool {
129    let guard = map().lock().unwrap();
130    match guard.get(child_id) {
131        Some(tx) => tx
132            .send(ParentFrame::ApprovalReply {
133                id: request_id.to_string(),
134                approved,
135            })
136            .is_ok(),
137        None => false,
138    }
139}
140
141/// Whether a child currently has a live actor connection.
142pub fn is_live(child_id: &str) -> bool {
143    map().lock().unwrap().contains_key(child_id)
144}
145
146#[cfg(test)]
147mod tests {
148    use super::*;
149
150    #[test]
151    fn register_deliver_unregister() {
152        let (tx, mut rx) = mpsc::unbounded_channel();
153        let guard = register("c-live", tx);
154        assert!(is_live("c-live"));
155        assert!(deliver_message("c-live", "hi"));
156        match rx.try_recv() {
157            Ok(ParentFrame::Message { text }) => assert_eq!(text, "hi"),
158            other => panic!("expected message frame, got {other:?}"),
159        }
160
161        drop(guard);
162        assert!(!is_live("c-live"));
163        assert!(!deliver_message("c-live", "gone"));
164    }
165
166    #[test]
167    fn deliver_fails_when_receiver_dropped() {
168        let (tx, rx) = mpsc::unbounded_channel();
169        let _guard = register("c-dead", tx);
170        drop(rx);
171        assert!(!deliver_message("c-dead", "hi"));
172    }
173
174    #[test]
175    fn deliver_approval_routes_reply_frame() {
176        let (tx, mut rx) = mpsc::unbounded_channel();
177        let guard = register("c-appr", tx);
178        assert!(deliver_approval("c-appr", "req-7", true));
179        match rx.try_recv() {
180            Ok(ParentFrame::ApprovalReply { id, approved }) => {
181                assert_eq!(id, "req-7");
182                assert!(approved);
183            }
184            other => panic!("expected approval reply, got {other:?}"),
185        }
186        drop(guard);
187        // Not-live child ⇒ false (no connection to answer on).
188        assert!(!deliver_approval("c-appr", "req-8", false));
189    }
190
191    #[test]
192    fn pending_approval_is_one_shot() {
193        register_pending_approval("c-pend", "req-1");
194        // First take consumes it; the second finds nothing.
195        assert!(take_pending_approval("c-pend", "req-1"));
196        assert!(!take_pending_approval("c-pend", "req-1"));
197    }
198
199    #[test]
200    fn take_of_unregistered_pair_is_false() {
201        // Unknown child entirely.
202        assert!(!take_pending_approval("c-unknown", "req-x"));
203        // Known child, but an unregistered request_id.
204        register_pending_approval("c-known", "req-real");
205        assert!(!take_pending_approval("c-known", "req-bogus"));
206        // The real one is still pending (a bogus take didn't disturb it).
207        assert!(take_pending_approval("c-known", "req-real"));
208    }
209
210    #[test]
211    fn deliver_approval_checked_only_delivers_for_registered_pair() {
212        let (tx, mut rx) = mpsc::unbounded_channel();
213        let _guard = register("c-checked", tx);
214
215        // Not registered ⇒ rejected, nothing on the wire.
216        assert!(!deliver_approval_checked("c-checked", "req-stray", true));
217        assert!(rx.try_recv().is_err());
218
219        // Registered ⇒ delivered, frame rides the wire, and consumed.
220        register_pending_approval("c-checked", "req-ok");
221        assert!(deliver_approval_checked("c-checked", "req-ok", true));
222        match rx.try_recv() {
223            Ok(ParentFrame::ApprovalReply { id, approved }) => {
224                assert_eq!(id, "req-ok");
225                assert!(approved);
226            }
227            other => panic!("expected approval reply, got {other:?}"),
228        }
229        // One-shot: a replay is rejected (and nothing further on the wire).
230        assert!(!deliver_approval_checked("c-checked", "req-ok", true));
231        assert!(rx.try_recv().is_err());
232    }
233
234    #[test]
235    fn clear_pending_approvals_for_drops_them() {
236        register_pending_approval("c-clear", "req-a");
237        register_pending_approval("c-clear", "req-b");
238        clear_pending_approvals_for("c-clear");
239        assert!(!take_pending_approval("c-clear", "req-a"));
240        assert!(!take_pending_approval("c-clear", "req-b"));
241    }
242}