Skip to main content

bamboo_subagent/
executor.rs

1//! The `ChildExecutor` seam: how an actor actually runs a task.
2//!
3//! This crate never depends on the agent runtime. The worker process implements
4//! [`ChildExecutor`] backed by the real `agent.execute()`; the transport layer drives it.
5//! [`EchoExecutor`] is a dependency-free stand-in used by the demo worker and tests.
6
7use async_trait::async_trait;
8use tokio::sync::{mpsc, oneshot};
9use tokio_util::sync::CancellationToken;
10
11use crate::proto::{RunSpec, TerminalStatus};
12
13/// Which kind of host callback a [`HostRequest`] is — selects the wire frame.
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub enum HostRequestKind {
16    /// Proxy a gated-tool approval to the host (→ `ChildFrame::ApprovalRequest`).
17    Approval,
18}
19
20/// A single host-callback request: an executor proxies a gated-tool approval
21/// back to the host over the run's WS and awaits the reply.
22pub struct HostRequest {
23    pub kind: HostRequestKind,
24    pub body: serde_json::Value,
25    pub reply: oneshot::Sender<serde_json::Value>,
26}
27
28/// Host-callback bridge handed to an executor (via [`EventSink`]) so a nested
29/// sub-agent can proxy a gated-tool approval to the host — over the same
30/// per-child WebSocket, no broker needed. Absent for tests/[`EchoExecutor`].
31#[derive(Clone)]
32pub struct HostBridge {
33    req_tx: mpsc::UnboundedSender<HostRequest>,
34}
35
36impl HostBridge {
37    /// Create a bridge + the receiver the transport pumps to the wire.
38    pub fn channel() -> (Self, mpsc::UnboundedReceiver<HostRequest>) {
39        let (req_tx, req_rx) = mpsc::unbounded_channel();
40        (HostBridge { req_tx }, req_rx)
41    }
42
43    /// Proxy one gated-tool approval to the host and await the decision JSON
44    /// (`{"approved": bool}`). The worker's permission flow blocks on this so the
45    /// human decides on the parent (Phase 2).
46    pub async fn approval_call(
47        &self,
48        body: serde_json::Value,
49    ) -> Result<serde_json::Value, String> {
50        self.call(HostRequestKind::Approval, body).await
51    }
52
53    async fn call(
54        &self,
55        kind: HostRequestKind,
56        body: serde_json::Value,
57    ) -> Result<serde_json::Value, String> {
58        let (reply, rx) = oneshot::channel();
59        self.req_tx
60            .send(HostRequest { kind, body, reply })
61            .map_err(|_| "host bridge closed".to_string())?;
62        rx.await
63            .map_err(|_| "host bridge dropped reply".to_string())
64    }
65}
66
67/// Sink an executor emits events into; the transport forwards each as a `ChildFrame::Event`.
68#[derive(Clone)]
69pub struct EventSink {
70    tx: mpsc::UnboundedSender<serde_json::Value>,
71    host: Option<HostBridge>,
72}
73
74impl EventSink {
75    /// Create a sink + the receiver the transport pumps to the wire.
76    pub fn channel() -> (Self, mpsc::UnboundedReceiver<serde_json::Value>) {
77        let (tx, rx) = mpsc::unbounded_channel();
78        (EventSink { tx, host: None }, rx)
79    }
80    /// Attach a host-callback bridge (the transport wires this for real runs).
81    pub fn with_host_bridge(mut self, bridge: HostBridge) -> Self {
82        self.host = Some(bridge);
83        self
84    }
85    /// The host-callback bridge, if this run was wired with one.
86    pub fn host(&self) -> Option<&HostBridge> {
87        self.host.as_ref()
88    }
89    /// Emit one event (serialized agent event). Dropped silently if the peer is gone.
90    pub fn emit(&self, event: serde_json::Value) {
91        let _ = self.tx.send(event);
92    }
93}
94
95/// Result of running a task to completion (or suspension).
96#[derive(Debug, Clone, PartialEq)]
97pub struct ChildOutcome {
98    pub status: TerminalStatus,
99    pub result: Option<String>,
100    pub error: Option<String>,
101    /// Full worker transcript, shipped only on suspend so the host can persist
102    /// it onto the child session and rehydrate the worker on resume.
103    pub transcript: Vec<serde_json::Value>,
104}
105
106impl ChildOutcome {
107    pub fn completed(result: impl Into<String>) -> Self {
108        Self {
109            status: TerminalStatus::Completed,
110            result: Some(result.into()),
111            error: None,
112            transcript: Vec::new(),
113        }
114    }
115    pub fn error(msg: impl Into<String>) -> Self {
116        Self {
117            status: TerminalStatus::Error,
118            result: None,
119            error: Some(msg.into()),
120            transcript: Vec::new(),
121        }
122    }
123    pub fn cancelled() -> Self {
124        Self {
125            status: TerminalStatus::Cancelled,
126            result: None,
127            error: None,
128            transcript: Vec::new(),
129        }
130    }
131    /// The worker suspended to wait on its own sub-agents; ship the full
132    /// transcript so the host can resume it later.
133    pub fn suspended(transcript: Vec<serde_json::Value>) -> Self {
134        Self {
135            status: TerminalStatus::Suspended,
136            result: None,
137            error: None,
138            transcript,
139        }
140    }
141}
142
143/// Mid-run steering inbox: `ParentFrame::Message` texts arriving while a run is
144/// active. Executors that support in-band steering admit them at a safe point
145/// (the engine's round boundary); others may simply ignore the inbox.
146pub struct SteerInbox {
147    rx: mpsc::UnboundedReceiver<String>,
148}
149
150impl SteerInbox {
151    /// Create a sender + inbox pair (the transport holds the sender).
152    pub fn channel() -> (mpsc::UnboundedSender<String>, Self) {
153        let (tx, rx) = mpsc::unbounded_channel();
154        (tx, SteerInbox { rx })
155    }
156    /// An already-closed inbox (for tests / executors that don't steer).
157    pub fn disconnected() -> Self {
158        let (_tx, rx) = mpsc::unbounded_channel();
159        SteerInbox { rx }
160    }
161    /// Next steering message, or `None` once the run's sender is gone.
162    pub async fn recv(&mut self) -> Option<String> {
163        self.rx.recv().await
164    }
165}
166
167/// What runs inside an actor. Implemented by the worker with the real runtime.
168#[async_trait]
169pub trait ChildExecutor: Send + Sync + 'static {
170    async fn run(
171        &self,
172        spec: RunSpec,
173        events: EventSink,
174        steer: SteerInbox,
175        cancel: CancellationToken,
176    ) -> ChildOutcome;
177}
178
179/// Dependency-free executor: streams one `token` event per word, then completes with an echo.
180/// Used by the demo worker and tests to exercise the full transport without a real LLM.
181///
182/// Test hook: an assignment starting with `__sleep_ms:<n>` sleeps (cancellably)
183/// for `n` milliseconds before echoing the rest — this is what lets cancel /
184/// concurrency e2e tests hold a run open deterministically without an LLM.
185pub struct EchoExecutor;
186
187/// Assignment prefix recognized by [`EchoExecutor`] for a cancellable delay.
188pub const ECHO_SLEEP_PREFIX: &str = "__sleep_ms:";
189
190#[async_trait]
191impl ChildExecutor for EchoExecutor {
192    async fn run(
193        &self,
194        spec: RunSpec,
195        events: EventSink,
196        _steer: SteerInbox,
197        cancel: CancellationToken,
198    ) -> ChildOutcome {
199        // Optional cancellable delay: any token `__sleep_ms:<n>` in the
200        // assignment (scanned, not just the prefix — child creation may wrap
201        // the prompt in a template). The marker token itself is not echoed.
202        let mut sleep_ms: Option<u64> = None;
203        let mut words: Vec<&str> = Vec::new();
204        for word in spec.assignment.split_whitespace() {
205            match word
206                .strip_prefix(ECHO_SLEEP_PREFIX)
207                .and_then(|n| n.parse::<u64>().ok())
208            {
209                Some(ms) if sleep_ms.is_none() => sleep_ms = Some(ms),
210                _ => words.push(word),
211            }
212        }
213        if let Some(ms) = sleep_ms {
214            tokio::select! {
215                _ = tokio::time::sleep(std::time::Duration::from_millis(ms)) => {}
216                _ = cancel.cancelled() => return ChildOutcome::cancelled(),
217            }
218        }
219
220        for word in &words {
221            if cancel.is_cancelled() {
222                return ChildOutcome::cancelled();
223            }
224            events.emit(serde_json::json!({ "type": "token", "content": format!("{word} ") }));
225            // tiny yield so cancellation can interleave; not a real delay
226            tokio::task::yield_now().await;
227        }
228        events.emit(serde_json::json!({ "type": "complete" }));
229        ChildOutcome::completed(format!("echo: {}", words.join(" ")))
230    }
231}
232
233#[cfg(test)]
234mod tests {
235    use super::*;
236
237    #[tokio::test]
238    async fn echo_streams_then_completes() {
239        let (sink, mut rx) = EventSink::channel();
240        let outcome = EchoExecutor
241            .run(
242                RunSpec {
243                    assignment: "alpha beta".into(),
244                    reasoning_effort: None,
245                    messages: Vec::new(),
246                },
247                sink,
248                SteerInbox::disconnected(),
249                CancellationToken::new(),
250            )
251            .await;
252        assert_eq!(outcome.status, TerminalStatus::Completed);
253        assert_eq!(outcome.result.as_deref(), Some("echo: alpha beta"));
254
255        let mut events = Vec::new();
256        while let Ok(e) = rx.try_recv() {
257            events.push(e);
258        }
259        // two tokens + one complete
260        assert_eq!(events.len(), 3);
261        assert_eq!(events[0]["content"], "alpha ");
262    }
263
264    #[tokio::test]
265    async fn echo_honors_cancel() {
266        let (sink, _rx) = EventSink::channel();
267        let cancel = CancellationToken::new();
268        cancel.cancel();
269        let outcome = EchoExecutor
270            .run(
271                RunSpec {
272                    assignment: "a b c".into(),
273                    reasoning_effort: None,
274                    messages: Vec::new(),
275                },
276                sink,
277                SteerInbox::disconnected(),
278                cancel,
279            )
280            .await;
281        assert_eq!(outcome.status, TerminalStatus::Cancelled);
282    }
283
284    #[tokio::test]
285    async fn approval_call_sends_approval_kind_and_round_trips_reply() {
286        let (bridge, mut req_rx) = HostBridge::channel();
287        let caller = tokio::spawn(async move {
288            bridge
289                .approval_call(serde_json::json!({"resource": "/tmp/x"}))
290                .await
291        });
292        let req = req_rx.recv().await.expect("a host request");
293        assert_eq!(req.kind, HostRequestKind::Approval);
294        assert_eq!(req.body["resource"], "/tmp/x");
295        let _ = req.reply.send(serde_json::json!({"approved": true}));
296        let reply = caller.await.unwrap().expect("decision");
297        assert_eq!(reply["approved"], true);
298    }
299}