bamboo_engine/external_agents/actor_adapter.rs
1//! Actor external child runner.
2//!
3//! Runs a child session as an independent **actor**: a separate OS process with its own
4//! isolated context, speaking the `bamboo-subagent` WebSocket protocol. This is the
5//! engine-side adapter on the `wants_external` seam: it spawns the worker binary, waits for
6//! it to self-register into the Tier-1 file fabric, connects, sends the assignment, and
7//! forwards the child's `AgentEvent`s back onto the parent's `event_tx`.
8//!
9//! The built-in **local actor** instance of this runner is the default runtime for
10//! every sub-agent (the in-process runtime was removed). The expert `externalAgents`
11//! tables can additionally route specific roles to other actor/a2a agents.
12
13use std::collections::HashMap;
14use std::path::PathBuf;
15use std::sync::Arc;
16use std::time::Duration;
17
18use async_trait::async_trait;
19use bamboo_agent_core::{AgentError, AgentEvent, Role, Session};
20use tokio::sync::{mpsc, Mutex};
21use tokio_util::sync::CancellationToken;
22
23use bamboo_subagent::discovery::Fabric;
24use bamboo_subagent::fleet::{spawn_worker, SpawnedChild};
25use bamboo_subagent::proto::{ChildFrame, ParentFrame, RunSpec, TerminalStatus};
26use bamboo_subagent::provision::{
27 ChildIdentity, ExecutorSpec, ModelRefSpec, ProvisionSpec, ScopedCredential,
28};
29use bamboo_subagent::transport::ChildClient;
30
31use crate::runtime::execution::{ExternalChildRunner, SpawnJob};
32
33/// Default cap on simultaneously running actor processes.
34pub const DEFAULT_MAX_CONCURRENT_ACTORS: usize = 8;
35
36/// Max nesting depth for direct nested execution (Phase 6). A worker whose
37/// session `spawn_depth` is below this gets its own spawn stack + the real
38/// SubAgent tool; at/over it, neither (and the tool itself refuses). Mirrors
39/// `bamboo_server_tools::DEFAULT_MAX_SPAWN_DEPTH` (kept in sync; engine can't
40/// depend on server-tools). Root orchestrator = 0 ⇒ 4 levels of sub-agents.
41pub const MAX_SPAWN_DEPTH: u32 = 4;
42
43/// Default cap on idle pooled (warm, reusable) workers kept per fingerprint.
44const DEFAULT_MAX_IDLE_PER_KEY: usize = 4;
45
46/// How long a pooled worker waits for its next assignment before reclaiming
47/// itself (must comfortably exceed the gap between sibling spawns).
48const POOLED_IDLE_TIMEOUT_SECS: u64 = 300;
49
50/// A warm worker parked for reuse: its process handle (killed on drop), the WS
51/// endpoint to reconnect to, and the id it registered under in the fabric.
52struct PooledActor {
53 worker: SpawnedChild,
54 endpoint: String,
55 agent_id: String,
56}
57
58/// Spawns and drives a child session as an independent actor: a `bamboo-subagent` worker process.
59pub struct ActorChildRunner {
60 agent_id: String,
61 worker_bin: PathBuf,
62 worker_args: Vec<String>,
63 fabric_dir: PathBuf,
64 executor: ExecutorSpec,
65 /// Per-provider credentials snapshotted from the parent config at build
66 /// time; the spec carries only the ONE the child's provider needs.
67 credentials: Vec<ScopedCredential>,
68 /// Parent's default provider (used when the child has no explicit one).
69 default_provider: String,
70 /// Backpressure: bounds the number of concurrently *running* actors; further
71 /// runs wait for a slot instead of exploding the process table. (Idle pooled
72 /// workers do not hold a slot.)
73 concurrency: std::sync::Arc<tokio::sync::Semaphore>,
74 spawn_timeout: Duration,
75 /// Warm-worker pool keyed by a reuse fingerprint
76 /// (role/provider/model/workspace/disabled-tools). A finished run parks its
77 /// worker here so the next matching child reuses it instead of spawning a
78 /// fresh process — collapsing N sibling sub-agents onto a few processes.
79 pool: Arc<Mutex<HashMap<String, Vec<PooledActor>>>>,
80 max_idle_per_key: usize,
81 /// Host-side decision for a child's gated-tool approval request (Phase 2).
82 /// `None` ⇒ fail-closed DENY (the safe default). A wired decider (policy or
83 /// human-routing bridge) returns approve/deny over the actor WS.
84 approval_decider: Option<Arc<dyn ChildApprovalDecider>>,
85 /// Per-run escalation host bridge for non-bypass child-approval routing (#68;
86 /// Phase 6, Part B). The owning worker's `run()` installs its OWN host bridge
87 /// here via `set_escalation_bridge`; `execute_external_child` CAPTURES it at
88 /// grandchild-spawn time and hands the owned value to `drive()`, which uses it
89 /// to RE-PROXY a child's approval request UP to the parent run — chaining up
90 /// every level until a bypass level (model-review) or the top orchestrator
91 /// (human) decides, then relaying the reply back down. Was a process-global
92 /// slot; now per-runner so a fire-and-forget grandchild that OUTLIVES the run
93 /// that spawned it keeps that run's bridge for its whole lifetime instead of
94 /// reading a stale/overwritten global at approval time (→ fail-closed deny).
95 escalation_bridge: Arc<std::sync::Mutex<Option<bamboo_subagent::executor::HostBridge>>>,
96}
97
98/// Decides how the host answers a child worker's gated-tool approval request
99/// (Phase 2: child → parent approval delegation). Async so an implementation
100/// can consult a policy or defer to a human. With no decider wired the host
101/// replies with a fail-closed DENY.
102///
103/// NOTE: `decide` is awaited inside the per-child frame pump, so an
104/// implementation must resolve promptly (e.g. a policy lookup). A human-in-the-
105/// loop decision that may block indefinitely should instead be delivered
106/// out-of-band as a `ParentFrame::ApprovalReply` via the live steering channel
107/// (`super::live`), which `drive()` already forwards to the worker without
108/// stalling the pump.
109#[async_trait]
110pub trait ChildApprovalDecider: Send + Sync {
111 /// Decide whether `child_session_id` may perform the gated action described
112 /// by `request` (`{tool_name, permission, resource}`).
113 async fn decide(&self, child_session_id: &str, request: &serde_json::Value) -> bool;
114}
115
116/// Resolve a child approval request to approve/deny. Fail-closed (DENY) when no
117/// decider is wired — the single, testable seam for the host-side decision.
118async fn decide_child_approval(
119 decider: Option<&Arc<dyn ChildApprovalDecider>>,
120 child_session_id: &str,
121 request: &serde_json::Value,
122) -> bool {
123 match decider {
124 Some(decider) => decider.decide(child_session_id, request).await,
125 None => false,
126 }
127}
128
129/// How long the host waits for a human approval decision before failing the
130/// child's gated tool closed (DENY). Bounds an unanswered request so it can't
131/// hang the worker indefinitely.
132const CHILD_APPROVAL_TIMEOUT: Duration = Duration::from_secs(300);
133
134/// Extract `(tool_name, permission, resource)` from a worker's approval request
135/// body (`{tool_name, permission, resource}`); missing fields default to empty.
136fn approval_request_fields(body: &serde_json::Value) -> (String, String, String) {
137 let field = |k: &str| {
138 body.get(k)
139 .and_then(|v| v.as_str())
140 .unwrap_or("")
141 .to_string()
142 };
143 (field("tool_name"), field("permission"), field("resource"))
144}
145
146/// Off-loop reviewer for a child's gated-tool approval request (Phase 6, Part B).
147///
148/// Installed (process-global) by a BYPASSED self-orchestrating worker so its
149/// children's forced-ask (dangerous) gated actions — which still raise
150/// `ConfirmationRequired` even under bypass — get an LLM reasonableness check
151/// rather than a blind pass. `review` is an LLM call: `drive()` invokes it in a
152/// SPAWNED task (NEVER in the frame pump) and delivers the verdict async via the
153/// live channel, so the agent loop is never blocked.
154#[async_trait]
155pub trait ChildApprovalReviewer: Send + Sync {
156 /// Judge whether the gated action `request` (`{tool_name, permission,
157 /// resource}`) is reasonable for `child_session_id`'s task. `true` = approve.
158 async fn review(&self, child_session_id: &str, request: &serde_json::Value) -> bool;
159}
160
161fn child_approval_reviewer_slot() -> &'static std::sync::OnceLock<Arc<dyn ChildApprovalReviewer>> {
162 static SLOT: std::sync::OnceLock<Arc<dyn ChildApprovalReviewer>> = std::sync::OnceLock::new();
163 &SLOT
164}
165
166/// Install the process-global child-approval reviewer (idempotent; first wins).
167pub fn set_child_approval_reviewer(reviewer: Arc<dyn ChildApprovalReviewer>) {
168 let _ = child_approval_reviewer_slot().set(reviewer);
169}
170
171/// The process-global child-approval reviewer, if installed.
172pub fn child_approval_reviewer() -> Option<Arc<dyn ChildApprovalReviewer>> {
173 child_approval_reviewer_slot().get().cloned()
174}
175
176impl ActorChildRunner {
177 #[allow(clippy::too_many_arguments)]
178 pub fn new(
179 agent_id: String,
180 worker_bin: PathBuf,
181 worker_args: Vec<String>,
182 fabric_dir: PathBuf,
183 executor: ExecutorSpec,
184 credentials: Vec<ScopedCredential>,
185 default_provider: String,
186 max_concurrent: usize,
187 ) -> Self {
188 Self {
189 agent_id,
190 worker_bin,
191 worker_args,
192 fabric_dir,
193 executor,
194 credentials,
195 default_provider,
196 concurrency: std::sync::Arc::new(tokio::sync::Semaphore::new(max_concurrent.max(1))),
197 spawn_timeout: Duration::from_secs(30),
198 pool: Arc::new(Mutex::new(HashMap::new())),
199 max_idle_per_key: DEFAULT_MAX_IDLE_PER_KEY,
200 approval_decider: None,
201 escalation_bridge: Arc::new(std::sync::Mutex::new(None)),
202 }
203 }
204
205 /// Wire the host-side decider for child gated-tool approval requests
206 /// (Phase 2). Without this the host fail-closed DENYs every request.
207 pub fn with_approval_decider(mut self, decider: Arc<dyn ChildApprovalDecider>) -> Self {
208 self.approval_decider = Some(decider);
209 self
210 }
211
212 /// Reuse fingerprint: two children are interchangeable on one warm worker iff
213 /// they share role, provider, model, workspace, disabled-tool set, AND every
214 /// capability the worker BAKES at provision time (`BambooRuntimeExecutor`
215 /// stamps these once and reuses them across runs): nesting depth, nested-spawn
216 /// stack, bypass mode, permission enforcement, and the depth cap. Omitting any
217 /// of these lets the pool hand a run a worker baked for a DIFFERENT posture —
218 /// e.g. a depth-1 worker (with its own spawn stack) reused for a depth-4
219 /// child would re-stamp `spawn_depth=1` and pass the depth-cap check, breaking
220 /// the recursion bound; or a bypass worker reused for a non-bypass child. So
221 /// these MUST split the pool bucket. Everything else (assignment, history) is
222 /// shipped per-run in the `RunSpec` and does not affect the fingerprint.
223 fn fingerprint(spec: &ProvisionSpec) -> String {
224 let role = spec.identity.role.as_str();
225 let (provider, model) = spec
226 .model
227 .as_ref()
228 .map(|m| (m.provider.as_str(), m.model.as_str()))
229 .unwrap_or(("", ""));
230 let workspace = spec.workspace.as_deref().unwrap_or("");
231 let mut tools = spec.disabled_tools.clone().unwrap_or_default();
232 tools.sort();
233 let caps = &spec.capabilities;
234 format!(
235 "{role}\u{1}{provider}\u{1}{model}\u{1}{workspace}\u{1}{}\u{1}d={}\u{1}ns={}\u{1}by={}\u{1}ep={}\u{1}md={}\u{1}nha={}\u{1}gro={}",
236 tools.join(","),
237 spec.identity.depth,
238 caps.nested_spawn,
239 caps.bypass,
240 caps.enforce_permissions,
241 caps.max_spawn_depth.unwrap_or(0),
242 // #73 review (P1): a worker bakes `no_human_review` ONCE from this flag
243 // at build() and never re-reads it per run, so the pool MUST NOT hand a
244 // worker baked for one approval posture to a run of the opposite one —
245 // else a scheduled-root worker reused for an interactive child would
246 // silently model-review instead of asking the human (and vice-versa,
247 // reintroducing the 300s-deny). Split the bucket on it.
248 caps.no_human_approver,
249 // #71: the read-only Bash checker is baked once at build() from this
250 // flag, so a guardian-reviewer worker must NOT be reused for an
251 // ordinary child (which expects unrestricted Bash), and vice-versa.
252 caps.guardian_read_only,
253 )
254 }
255
256 /// Check out a worker for this assignment: reuse a live pooled one matching
257 /// `key`, else spawn a fresh reusable worker.
258 async fn acquire_worker(
259 &self,
260 key: &str,
261 spec: &ProvisionSpec,
262 ) -> crate::runtime::runner::Result<PooledActor> {
263 // Drain the pool bucket, validating liveness; a worker that hit its idle
264 // timeout has exited and withdrawn its fabric record — skip and reap it.
265 loop {
266 let candidate = {
267 let mut pool = self.pool.lock().await;
268 pool.get_mut(key).and_then(|bucket| bucket.pop())
269 };
270 let Some(candidate) = candidate else { break };
271 let alive = Fabric::at(&self.fabric_dir)
272 .resolve(&candidate.agent_id)
273 .await
274 .ok()
275 .flatten()
276 .is_some();
277 if alive {
278 return Ok(candidate);
279 }
280 candidate.worker.kill().await;
281 }
282
283 let spawned = spawn_worker(
284 &self.worker_bin,
285 &self.worker_args,
286 spec,
287 self.spawn_timeout,
288 )
289 .await
290 .map_err(|e| AgentError::LLM(format!("actor spawn/register failed: {e}")))?;
291 let endpoint = spawned.record.endpoint.clone();
292 let agent_id = spawned.record.agent_id.clone();
293 Ok(PooledActor {
294 worker: spawned,
295 endpoint,
296 agent_id,
297 })
298 }
299
300 /// Park a worker for reuse after a clean run; if the bucket is full, retire it.
301 async fn release_worker(&self, key: &str, actor: PooledActor) {
302 let mut pool = self.pool.lock().await;
303 let bucket = pool.entry(key.to_string()).or_default();
304 if bucket.len() >= self.max_idle_per_key {
305 drop(pool);
306 self.retire_worker(actor).await;
307 return;
308 }
309 bucket.push(actor);
310 }
311
312 /// Forcefully stop a worker and clean its discovery record.
313 async fn retire_worker(&self, actor: PooledActor) {
314 let agent_id = actor.agent_id.clone();
315 actor.worker.kill().await;
316 let _ = Fabric::at(&self.fabric_dir).withdraw(&agent_id).await;
317 }
318
319 /// Assemble the parent-resolved provisioning document for this child.
320 fn build_spec(&self, session: &Session, job: &SpawnJob) -> ProvisionSpec {
321 let mut spec = ProvisionSpec::new(
322 ChildIdentity {
323 child_id: job.child_session_id.clone(),
324 parent_id: Some(job.parent_session_id.clone()),
325 project_key: None,
326 role: session
327 .metadata
328 .get("subagent_type")
329 .cloned()
330 .unwrap_or_else(|| "worker".to_string()),
331 // The child session already carries the correct depth
332 // (create_child_action's new_child_of did parent.spawn_depth+1);
333 // stamp it so the worker can re-establish it on its run session
334 // and enforce the max-depth cap across the actor boundary.
335 depth: session.spawn_depth,
336 },
337 self.executor.clone(),
338 self.fabric_dir.to_string_lossy().into_owned(),
339 );
340 spec.workspace = session.workspace.clone();
341 // Final model: the session's pinned model_ref (create.model / routing already applied),
342 // falling back to the job's bare model on the parent's default provider.
343 spec.model = session
344 .model_ref
345 .as_ref()
346 .map(|r| ModelRefSpec {
347 provider: r.provider.clone(),
348 model: r.model.clone(),
349 })
350 .or_else(|| {
351 let m = job.model.trim();
352 (!m.is_empty()).then(|| ModelRefSpec {
353 provider: self.default_provider.clone(),
354 model: m.to_string(),
355 })
356 });
357 spec.disabled_tools = job.disabled_tools.clone();
358 // Least-privilege secrets: only the credential for the child's provider.
359 let provider = spec
360 .model
361 .as_ref()
362 .map(|m| m.provider.as_str())
363 .filter(|p| !p.trim().is_empty())
364 .unwrap_or(&self.default_provider);
365 if let Some(cred) = self.credentials.iter().find(|c| c.provider == provider) {
366 spec.secrets.provider_credentials.push(cred.clone());
367 } else {
368 tracing::warn!(
369 "actor child {}: no credential found for provider '{}'",
370 job.child_session_id,
371 provider
372 );
373 }
374 // Phase 6 (direct nested execution): a worker BELOW the depth cap may
375 // orchestrate its OWN children — on startup it builds its own spawn
376 // stack and runs the real SubAgent tool (no host proxy). The cap (the
377 // SubAgent tool refuses to spawn at/over `max_spawn_depth`) bounds the
378 // recursion. Driven purely by the child's depth, so it auto-propagates
379 // down the tree without any extra config threading.
380 spec.capabilities.nested_spawn = session.spawn_depth < MAX_SPAWN_DEPTH;
381 spec.capabilities.max_spawn_depth = Some(MAX_SPAWN_DEPTH);
382 // #69: activate child-approval review. Sub-agents enforce permissions so
383 // their DANGEROUS actions (the worker uses a HIGH threshold) reach the
384 // parent for review — escalated to the human, or model-reviewed off-loop
385 // when the parent is in bypass. The worker installs no checker without
386 // this, so the whole review chain would otherwise stay dormant.
387 spec.capabilities.enforce_permissions = true;
388 // Propagate "bypass permissions" so a self-orchestrating worker knows it
389 // is a bypassed parent and installs the off-loop model-reviewer for its
390 // children's forced-ask actions (Phase 6, Part B). The child session
391 // already carries the inherited flag (create_child_action seeds it).
392 spec.capabilities.bypass = session
393 .agent_runtime_state
394 .as_ref()
395 .is_some_and(|s| s.bypass_permissions);
396 // #73: propagate "no interactive human approver" (headless / scheduled /
397 // deployed root, inherited by the child session). When set, the worker's
398 // per-run approval proxy model-reviews a gated action locally instead of
399 // escalating to a human who will never answer (which would 300s-deny).
400 spec.capabilities.no_human_approver = session
401 .agent_runtime_state
402 .as_ref()
403 .is_some_and(|s| s.no_human_approver);
404 // #71: mark a READ-ONLY Guardian reviewer so the worker installs the
405 // read-only Bash allowlist checker. The reviewer is spawned by
406 // `spawn_guardian_review` with `subagent_type == "guardian"` (the SAME
407 // marker the completion coordinator branches on to parse the verdict) AND
408 // the `guardian_read_only_disabled_tools` denylist. Keyed off that role
409 // marker (already read above to set `identity.role`), so it rides the same
410 // session-metadata path the denylist/subagent_type use — no new wire seam.
411 // Without this the worker keeps an UNRESTRICTED Bash, so the reviewer could
412 // still `rm -rf` / `git push` / `curl | sh`, defeating "read-only".
413 spec.capabilities.guardian_read_only =
414 session.metadata.get("subagent_type").map(String::as_str) == Some("guardian");
415 spec
416 }
417}
418
419#[async_trait]
420impl ExternalChildRunner for ActorChildRunner {
421 async fn should_handle(&self, session: &Session) -> bool {
422 session.metadata.get("runtime.kind") == Some(&"external".to_string())
423 && session.metadata.get("external.protocol") == Some(&"actor".to_string())
424 && session.metadata.get("external.agent_id") == Some(&self.agent_id)
425 }
426
427 fn set_escalation_bridge(&self, bridge: Option<bamboo_subagent::executor::HostBridge>) {
428 *self.escalation_bridge.lock().unwrap() = bridge;
429 }
430
431 async fn execute_external_child(
432 &self,
433 session: &mut Session,
434 job: &SpawnJob,
435 event_tx: mpsc::Sender<AgentEvent>,
436 cancel_token: CancellationToken,
437 ) -> crate::runtime::runner::Result<()> {
438 // #68 CORRECTNESS CRUX: capture the per-run escalation bridge HERE, at the
439 // moment this grandchild is spawned — while the parent run's bridge is
440 // still in our slot — into an owned local handed to `drive()` for this
441 // grandchild's whole lifetime. A fire-and-forget grandchild that OUTLIVES
442 // the run that spawned it must NOT re-read `self.escalation_bridge` at
443 // approval time: by then `run()` may have cleared/overwritten it (a worker
444 // serves runs sequentially), and re-proxying through a closed bridge
445 // fail-closed denies. Capturing at spawn pins the right bridge per run.
446 let escalation = self.escalation_bridge.lock().unwrap().clone();
447 let assignment = extract_assignment(session);
448 let mut spec = self.build_spec(session, job);
449 // Make every actor a warm, reusable worker so the pool can recycle it for
450 // the next sibling with a matching fingerprint.
451 spec.reusable = true;
452 if spec.limits.idle_timeout_secs.is_none() {
453 spec.limits.idle_timeout_secs = Some(POOLED_IDLE_TIMEOUT_SECS);
454 }
455 let pool_key = Self::fingerprint(&spec);
456 // Rehydration: the child session in the parent's store is the actor's
457 // durable state. Ship the full conversation so a reactivation
458 // (send_message / update / rerun) carries its history. A reused worker is
459 // stateless between runs, so this is also what isolates each child's
460 // context on a shared process.
461 let messages: Vec<serde_json::Value> = session
462 .messages
463 .iter()
464 .filter_map(|m| serde_json::to_value(m).ok())
465 .collect();
466
467 // Backpressure: hold a concurrency slot for the lifetime of the *run*
468 // (cancellation still proceeds — the cancel branch in drive() runs while
469 // we hold the permit). Released when this fn returns, i.e. once the worker
470 // is parked back into the pool, so idle workers don't pin slots.
471 let _slot = self
472 .concurrency
473 .acquire()
474 .await
475 .map_err(|_| AgentError::LLM("actor concurrency limiter closed".to_string()))?;
476
477 // Check out a warm worker (reuse-or-spawn).
478 let mut actor = self.acquire_worker(&pool_key, &spec).await?;
479
480 let mut client = match ChildClient::connect(&actor.endpoint).await {
481 Ok(client) => client,
482 Err(e) => {
483 // The pooled worker may have died between checkout and connect;
484 // retire it and spawn one fresh, once.
485 self.retire_worker(actor).await;
486 let spawned = spawn_worker(
487 &self.worker_bin,
488 &self.worker_args,
489 &spec,
490 self.spawn_timeout,
491 )
492 .await
493 .map_err(|e2| {
494 AgentError::LLM(format!("actor respawn after reuse miss ({e}): {e2}"))
495 })?;
496 let endpoint = spawned.record.endpoint.clone();
497 let agent_id = spawned.record.agent_id.clone();
498 let client = ChildClient::connect(&endpoint)
499 .await
500 .map_err(|e2| AgentError::LLM(format!("actor connect failed: {e2}")))?;
501 actor = PooledActor {
502 worker: spawned,
503 endpoint,
504 agent_id,
505 };
506 client
507 }
508 };
509
510 client
511 .send(ParentFrame::Run(RunSpec {
512 assignment,
513 reasoning_effort: None,
514 messages,
515 }))
516 .await
517 .map_err(|e| AgentError::LLM(format!("actor run dispatch failed: {e}")))?;
518
519 // Register as a live actor so send_message (running, no interrupt) can
520 // steer this child in-band over the existing WS connection. The guard
521 // unregisters on every exit path.
522 let (live_tx, mut live_rx) = mpsc::unbounded_channel::<ParentFrame>();
523 let live_guard = super::live::register(&job.child_session_id, live_tx);
524
525 let result = drive(
526 &mut client,
527 &job.child_session_id,
528 self.approval_decider.as_ref(),
529 escalation,
530 &event_tx,
531 &cancel_token,
532 &mut live_rx,
533 )
534 .await;
535 // Unregister IMMEDIATELY: after drive returns nobody consumes live_rx,
536 // so a send_message landing in the close/park window below must see
537 // "not live" and take the durable-queue fallback instead of vanishing.
538 // (Even if one slipped in earlier, send_message also appends it to the
539 // durable transcript, so the next activation still rehydrates it.)
540 drop(live_guard);
541
542 // Close the connection: the worker's serve loop then accepts the next
543 // assignment (reuse) or idles out. Park the worker on a clean run; retire
544 // it on error/cancel (a wedged worker must not be reused).
545 let _ = client.close().await;
546 match &result {
547 Ok(_) => self.release_worker(&pool_key, actor).await,
548 Err(_) => self.retire_worker(actor).await,
549 }
550
551 // Write-back: persist the actor's final reply onto the child session so
552 // the transcript survives and the NEXT activation sees it as history.
553 // (run_child_spawn saves the session right after we return.)
554 match result {
555 Ok(Some(text)) => {
556 if !text.is_empty() {
557 session.add_message(bamboo_agent_core::Message::assistant(text, None));
558 }
559 Ok(())
560 }
561 Ok(None) => Ok(()),
562 Err(e) => Err(e),
563 }
564 }
565}
566
567/// Pump child frames -> parent events until a terminal frame (or cancellation).
568/// On success, yields the actor's final result text (for session write-back).
569/// `live_rx` carries in-band frames (steering messages) from the live registry.
570///
571/// `escalation_bridge` (#68) is the per-run escalation host bridge CAPTURED BY
572/// VALUE at spawn time in `execute_external_child` (NOT read live here): when a
573/// non-bypass child re-proxies an approval request, this owned bridge routes it
574/// UP to the parent run. Owning it for the call's lifetime is what lets a
575/// fire-and-forget grandchild that outlives its spawning run still escalate to
576/// the correct (then-current) parent bridge rather than a stale/overwritten one.
577async fn drive(
578 client: &mut ChildClient,
579 child_session_id: &str,
580 approval_decider: Option<&Arc<dyn ChildApprovalDecider>>,
581 escalation_bridge: Option<bamboo_subagent::executor::HostBridge>,
582 event_tx: &mpsc::Sender<AgentEvent>,
583 cancel_token: &CancellationToken,
584 live_rx: &mut mpsc::UnboundedReceiver<ParentFrame>,
585) -> crate::runtime::runner::Result<Option<String>> {
586 loop {
587 tokio::select! {
588 _ = cancel_token.cancelled() => {
589 // fall through to the cancel handling below
590 break;
591 }
592 Some(frame) = live_rx.recv() => {
593 // Forward in-band steering to the worker over the existing WS.
594 if client.send(frame).await.is_err() {
595 tracing::warn!("live steering frame could not be sent; connection failing");
596 }
597 }
598 frame = client.next_frame() => {
599 match frame {
600 Ok(Some(ChildFrame::Event { event })) => {
601 // AgentEvent is serialized verbatim on the wire (zero mapping).
602 if let Ok(ev) = serde_json::from_value::<AgentEvent>(event) {
603 let _ = event_tx.send(ev).await;
604 }
605 }
606 Ok(Some(ChildFrame::ApprovalRequest { id, body })) => {
607 // Phase 2: a worker proxied a gated-tool approval back to
608 // the host. The WORKER side is live — its executor installs
609 // a per-run task-local `ApprovalProxy` (subagent_worker.rs)
610 // that calls `host.approval_call`, so this frame arrives
611 // when a child hits `ConfirmationRequired`.
612 if let Some(reviewer) = child_approval_reviewer() {
613 // Phase 6, Part B: a BYPASSED parent worker
614 // model-reviews its children's forced-ask (dangerous)
615 // actions. The review is an LLM call, so run it OFF
616 // the frame pump in a spawned task and deliver the
617 // verdict async via the live channel — the pump keeps
618 // forwarding events and the agent loop never blocks. A
619 // timeout denies a hung review so the child can't hang.
620 let child = child_session_id.to_string();
621 let req_id = id.clone();
622 let body = body.clone();
623 tokio::spawn(async move {
624 let approved = tokio::time::timeout(
625 CHILD_APPROVAL_TIMEOUT,
626 reviewer.review(&child, &body),
627 )
628 .await
629 .unwrap_or(false);
630 super::live::deliver_approval(&child, &req_id, approved);
631 });
632 } else if approval_decider.is_some() {
633 // A decider is wired (policy / auto): decide promptly
634 // and reply inline. (Must not block the pump — see the
635 // `ChildApprovalDecider` doc.)
636 let approved =
637 decide_child_approval(approval_decider, child_session_id, &body)
638 .await;
639 if client
640 .send(ParentFrame::ApprovalReply { id, approved })
641 .await
642 .is_err()
643 {
644 tracing::warn!(
645 "failed to answer approval_request; connection failing"
646 );
647 }
648 } else if let Some(host) = escalation_bridge.clone() {
649 // Non-bypass WORKER: ESCALATE up our own actor link
650 // (re-proxy) so the request chains to our parent — and
651 // up every level until a bypass level (model-review) or
652 // the top orchestrator (human) decides. Off-loop so the
653 // pump never blocks; relay the reply down to the child.
654 let child = child_session_id.to_string();
655 let req_id = id.clone();
656 let body = body.clone();
657 tokio::spawn(async move {
658 let approved = match tokio::time::timeout(
659 CHILD_APPROVAL_TIMEOUT,
660 host.approval_call(body),
661 )
662 .await
663 {
664 Ok(Ok(reply)) => reply
665 .get("approved")
666 .and_then(|v| v.as_bool())
667 .unwrap_or(false),
668 // Transport error or timeout ⇒ fail closed.
669 _ => false,
670 };
671 super::live::deliver_approval(&child, &req_id, approved);
672 });
673 } else {
674 // Top orchestrator (no escalation bridge): human-in-the-
675 // loop. Surface the request on the parent's event stream
676 // and DEFER — the decision arrives out-of-band via
677 // `live::deliver_approval(child, request_id, approved)`
678 // (→ this child's `live_rx` → forwarded to the worker
679 // above). A timeout denies a never-answered request so
680 // it can't hang the child forever.
681 let (tool_name, permission, resource) =
682 approval_request_fields(&body);
683 // Register the pending request BEFORE surfacing it so
684 // the external handler's `deliver_approval_checked` can
685 // correlate an out-of-band POST against a genuine
686 // human-loop request (and consume it one-shot).
687 super::live::register_pending_approval(child_session_id, &id);
688 let _ = event_tx
689 .send(AgentEvent::ChildApprovalRequested {
690 child_session_id: child_session_id.to_string(),
691 request_id: id.clone(),
692 tool_name,
693 permission,
694 resource,
695 })
696 .await;
697 let child = child_session_id.to_string();
698 tokio::spawn(async move {
699 tokio::time::sleep(CHILD_APPROVAL_TIMEOUT).await;
700 // Deny only if still pending: a one-shot consume so
701 // we don't double-deliver if the human already
702 // answered (the POST took it), and so a late POST
703 // after this fires finds nothing pending.
704 if super::live::take_pending_approval(&child, &id) {
705 super::live::deliver_approval(&child, &id, false);
706 }
707 });
708 }
709 }
710 Ok(Some(ChildFrame::Terminal { status, result, error, .. })) => {
711 return match status {
712 TerminalStatus::Completed => Ok(result),
713 TerminalStatus::Cancelled => Err(AgentError::Cancelled),
714 TerminalStatus::Error => Err(AgentError::LLM(
715 error.unwrap_or_else(|| "actor child errored".to_string()),
716 )),
717 // The suspend/resume round-trip (host re-dispatch of a
718 // nested parent) is not wired here yet; a worker in
719 // this build never emits Suspended, so this is
720 // unreachable in practice.
721 TerminalStatus::Suspended => Err(AgentError::LLM(
722 "nested sub-agent suspend received but resume transport is not wired"
723 .to_string(),
724 )),
725 };
726 }
727 Ok(None) => {
728 return Err(AgentError::LLM(
729 "actor child closed before terminal".to_string(),
730 ));
731 }
732 Err(e) => {
733 return Err(AgentError::LLM(format!("actor transport error: {e}")));
734 }
735 }
736 }
737 }
738 }
739
740 // Only reached on cancellation: ask the child to stop (best-effort), then report cancelled.
741 let _ = client.send(ParentFrame::Cancel).await;
742 Err(AgentError::Cancelled)
743}
744
745/// The assignment text = the child session's latest user message (falls back to its title).
746fn extract_assignment(session: &Session) -> String {
747 session
748 .messages
749 .iter()
750 .rev()
751 .find(|m| matches!(m.role, Role::User))
752 .map(|m| m.content.clone())
753 .unwrap_or_else(|| {
754 session
755 .metadata
756 .get("title")
757 .cloned()
758 .unwrap_or_else(|| "Execute task".to_string())
759 })
760}
761
762#[cfg(test)]
763mod tests {
764 use super::*;
765
766 fn spec_with(
767 role: &str,
768 provider: &str,
769 model: &str,
770 workspace: Option<&str>,
771 disabled: Option<Vec<&str>>,
772 ) -> ProvisionSpec {
773 let mut spec = ProvisionSpec::new(
774 ChildIdentity {
775 child_id: "c".into(),
776 parent_id: None,
777 project_key: None,
778 role: role.into(),
779 depth: 0,
780 },
781 ExecutorSpec::Echo,
782 "/tmp/fab".into(),
783 );
784 spec.workspace = workspace.map(|w| w.to_string());
785 spec.model = Some(ModelRefSpec {
786 provider: provider.into(),
787 model: model.into(),
788 });
789 spec.disabled_tools = disabled.map(|d| d.into_iter().map(String::from).collect());
790 spec
791 }
792
793 #[test]
794 fn fingerprint_matches_interchangeable_children() {
795 // Same role/provider/model/workspace and equal tool sets (order-insensitive)
796 // are interchangeable on one warm worker — and differ only in child_id.
797 let a = spec_with(
798 "explorer",
799 "p",
800 "m",
801 Some("/ws"),
802 Some(vec!["Bash", "Edit"]),
803 );
804 let mut b = spec_with(
805 "explorer",
806 "p",
807 "m",
808 Some("/ws"),
809 Some(vec!["Edit", "Bash"]),
810 );
811 b.identity.child_id = "other".into();
812 assert_eq!(
813 ActorChildRunner::fingerprint(&a),
814 ActorChildRunner::fingerprint(&b)
815 );
816 }
817
818 #[test]
819 fn fingerprint_separates_distinct_runtimes() {
820 let base = spec_with("explorer", "p", "m", Some("/ws"), None);
821 let base_fp = ActorChildRunner::fingerprint(&base);
822 // Each axis that is baked into the worker must split the pool bucket.
823 assert_ne!(
824 base_fp,
825 ActorChildRunner::fingerprint(&spec_with("writer", "p", "m", Some("/ws"), None))
826 );
827 assert_ne!(
828 base_fp,
829 ActorChildRunner::fingerprint(&spec_with("explorer", "p2", "m", Some("/ws"), None))
830 );
831 assert_ne!(
832 base_fp,
833 ActorChildRunner::fingerprint(&spec_with("explorer", "p", "m2", Some("/ws"), None))
834 );
835 assert_ne!(
836 base_fp,
837 ActorChildRunner::fingerprint(&spec_with("explorer", "p", "m", Some("/ws2"), None))
838 );
839 assert_ne!(
840 base_fp,
841 ActorChildRunner::fingerprint(&spec_with(
842 "explorer",
843 "p",
844 "m",
845 Some("/ws"),
846 Some(vec!["Bash"])
847 ))
848 );
849 }
850
851 #[test]
852 fn fingerprint_splits_on_baked_capabilities() {
853 // Every capability baked once at provision time must split the pool
854 // bucket, else a worker baked for one posture gets reused for another
855 // (e.g. a depth-1 worker re-stamping spawn_depth onto a depth-4 child,
856 // breaking the depth cap; or a bypass worker reused for a non-bypass one).
857 let base_fp =
858 ActorChildRunner::fingerprint(&spec_with("explorer", "p", "m", Some("/ws"), None));
859
860 let mut depth = spec_with("explorer", "p", "m", Some("/ws"), None);
861 depth.identity.depth = 2;
862 assert_ne!(
863 base_fp,
864 ActorChildRunner::fingerprint(&depth),
865 "depth must split"
866 );
867
868 let mut nested = spec_with("explorer", "p", "m", Some("/ws"), None);
869 nested.capabilities.nested_spawn = true;
870 assert_ne!(
871 base_fp,
872 ActorChildRunner::fingerprint(&nested),
873 "nested_spawn must split"
874 );
875
876 let mut bypass = spec_with("explorer", "p", "m", Some("/ws"), None);
877 bypass.capabilities.bypass = true;
878 assert_ne!(
879 base_fp,
880 ActorChildRunner::fingerprint(&bypass),
881 "bypass must split"
882 );
883
884 let mut enforce = spec_with("explorer", "p", "m", Some("/ws"), None);
885 enforce.capabilities.enforce_permissions = true;
886 assert_ne!(
887 base_fp,
888 ActorChildRunner::fingerprint(&enforce),
889 "enforce_permissions must split"
890 );
891
892 let mut cap = spec_with("explorer", "p", "m", Some("/ws"), None);
893 cap.capabilities.max_spawn_depth = Some(8);
894 assert_ne!(
895 base_fp,
896 ActorChildRunner::fingerprint(&cap),
897 "max_spawn_depth must split"
898 );
899
900 // #73 (P1): the worker bakes `no_human_review` from this flag once at
901 // build(), so it MUST split the pool or a worker baked for one approval
902 // posture is reused for the opposite one.
903 let mut nha = spec_with("explorer", "p", "m", Some("/ws"), None);
904 nha.capabilities.no_human_approver = true;
905 assert_ne!(
906 base_fp,
907 ActorChildRunner::fingerprint(&nha),
908 "no_human_approver must split"
909 );
910
911 // #71: the read-only Bash checker is baked once at build() from this flag,
912 // so a guardian reviewer worker must not be reused for an ordinary child.
913 let mut gro = spec_with("explorer", "p", "m", Some("/ws"), None);
914 gro.capabilities.guardian_read_only = true;
915 assert_ne!(
916 base_fp,
917 ActorChildRunner::fingerprint(&gro),
918 "guardian_read_only must split"
919 );
920 }
921
922 struct StaticDecider(bool);
923
924 #[async_trait]
925 impl ChildApprovalDecider for StaticDecider {
926 async fn decide(&self, _child: &str, _req: &serde_json::Value) -> bool {
927 self.0
928 }
929 }
930
931 #[tokio::test]
932 async fn child_approval_fails_closed_without_decider() {
933 // No decider wired ⇒ the host denies (safe default), unchanged behavior.
934 let body = serde_json::json!({"tool_name":"Bash","permission":"run","resource":"rm -rf /"});
935 assert!(!decide_child_approval(None, "child-1", &body).await);
936 }
937
938 #[tokio::test]
939 async fn child_approval_honors_wired_decider() {
940 let body =
941 serde_json::json!({"tool_name":"Write","permission":"write","resource":"/tmp/x"});
942 let approve: Arc<dyn ChildApprovalDecider> = Arc::new(StaticDecider(true));
943 let deny: Arc<dyn ChildApprovalDecider> = Arc::new(StaticDecider(false));
944 assert!(decide_child_approval(Some(&approve), "child-1", &body).await);
945 assert!(!decide_child_approval(Some(&deny), "child-1", &body).await);
946 }
947
948 #[test]
949 fn approval_request_fields_extracts_and_defaults() {
950 let full = serde_json::json!({"tool_name":"Bash","permission":"run","resource":"ls"});
951 assert_eq!(
952 approval_request_fields(&full),
953 ("Bash".to_string(), "run".to_string(), "ls".to_string())
954 );
955 // Missing fields default to empty strings.
956 let partial = serde_json::json!({"tool_name":"Write"});
957 assert_eq!(
958 approval_request_fields(&partial),
959 ("Write".to_string(), String::new(), String::new())
960 );
961 }
962}