bamboo_engine/external_agents/actor_adapter.rs
1//! Actor external child runner.
2//!
3//! Runs a child session as an independent **actor**: a separate OS process with its own
4//! isolated context, speaking the `bamboo-subagent` WebSocket protocol. This is the
5//! engine-side adapter on the `wants_external` seam: it spawns the worker binary, waits for
6//! it to self-register into the Tier-1 file fabric, connects, sends the assignment, and
7//! forwards the child's `AgentEvent`s back onto the parent's `event_tx`.
8//!
9//! The built-in **local actor** instance of this runner is the default runtime for
10//! every sub-agent (the in-process runtime was removed). The expert `externalAgents`
11//! tables can additionally route specific roles to other actor/a2a agents.
12
13use std::collections::HashMap;
14use std::path::PathBuf;
15use std::sync::Arc;
16use std::time::Duration;
17
18use async_trait::async_trait;
19use bamboo_agent_core::{AgentError, AgentEvent, Role, Session};
20use tokio::sync::{mpsc, Mutex};
21use tokio_util::sync::CancellationToken;
22
23use bamboo_subagent::discovery::Fabric;
24use bamboo_subagent::fleet::{spawn_worker, SpawnedChild};
25use bamboo_subagent::proto::{ChildFrame, ParentFrame, RunSpec, TerminalStatus};
26use bamboo_subagent::provision::{
27 ChildIdentity, ExecutorSpec, ModelRefSpec, ProvisionSpec, ScopedCredential,
28};
29use bamboo_subagent::transport::ChildClient;
30
31use crate::runtime::execution::{ExternalChildRunner, SpawnJob};
32
33/// Default cap on simultaneously running actor processes.
34pub const DEFAULT_MAX_CONCURRENT_ACTORS: usize = 8;
35
36/// Max nesting depth for direct nested execution (Phase 6). A worker whose
37/// session `spawn_depth` is below this gets its own spawn stack + the real
38/// SubAgent tool; at/over it, neither (and the tool itself refuses). Mirrors
39/// `bamboo_server_tools::DEFAULT_MAX_SPAWN_DEPTH` (kept in sync; engine can't
40/// depend on server-tools). Root orchestrator = 0 ⇒ 4 levels of sub-agents.
41pub const MAX_SPAWN_DEPTH: u32 = 4;
42
43/// Default cap on idle pooled (warm, reusable) workers kept per fingerprint.
44const DEFAULT_MAX_IDLE_PER_KEY: usize = 4;
45
46/// How long a pooled worker waits for its next assignment before reclaiming
47/// itself (must comfortably exceed the gap between sibling spawns).
48const POOLED_IDLE_TIMEOUT_SECS: u64 = 300;
49
50/// A warm worker parked for reuse: its process handle (killed on drop), the WS
51/// endpoint to reconnect to, and the id it registered under in the fabric.
52struct PooledActor {
53 worker: SpawnedChild,
54 endpoint: String,
55 agent_id: String,
56}
57
58/// Spawns and drives a child session as an independent actor: a `bamboo-subagent` worker process.
59pub struct ActorChildRunner {
60 agent_id: String,
61 worker_bin: PathBuf,
62 worker_args: Vec<String>,
63 fabric_dir: PathBuf,
64 executor: ExecutorSpec,
65 /// Per-provider credentials snapshotted from the parent config at build
66 /// time; the spec carries only the ONE the child's provider needs.
67 credentials: Vec<ScopedCredential>,
68 /// Parent's default provider (used when the child has no explicit one).
69 default_provider: String,
70 /// Backpressure: bounds the number of concurrently *running* actors; further
71 /// runs wait for a slot instead of exploding the process table. (Idle pooled
72 /// workers do not hold a slot.)
73 concurrency: std::sync::Arc<tokio::sync::Semaphore>,
74 spawn_timeout: Duration,
75 /// Warm-worker pool keyed by a reuse fingerprint
76 /// (role/provider/model/workspace/disabled-tools). A finished run parks its
77 /// worker here so the next matching child reuses it instead of spawning a
78 /// fresh process — collapsing N sibling sub-agents onto a few processes.
79 pool: Arc<Mutex<HashMap<String, Vec<PooledActor>>>>,
80 max_idle_per_key: usize,
81 /// Host-side decision for a child's gated-tool approval request (Phase 2).
82 /// `None` ⇒ fail-closed DENY (the safe default). A wired decider (policy or
83 /// human-routing bridge) returns approve/deny over the actor WS.
84 approval_decider: Option<Arc<dyn ChildApprovalDecider>>,
85 /// Host-side fulfilment of a child's nested SubAgent spawn request (Phase 6).
86 /// `None` ⇒ graceful "not available" error (the safe default).
87 nested_spawn_handler: Option<Arc<dyn NestedSpawnHandler>>,
88}
89
90/// Decides how the host answers a child worker's gated-tool approval request
91/// (Phase 2: child → parent approval delegation). Async so an implementation
92/// can consult a policy or defer to a human. With no decider wired the host
93/// replies with a fail-closed DENY.
94///
95/// NOTE: `decide` is awaited inside the per-child frame pump, so an
96/// implementation must resolve promptly (e.g. a policy lookup). A human-in-the-
97/// loop decision that may block indefinitely should instead be delivered
98/// out-of-band as a `ParentFrame::ApprovalReply` via the live steering channel
99/// (`super::live`), which `drive()` already forwards to the worker without
100/// stalling the pump.
101#[async_trait]
102pub trait ChildApprovalDecider: Send + Sync {
103 /// Decide whether `child_session_id` may perform the gated action described
104 /// by `request` (`{tool_name, permission, resource}`).
105 async fn decide(&self, child_session_id: &str, request: &serde_json::Value) -> bool;
106}
107
108/// Resolve a child approval request to approve/deny. Fail-closed (DENY) when no
109/// decider is wired — the single, testable seam for the host-side decision.
110async fn decide_child_approval(
111 decider: Option<&Arc<dyn ChildApprovalDecider>>,
112 child_session_id: &str,
113 request: &serde_json::Value,
114) -> bool {
115 match decider {
116 Some(decider) => decider.decide(child_session_id, request).await,
117 None => false,
118 }
119}
120
121/// How long the host waits for a human approval decision before failing the
122/// child's gated tool closed (DENY). Bounds an unanswered request so it can't
123/// hang the worker indefinitely.
124const CHILD_APPROVAL_TIMEOUT: Duration = Duration::from_secs(300);
125
126/// Extract `(tool_name, permission, resource)` from a worker's approval request
127/// body (`{tool_name, permission, resource}`); missing fields default to empty.
128fn approval_request_fields(body: &serde_json::Value) -> (String, String, String) {
129 let field = |k: &str| {
130 body.get(k)
131 .and_then(|v| v.as_str())
132 .unwrap_or("")
133 .to_string()
134 };
135 (field("tool_name"), field("permission"), field("resource"))
136}
137
138/// Fulfils a child worker's nested SubAgent spawn request on the host (Phase 6:
139/// nested execution). When a child wants a grandchild, its `host.subagent_call`
140/// arrives as a `ChildFrame::SubagentRequest`; an implementation runs the real
141/// spawn (parenting the grandchild under the requesting child) and returns the
142/// SubAgent tool's result JSON. With no handler wired the host replies with a
143/// graceful error so the child's tool fails cleanly instead of hanging.
144#[async_trait]
145pub trait NestedSpawnHandler: Send + Sync {
146 /// Run a nested SubAgent spawn requested by `child_session_id`; `request` is
147 /// the SubAgent tool-call body. Returns the tool-result JSON, or an error
148 /// string (surfaced to the child as a failed tool result).
149 async fn spawn_nested(
150 &self,
151 child_session_id: &str,
152 request: serde_json::Value,
153 ) -> Result<serde_json::Value, String>;
154}
155
156/// The graceful "not available" reply body for an unfulfilled nested spawn.
157fn nested_spawn_unavailable(reason: &str) -> serde_json::Value {
158 serde_json::json!({
159 "success": false,
160 "result": reason,
161 "display_preference": null,
162 })
163}
164
165/// Resolve a nested-spawn request to a `SubagentReply` body. With no handler
166/// wired, returns the graceful "not available" error body (unchanged default).
167async fn fulfil_nested_spawn(
168 handler: Option<&Arc<dyn NestedSpawnHandler>>,
169 child_session_id: &str,
170 request: serde_json::Value,
171) -> serde_json::Value {
172 match handler {
173 Some(handler) => match handler.spawn_nested(child_session_id, request).await {
174 Ok(result) => result,
175 Err(e) => nested_spawn_unavailable(&format!("nested sub-agent spawn failed: {e}")),
176 },
177 None => nested_spawn_unavailable("nested sub-agent spawn is not available in this build"),
178 }
179}
180
181/// Process-global slot for the singleton host-side nested-spawn handler.
182///
183/// Set once at server startup — AFTER the `ChildSessionAdapter` exists, which is
184/// how we resolve the runner→scheduler→adapter construction-order cycle (the
185/// runner is built before the adapter that would be its handler, and it's then
186/// dyn-erased into the composite runner so it can't be reached for a setter).
187/// Read by `drive()` for every child run. Mirrors the `super::live` registry.
188fn nested_spawn_handler_slot() -> &'static std::sync::OnceLock<Arc<dyn NestedSpawnHandler>> {
189 static SLOT: std::sync::OnceLock<Arc<dyn NestedSpawnHandler>> = std::sync::OnceLock::new();
190 &SLOT
191}
192
193/// Install the process-global nested-spawn handler (idempotent; first wins).
194pub fn set_nested_spawn_handler(handler: Arc<dyn NestedSpawnHandler>) {
195 let _ = nested_spawn_handler_slot().set(handler);
196}
197
198/// The process-global nested-spawn handler, if installed.
199pub fn nested_spawn_handler() -> Option<Arc<dyn NestedSpawnHandler>> {
200 nested_spawn_handler_slot().get().cloned()
201}
202
203/// Off-loop reviewer for a child's gated-tool approval request (Phase 6, Part B).
204///
205/// Installed (process-global) by a BYPASSED self-orchestrating worker so its
206/// children's forced-ask (dangerous) gated actions — which still raise
207/// `ConfirmationRequired` even under bypass — get an LLM reasonableness check
208/// rather than a blind pass. `review` is an LLM call: `drive()` invokes it in a
209/// SPAWNED task (NEVER in the frame pump) and delivers the verdict async via the
210/// live channel, so the agent loop is never blocked.
211#[async_trait]
212pub trait ChildApprovalReviewer: Send + Sync {
213 /// Judge whether the gated action `request` (`{tool_name, permission,
214 /// resource}`) is reasonable for `child_session_id`'s task. `true` = approve.
215 async fn review(&self, child_session_id: &str, request: &serde_json::Value) -> bool;
216}
217
218fn child_approval_reviewer_slot() -> &'static std::sync::OnceLock<Arc<dyn ChildApprovalReviewer>> {
219 static SLOT: std::sync::OnceLock<Arc<dyn ChildApprovalReviewer>> = std::sync::OnceLock::new();
220 &SLOT
221}
222
223/// Install the process-global child-approval reviewer (idempotent; first wins).
224pub fn set_child_approval_reviewer(reviewer: Arc<dyn ChildApprovalReviewer>) {
225 let _ = child_approval_reviewer_slot().set(reviewer);
226}
227
228/// The process-global child-approval reviewer, if installed.
229pub fn child_approval_reviewer() -> Option<Arc<dyn ChildApprovalReviewer>> {
230 child_approval_reviewer_slot().get().cloned()
231}
232
233/// Per-run escalation bridge for non-bypass child-approval routing (Phase 6,
234/// Part B). A WORKER's `run()` installs its OWN host bridge here; its `drive()`
235/// (driving grandchildren) uses it to RE-PROXY a child's approval request UP to
236/// its own parent — chaining the request up every level until a bypass level
237/// (model-review) or the top orchestrator (human) decides, then relaying the
238/// reply back down. The top server never installs one ⇒ its drive() falls to the
239/// human-loop. Set per-run (a worker serves runs sequentially, so one active
240/// bridge); a stale bridge from a finished run errors on call ⇒ fail-closed.
241fn escalation_bridge_slot(
242) -> &'static std::sync::Mutex<Option<bamboo_subagent::executor::HostBridge>> {
243 static SLOT: std::sync::Mutex<Option<bamboo_subagent::executor::HostBridge>> =
244 std::sync::Mutex::new(None);
245 &SLOT
246}
247
248/// Install (or clear with `None`) the process escalation host bridge.
249pub fn set_escalation_host_bridge(bridge: Option<bamboo_subagent::executor::HostBridge>) {
250 *escalation_bridge_slot().lock().unwrap() = bridge;
251}
252
253fn escalation_host_bridge() -> Option<bamboo_subagent::executor::HostBridge> {
254 escalation_bridge_slot().lock().unwrap().clone()
255}
256
257impl ActorChildRunner {
258 #[allow(clippy::too_many_arguments)]
259 pub fn new(
260 agent_id: String,
261 worker_bin: PathBuf,
262 worker_args: Vec<String>,
263 fabric_dir: PathBuf,
264 executor: ExecutorSpec,
265 credentials: Vec<ScopedCredential>,
266 default_provider: String,
267 max_concurrent: usize,
268 ) -> Self {
269 Self {
270 agent_id,
271 worker_bin,
272 worker_args,
273 fabric_dir,
274 executor,
275 credentials,
276 default_provider,
277 concurrency: std::sync::Arc::new(tokio::sync::Semaphore::new(max_concurrent.max(1))),
278 spawn_timeout: Duration::from_secs(30),
279 pool: Arc::new(Mutex::new(HashMap::new())),
280 max_idle_per_key: DEFAULT_MAX_IDLE_PER_KEY,
281 approval_decider: None,
282 nested_spawn_handler: None,
283 }
284 }
285
286 /// Wire the host-side decider for child gated-tool approval requests
287 /// (Phase 2). Without this the host fail-closed DENYs every request.
288 pub fn with_approval_decider(mut self, decider: Arc<dyn ChildApprovalDecider>) -> Self {
289 self.approval_decider = Some(decider);
290 self
291 }
292
293 /// Wire the host-side nested SubAgent spawn handler (Phase 6). Without it, a
294 /// child's nested spawn request fails gracefully ("not available").
295 pub fn with_nested_spawn_handler(mut self, handler: Arc<dyn NestedSpawnHandler>) -> Self {
296 self.nested_spawn_handler = Some(handler);
297 self
298 }
299
300 /// Reuse fingerprint: two children are interchangeable on one warm worker iff
301 /// they share role, provider, model, workspace, disabled-tool set, AND every
302 /// capability the worker BAKES at provision time (`BambooRuntimeExecutor`
303 /// stamps these once and reuses them across runs): nesting depth, nested-spawn
304 /// stack, bypass mode, permission enforcement, and the depth cap. Omitting any
305 /// of these lets the pool hand a run a worker baked for a DIFFERENT posture —
306 /// e.g. a depth-1 worker (with its own spawn stack) reused for a depth-4
307 /// child would re-stamp `spawn_depth=1` and pass the depth-cap check, breaking
308 /// the recursion bound; or a bypass worker reused for a non-bypass child. So
309 /// these MUST split the pool bucket. Everything else (assignment, history) is
310 /// shipped per-run in the `RunSpec` and does not affect the fingerprint.
311 fn fingerprint(spec: &ProvisionSpec) -> String {
312 let role = spec.identity.role.as_str();
313 let (provider, model) = spec
314 .model
315 .as_ref()
316 .map(|m| (m.provider.as_str(), m.model.as_str()))
317 .unwrap_or(("", ""));
318 let workspace = spec.workspace.as_deref().unwrap_or("");
319 let mut tools = spec.disabled_tools.clone().unwrap_or_default();
320 tools.sort();
321 let caps = &spec.capabilities;
322 format!(
323 "{role}\u{1}{provider}\u{1}{model}\u{1}{workspace}\u{1}{}\u{1}d={}\u{1}ns={}\u{1}by={}\u{1}ep={}\u{1}md={}",
324 tools.join(","),
325 spec.identity.depth,
326 caps.nested_spawn,
327 caps.bypass,
328 caps.enforce_permissions,
329 caps.max_spawn_depth.unwrap_or(0),
330 )
331 }
332
333 /// Check out a worker for this assignment: reuse a live pooled one matching
334 /// `key`, else spawn a fresh reusable worker.
335 async fn acquire_worker(
336 &self,
337 key: &str,
338 spec: &ProvisionSpec,
339 ) -> crate::runtime::runner::Result<PooledActor> {
340 // Drain the pool bucket, validating liveness; a worker that hit its idle
341 // timeout has exited and withdrawn its fabric record — skip and reap it.
342 loop {
343 let candidate = {
344 let mut pool = self.pool.lock().await;
345 pool.get_mut(key).and_then(|bucket| bucket.pop())
346 };
347 let Some(candidate) = candidate else { break };
348 let alive = Fabric::at(&self.fabric_dir)
349 .resolve(&candidate.agent_id)
350 .await
351 .ok()
352 .flatten()
353 .is_some();
354 if alive {
355 return Ok(candidate);
356 }
357 candidate.worker.kill().await;
358 }
359
360 let spawned = spawn_worker(
361 &self.worker_bin,
362 &self.worker_args,
363 spec,
364 self.spawn_timeout,
365 )
366 .await
367 .map_err(|e| AgentError::LLM(format!("actor spawn/register failed: {e}")))?;
368 let endpoint = spawned.record.endpoint.clone();
369 let agent_id = spawned.record.agent_id.clone();
370 Ok(PooledActor {
371 worker: spawned,
372 endpoint,
373 agent_id,
374 })
375 }
376
377 /// Park a worker for reuse after a clean run; if the bucket is full, retire it.
378 async fn release_worker(&self, key: &str, actor: PooledActor) {
379 let mut pool = self.pool.lock().await;
380 let bucket = pool.entry(key.to_string()).or_default();
381 if bucket.len() >= self.max_idle_per_key {
382 drop(pool);
383 self.retire_worker(actor).await;
384 return;
385 }
386 bucket.push(actor);
387 }
388
389 /// Forcefully stop a worker and clean its discovery record.
390 async fn retire_worker(&self, actor: PooledActor) {
391 let agent_id = actor.agent_id.clone();
392 actor.worker.kill().await;
393 let _ = Fabric::at(&self.fabric_dir).withdraw(&agent_id).await;
394 }
395
396 /// Assemble the parent-resolved provisioning document for this child.
397 fn build_spec(&self, session: &Session, job: &SpawnJob) -> ProvisionSpec {
398 let mut spec = ProvisionSpec::new(
399 ChildIdentity {
400 child_id: job.child_session_id.clone(),
401 parent_id: Some(job.parent_session_id.clone()),
402 project_key: None,
403 role: session
404 .metadata
405 .get("subagent_type")
406 .cloned()
407 .unwrap_or_else(|| "worker".to_string()),
408 // The child session already carries the correct depth
409 // (create_child_action's new_child_of did parent.spawn_depth+1);
410 // stamp it so the worker can re-establish it on its run session
411 // and enforce the max-depth cap across the actor boundary.
412 depth: session.spawn_depth,
413 },
414 self.executor.clone(),
415 self.fabric_dir.to_string_lossy().into_owned(),
416 );
417 spec.workspace = session.workspace.clone();
418 // Final model: the session's pinned model_ref (create.model / routing already applied),
419 // falling back to the job's bare model on the parent's default provider.
420 spec.model = session
421 .model_ref
422 .as_ref()
423 .map(|r| ModelRefSpec {
424 provider: r.provider.clone(),
425 model: r.model.clone(),
426 })
427 .or_else(|| {
428 let m = job.model.trim();
429 (!m.is_empty()).then(|| ModelRefSpec {
430 provider: self.default_provider.clone(),
431 model: m.to_string(),
432 })
433 });
434 spec.disabled_tools = job.disabled_tools.clone();
435 // Least-privilege secrets: only the credential for the child's provider.
436 let provider = spec
437 .model
438 .as_ref()
439 .map(|m| m.provider.as_str())
440 .filter(|p| !p.trim().is_empty())
441 .unwrap_or(&self.default_provider);
442 if let Some(cred) = self.credentials.iter().find(|c| c.provider == provider) {
443 spec.secrets.provider_credentials.push(cred.clone());
444 } else {
445 tracing::warn!(
446 "actor child {}: no credential found for provider '{}'",
447 job.child_session_id,
448 provider
449 );
450 }
451 // Phase 6 (direct nested execution): a worker BELOW the depth cap may
452 // orchestrate its OWN children — on startup it builds its own spawn
453 // stack and runs the real SubAgent tool (no host proxy). The cap (the
454 // SubAgent tool refuses to spawn at/over `max_spawn_depth`) bounds the
455 // recursion. Driven purely by the child's depth, so it auto-propagates
456 // down the tree without any extra config threading.
457 spec.capabilities.nested_spawn = session.spawn_depth < MAX_SPAWN_DEPTH;
458 spec.capabilities.max_spawn_depth = Some(MAX_SPAWN_DEPTH);
459 // Propagate "bypass permissions" so a self-orchestrating worker knows it
460 // is a bypassed parent and installs the off-loop model-reviewer for its
461 // children's forced-ask actions (Phase 6, Part B). The child session
462 // already carries the inherited flag (create_child_action seeds it).
463 spec.capabilities.bypass = session
464 .agent_runtime_state
465 .as_ref()
466 .is_some_and(|s| s.bypass_permissions);
467 spec
468 }
469}
470
471#[async_trait]
472impl ExternalChildRunner for ActorChildRunner {
473 async fn should_handle(&self, session: &Session) -> bool {
474 session.metadata.get("runtime.kind") == Some(&"external".to_string())
475 && session.metadata.get("external.protocol") == Some(&"actor".to_string())
476 && session.metadata.get("external.agent_id") == Some(&self.agent_id)
477 }
478
479 async fn execute_external_child(
480 &self,
481 session: &mut Session,
482 job: &SpawnJob,
483 event_tx: mpsc::Sender<AgentEvent>,
484 cancel_token: CancellationToken,
485 ) -> crate::runtime::runner::Result<()> {
486 let assignment = extract_assignment(session);
487 let mut spec = self.build_spec(session, job);
488 // Make every actor a warm, reusable worker so the pool can recycle it for
489 // the next sibling with a matching fingerprint.
490 spec.reusable = true;
491 if spec.limits.idle_timeout_secs.is_none() {
492 spec.limits.idle_timeout_secs = Some(POOLED_IDLE_TIMEOUT_SECS);
493 }
494 let pool_key = Self::fingerprint(&spec);
495 // Rehydration: the child session in the parent's store is the actor's
496 // durable state. Ship the full conversation so a reactivation
497 // (send_message / update / rerun) carries its history. A reused worker is
498 // stateless between runs, so this is also what isolates each child's
499 // context on a shared process.
500 let messages: Vec<serde_json::Value> = session
501 .messages
502 .iter()
503 .filter_map(|m| serde_json::to_value(m).ok())
504 .collect();
505
506 // Backpressure: hold a concurrency slot for the lifetime of the *run*
507 // (cancellation still proceeds — the cancel branch in drive() runs while
508 // we hold the permit). Released when this fn returns, i.e. once the worker
509 // is parked back into the pool, so idle workers don't pin slots.
510 let _slot = self
511 .concurrency
512 .acquire()
513 .await
514 .map_err(|_| AgentError::LLM("actor concurrency limiter closed".to_string()))?;
515
516 // Check out a warm worker (reuse-or-spawn).
517 let mut actor = self.acquire_worker(&pool_key, &spec).await?;
518
519 let mut client = match ChildClient::connect(&actor.endpoint).await {
520 Ok(client) => client,
521 Err(e) => {
522 // The pooled worker may have died between checkout and connect;
523 // retire it and spawn one fresh, once.
524 self.retire_worker(actor).await;
525 let spawned = spawn_worker(
526 &self.worker_bin,
527 &self.worker_args,
528 &spec,
529 self.spawn_timeout,
530 )
531 .await
532 .map_err(|e2| {
533 AgentError::LLM(format!("actor respawn after reuse miss ({e}): {e2}"))
534 })?;
535 let endpoint = spawned.record.endpoint.clone();
536 let agent_id = spawned.record.agent_id.clone();
537 let client = ChildClient::connect(&endpoint)
538 .await
539 .map_err(|e2| AgentError::LLM(format!("actor connect failed: {e2}")))?;
540 actor = PooledActor {
541 worker: spawned,
542 endpoint,
543 agent_id,
544 };
545 client
546 }
547 };
548
549 client
550 .send(ParentFrame::Run(RunSpec {
551 assignment,
552 reasoning_effort: None,
553 messages,
554 }))
555 .await
556 .map_err(|e| AgentError::LLM(format!("actor run dispatch failed: {e}")))?;
557
558 // Register as a live actor so send_message (running, no interrupt) can
559 // steer this child in-band over the existing WS connection. The guard
560 // unregisters on every exit path.
561 let (live_tx, mut live_rx) = mpsc::unbounded_channel::<ParentFrame>();
562 let live_guard = super::live::register(&job.child_session_id, live_tx);
563
564 // The nested-spawn handler is normally the process-global installed at
565 // startup (see `set_nested_spawn_handler`); an explicit per-runner field
566 // overrides it (used in tests).
567 let nested_handler = self
568 .nested_spawn_handler
569 .clone()
570 .or_else(nested_spawn_handler);
571 let result = drive(
572 &mut client,
573 &job.child_session_id,
574 self.approval_decider.as_ref(),
575 nested_handler.as_ref(),
576 &event_tx,
577 &cancel_token,
578 &mut live_rx,
579 )
580 .await;
581 // Unregister IMMEDIATELY: after drive returns nobody consumes live_rx,
582 // so a send_message landing in the close/park window below must see
583 // "not live" and take the durable-queue fallback instead of vanishing.
584 // (Even if one slipped in earlier, send_message also appends it to the
585 // durable transcript, so the next activation still rehydrates it.)
586 drop(live_guard);
587
588 // Close the connection: the worker's serve loop then accepts the next
589 // assignment (reuse) or idles out. Park the worker on a clean run; retire
590 // it on error/cancel (a wedged worker must not be reused).
591 let _ = client.close().await;
592 match &result {
593 Ok(_) => self.release_worker(&pool_key, actor).await,
594 Err(_) => self.retire_worker(actor).await,
595 }
596
597 // Write-back: persist the actor's final reply onto the child session so
598 // the transcript survives and the NEXT activation sees it as history.
599 // (run_child_spawn saves the session right after we return.)
600 match result {
601 Ok(Some(text)) => {
602 if !text.is_empty() {
603 session.add_message(bamboo_agent_core::Message::assistant(text, None));
604 }
605 Ok(())
606 }
607 Ok(None) => Ok(()),
608 Err(e) => Err(e),
609 }
610 }
611}
612
613/// Pump child frames -> parent events until a terminal frame (or cancellation).
614/// On success, yields the actor's final result text (for session write-back).
615/// `live_rx` carries in-band frames (steering messages) from the live registry.
616#[allow(clippy::too_many_arguments)]
617async fn drive(
618 client: &mut ChildClient,
619 child_session_id: &str,
620 approval_decider: Option<&Arc<dyn ChildApprovalDecider>>,
621 nested_spawn_handler: Option<&Arc<dyn NestedSpawnHandler>>,
622 event_tx: &mpsc::Sender<AgentEvent>,
623 cancel_token: &CancellationToken,
624 live_rx: &mut mpsc::UnboundedReceiver<ParentFrame>,
625) -> crate::runtime::runner::Result<Option<String>> {
626 loop {
627 tokio::select! {
628 _ = cancel_token.cancelled() => {
629 // fall through to the cancel handling below
630 break;
631 }
632 Some(frame) = live_rx.recv() => {
633 // Forward in-band steering to the worker over the existing WS.
634 if client.send(frame).await.is_err() {
635 tracing::warn!("live steering frame could not be sent; connection failing");
636 }
637 }
638 frame = client.next_frame() => {
639 match frame {
640 Ok(Some(ChildFrame::Event { event })) => {
641 // AgentEvent is serialized verbatim on the wire (zero mapping).
642 if let Ok(ev) = serde_json::from_value::<AgentEvent>(event) {
643 let _ = event_tx.send(ev).await;
644 }
645 }
646 Ok(Some(ChildFrame::SubagentRequest { id, body })) => {
647 // Phase 6: a nested worker child proxied a SubAgent tool
648 // call back to the host. A wired `NestedSpawnHandler` runs
649 // the real spawn (parenting a grandchild under this child)
650 // and returns the tool result; with none wired we reply
651 // with a graceful "not available" error so the worker's
652 // tool call fails cleanly instead of hanging.
653 let reply =
654 fulfil_nested_spawn(nested_spawn_handler, child_session_id, body).await;
655 if client
656 .send(ParentFrame::SubagentReply { id, body: reply })
657 .await
658 .is_err()
659 {
660 tracing::warn!("failed to answer subagent_request; connection failing");
661 }
662 }
663 Ok(Some(ChildFrame::ApprovalRequest { id, body })) => {
664 // Phase 2: a worker proxied a gated-tool approval back to
665 // the host. The WORKER side is live — its executor installs
666 // a per-run task-local `ApprovalProxy` (subagent_worker.rs)
667 // that calls `host.approval_call`, so this frame arrives
668 // when a child hits `ConfirmationRequired`.
669 if let Some(reviewer) = child_approval_reviewer() {
670 // Phase 6, Part B: a BYPASSED parent worker
671 // model-reviews its children's forced-ask (dangerous)
672 // actions. The review is an LLM call, so run it OFF
673 // the frame pump in a spawned task and deliver the
674 // verdict async via the live channel — the pump keeps
675 // forwarding events and the agent loop never blocks. A
676 // timeout denies a hung review so the child can't hang.
677 let child = child_session_id.to_string();
678 let req_id = id.clone();
679 let body = body.clone();
680 tokio::spawn(async move {
681 let approved = tokio::time::timeout(
682 CHILD_APPROVAL_TIMEOUT,
683 reviewer.review(&child, &body),
684 )
685 .await
686 .unwrap_or(false);
687 super::live::deliver_approval(&child, &req_id, approved);
688 });
689 } else if approval_decider.is_some() {
690 // A decider is wired (policy / auto): decide promptly
691 // and reply inline. (Must not block the pump — see the
692 // `ChildApprovalDecider` doc.)
693 let approved =
694 decide_child_approval(approval_decider, child_session_id, &body)
695 .await;
696 if client
697 .send(ParentFrame::ApprovalReply { id, approved })
698 .await
699 .is_err()
700 {
701 tracing::warn!(
702 "failed to answer approval_request; connection failing"
703 );
704 }
705 } else if let Some(host) = escalation_host_bridge() {
706 // Non-bypass WORKER: ESCALATE up our own actor link
707 // (re-proxy) so the request chains to our parent — and
708 // up every level until a bypass level (model-review) or
709 // the top orchestrator (human) decides. Off-loop so the
710 // pump never blocks; relay the reply down to the child.
711 let child = child_session_id.to_string();
712 let req_id = id.clone();
713 let body = body.clone();
714 tokio::spawn(async move {
715 let approved = match tokio::time::timeout(
716 CHILD_APPROVAL_TIMEOUT,
717 host.approval_call(body),
718 )
719 .await
720 {
721 Ok(Ok(reply)) => reply
722 .get("approved")
723 .and_then(|v| v.as_bool())
724 .unwrap_or(false),
725 // Transport error or timeout ⇒ fail closed.
726 _ => false,
727 };
728 super::live::deliver_approval(&child, &req_id, approved);
729 });
730 } else {
731 // Top orchestrator (no escalation bridge): human-in-the-
732 // loop. Surface the request on the parent's event stream
733 // and DEFER — the decision arrives out-of-band via
734 // `live::deliver_approval(child, request_id, approved)`
735 // (→ this child's `live_rx` → forwarded to the worker
736 // above). A timeout denies a never-answered request so
737 // it can't hang the child forever.
738 let (tool_name, permission, resource) =
739 approval_request_fields(&body);
740 let _ = event_tx
741 .send(AgentEvent::ChildApprovalRequested {
742 child_session_id: child_session_id.to_string(),
743 request_id: id.clone(),
744 tool_name,
745 permission,
746 resource,
747 })
748 .await;
749 let child = child_session_id.to_string();
750 tokio::spawn(async move {
751 tokio::time::sleep(CHILD_APPROVAL_TIMEOUT).await;
752 // No-op if already answered (worker ignores an
753 // unknown/duplicate id) or the child is gone.
754 super::live::deliver_approval(&child, &id, false);
755 });
756 }
757 }
758 Ok(Some(ChildFrame::Terminal { status, result, error, .. })) => {
759 return match status {
760 TerminalStatus::Completed => Ok(result),
761 TerminalStatus::Cancelled => Err(AgentError::Cancelled),
762 TerminalStatus::Error => Err(AgentError::LLM(
763 error.unwrap_or_else(|| "actor child errored".to_string()),
764 )),
765 // The suspend/resume round-trip (host re-dispatch of a
766 // nested parent) is not wired here yet; a worker in
767 // this build never emits Suspended, so this is
768 // unreachable in practice.
769 TerminalStatus::Suspended => Err(AgentError::LLM(
770 "nested sub-agent suspend received but resume transport is not wired"
771 .to_string(),
772 )),
773 };
774 }
775 Ok(None) => {
776 return Err(AgentError::LLM(
777 "actor child closed before terminal".to_string(),
778 ));
779 }
780 Err(e) => {
781 return Err(AgentError::LLM(format!("actor transport error: {e}")));
782 }
783 }
784 }
785 }
786 }
787
788 // Only reached on cancellation: ask the child to stop (best-effort), then report cancelled.
789 let _ = client.send(ParentFrame::Cancel).await;
790 Err(AgentError::Cancelled)
791}
792
793/// The assignment text = the child session's latest user message (falls back to its title).
794fn extract_assignment(session: &Session) -> String {
795 session
796 .messages
797 .iter()
798 .rev()
799 .find(|m| matches!(m.role, Role::User))
800 .map(|m| m.content.clone())
801 .unwrap_or_else(|| {
802 session
803 .metadata
804 .get("title")
805 .cloned()
806 .unwrap_or_else(|| "Execute task".to_string())
807 })
808}
809
810#[cfg(test)]
811mod tests {
812 use super::*;
813
814 fn spec_with(
815 role: &str,
816 provider: &str,
817 model: &str,
818 workspace: Option<&str>,
819 disabled: Option<Vec<&str>>,
820 ) -> ProvisionSpec {
821 let mut spec = ProvisionSpec::new(
822 ChildIdentity {
823 child_id: "c".into(),
824 parent_id: None,
825 project_key: None,
826 role: role.into(),
827 depth: 0,
828 },
829 ExecutorSpec::Echo,
830 "/tmp/fab".into(),
831 );
832 spec.workspace = workspace.map(|w| w.to_string());
833 spec.model = Some(ModelRefSpec {
834 provider: provider.into(),
835 model: model.into(),
836 });
837 spec.disabled_tools = disabled.map(|d| d.into_iter().map(String::from).collect());
838 spec
839 }
840
841 #[test]
842 fn fingerprint_matches_interchangeable_children() {
843 // Same role/provider/model/workspace and equal tool sets (order-insensitive)
844 // are interchangeable on one warm worker — and differ only in child_id.
845 let a = spec_with(
846 "explorer",
847 "p",
848 "m",
849 Some("/ws"),
850 Some(vec!["Bash", "Edit"]),
851 );
852 let mut b = spec_with(
853 "explorer",
854 "p",
855 "m",
856 Some("/ws"),
857 Some(vec!["Edit", "Bash"]),
858 );
859 b.identity.child_id = "other".into();
860 assert_eq!(
861 ActorChildRunner::fingerprint(&a),
862 ActorChildRunner::fingerprint(&b)
863 );
864 }
865
866 #[test]
867 fn fingerprint_separates_distinct_runtimes() {
868 let base = spec_with("explorer", "p", "m", Some("/ws"), None);
869 let base_fp = ActorChildRunner::fingerprint(&base);
870 // Each axis that is baked into the worker must split the pool bucket.
871 assert_ne!(
872 base_fp,
873 ActorChildRunner::fingerprint(&spec_with("writer", "p", "m", Some("/ws"), None))
874 );
875 assert_ne!(
876 base_fp,
877 ActorChildRunner::fingerprint(&spec_with("explorer", "p2", "m", Some("/ws"), None))
878 );
879 assert_ne!(
880 base_fp,
881 ActorChildRunner::fingerprint(&spec_with("explorer", "p", "m2", Some("/ws"), None))
882 );
883 assert_ne!(
884 base_fp,
885 ActorChildRunner::fingerprint(&spec_with("explorer", "p", "m", Some("/ws2"), None))
886 );
887 assert_ne!(
888 base_fp,
889 ActorChildRunner::fingerprint(&spec_with(
890 "explorer",
891 "p",
892 "m",
893 Some("/ws"),
894 Some(vec!["Bash"])
895 ))
896 );
897 }
898
899 #[test]
900 fn fingerprint_splits_on_baked_capabilities() {
901 // Every capability baked once at provision time must split the pool
902 // bucket, else a worker baked for one posture gets reused for another
903 // (e.g. a depth-1 worker re-stamping spawn_depth onto a depth-4 child,
904 // breaking the depth cap; or a bypass worker reused for a non-bypass one).
905 let base_fp =
906 ActorChildRunner::fingerprint(&spec_with("explorer", "p", "m", Some("/ws"), None));
907
908 let mut depth = spec_with("explorer", "p", "m", Some("/ws"), None);
909 depth.identity.depth = 2;
910 assert_ne!(
911 base_fp,
912 ActorChildRunner::fingerprint(&depth),
913 "depth must split"
914 );
915
916 let mut nested = spec_with("explorer", "p", "m", Some("/ws"), None);
917 nested.capabilities.nested_spawn = true;
918 assert_ne!(
919 base_fp,
920 ActorChildRunner::fingerprint(&nested),
921 "nested_spawn must split"
922 );
923
924 let mut bypass = spec_with("explorer", "p", "m", Some("/ws"), None);
925 bypass.capabilities.bypass = true;
926 assert_ne!(
927 base_fp,
928 ActorChildRunner::fingerprint(&bypass),
929 "bypass must split"
930 );
931
932 let mut enforce = spec_with("explorer", "p", "m", Some("/ws"), None);
933 enforce.capabilities.enforce_permissions = true;
934 assert_ne!(
935 base_fp,
936 ActorChildRunner::fingerprint(&enforce),
937 "enforce_permissions must split"
938 );
939
940 let mut cap = spec_with("explorer", "p", "m", Some("/ws"), None);
941 cap.capabilities.max_spawn_depth = Some(8);
942 assert_ne!(
943 base_fp,
944 ActorChildRunner::fingerprint(&cap),
945 "max_spawn_depth must split"
946 );
947 }
948
949 struct StaticDecider(bool);
950
951 #[async_trait]
952 impl ChildApprovalDecider for StaticDecider {
953 async fn decide(&self, _child: &str, _req: &serde_json::Value) -> bool {
954 self.0
955 }
956 }
957
958 #[tokio::test]
959 async fn child_approval_fails_closed_without_decider() {
960 // No decider wired ⇒ the host denies (safe default), unchanged behavior.
961 let body = serde_json::json!({"tool_name":"Bash","permission":"run","resource":"rm -rf /"});
962 assert!(!decide_child_approval(None, "child-1", &body).await);
963 }
964
965 #[tokio::test]
966 async fn child_approval_honors_wired_decider() {
967 let body =
968 serde_json::json!({"tool_name":"Write","permission":"write","resource":"/tmp/x"});
969 let approve: Arc<dyn ChildApprovalDecider> = Arc::new(StaticDecider(true));
970 let deny: Arc<dyn ChildApprovalDecider> = Arc::new(StaticDecider(false));
971 assert!(decide_child_approval(Some(&approve), "child-1", &body).await);
972 assert!(!decide_child_approval(Some(&deny), "child-1", &body).await);
973 }
974
975 struct StaticSpawn(Result<serde_json::Value, String>);
976
977 #[async_trait]
978 impl NestedSpawnHandler for StaticSpawn {
979 async fn spawn_nested(
980 &self,
981 _child: &str,
982 _req: serde_json::Value,
983 ) -> Result<serde_json::Value, String> {
984 self.0.clone()
985 }
986 }
987
988 #[tokio::test]
989 async fn nested_spawn_unavailable_without_handler() {
990 let reply = fulfil_nested_spawn(None, "child-1", serde_json::json!({})).await;
991 assert_eq!(reply["success"], serde_json::json!(false));
992 assert!(reply["result"].as_str().unwrap().contains("not available"));
993 }
994
995 #[tokio::test]
996 async fn nested_spawn_returns_handler_result_or_error_body() {
997 let ok: Arc<dyn NestedSpawnHandler> = Arc::new(StaticSpawn(Ok(
998 serde_json::json!({"success": true, "result": "spawned"}),
999 )));
1000 let reply = fulfil_nested_spawn(Some(&ok), "child-1", serde_json::json!({})).await;
1001 assert_eq!(reply["result"], serde_json::json!("spawned"));
1002
1003 let err: Arc<dyn NestedSpawnHandler> = Arc::new(StaticSpawn(Err("boom".to_string())));
1004 let reply = fulfil_nested_spawn(Some(&err), "child-1", serde_json::json!({})).await;
1005 assert_eq!(reply["success"], serde_json::json!(false));
1006 assert!(reply["result"].as_str().unwrap().contains("boom"));
1007 }
1008
1009 #[test]
1010 fn approval_request_fields_extracts_and_defaults() {
1011 let full = serde_json::json!({"tool_name":"Bash","permission":"run","resource":"ls"});
1012 assert_eq!(
1013 approval_request_fields(&full),
1014 ("Bash".to_string(), "run".to_string(), "ls".to_string())
1015 );
1016 // Missing fields default to empty strings.
1017 let partial = serde_json::json!({"tool_name":"Write"});
1018 assert_eq!(
1019 approval_request_fields(&partial),
1020 ("Write".to_string(), String::new(), String::new())
1021 );
1022 }
1023}