Skip to main content

bamboo_engine/external_agents/
actor_adapter.rs

1//! Actor external child runner.
2//!
3//! Runs a child session as an independent **actor**: a separate OS process with its own
4//! isolated context, speaking the `bamboo-subagent` WebSocket protocol. This is the
5//! engine-side adapter on the `wants_external` seam: it spawns the worker binary, waits for
6//! it to self-register into the Tier-1 file fabric, connects, sends the assignment, and
7//! forwards the child's `AgentEvent`s back onto the parent's `event_tx`.
8//!
9//! The built-in **local actor** instance of this runner is the default runtime for
10//! every sub-agent (the in-process runtime was removed). The expert `externalAgents`
11//! tables can additionally route specific roles to other actor/a2a agents.
12
13use std::collections::HashMap;
14use std::path::PathBuf;
15use std::sync::Arc;
16use std::time::Duration;
17
18use async_trait::async_trait;
19use bamboo_agent_core::{AgentError, AgentEvent, Role, Session};
20use tokio::sync::{mpsc, Mutex};
21use tokio_util::sync::CancellationToken;
22
23use bamboo_subagent::discovery::Fabric;
24use bamboo_subagent::fleet::{spawn_worker, SpawnedChild};
25use bamboo_subagent::proto::{ChildFrame, ParentFrame, RunSpec, TerminalStatus};
26use bamboo_subagent::provision::{
27    ChildIdentity, ExecutorSpec, ModelRefSpec, ProvisionSpec, ScopedCredential,
28};
29use bamboo_subagent::transport::ChildClient;
30
31use crate::runtime::execution::{ExternalChildRunner, SpawnJob};
32
33/// Default cap on simultaneously running actor processes.
34pub const DEFAULT_MAX_CONCURRENT_ACTORS: usize = 8;
35
36/// Max nesting depth for direct nested execution (Phase 6). A worker whose
37/// session `spawn_depth` is below this gets its own spawn stack + the real
38/// SubAgent tool; at/over it, neither (and the tool itself refuses). Mirrors
39/// `bamboo_server_tools::DEFAULT_MAX_SPAWN_DEPTH` (kept in sync; engine can't
40/// depend on server-tools). Root orchestrator = 0 ⇒ 4 levels of sub-agents.
41pub const MAX_SPAWN_DEPTH: u32 = 4;
42
43/// Default cap on idle pooled (warm, reusable) workers kept per fingerprint.
44const DEFAULT_MAX_IDLE_PER_KEY: usize = 4;
45
46/// How long a pooled worker waits for its next assignment before reclaiming
47/// itself (must comfortably exceed the gap between sibling spawns).
48const POOLED_IDLE_TIMEOUT_SECS: u64 = 300;
49
50/// A warm worker parked for reuse: its process handle (killed on drop), the WS
51/// endpoint to reconnect to, and the id it registered under in the fabric.
52struct PooledActor {
53    worker: SpawnedChild,
54    endpoint: String,
55    agent_id: String,
56}
57
58/// Spawns and drives a child session as an independent actor: a `bamboo-subagent` worker process.
59pub struct ActorChildRunner {
60    agent_id: String,
61    worker_bin: PathBuf,
62    worker_args: Vec<String>,
63    fabric_dir: PathBuf,
64    executor: ExecutorSpec,
65    /// Per-provider credentials snapshotted from the parent config at build
66    /// time; the spec carries only the ONE the child's provider needs.
67    credentials: Vec<ScopedCredential>,
68    /// Parent's default provider (used when the child has no explicit one).
69    default_provider: String,
70    /// Backpressure: bounds the number of concurrently *running* actors; further
71    /// runs wait for a slot instead of exploding the process table. (Idle pooled
72    /// workers do not hold a slot.)
73    concurrency: std::sync::Arc<tokio::sync::Semaphore>,
74    spawn_timeout: Duration,
75    /// Warm-worker pool keyed by a reuse fingerprint
76    /// (role/provider/model/workspace/disabled-tools). A finished run parks its
77    /// worker here so the next matching child reuses it instead of spawning a
78    /// fresh process — collapsing N sibling sub-agents onto a few processes.
79    pool: Arc<Mutex<HashMap<String, Vec<PooledActor>>>>,
80    max_idle_per_key: usize,
81    /// Host-side decision for a child's gated-tool approval request (Phase 2).
82    /// `None` ⇒ fail-closed DENY (the safe default). A wired decider (policy or
83    /// human-routing bridge) returns approve/deny over the actor WS.
84    approval_decider: Option<Arc<dyn ChildApprovalDecider>>,
85    /// Host-side fulfilment of a child's nested SubAgent spawn request (Phase 6).
86    /// `None` ⇒ graceful "not available" error (the safe default).
87    nested_spawn_handler: Option<Arc<dyn NestedSpawnHandler>>,
88}
89
90/// Decides how the host answers a child worker's gated-tool approval request
91/// (Phase 2: child → parent approval delegation). Async so an implementation
92/// can consult a policy or defer to a human. With no decider wired the host
93/// replies with a fail-closed DENY.
94///
95/// NOTE: `decide` is awaited inside the per-child frame pump, so an
96/// implementation must resolve promptly (e.g. a policy lookup). A human-in-the-
97/// loop decision that may block indefinitely should instead be delivered
98/// out-of-band as a `ParentFrame::ApprovalReply` via the live steering channel
99/// (`super::live`), which `drive()` already forwards to the worker without
100/// stalling the pump.
101#[async_trait]
102pub trait ChildApprovalDecider: Send + Sync {
103    /// Decide whether `child_session_id` may perform the gated action described
104    /// by `request` (`{tool_name, permission, resource}`).
105    async fn decide(&self, child_session_id: &str, request: &serde_json::Value) -> bool;
106}
107
108/// Resolve a child approval request to approve/deny. Fail-closed (DENY) when no
109/// decider is wired — the single, testable seam for the host-side decision.
110async fn decide_child_approval(
111    decider: Option<&Arc<dyn ChildApprovalDecider>>,
112    child_session_id: &str,
113    request: &serde_json::Value,
114) -> bool {
115    match decider {
116        Some(decider) => decider.decide(child_session_id, request).await,
117        None => false,
118    }
119}
120
121/// How long the host waits for a human approval decision before failing the
122/// child's gated tool closed (DENY). Bounds an unanswered request so it can't
123/// hang the worker indefinitely.
124const CHILD_APPROVAL_TIMEOUT: Duration = Duration::from_secs(300);
125
126/// Extract `(tool_name, permission, resource)` from a worker's approval request
127/// body (`{tool_name, permission, resource}`); missing fields default to empty.
128fn approval_request_fields(body: &serde_json::Value) -> (String, String, String) {
129    let field = |k: &str| {
130        body.get(k)
131            .and_then(|v| v.as_str())
132            .unwrap_or("")
133            .to_string()
134    };
135    (field("tool_name"), field("permission"), field("resource"))
136}
137
138/// Fulfils a child worker's nested SubAgent spawn request on the host (Phase 6:
139/// nested execution). When a child wants a grandchild, its `host.subagent_call`
140/// arrives as a `ChildFrame::SubagentRequest`; an implementation runs the real
141/// spawn (parenting the grandchild under the requesting child) and returns the
142/// SubAgent tool's result JSON. With no handler wired the host replies with a
143/// graceful error so the child's tool fails cleanly instead of hanging.
144#[async_trait]
145pub trait NestedSpawnHandler: Send + Sync {
146    /// Run a nested SubAgent spawn requested by `child_session_id`; `request` is
147    /// the SubAgent tool-call body. Returns the tool-result JSON, or an error
148    /// string (surfaced to the child as a failed tool result).
149    async fn spawn_nested(
150        &self,
151        child_session_id: &str,
152        request: serde_json::Value,
153    ) -> Result<serde_json::Value, String>;
154}
155
156/// The graceful "not available" reply body for an unfulfilled nested spawn.
157fn nested_spawn_unavailable(reason: &str) -> serde_json::Value {
158    serde_json::json!({
159        "success": false,
160        "result": reason,
161        "display_preference": null,
162    })
163}
164
165/// Resolve a nested-spawn request to a `SubagentReply` body. With no handler
166/// wired, returns the graceful "not available" error body (unchanged default).
167async fn fulfil_nested_spawn(
168    handler: Option<&Arc<dyn NestedSpawnHandler>>,
169    child_session_id: &str,
170    request: serde_json::Value,
171) -> serde_json::Value {
172    match handler {
173        Some(handler) => match handler.spawn_nested(child_session_id, request).await {
174            Ok(result) => result,
175            Err(e) => nested_spawn_unavailable(&format!("nested sub-agent spawn failed: {e}")),
176        },
177        None => nested_spawn_unavailable("nested sub-agent spawn is not available in this build"),
178    }
179}
180
181/// Process-global slot for the singleton host-side nested-spawn handler.
182///
183/// Set once at server startup — AFTER the `ChildSessionAdapter` exists, which is
184/// how we resolve the runner→scheduler→adapter construction-order cycle (the
185/// runner is built before the adapter that would be its handler, and it's then
186/// dyn-erased into the composite runner so it can't be reached for a setter).
187/// Read by `drive()` for every child run. Mirrors the `super::live` registry.
188fn nested_spawn_handler_slot() -> &'static std::sync::OnceLock<Arc<dyn NestedSpawnHandler>> {
189    static SLOT: std::sync::OnceLock<Arc<dyn NestedSpawnHandler>> = std::sync::OnceLock::new();
190    &SLOT
191}
192
193/// Install the process-global nested-spawn handler (idempotent; first wins).
194pub fn set_nested_spawn_handler(handler: Arc<dyn NestedSpawnHandler>) {
195    let _ = nested_spawn_handler_slot().set(handler);
196}
197
198/// The process-global nested-spawn handler, if installed.
199pub fn nested_spawn_handler() -> Option<Arc<dyn NestedSpawnHandler>> {
200    nested_spawn_handler_slot().get().cloned()
201}
202
203/// Off-loop reviewer for a child's gated-tool approval request (Phase 6, Part B).
204///
205/// Installed (process-global) by a BYPASSED self-orchestrating worker so its
206/// children's forced-ask (dangerous) gated actions — which still raise
207/// `ConfirmationRequired` even under bypass — get an LLM reasonableness check
208/// rather than a blind pass. `review` is an LLM call: `drive()` invokes it in a
209/// SPAWNED task (NEVER in the frame pump) and delivers the verdict async via the
210/// live channel, so the agent loop is never blocked.
211#[async_trait]
212pub trait ChildApprovalReviewer: Send + Sync {
213    /// Judge whether the gated action `request` (`{tool_name, permission,
214    /// resource}`) is reasonable for `child_session_id`'s task. `true` = approve.
215    async fn review(&self, child_session_id: &str, request: &serde_json::Value) -> bool;
216}
217
218fn child_approval_reviewer_slot() -> &'static std::sync::OnceLock<Arc<dyn ChildApprovalReviewer>> {
219    static SLOT: std::sync::OnceLock<Arc<dyn ChildApprovalReviewer>> = std::sync::OnceLock::new();
220    &SLOT
221}
222
223/// Install the process-global child-approval reviewer (idempotent; first wins).
224pub fn set_child_approval_reviewer(reviewer: Arc<dyn ChildApprovalReviewer>) {
225    let _ = child_approval_reviewer_slot().set(reviewer);
226}
227
228/// The process-global child-approval reviewer, if installed.
229pub fn child_approval_reviewer() -> Option<Arc<dyn ChildApprovalReviewer>> {
230    child_approval_reviewer_slot().get().cloned()
231}
232
233/// Per-run escalation bridge for non-bypass child-approval routing (Phase 6,
234/// Part B). A WORKER's `run()` installs its OWN host bridge here; its `drive()`
235/// (driving grandchildren) uses it to RE-PROXY a child's approval request UP to
236/// its own parent — chaining the request up every level until a bypass level
237/// (model-review) or the top orchestrator (human) decides, then relaying the
238/// reply back down. The top server never installs one ⇒ its drive() falls to the
239/// human-loop. Set per-run (a worker serves runs sequentially, so one active
240/// bridge); a stale bridge from a finished run errors on call ⇒ fail-closed.
241fn escalation_bridge_slot(
242) -> &'static std::sync::Mutex<Option<bamboo_subagent::executor::HostBridge>> {
243    static SLOT: std::sync::Mutex<Option<bamboo_subagent::executor::HostBridge>> =
244        std::sync::Mutex::new(None);
245    &SLOT
246}
247
248/// Install (or clear with `None`) the process escalation host bridge.
249pub fn set_escalation_host_bridge(bridge: Option<bamboo_subagent::executor::HostBridge>) {
250    *escalation_bridge_slot().lock().unwrap() = bridge;
251}
252
253fn escalation_host_bridge() -> Option<bamboo_subagent::executor::HostBridge> {
254    escalation_bridge_slot().lock().unwrap().clone()
255}
256
257impl ActorChildRunner {
258    #[allow(clippy::too_many_arguments)]
259    pub fn new(
260        agent_id: String,
261        worker_bin: PathBuf,
262        worker_args: Vec<String>,
263        fabric_dir: PathBuf,
264        executor: ExecutorSpec,
265        credentials: Vec<ScopedCredential>,
266        default_provider: String,
267        max_concurrent: usize,
268    ) -> Self {
269        Self {
270            agent_id,
271            worker_bin,
272            worker_args,
273            fabric_dir,
274            executor,
275            credentials,
276            default_provider,
277            concurrency: std::sync::Arc::new(tokio::sync::Semaphore::new(max_concurrent.max(1))),
278            spawn_timeout: Duration::from_secs(30),
279            pool: Arc::new(Mutex::new(HashMap::new())),
280            max_idle_per_key: DEFAULT_MAX_IDLE_PER_KEY,
281            approval_decider: None,
282            nested_spawn_handler: None,
283        }
284    }
285
286    /// Wire the host-side decider for child gated-tool approval requests
287    /// (Phase 2). Without this the host fail-closed DENYs every request.
288    pub fn with_approval_decider(mut self, decider: Arc<dyn ChildApprovalDecider>) -> Self {
289        self.approval_decider = Some(decider);
290        self
291    }
292
293    /// Wire the host-side nested SubAgent spawn handler (Phase 6). Without it, a
294    /// child's nested spawn request fails gracefully ("not available").
295    pub fn with_nested_spawn_handler(mut self, handler: Arc<dyn NestedSpawnHandler>) -> Self {
296        self.nested_spawn_handler = Some(handler);
297        self
298    }
299
300    /// Reuse fingerprint: two children are interchangeable on one warm worker iff
301    /// they share role, provider, model, workspace, disabled-tool set, AND every
302    /// capability the worker BAKES at provision time (`BambooRuntimeExecutor`
303    /// stamps these once and reuses them across runs): nesting depth, nested-spawn
304    /// stack, bypass mode, permission enforcement, and the depth cap. Omitting any
305    /// of these lets the pool hand a run a worker baked for a DIFFERENT posture —
306    /// e.g. a depth-1 worker (with its own spawn stack) reused for a depth-4
307    /// child would re-stamp `spawn_depth=1` and pass the depth-cap check, breaking
308    /// the recursion bound; or a bypass worker reused for a non-bypass child. So
309    /// these MUST split the pool bucket. Everything else (assignment, history) is
310    /// shipped per-run in the `RunSpec` and does not affect the fingerprint.
311    fn fingerprint(spec: &ProvisionSpec) -> String {
312        let role = spec.identity.role.as_str();
313        let (provider, model) = spec
314            .model
315            .as_ref()
316            .map(|m| (m.provider.as_str(), m.model.as_str()))
317            .unwrap_or(("", ""));
318        let workspace = spec.workspace.as_deref().unwrap_or("");
319        let mut tools = spec.disabled_tools.clone().unwrap_or_default();
320        tools.sort();
321        let caps = &spec.capabilities;
322        format!(
323            "{role}\u{1}{provider}\u{1}{model}\u{1}{workspace}\u{1}{}\u{1}d={}\u{1}ns={}\u{1}by={}\u{1}ep={}\u{1}md={}",
324            tools.join(","),
325            spec.identity.depth,
326            caps.nested_spawn,
327            caps.bypass,
328            caps.enforce_permissions,
329            caps.max_spawn_depth.unwrap_or(0),
330        )
331    }
332
333    /// Check out a worker for this assignment: reuse a live pooled one matching
334    /// `key`, else spawn a fresh reusable worker.
335    async fn acquire_worker(
336        &self,
337        key: &str,
338        spec: &ProvisionSpec,
339    ) -> crate::runtime::runner::Result<PooledActor> {
340        // Drain the pool bucket, validating liveness; a worker that hit its idle
341        // timeout has exited and withdrawn its fabric record — skip and reap it.
342        loop {
343            let candidate = {
344                let mut pool = self.pool.lock().await;
345                pool.get_mut(key).and_then(|bucket| bucket.pop())
346            };
347            let Some(candidate) = candidate else { break };
348            let alive = Fabric::at(&self.fabric_dir)
349                .resolve(&candidate.agent_id)
350                .await
351                .ok()
352                .flatten()
353                .is_some();
354            if alive {
355                return Ok(candidate);
356            }
357            candidate.worker.kill().await;
358        }
359
360        let spawned = spawn_worker(
361            &self.worker_bin,
362            &self.worker_args,
363            spec,
364            self.spawn_timeout,
365        )
366        .await
367        .map_err(|e| AgentError::LLM(format!("actor spawn/register failed: {e}")))?;
368        let endpoint = spawned.record.endpoint.clone();
369        let agent_id = spawned.record.agent_id.clone();
370        Ok(PooledActor {
371            worker: spawned,
372            endpoint,
373            agent_id,
374        })
375    }
376
377    /// Park a worker for reuse after a clean run; if the bucket is full, retire it.
378    async fn release_worker(&self, key: &str, actor: PooledActor) {
379        let mut pool = self.pool.lock().await;
380        let bucket = pool.entry(key.to_string()).or_default();
381        if bucket.len() >= self.max_idle_per_key {
382            drop(pool);
383            self.retire_worker(actor).await;
384            return;
385        }
386        bucket.push(actor);
387    }
388
389    /// Forcefully stop a worker and clean its discovery record.
390    async fn retire_worker(&self, actor: PooledActor) {
391        let agent_id = actor.agent_id.clone();
392        actor.worker.kill().await;
393        let _ = Fabric::at(&self.fabric_dir).withdraw(&agent_id).await;
394    }
395
396    /// Assemble the parent-resolved provisioning document for this child.
397    fn build_spec(&self, session: &Session, job: &SpawnJob) -> ProvisionSpec {
398        let mut spec = ProvisionSpec::new(
399            ChildIdentity {
400                child_id: job.child_session_id.clone(),
401                parent_id: Some(job.parent_session_id.clone()),
402                project_key: None,
403                role: session
404                    .metadata
405                    .get("subagent_type")
406                    .cloned()
407                    .unwrap_or_else(|| "worker".to_string()),
408                // The child session already carries the correct depth
409                // (create_child_action's new_child_of did parent.spawn_depth+1);
410                // stamp it so the worker can re-establish it on its run session
411                // and enforce the max-depth cap across the actor boundary.
412                depth: session.spawn_depth,
413            },
414            self.executor.clone(),
415            self.fabric_dir.to_string_lossy().into_owned(),
416        );
417        spec.workspace = session.workspace.clone();
418        // Final model: the session's pinned model_ref (create.model / routing already applied),
419        // falling back to the job's bare model on the parent's default provider.
420        spec.model = session
421            .model_ref
422            .as_ref()
423            .map(|r| ModelRefSpec {
424                provider: r.provider.clone(),
425                model: r.model.clone(),
426            })
427            .or_else(|| {
428                let m = job.model.trim();
429                (!m.is_empty()).then(|| ModelRefSpec {
430                    provider: self.default_provider.clone(),
431                    model: m.to_string(),
432                })
433            });
434        spec.disabled_tools = job.disabled_tools.clone();
435        // Least-privilege secrets: only the credential for the child's provider.
436        let provider = spec
437            .model
438            .as_ref()
439            .map(|m| m.provider.as_str())
440            .filter(|p| !p.trim().is_empty())
441            .unwrap_or(&self.default_provider);
442        if let Some(cred) = self.credentials.iter().find(|c| c.provider == provider) {
443            spec.secrets.provider_credentials.push(cred.clone());
444        } else {
445            tracing::warn!(
446                "actor child {}: no credential found for provider '{}'",
447                job.child_session_id,
448                provider
449            );
450        }
451        // Phase 6 (direct nested execution): a worker BELOW the depth cap may
452        // orchestrate its OWN children — on startup it builds its own spawn
453        // stack and runs the real SubAgent tool (no host proxy). The cap (the
454        // SubAgent tool refuses to spawn at/over `max_spawn_depth`) bounds the
455        // recursion. Driven purely by the child's depth, so it auto-propagates
456        // down the tree without any extra config threading.
457        spec.capabilities.nested_spawn = session.spawn_depth < MAX_SPAWN_DEPTH;
458        spec.capabilities.max_spawn_depth = Some(MAX_SPAWN_DEPTH);
459        // Propagate "bypass permissions" so a self-orchestrating worker knows it
460        // is a bypassed parent and installs the off-loop model-reviewer for its
461        // children's forced-ask actions (Phase 6, Part B). The child session
462        // already carries the inherited flag (create_child_action seeds it).
463        spec.capabilities.bypass = session
464            .agent_runtime_state
465            .as_ref()
466            .is_some_and(|s| s.bypass_permissions);
467        spec
468    }
469}
470
471#[async_trait]
472impl ExternalChildRunner for ActorChildRunner {
473    async fn should_handle(&self, session: &Session) -> bool {
474        session.metadata.get("runtime.kind") == Some(&"external".to_string())
475            && session.metadata.get("external.protocol") == Some(&"actor".to_string())
476            && session.metadata.get("external.agent_id") == Some(&self.agent_id)
477    }
478
479    async fn execute_external_child(
480        &self,
481        session: &mut Session,
482        job: &SpawnJob,
483        event_tx: mpsc::Sender<AgentEvent>,
484        cancel_token: CancellationToken,
485    ) -> crate::runtime::runner::Result<()> {
486        let assignment = extract_assignment(session);
487        let mut spec = self.build_spec(session, job);
488        // Make every actor a warm, reusable worker so the pool can recycle it for
489        // the next sibling with a matching fingerprint.
490        spec.reusable = true;
491        if spec.limits.idle_timeout_secs.is_none() {
492            spec.limits.idle_timeout_secs = Some(POOLED_IDLE_TIMEOUT_SECS);
493        }
494        let pool_key = Self::fingerprint(&spec);
495        // Rehydration: the child session in the parent's store is the actor's
496        // durable state. Ship the full conversation so a reactivation
497        // (send_message / update / rerun) carries its history. A reused worker is
498        // stateless between runs, so this is also what isolates each child's
499        // context on a shared process.
500        let messages: Vec<serde_json::Value> = session
501            .messages
502            .iter()
503            .filter_map(|m| serde_json::to_value(m).ok())
504            .collect();
505
506        // Backpressure: hold a concurrency slot for the lifetime of the *run*
507        // (cancellation still proceeds — the cancel branch in drive() runs while
508        // we hold the permit). Released when this fn returns, i.e. once the worker
509        // is parked back into the pool, so idle workers don't pin slots.
510        let _slot = self
511            .concurrency
512            .acquire()
513            .await
514            .map_err(|_| AgentError::LLM("actor concurrency limiter closed".to_string()))?;
515
516        // Check out a warm worker (reuse-or-spawn).
517        let mut actor = self.acquire_worker(&pool_key, &spec).await?;
518
519        let mut client = match ChildClient::connect(&actor.endpoint).await {
520            Ok(client) => client,
521            Err(e) => {
522                // The pooled worker may have died between checkout and connect;
523                // retire it and spawn one fresh, once.
524                self.retire_worker(actor).await;
525                let spawned = spawn_worker(
526                    &self.worker_bin,
527                    &self.worker_args,
528                    &spec,
529                    self.spawn_timeout,
530                )
531                .await
532                .map_err(|e2| {
533                    AgentError::LLM(format!("actor respawn after reuse miss ({e}): {e2}"))
534                })?;
535                let endpoint = spawned.record.endpoint.clone();
536                let agent_id = spawned.record.agent_id.clone();
537                let client = ChildClient::connect(&endpoint)
538                    .await
539                    .map_err(|e2| AgentError::LLM(format!("actor connect failed: {e2}")))?;
540                actor = PooledActor {
541                    worker: spawned,
542                    endpoint,
543                    agent_id,
544                };
545                client
546            }
547        };
548
549        client
550            .send(ParentFrame::Run(RunSpec {
551                assignment,
552                reasoning_effort: None,
553                messages,
554            }))
555            .await
556            .map_err(|e| AgentError::LLM(format!("actor run dispatch failed: {e}")))?;
557
558        // Register as a live actor so send_message (running, no interrupt) can
559        // steer this child in-band over the existing WS connection. The guard
560        // unregisters on every exit path.
561        let (live_tx, mut live_rx) = mpsc::unbounded_channel::<ParentFrame>();
562        let live_guard = super::live::register(&job.child_session_id, live_tx);
563
564        // The nested-spawn handler is normally the process-global installed at
565        // startup (see `set_nested_spawn_handler`); an explicit per-runner field
566        // overrides it (used in tests).
567        let nested_handler = self
568            .nested_spawn_handler
569            .clone()
570            .or_else(nested_spawn_handler);
571        let result = drive(
572            &mut client,
573            &job.child_session_id,
574            self.approval_decider.as_ref(),
575            nested_handler.as_ref(),
576            &event_tx,
577            &cancel_token,
578            &mut live_rx,
579        )
580        .await;
581        // Unregister IMMEDIATELY: after drive returns nobody consumes live_rx,
582        // so a send_message landing in the close/park window below must see
583        // "not live" and take the durable-queue fallback instead of vanishing.
584        // (Even if one slipped in earlier, send_message also appends it to the
585        // durable transcript, so the next activation still rehydrates it.)
586        drop(live_guard);
587
588        // Close the connection: the worker's serve loop then accepts the next
589        // assignment (reuse) or idles out. Park the worker on a clean run; retire
590        // it on error/cancel (a wedged worker must not be reused).
591        let _ = client.close().await;
592        match &result {
593            Ok(_) => self.release_worker(&pool_key, actor).await,
594            Err(_) => self.retire_worker(actor).await,
595        }
596
597        // Write-back: persist the actor's final reply onto the child session so
598        // the transcript survives and the NEXT activation sees it as history.
599        // (run_child_spawn saves the session right after we return.)
600        match result {
601            Ok(Some(text)) => {
602                if !text.is_empty() {
603                    session.add_message(bamboo_agent_core::Message::assistant(text, None));
604                }
605                Ok(())
606            }
607            Ok(None) => Ok(()),
608            Err(e) => Err(e),
609        }
610    }
611}
612
613/// Pump child frames -> parent events until a terminal frame (or cancellation).
614/// On success, yields the actor's final result text (for session write-back).
615/// `live_rx` carries in-band frames (steering messages) from the live registry.
616#[allow(clippy::too_many_arguments)]
617async fn drive(
618    client: &mut ChildClient,
619    child_session_id: &str,
620    approval_decider: Option<&Arc<dyn ChildApprovalDecider>>,
621    nested_spawn_handler: Option<&Arc<dyn NestedSpawnHandler>>,
622    event_tx: &mpsc::Sender<AgentEvent>,
623    cancel_token: &CancellationToken,
624    live_rx: &mut mpsc::UnboundedReceiver<ParentFrame>,
625) -> crate::runtime::runner::Result<Option<String>> {
626    loop {
627        tokio::select! {
628            _ = cancel_token.cancelled() => {
629                // fall through to the cancel handling below
630                break;
631            }
632            Some(frame) = live_rx.recv() => {
633                // Forward in-band steering to the worker over the existing WS.
634                if client.send(frame).await.is_err() {
635                    tracing::warn!("live steering frame could not be sent; connection failing");
636                }
637            }
638            frame = client.next_frame() => {
639                match frame {
640                    Ok(Some(ChildFrame::Event { event })) => {
641                        // AgentEvent is serialized verbatim on the wire (zero mapping).
642                        if let Ok(ev) = serde_json::from_value::<AgentEvent>(event) {
643                            let _ = event_tx.send(ev).await;
644                        }
645                    }
646                    Ok(Some(ChildFrame::SubagentRequest { id, body })) => {
647                        // Phase 6: a nested worker child proxied a SubAgent tool
648                        // call back to the host. A wired `NestedSpawnHandler` runs
649                        // the real spawn (parenting a grandchild under this child)
650                        // and returns the tool result; with none wired we reply
651                        // with a graceful "not available" error so the worker's
652                        // tool call fails cleanly instead of hanging.
653                        let reply =
654                            fulfil_nested_spawn(nested_spawn_handler, child_session_id, body).await;
655                        if client
656                            .send(ParentFrame::SubagentReply { id, body: reply })
657                            .await
658                            .is_err()
659                        {
660                            tracing::warn!("failed to answer subagent_request; connection failing");
661                        }
662                    }
663                    Ok(Some(ChildFrame::ApprovalRequest { id, body })) => {
664                        // Phase 2: a worker proxied a gated-tool approval back to
665                        // the host. The WORKER side is live — its executor installs
666                        // a per-run task-local `ApprovalProxy` (subagent_worker.rs)
667                        // that calls `host.approval_call`, so this frame arrives
668                        // when a child hits `ConfirmationRequired`.
669                        if let Some(reviewer) = child_approval_reviewer() {
670                            // Phase 6, Part B: a BYPASSED parent worker
671                            // model-reviews its children's forced-ask (dangerous)
672                            // actions. The review is an LLM call, so run it OFF
673                            // the frame pump in a spawned task and deliver the
674                            // verdict async via the live channel — the pump keeps
675                            // forwarding events and the agent loop never blocks. A
676                            // timeout denies a hung review so the child can't hang.
677                            let child = child_session_id.to_string();
678                            let req_id = id.clone();
679                            let body = body.clone();
680                            tokio::spawn(async move {
681                                let approved = tokio::time::timeout(
682                                    CHILD_APPROVAL_TIMEOUT,
683                                    reviewer.review(&child, &body),
684                                )
685                                .await
686                                .unwrap_or(false);
687                                super::live::deliver_approval(&child, &req_id, approved);
688                            });
689                        } else if approval_decider.is_some() {
690                            // A decider is wired (policy / auto): decide promptly
691                            // and reply inline. (Must not block the pump — see the
692                            // `ChildApprovalDecider` doc.)
693                            let approved =
694                                decide_child_approval(approval_decider, child_session_id, &body)
695                                    .await;
696                            if client
697                                .send(ParentFrame::ApprovalReply { id, approved })
698                                .await
699                                .is_err()
700                            {
701                                tracing::warn!(
702                                    "failed to answer approval_request; connection failing"
703                                );
704                            }
705                        } else if let Some(host) = escalation_host_bridge() {
706                            // Non-bypass WORKER: ESCALATE up our own actor link
707                            // (re-proxy) so the request chains to our parent — and
708                            // up every level until a bypass level (model-review) or
709                            // the top orchestrator (human) decides. Off-loop so the
710                            // pump never blocks; relay the reply down to the child.
711                            let child = child_session_id.to_string();
712                            let req_id = id.clone();
713                            let body = body.clone();
714                            tokio::spawn(async move {
715                                let approved = match tokio::time::timeout(
716                                    CHILD_APPROVAL_TIMEOUT,
717                                    host.approval_call(body),
718                                )
719                                .await
720                                {
721                                    Ok(Ok(reply)) => reply
722                                        .get("approved")
723                                        .and_then(|v| v.as_bool())
724                                        .unwrap_or(false),
725                                    // Transport error or timeout ⇒ fail closed.
726                                    _ => false,
727                                };
728                                super::live::deliver_approval(&child, &req_id, approved);
729                            });
730                        } else {
731                            // Top orchestrator (no escalation bridge): human-in-the-
732                            // loop. Surface the request on the parent's event stream
733                            // and DEFER — the decision arrives out-of-band via
734                            // `live::deliver_approval(child, request_id, approved)`
735                            // (→ this child's `live_rx` → forwarded to the worker
736                            // above). A timeout denies a never-answered request so
737                            // it can't hang the child forever.
738                            let (tool_name, permission, resource) =
739                                approval_request_fields(&body);
740                            let _ = event_tx
741                                .send(AgentEvent::ChildApprovalRequested {
742                                    child_session_id: child_session_id.to_string(),
743                                    request_id: id.clone(),
744                                    tool_name,
745                                    permission,
746                                    resource,
747                                })
748                                .await;
749                            let child = child_session_id.to_string();
750                            tokio::spawn(async move {
751                                tokio::time::sleep(CHILD_APPROVAL_TIMEOUT).await;
752                                // No-op if already answered (worker ignores an
753                                // unknown/duplicate id) or the child is gone.
754                                super::live::deliver_approval(&child, &id, false);
755                            });
756                        }
757                    }
758                    Ok(Some(ChildFrame::Terminal { status, result, error, .. })) => {
759                        return match status {
760                            TerminalStatus::Completed => Ok(result),
761                            TerminalStatus::Cancelled => Err(AgentError::Cancelled),
762                            TerminalStatus::Error => Err(AgentError::LLM(
763                                error.unwrap_or_else(|| "actor child errored".to_string()),
764                            )),
765                            // The suspend/resume round-trip (host re-dispatch of a
766                            // nested parent) is not wired here yet; a worker in
767                            // this build never emits Suspended, so this is
768                            // unreachable in practice.
769                            TerminalStatus::Suspended => Err(AgentError::LLM(
770                                "nested sub-agent suspend received but resume transport is not wired"
771                                    .to_string(),
772                            )),
773                        };
774                    }
775                    Ok(None) => {
776                        return Err(AgentError::LLM(
777                            "actor child closed before terminal".to_string(),
778                        ));
779                    }
780                    Err(e) => {
781                        return Err(AgentError::LLM(format!("actor transport error: {e}")));
782                    }
783                }
784            }
785        }
786    }
787
788    // Only reached on cancellation: ask the child to stop (best-effort), then report cancelled.
789    let _ = client.send(ParentFrame::Cancel).await;
790    Err(AgentError::Cancelled)
791}
792
793/// The assignment text = the child session's latest user message (falls back to its title).
794fn extract_assignment(session: &Session) -> String {
795    session
796        .messages
797        .iter()
798        .rev()
799        .find(|m| matches!(m.role, Role::User))
800        .map(|m| m.content.clone())
801        .unwrap_or_else(|| {
802            session
803                .metadata
804                .get("title")
805                .cloned()
806                .unwrap_or_else(|| "Execute task".to_string())
807        })
808}
809
810#[cfg(test)]
811mod tests {
812    use super::*;
813
814    fn spec_with(
815        role: &str,
816        provider: &str,
817        model: &str,
818        workspace: Option<&str>,
819        disabled: Option<Vec<&str>>,
820    ) -> ProvisionSpec {
821        let mut spec = ProvisionSpec::new(
822            ChildIdentity {
823                child_id: "c".into(),
824                parent_id: None,
825                project_key: None,
826                role: role.into(),
827                depth: 0,
828            },
829            ExecutorSpec::Echo,
830            "/tmp/fab".into(),
831        );
832        spec.workspace = workspace.map(|w| w.to_string());
833        spec.model = Some(ModelRefSpec {
834            provider: provider.into(),
835            model: model.into(),
836        });
837        spec.disabled_tools = disabled.map(|d| d.into_iter().map(String::from).collect());
838        spec
839    }
840
841    #[test]
842    fn fingerprint_matches_interchangeable_children() {
843        // Same role/provider/model/workspace and equal tool sets (order-insensitive)
844        // are interchangeable on one warm worker — and differ only in child_id.
845        let a = spec_with(
846            "explorer",
847            "p",
848            "m",
849            Some("/ws"),
850            Some(vec!["Bash", "Edit"]),
851        );
852        let mut b = spec_with(
853            "explorer",
854            "p",
855            "m",
856            Some("/ws"),
857            Some(vec!["Edit", "Bash"]),
858        );
859        b.identity.child_id = "other".into();
860        assert_eq!(
861            ActorChildRunner::fingerprint(&a),
862            ActorChildRunner::fingerprint(&b)
863        );
864    }
865
866    #[test]
867    fn fingerprint_separates_distinct_runtimes() {
868        let base = spec_with("explorer", "p", "m", Some("/ws"), None);
869        let base_fp = ActorChildRunner::fingerprint(&base);
870        // Each axis that is baked into the worker must split the pool bucket.
871        assert_ne!(
872            base_fp,
873            ActorChildRunner::fingerprint(&spec_with("writer", "p", "m", Some("/ws"), None))
874        );
875        assert_ne!(
876            base_fp,
877            ActorChildRunner::fingerprint(&spec_with("explorer", "p2", "m", Some("/ws"), None))
878        );
879        assert_ne!(
880            base_fp,
881            ActorChildRunner::fingerprint(&spec_with("explorer", "p", "m2", Some("/ws"), None))
882        );
883        assert_ne!(
884            base_fp,
885            ActorChildRunner::fingerprint(&spec_with("explorer", "p", "m", Some("/ws2"), None))
886        );
887        assert_ne!(
888            base_fp,
889            ActorChildRunner::fingerprint(&spec_with(
890                "explorer",
891                "p",
892                "m",
893                Some("/ws"),
894                Some(vec!["Bash"])
895            ))
896        );
897    }
898
899    #[test]
900    fn fingerprint_splits_on_baked_capabilities() {
901        // Every capability baked once at provision time must split the pool
902        // bucket, else a worker baked for one posture gets reused for another
903        // (e.g. a depth-1 worker re-stamping spawn_depth onto a depth-4 child,
904        // breaking the depth cap; or a bypass worker reused for a non-bypass one).
905        let base_fp =
906            ActorChildRunner::fingerprint(&spec_with("explorer", "p", "m", Some("/ws"), None));
907
908        let mut depth = spec_with("explorer", "p", "m", Some("/ws"), None);
909        depth.identity.depth = 2;
910        assert_ne!(
911            base_fp,
912            ActorChildRunner::fingerprint(&depth),
913            "depth must split"
914        );
915
916        let mut nested = spec_with("explorer", "p", "m", Some("/ws"), None);
917        nested.capabilities.nested_spawn = true;
918        assert_ne!(
919            base_fp,
920            ActorChildRunner::fingerprint(&nested),
921            "nested_spawn must split"
922        );
923
924        let mut bypass = spec_with("explorer", "p", "m", Some("/ws"), None);
925        bypass.capabilities.bypass = true;
926        assert_ne!(
927            base_fp,
928            ActorChildRunner::fingerprint(&bypass),
929            "bypass must split"
930        );
931
932        let mut enforce = spec_with("explorer", "p", "m", Some("/ws"), None);
933        enforce.capabilities.enforce_permissions = true;
934        assert_ne!(
935            base_fp,
936            ActorChildRunner::fingerprint(&enforce),
937            "enforce_permissions must split"
938        );
939
940        let mut cap = spec_with("explorer", "p", "m", Some("/ws"), None);
941        cap.capabilities.max_spawn_depth = Some(8);
942        assert_ne!(
943            base_fp,
944            ActorChildRunner::fingerprint(&cap),
945            "max_spawn_depth must split"
946        );
947    }
948
949    struct StaticDecider(bool);
950
951    #[async_trait]
952    impl ChildApprovalDecider for StaticDecider {
953        async fn decide(&self, _child: &str, _req: &serde_json::Value) -> bool {
954            self.0
955        }
956    }
957
958    #[tokio::test]
959    async fn child_approval_fails_closed_without_decider() {
960        // No decider wired ⇒ the host denies (safe default), unchanged behavior.
961        let body = serde_json::json!({"tool_name":"Bash","permission":"run","resource":"rm -rf /"});
962        assert!(!decide_child_approval(None, "child-1", &body).await);
963    }
964
965    #[tokio::test]
966    async fn child_approval_honors_wired_decider() {
967        let body =
968            serde_json::json!({"tool_name":"Write","permission":"write","resource":"/tmp/x"});
969        let approve: Arc<dyn ChildApprovalDecider> = Arc::new(StaticDecider(true));
970        let deny: Arc<dyn ChildApprovalDecider> = Arc::new(StaticDecider(false));
971        assert!(decide_child_approval(Some(&approve), "child-1", &body).await);
972        assert!(!decide_child_approval(Some(&deny), "child-1", &body).await);
973    }
974
975    struct StaticSpawn(Result<serde_json::Value, String>);
976
977    #[async_trait]
978    impl NestedSpawnHandler for StaticSpawn {
979        async fn spawn_nested(
980            &self,
981            _child: &str,
982            _req: serde_json::Value,
983        ) -> Result<serde_json::Value, String> {
984            self.0.clone()
985        }
986    }
987
988    #[tokio::test]
989    async fn nested_spawn_unavailable_without_handler() {
990        let reply = fulfil_nested_spawn(None, "child-1", serde_json::json!({})).await;
991        assert_eq!(reply["success"], serde_json::json!(false));
992        assert!(reply["result"].as_str().unwrap().contains("not available"));
993    }
994
995    #[tokio::test]
996    async fn nested_spawn_returns_handler_result_or_error_body() {
997        let ok: Arc<dyn NestedSpawnHandler> = Arc::new(StaticSpawn(Ok(
998            serde_json::json!({"success": true, "result": "spawned"}),
999        )));
1000        let reply = fulfil_nested_spawn(Some(&ok), "child-1", serde_json::json!({})).await;
1001        assert_eq!(reply["result"], serde_json::json!("spawned"));
1002
1003        let err: Arc<dyn NestedSpawnHandler> = Arc::new(StaticSpawn(Err("boom".to_string())));
1004        let reply = fulfil_nested_spawn(Some(&err), "child-1", serde_json::json!({})).await;
1005        assert_eq!(reply["success"], serde_json::json!(false));
1006        assert!(reply["result"].as_str().unwrap().contains("boom"));
1007    }
1008
1009    #[test]
1010    fn approval_request_fields_extracts_and_defaults() {
1011        let full = serde_json::json!({"tool_name":"Bash","permission":"run","resource":"ls"});
1012        assert_eq!(
1013            approval_request_fields(&full),
1014            ("Bash".to_string(), "run".to_string(), "ls".to_string())
1015        );
1016        // Missing fields default to empty strings.
1017        let partial = serde_json::json!({"tool_name":"Write"});
1018        assert_eq!(
1019            approval_request_fields(&partial),
1020            ("Write".to_string(), String::new(), String::new())
1021        );
1022    }
1023}