Skip to main content

bamboo_engine/external_agents/
actor_adapter.rs

1//! Actor external child runner.
2//!
3//! Runs a child session as an independent **actor**: a separate OS process with its own
4//! isolated context, speaking the `bamboo-subagent` WebSocket protocol. This is the
5//! engine-side adapter on the `wants_external` seam: it spawns the worker binary, waits for
6//! it to self-register into the Tier-1 file fabric, connects, sends the assignment, and
7//! forwards the child's `AgentEvent`s back onto the parent's `event_tx`.
8//!
9//! The built-in **local actor** instance of this runner is the default runtime for
10//! every sub-agent (the in-process runtime was removed). The expert `externalAgents`
11//! tables can additionally route specific roles to other actor/a2a agents.
12
13use std::collections::HashMap;
14use std::path::PathBuf;
15use std::sync::Arc;
16use std::time::Duration;
17
18use async_trait::async_trait;
19use bamboo_agent_core::{AgentError, AgentEvent, Role, Session};
20use tokio::sync::{mpsc, Mutex};
21use tokio_util::sync::CancellationToken;
22
23use bamboo_subagent::discovery::Fabric;
24use bamboo_subagent::fleet::{spawn_worker, SpawnedChild};
25use bamboo_subagent::proto::{ChildFrame, ParentFrame, RunSpec, TerminalStatus};
26use bamboo_subagent::provision::{
27    ChildIdentity, ExecutorSpec, ModelRefSpec, ProvisionSpec, ScopedCredential,
28};
29use bamboo_subagent::transport::ChildClient;
30
31use crate::runtime::execution::{ExternalChildRunner, SpawnJob};
32
33/// Default cap on simultaneously running actor processes.
34pub const DEFAULT_MAX_CONCURRENT_ACTORS: usize = 8;
35
36/// Max nesting depth for direct nested execution (Phase 6). A worker whose
37/// session `spawn_depth` is below this gets its own spawn stack + the real
38/// SubAgent tool; at/over it, neither (and the tool itself refuses). Mirrors
39/// `bamboo_server_tools::DEFAULT_MAX_SPAWN_DEPTH` (kept in sync; engine can't
40/// depend on server-tools). Root orchestrator = 0 ⇒ 4 levels of sub-agents.
41pub const MAX_SPAWN_DEPTH: u32 = 4;
42
43/// Default cap on idle pooled (warm, reusable) workers kept per fingerprint.
44const DEFAULT_MAX_IDLE_PER_KEY: usize = 4;
45
46/// How long a pooled worker waits for its next assignment before reclaiming
47/// itself (must comfortably exceed the gap between sibling spawns).
48const POOLED_IDLE_TIMEOUT_SECS: u64 = 300;
49
50/// A warm worker parked for reuse: its process handle (killed on drop), the WS
51/// endpoint to reconnect to, and the id it registered under in the fabric.
52struct PooledActor {
53    worker: SpawnedChild,
54    endpoint: String,
55    agent_id: String,
56}
57
58/// Spawns and drives a child session as an independent actor: a `bamboo-subagent` worker process.
59pub struct ActorChildRunner {
60    agent_id: String,
61    worker_bin: PathBuf,
62    worker_args: Vec<String>,
63    fabric_dir: PathBuf,
64    executor: ExecutorSpec,
65    /// Per-provider credentials snapshotted from the parent config at build
66    /// time; the spec carries only the ONE the child's provider needs.
67    credentials: Vec<ScopedCredential>,
68    /// Parent's default provider (used when the child has no explicit one).
69    default_provider: String,
70    /// Backpressure: bounds the number of concurrently *running* actors; further
71    /// runs wait for a slot instead of exploding the process table. (Idle pooled
72    /// workers do not hold a slot.)
73    concurrency: std::sync::Arc<tokio::sync::Semaphore>,
74    spawn_timeout: Duration,
75    /// Warm-worker pool keyed by a reuse fingerprint
76    /// (role/provider/model/workspace/disabled-tools). A finished run parks its
77    /// worker here so the next matching child reuses it instead of spawning a
78    /// fresh process — collapsing N sibling sub-agents onto a few processes.
79    pool: Arc<Mutex<HashMap<String, Vec<PooledActor>>>>,
80    max_idle_per_key: usize,
81    /// Host-side decision for a child's gated-tool approval request (Phase 2).
82    /// `None` ⇒ fail-closed DENY (the safe default). A wired decider (policy or
83    /// human-routing bridge) returns approve/deny over the actor WS.
84    approval_decider: Option<Arc<dyn ChildApprovalDecider>>,
85    /// Per-run escalation host bridge for non-bypass child-approval routing (#68;
86    /// Phase 6, Part B). The owning worker's `run()` installs its OWN host bridge
87    /// here via `set_escalation_bridge`; `execute_external_child` CAPTURES it at
88    /// grandchild-spawn time and hands the owned value to `drive()`, which uses it
89    /// to RE-PROXY a child's approval request UP to the parent run — chaining up
90    /// every level until a bypass level (model-review) or the top orchestrator
91    /// (human) decides, then relaying the reply back down. Was a process-global
92    /// slot; now per-runner so a fire-and-forget grandchild that OUTLIVES the run
93    /// that spawned it keeps that run's bridge for its whole lifetime instead of
94    /// reading a stale/overwritten global at approval time (→ fail-closed deny).
95    escalation_bridge: Arc<std::sync::Mutex<Option<bamboo_subagent::executor::HostBridge>>>,
96}
97
98/// Decides how the host answers a child worker's gated-tool approval request
99/// (Phase 2: child → parent approval delegation). Async so an implementation
100/// can consult a policy or defer to a human. With no decider wired the host
101/// replies with a fail-closed DENY.
102///
103/// NOTE: `decide` is awaited inside the per-child frame pump, so an
104/// implementation must resolve promptly (e.g. a policy lookup). A human-in-the-
105/// loop decision that may block indefinitely should instead be delivered
106/// out-of-band as a `ParentFrame::ApprovalReply` via the live steering channel
107/// (`super::live`), which `drive()` already forwards to the worker without
108/// stalling the pump.
109#[async_trait]
110pub trait ChildApprovalDecider: Send + Sync {
111    /// Decide whether `child_session_id` may perform the gated action described
112    /// by `request` (`{tool_name, permission, resource}`).
113    async fn decide(&self, child_session_id: &str, request: &serde_json::Value) -> bool;
114}
115
116/// Resolve a child approval request to approve/deny. Fail-closed (DENY) when no
117/// decider is wired — the single, testable seam for the host-side decision.
118async fn decide_child_approval(
119    decider: Option<&Arc<dyn ChildApprovalDecider>>,
120    child_session_id: &str,
121    request: &serde_json::Value,
122) -> bool {
123    match decider {
124        Some(decider) => decider.decide(child_session_id, request).await,
125        None => false,
126    }
127}
128
129/// How long the host waits for a human approval decision before failing the
130/// child's gated tool closed (DENY). Bounds an unanswered request so it can't
131/// hang the worker indefinitely.
132const CHILD_APPROVAL_TIMEOUT: Duration = Duration::from_secs(300);
133
134/// Extract `(tool_name, permission, resource)` from a worker's approval request
135/// body (`{tool_name, permission, resource}`); missing fields default to empty.
136fn approval_request_fields(body: &serde_json::Value) -> (String, String, String) {
137    let field = |k: &str| {
138        body.get(k)
139            .and_then(|v| v.as_str())
140            .unwrap_or("")
141            .to_string()
142    };
143    (field("tool_name"), field("permission"), field("resource"))
144}
145
146/// Off-loop reviewer for a child's gated-tool approval request (Phase 6, Part B).
147///
148/// Installed (process-global) by a BYPASSED self-orchestrating worker so its
149/// children's forced-ask (dangerous) gated actions — which still raise
150/// `ConfirmationRequired` even under bypass — get an LLM reasonableness check
151/// rather than a blind pass. `review` is an LLM call: `drive()` invokes it in a
152/// SPAWNED task (NEVER in the frame pump) and delivers the verdict async via the
153/// live channel, so the agent loop is never blocked.
154#[async_trait]
155pub trait ChildApprovalReviewer: Send + Sync {
156    /// Judge whether the gated action `request` (`{tool_name, permission,
157    /// resource}`) is reasonable for `child_session_id`'s task. `true` = approve.
158    async fn review(&self, child_session_id: &str, request: &serde_json::Value) -> bool;
159}
160
161fn child_approval_reviewer_slot() -> &'static std::sync::OnceLock<Arc<dyn ChildApprovalReviewer>> {
162    static SLOT: std::sync::OnceLock<Arc<dyn ChildApprovalReviewer>> = std::sync::OnceLock::new();
163    &SLOT
164}
165
166/// Install the process-global child-approval reviewer (idempotent; first wins).
167pub fn set_child_approval_reviewer(reviewer: Arc<dyn ChildApprovalReviewer>) {
168    let _ = child_approval_reviewer_slot().set(reviewer);
169}
170
171/// The process-global child-approval reviewer, if installed.
172pub fn child_approval_reviewer() -> Option<Arc<dyn ChildApprovalReviewer>> {
173    child_approval_reviewer_slot().get().cloned()
174}
175
176impl ActorChildRunner {
177    #[allow(clippy::too_many_arguments)]
178    pub fn new(
179        agent_id: String,
180        worker_bin: PathBuf,
181        worker_args: Vec<String>,
182        fabric_dir: PathBuf,
183        executor: ExecutorSpec,
184        credentials: Vec<ScopedCredential>,
185        default_provider: String,
186        max_concurrent: usize,
187    ) -> Self {
188        Self {
189            agent_id,
190            worker_bin,
191            worker_args,
192            fabric_dir,
193            executor,
194            credentials,
195            default_provider,
196            concurrency: std::sync::Arc::new(tokio::sync::Semaphore::new(max_concurrent.max(1))),
197            spawn_timeout: Duration::from_secs(30),
198            pool: Arc::new(Mutex::new(HashMap::new())),
199            max_idle_per_key: DEFAULT_MAX_IDLE_PER_KEY,
200            approval_decider: None,
201            escalation_bridge: Arc::new(std::sync::Mutex::new(None)),
202        }
203    }
204
205    /// Wire the host-side decider for child gated-tool approval requests
206    /// (Phase 2). Without this the host fail-closed DENYs every request.
207    pub fn with_approval_decider(mut self, decider: Arc<dyn ChildApprovalDecider>) -> Self {
208        self.approval_decider = Some(decider);
209        self
210    }
211
212    /// Reuse fingerprint: two children are interchangeable on one warm worker iff
213    /// they share role, provider, model, workspace, disabled-tool set, AND every
214    /// capability the worker BAKES at provision time (`BambooRuntimeExecutor`
215    /// stamps these once and reuses them across runs): nesting depth, nested-spawn
216    /// stack, bypass mode, permission enforcement, and the depth cap. Omitting any
217    /// of these lets the pool hand a run a worker baked for a DIFFERENT posture —
218    /// e.g. a depth-1 worker (with its own spawn stack) reused for a depth-4
219    /// child would re-stamp `spawn_depth=1` and pass the depth-cap check, breaking
220    /// the recursion bound; or a bypass worker reused for a non-bypass child. So
221    /// these MUST split the pool bucket. Everything else (assignment, history) is
222    /// shipped per-run in the `RunSpec` and does not affect the fingerprint.
223    fn fingerprint(spec: &ProvisionSpec) -> String {
224        let role = spec.identity.role.as_str();
225        let (provider, model) = spec
226            .model
227            .as_ref()
228            .map(|m| (m.provider.as_str(), m.model.as_str()))
229            .unwrap_or(("", ""));
230        let workspace = spec.workspace.as_deref().unwrap_or("");
231        let mut tools = spec.disabled_tools.clone().unwrap_or_default();
232        tools.sort();
233        let caps = &spec.capabilities;
234        format!(
235            "{role}\u{1}{provider}\u{1}{model}\u{1}{workspace}\u{1}{}\u{1}d={}\u{1}ns={}\u{1}by={}\u{1}ep={}\u{1}md={}\u{1}nha={}\u{1}gro={}",
236            tools.join(","),
237            spec.identity.depth,
238            caps.nested_spawn,
239            caps.bypass,
240            caps.enforce_permissions,
241            caps.max_spawn_depth.unwrap_or(0),
242            // #73 review (P1): a worker bakes `no_human_review` ONCE from this flag
243            // at build() and never re-reads it per run, so the pool MUST NOT hand a
244            // worker baked for one approval posture to a run of the opposite one —
245            // else a scheduled-root worker reused for an interactive child would
246            // silently model-review instead of asking the human (and vice-versa,
247            // reintroducing the 300s-deny). Split the bucket on it.
248            caps.no_human_approver,
249            // #71: the read-only Bash checker is baked once at build() from this
250            // flag, so a guardian-reviewer worker must NOT be reused for an
251            // ordinary child (which expects unrestricted Bash), and vice-versa.
252            caps.guardian_read_only,
253        )
254    }
255
256    /// Check out a worker for this assignment: reuse a live pooled one matching
257    /// `key`, else spawn a fresh reusable worker.
258    async fn acquire_worker(
259        &self,
260        key: &str,
261        spec: &ProvisionSpec,
262    ) -> crate::runtime::runner::Result<PooledActor> {
263        // Drain the pool bucket, validating liveness; a worker that hit its idle
264        // timeout has exited and withdrawn its fabric record — skip and reap it.
265        loop {
266            let candidate = {
267                let mut pool = self.pool.lock().await;
268                pool.get_mut(key).and_then(|bucket| bucket.pop())
269            };
270            let Some(candidate) = candidate else { break };
271            let alive = Fabric::at(&self.fabric_dir)
272                .resolve(&candidate.agent_id)
273                .await
274                .ok()
275                .flatten()
276                .is_some();
277            if alive {
278                return Ok(candidate);
279            }
280            candidate.worker.kill().await;
281        }
282
283        let spawned = spawn_worker(
284            &self.worker_bin,
285            &self.worker_args,
286            spec,
287            self.spawn_timeout,
288        )
289        .await
290        .map_err(|e| AgentError::LLM(format!("actor spawn/register failed: {e}")))?;
291        let endpoint = spawned.record.endpoint.clone();
292        let agent_id = spawned.record.agent_id.clone();
293        Ok(PooledActor {
294            worker: spawned,
295            endpoint,
296            agent_id,
297        })
298    }
299
300    /// Park a worker for reuse after a clean run; if the bucket is full, retire it.
301    async fn release_worker(&self, key: &str, actor: PooledActor) {
302        let mut pool = self.pool.lock().await;
303        let bucket = pool.entry(key.to_string()).or_default();
304        if bucket.len() >= self.max_idle_per_key {
305            drop(pool);
306            self.retire_worker(actor).await;
307            return;
308        }
309        bucket.push(actor);
310    }
311
312    /// Forcefully stop a worker and clean its discovery record.
313    async fn retire_worker(&self, actor: PooledActor) {
314        let agent_id = actor.agent_id.clone();
315        actor.worker.kill().await;
316        let _ = Fabric::at(&self.fabric_dir).withdraw(&agent_id).await;
317    }
318
319    /// Assemble the parent-resolved provisioning document for this child.
320    fn build_spec(&self, session: &Session, job: &SpawnJob) -> ProvisionSpec {
321        let mut spec = ProvisionSpec::new(
322            ChildIdentity {
323                child_id: job.child_session_id.clone(),
324                parent_id: Some(job.parent_session_id.clone()),
325                project_key: None,
326                role: session
327                    .metadata
328                    .get("subagent_type")
329                    .cloned()
330                    .unwrap_or_else(|| "worker".to_string()),
331                // The child session already carries the correct depth
332                // (create_child_action's new_child_of did parent.spawn_depth+1);
333                // stamp it so the worker can re-establish it on its run session
334                // and enforce the max-depth cap across the actor boundary.
335                depth: session.spawn_depth,
336            },
337            self.executor.clone(),
338            self.fabric_dir.to_string_lossy().into_owned(),
339        );
340        spec.workspace = session.workspace.clone();
341        // Final model: the session's pinned model_ref (create.model / routing already applied),
342        // falling back to the job's bare model on the parent's default provider.
343        spec.model = session
344            .model_ref
345            .as_ref()
346            .map(|r| ModelRefSpec {
347                provider: r.provider.clone(),
348                model: r.model.clone(),
349            })
350            .or_else(|| {
351                let m = job.model.trim();
352                (!m.is_empty()).then(|| ModelRefSpec {
353                    provider: self.default_provider.clone(),
354                    model: m.to_string(),
355                })
356            });
357        spec.disabled_tools = job.disabled_tools.clone();
358        // Least-privilege secrets: only the credential for the child's provider.
359        let provider = spec
360            .model
361            .as_ref()
362            .map(|m| m.provider.as_str())
363            .filter(|p| !p.trim().is_empty())
364            .unwrap_or(&self.default_provider);
365        if let Some(cred) = self.credentials.iter().find(|c| c.provider == provider) {
366            spec.secrets.provider_credentials.push(cred.clone());
367        } else {
368            tracing::warn!(
369                "actor child {}: no credential found for provider '{}'",
370                job.child_session_id,
371                provider
372            );
373        }
374        // Phase 6 (direct nested execution): a worker BELOW the depth cap may
375        // orchestrate its OWN children — on startup it builds its own spawn
376        // stack and runs the real SubAgent tool (no host proxy). The cap (the
377        // SubAgent tool refuses to spawn at/over `max_spawn_depth`) bounds the
378        // recursion. Driven purely by the child's depth, so it auto-propagates
379        // down the tree without any extra config threading.
380        spec.capabilities.nested_spawn = session.spawn_depth < MAX_SPAWN_DEPTH;
381        spec.capabilities.max_spawn_depth = Some(MAX_SPAWN_DEPTH);
382        // #69: activate child-approval review. Sub-agents enforce permissions so
383        // their DANGEROUS actions (the worker uses a HIGH threshold) reach the
384        // parent for review — escalated to the human, or model-reviewed off-loop
385        // when the parent is in bypass. The worker installs no checker without
386        // this, so the whole review chain would otherwise stay dormant.
387        spec.capabilities.enforce_permissions = true;
388        // Propagate "bypass permissions" so a self-orchestrating worker knows it
389        // is a bypassed parent and installs the off-loop model-reviewer for its
390        // children's forced-ask actions (Phase 6, Part B). The child session
391        // already carries the inherited flag (create_child_action seeds it).
392        spec.capabilities.bypass = session
393            .agent_runtime_state
394            .as_ref()
395            .is_some_and(|s| s.bypass_permissions);
396        // #73: propagate "no interactive human approver" (headless / scheduled /
397        // deployed root, inherited by the child session). When set, the worker's
398        // per-run approval proxy model-reviews a gated action locally instead of
399        // escalating to a human who will never answer (which would 300s-deny).
400        spec.capabilities.no_human_approver = session
401            .agent_runtime_state
402            .as_ref()
403            .is_some_and(|s| s.no_human_approver);
404        // #71: mark a READ-ONLY Guardian reviewer so the worker installs the
405        // read-only Bash allowlist checker. The reviewer is spawned by
406        // `spawn_guardian_review` with `subagent_type == "guardian"` (the SAME
407        // marker the completion coordinator branches on to parse the verdict) AND
408        // the `guardian_read_only_disabled_tools` denylist. Keyed off that role
409        // marker (already read above to set `identity.role`), so it rides the same
410        // session-metadata path the denylist/subagent_type use — no new wire seam.
411        // Without this the worker keeps an UNRESTRICTED Bash, so the reviewer could
412        // still `rm -rf` / `git push` / `curl | sh`, defeating "read-only".
413        spec.capabilities.guardian_read_only =
414            session.metadata.get("subagent_type").map(String::as_str) == Some("guardian");
415        spec
416    }
417}
418
419#[async_trait]
420impl ExternalChildRunner for ActorChildRunner {
421    async fn should_handle(&self, session: &Session) -> bool {
422        session.metadata.get("runtime.kind") == Some(&"external".to_string())
423            && session.metadata.get("external.protocol") == Some(&"actor".to_string())
424            && session.metadata.get("external.agent_id") == Some(&self.agent_id)
425    }
426
427    fn set_escalation_bridge(&self, bridge: Option<bamboo_subagent::executor::HostBridge>) {
428        *self.escalation_bridge.lock().unwrap() = bridge;
429    }
430
431    async fn execute_external_child(
432        &self,
433        session: &mut Session,
434        job: &SpawnJob,
435        event_tx: mpsc::Sender<AgentEvent>,
436        cancel_token: CancellationToken,
437    ) -> crate::runtime::runner::Result<()> {
438        // #68 CORRECTNESS CRUX: capture the per-run escalation bridge HERE, at the
439        // moment this grandchild is spawned — while the parent run's bridge is
440        // still in our slot — into an owned local handed to `drive()` for this
441        // grandchild's whole lifetime. A fire-and-forget grandchild that OUTLIVES
442        // the run that spawned it must NOT re-read `self.escalation_bridge` at
443        // approval time: by then `run()` may have cleared/overwritten it (a worker
444        // serves runs sequentially), and re-proxying through a closed bridge
445        // fail-closed denies. Capturing at spawn pins the right bridge per run.
446        let escalation = self.escalation_bridge.lock().unwrap().clone();
447        let assignment = extract_assignment(session);
448        let mut spec = self.build_spec(session, job);
449        // Make every actor a warm, reusable worker so the pool can recycle it for
450        // the next sibling with a matching fingerprint.
451        spec.reusable = true;
452        if spec.limits.idle_timeout_secs.is_none() {
453            spec.limits.idle_timeout_secs = Some(POOLED_IDLE_TIMEOUT_SECS);
454        }
455        let pool_key = Self::fingerprint(&spec);
456        // Rehydration: the child session in the parent's store is the actor's
457        // durable state. Ship the full conversation so a reactivation
458        // (send_message / update / rerun) carries its history. A reused worker is
459        // stateless between runs, so this is also what isolates each child's
460        // context on a shared process.
461        let messages: Vec<serde_json::Value> = session
462            .messages
463            .iter()
464            .filter_map(|m| serde_json::to_value(m).ok())
465            .collect();
466
467        // Backpressure: hold a concurrency slot for the lifetime of the *run*
468        // (cancellation still proceeds — the cancel branch in drive() runs while
469        // we hold the permit). Released when this fn returns, i.e. once the worker
470        // is parked back into the pool, so idle workers don't pin slots.
471        let _slot = self
472            .concurrency
473            .acquire()
474            .await
475            .map_err(|_| AgentError::LLM("actor concurrency limiter closed".to_string()))?;
476
477        // Check out a warm worker (reuse-or-spawn).
478        let mut actor = self.acquire_worker(&pool_key, &spec).await?;
479
480        let mut client = match ChildClient::connect(&actor.endpoint).await {
481            Ok(client) => client,
482            Err(e) => {
483                // The pooled worker may have died between checkout and connect;
484                // retire it and spawn one fresh, once.
485                self.retire_worker(actor).await;
486                let spawned = spawn_worker(
487                    &self.worker_bin,
488                    &self.worker_args,
489                    &spec,
490                    self.spawn_timeout,
491                )
492                .await
493                .map_err(|e2| {
494                    AgentError::LLM(format!("actor respawn after reuse miss ({e}): {e2}"))
495                })?;
496                let endpoint = spawned.record.endpoint.clone();
497                let agent_id = spawned.record.agent_id.clone();
498                let client = ChildClient::connect(&endpoint)
499                    .await
500                    .map_err(|e2| AgentError::LLM(format!("actor connect failed: {e2}")))?;
501                actor = PooledActor {
502                    worker: spawned,
503                    endpoint,
504                    agent_id,
505                };
506                client
507            }
508        };
509
510        client
511            .send(ParentFrame::Run(RunSpec {
512                assignment,
513                reasoning_effort: None,
514                messages,
515            }))
516            .await
517            .map_err(|e| AgentError::LLM(format!("actor run dispatch failed: {e}")))?;
518
519        // Register as a live actor so send_message (running, no interrupt) can
520        // steer this child in-band over the existing WS connection. The guard
521        // unregisters on every exit path.
522        let (live_tx, mut live_rx) = mpsc::unbounded_channel::<ParentFrame>();
523        let live_guard = super::live::register(&job.child_session_id, live_tx);
524
525        let result = drive(
526            &mut client,
527            &job.child_session_id,
528            self.approval_decider.as_ref(),
529            escalation,
530            &event_tx,
531            &cancel_token,
532            &mut live_rx,
533        )
534        .await;
535        // Unregister IMMEDIATELY: after drive returns nobody consumes live_rx,
536        // so a send_message landing in the close/park window below must see
537        // "not live" and take the durable-queue fallback instead of vanishing.
538        // (Even if one slipped in earlier, send_message also appends it to the
539        // durable transcript, so the next activation still rehydrates it.)
540        drop(live_guard);
541
542        // Close the connection: the worker's serve loop then accepts the next
543        // assignment (reuse) or idles out. Park the worker on a clean run; retire
544        // it on error/cancel (a wedged worker must not be reused).
545        let _ = client.close().await;
546        match &result {
547            Ok(_) => self.release_worker(&pool_key, actor).await,
548            Err(_) => self.retire_worker(actor).await,
549        }
550
551        // Write-back: persist the actor's final reply onto the child session so
552        // the transcript survives and the NEXT activation sees it as history.
553        // (run_child_spawn saves the session right after we return.)
554        match result {
555            Ok(Some(text)) => {
556                if !text.is_empty() {
557                    session.add_message(bamboo_agent_core::Message::assistant(text, None));
558                }
559                Ok(())
560            }
561            Ok(None) => Ok(()),
562            Err(e) => Err(e),
563        }
564    }
565}
566
567/// Pump child frames -> parent events until a terminal frame (or cancellation).
568/// On success, yields the actor's final result text (for session write-back).
569/// `live_rx` carries in-band frames (steering messages) from the live registry.
570///
571/// `escalation_bridge` (#68) is the per-run escalation host bridge CAPTURED BY
572/// VALUE at spawn time in `execute_external_child` (NOT read live here): when a
573/// non-bypass child re-proxies an approval request, this owned bridge routes it
574/// UP to the parent run. Owning it for the call's lifetime is what lets a
575/// fire-and-forget grandchild that outlives its spawning run still escalate to
576/// the correct (then-current) parent bridge rather than a stale/overwritten one.
577async fn drive(
578    client: &mut ChildClient,
579    child_session_id: &str,
580    approval_decider: Option<&Arc<dyn ChildApprovalDecider>>,
581    escalation_bridge: Option<bamboo_subagent::executor::HostBridge>,
582    event_tx: &mpsc::Sender<AgentEvent>,
583    cancel_token: &CancellationToken,
584    live_rx: &mut mpsc::UnboundedReceiver<ParentFrame>,
585) -> crate::runtime::runner::Result<Option<String>> {
586    loop {
587        tokio::select! {
588            _ = cancel_token.cancelled() => {
589                // fall through to the cancel handling below
590                break;
591            }
592            Some(frame) = live_rx.recv() => {
593                // Forward in-band steering to the worker over the existing WS.
594                if client.send(frame).await.is_err() {
595                    tracing::warn!("live steering frame could not be sent; connection failing");
596                }
597            }
598            frame = client.next_frame() => {
599                match frame {
600                    Ok(Some(ChildFrame::Event { event })) => {
601                        // AgentEvent is serialized verbatim on the wire (zero mapping).
602                        if let Ok(ev) = serde_json::from_value::<AgentEvent>(event) {
603                            let _ = event_tx.send(ev).await;
604                        }
605                    }
606                    Ok(Some(ChildFrame::ApprovalRequest { id, body })) => {
607                        // Phase 2: a worker proxied a gated-tool approval back to
608                        // the host. The WORKER side is live — its executor installs
609                        // a per-run task-local `ApprovalProxy` (subagent_worker.rs)
610                        // that calls `host.approval_call`, so this frame arrives
611                        // when a child hits `ConfirmationRequired`.
612                        if let Some(reviewer) = child_approval_reviewer() {
613                            // Phase 6, Part B: a BYPASSED parent worker
614                            // model-reviews its children's forced-ask (dangerous)
615                            // actions. The review is an LLM call, so run it OFF
616                            // the frame pump in a spawned task and deliver the
617                            // verdict async via the live channel — the pump keeps
618                            // forwarding events and the agent loop never blocks. A
619                            // timeout denies a hung review so the child can't hang.
620                            let child = child_session_id.to_string();
621                            let req_id = id.clone();
622                            let body = body.clone();
623                            tokio::spawn(async move {
624                                let approved = tokio::time::timeout(
625                                    CHILD_APPROVAL_TIMEOUT,
626                                    reviewer.review(&child, &body),
627                                )
628                                .await
629                                .unwrap_or(false);
630                                super::live::deliver_approval(&child, &req_id, approved);
631                            });
632                        } else if approval_decider.is_some() {
633                            // A decider is wired (policy / auto): decide promptly
634                            // and reply inline. (Must not block the pump — see the
635                            // `ChildApprovalDecider` doc.)
636                            let approved =
637                                decide_child_approval(approval_decider, child_session_id, &body)
638                                    .await;
639                            if client
640                                .send(ParentFrame::ApprovalReply { id, approved })
641                                .await
642                                .is_err()
643                            {
644                                tracing::warn!(
645                                    "failed to answer approval_request; connection failing"
646                                );
647                            }
648                        } else if let Some(host) = escalation_bridge.clone() {
649                            // Non-bypass WORKER: ESCALATE up our own actor link
650                            // (re-proxy) so the request chains to our parent — and
651                            // up every level until a bypass level (model-review) or
652                            // the top orchestrator (human) decides. Off-loop so the
653                            // pump never blocks; relay the reply down to the child.
654                            let child = child_session_id.to_string();
655                            let req_id = id.clone();
656                            let body = body.clone();
657                            tokio::spawn(async move {
658                                let approved = match tokio::time::timeout(
659                                    CHILD_APPROVAL_TIMEOUT,
660                                    host.approval_call(body),
661                                )
662                                .await
663                                {
664                                    Ok(Ok(reply)) => reply
665                                        .get("approved")
666                                        .and_then(|v| v.as_bool())
667                                        .unwrap_or(false),
668                                    // Transport error or timeout ⇒ fail closed.
669                                    _ => false,
670                                };
671                                super::live::deliver_approval(&child, &req_id, approved);
672                            });
673                        } else {
674                            // Top orchestrator (no escalation bridge): human-in-the-
675                            // loop. Surface the request on the parent's event stream
676                            // and DEFER — the decision arrives out-of-band via
677                            // `live::deliver_approval(child, request_id, approved)`
678                            // (→ this child's `live_rx` → forwarded to the worker
679                            // above). A timeout denies a never-answered request so
680                            // it can't hang the child forever.
681                            let (tool_name, permission, resource) =
682                                approval_request_fields(&body);
683                            // Register the pending request BEFORE surfacing it so
684                            // the external handler's `deliver_approval_checked` can
685                            // correlate an out-of-band POST against a genuine
686                            // human-loop request (and consume it one-shot).
687                            super::live::register_pending_approval(child_session_id, &id);
688                            let _ = event_tx
689                                .send(AgentEvent::ChildApprovalRequested {
690                                    child_session_id: child_session_id.to_string(),
691                                    request_id: id.clone(),
692                                    tool_name,
693                                    permission,
694                                    resource,
695                                })
696                                .await;
697                            let child = child_session_id.to_string();
698                            tokio::spawn(async move {
699                                tokio::time::sleep(CHILD_APPROVAL_TIMEOUT).await;
700                                // Deny only if still pending: a one-shot consume so
701                                // we don't double-deliver if the human already
702                                // answered (the POST took it), and so a late POST
703                                // after this fires finds nothing pending.
704                                if super::live::take_pending_approval(&child, &id) {
705                                    super::live::deliver_approval(&child, &id, false);
706                                }
707                            });
708                        }
709                    }
710                    Ok(Some(ChildFrame::Terminal { status, result, error, .. })) => {
711                        return match status {
712                            TerminalStatus::Completed => Ok(result),
713                            TerminalStatus::Cancelled => Err(AgentError::Cancelled),
714                            TerminalStatus::Error => Err(AgentError::LLM(
715                                error.unwrap_or_else(|| "actor child errored".to_string()),
716                            )),
717                            // The suspend/resume round-trip (host re-dispatch of a
718                            // nested parent) is not wired here yet; a worker in
719                            // this build never emits Suspended, so this is
720                            // unreachable in practice.
721                            TerminalStatus::Suspended => Err(AgentError::LLM(
722                                "nested sub-agent suspend received but resume transport is not wired"
723                                    .to_string(),
724                            )),
725                        };
726                    }
727                    Ok(None) => {
728                        return Err(AgentError::LLM(
729                            "actor child closed before terminal".to_string(),
730                        ));
731                    }
732                    Err(e) => {
733                        return Err(AgentError::LLM(format!("actor transport error: {e}")));
734                    }
735                }
736            }
737        }
738    }
739
740    // Only reached on cancellation: ask the child to stop (best-effort), then report cancelled.
741    let _ = client.send(ParentFrame::Cancel).await;
742    Err(AgentError::Cancelled)
743}
744
745/// The assignment text = the child session's latest user message (falls back to its title).
746fn extract_assignment(session: &Session) -> String {
747    session
748        .messages
749        .iter()
750        .rev()
751        .find(|m| matches!(m.role, Role::User))
752        .map(|m| m.content.clone())
753        .unwrap_or_else(|| {
754            session
755                .metadata
756                .get("title")
757                .cloned()
758                .unwrap_or_else(|| "Execute task".to_string())
759        })
760}
761
762#[cfg(test)]
763mod tests {
764    use super::*;
765
766    fn spec_with(
767        role: &str,
768        provider: &str,
769        model: &str,
770        workspace: Option<&str>,
771        disabled: Option<Vec<&str>>,
772    ) -> ProvisionSpec {
773        let mut spec = ProvisionSpec::new(
774            ChildIdentity {
775                child_id: "c".into(),
776                parent_id: None,
777                project_key: None,
778                role: role.into(),
779                depth: 0,
780            },
781            ExecutorSpec::Echo,
782            "/tmp/fab".into(),
783        );
784        spec.workspace = workspace.map(|w| w.to_string());
785        spec.model = Some(ModelRefSpec {
786            provider: provider.into(),
787            model: model.into(),
788        });
789        spec.disabled_tools = disabled.map(|d| d.into_iter().map(String::from).collect());
790        spec
791    }
792
793    #[test]
794    fn fingerprint_matches_interchangeable_children() {
795        // Same role/provider/model/workspace and equal tool sets (order-insensitive)
796        // are interchangeable on one warm worker — and differ only in child_id.
797        let a = spec_with(
798            "explorer",
799            "p",
800            "m",
801            Some("/ws"),
802            Some(vec!["Bash", "Edit"]),
803        );
804        let mut b = spec_with(
805            "explorer",
806            "p",
807            "m",
808            Some("/ws"),
809            Some(vec!["Edit", "Bash"]),
810        );
811        b.identity.child_id = "other".into();
812        assert_eq!(
813            ActorChildRunner::fingerprint(&a),
814            ActorChildRunner::fingerprint(&b)
815        );
816    }
817
818    #[test]
819    fn fingerprint_separates_distinct_runtimes() {
820        let base = spec_with("explorer", "p", "m", Some("/ws"), None);
821        let base_fp = ActorChildRunner::fingerprint(&base);
822        // Each axis that is baked into the worker must split the pool bucket.
823        assert_ne!(
824            base_fp,
825            ActorChildRunner::fingerprint(&spec_with("writer", "p", "m", Some("/ws"), None))
826        );
827        assert_ne!(
828            base_fp,
829            ActorChildRunner::fingerprint(&spec_with("explorer", "p2", "m", Some("/ws"), None))
830        );
831        assert_ne!(
832            base_fp,
833            ActorChildRunner::fingerprint(&spec_with("explorer", "p", "m2", Some("/ws"), None))
834        );
835        assert_ne!(
836            base_fp,
837            ActorChildRunner::fingerprint(&spec_with("explorer", "p", "m", Some("/ws2"), None))
838        );
839        assert_ne!(
840            base_fp,
841            ActorChildRunner::fingerprint(&spec_with(
842                "explorer",
843                "p",
844                "m",
845                Some("/ws"),
846                Some(vec!["Bash"])
847            ))
848        );
849    }
850
851    #[test]
852    fn fingerprint_splits_on_baked_capabilities() {
853        // Every capability baked once at provision time must split the pool
854        // bucket, else a worker baked for one posture gets reused for another
855        // (e.g. a depth-1 worker re-stamping spawn_depth onto a depth-4 child,
856        // breaking the depth cap; or a bypass worker reused for a non-bypass one).
857        let base_fp =
858            ActorChildRunner::fingerprint(&spec_with("explorer", "p", "m", Some("/ws"), None));
859
860        let mut depth = spec_with("explorer", "p", "m", Some("/ws"), None);
861        depth.identity.depth = 2;
862        assert_ne!(
863            base_fp,
864            ActorChildRunner::fingerprint(&depth),
865            "depth must split"
866        );
867
868        let mut nested = spec_with("explorer", "p", "m", Some("/ws"), None);
869        nested.capabilities.nested_spawn = true;
870        assert_ne!(
871            base_fp,
872            ActorChildRunner::fingerprint(&nested),
873            "nested_spawn must split"
874        );
875
876        let mut bypass = spec_with("explorer", "p", "m", Some("/ws"), None);
877        bypass.capabilities.bypass = true;
878        assert_ne!(
879            base_fp,
880            ActorChildRunner::fingerprint(&bypass),
881            "bypass must split"
882        );
883
884        let mut enforce = spec_with("explorer", "p", "m", Some("/ws"), None);
885        enforce.capabilities.enforce_permissions = true;
886        assert_ne!(
887            base_fp,
888            ActorChildRunner::fingerprint(&enforce),
889            "enforce_permissions must split"
890        );
891
892        let mut cap = spec_with("explorer", "p", "m", Some("/ws"), None);
893        cap.capabilities.max_spawn_depth = Some(8);
894        assert_ne!(
895            base_fp,
896            ActorChildRunner::fingerprint(&cap),
897            "max_spawn_depth must split"
898        );
899
900        // #73 (P1): the worker bakes `no_human_review` from this flag once at
901        // build(), so it MUST split the pool or a worker baked for one approval
902        // posture is reused for the opposite one.
903        let mut nha = spec_with("explorer", "p", "m", Some("/ws"), None);
904        nha.capabilities.no_human_approver = true;
905        assert_ne!(
906            base_fp,
907            ActorChildRunner::fingerprint(&nha),
908            "no_human_approver must split"
909        );
910
911        // #71: the read-only Bash checker is baked once at build() from this flag,
912        // so a guardian reviewer worker must not be reused for an ordinary child.
913        let mut gro = spec_with("explorer", "p", "m", Some("/ws"), None);
914        gro.capabilities.guardian_read_only = true;
915        assert_ne!(
916            base_fp,
917            ActorChildRunner::fingerprint(&gro),
918            "guardian_read_only must split"
919        );
920    }
921
922    struct StaticDecider(bool);
923
924    #[async_trait]
925    impl ChildApprovalDecider for StaticDecider {
926        async fn decide(&self, _child: &str, _req: &serde_json::Value) -> bool {
927            self.0
928        }
929    }
930
931    #[tokio::test]
932    async fn child_approval_fails_closed_without_decider() {
933        // No decider wired ⇒ the host denies (safe default), unchanged behavior.
934        let body = serde_json::json!({"tool_name":"Bash","permission":"run","resource":"rm -rf /"});
935        assert!(!decide_child_approval(None, "child-1", &body).await);
936    }
937
938    #[tokio::test]
939    async fn child_approval_honors_wired_decider() {
940        let body =
941            serde_json::json!({"tool_name":"Write","permission":"write","resource":"/tmp/x"});
942        let approve: Arc<dyn ChildApprovalDecider> = Arc::new(StaticDecider(true));
943        let deny: Arc<dyn ChildApprovalDecider> = Arc::new(StaticDecider(false));
944        assert!(decide_child_approval(Some(&approve), "child-1", &body).await);
945        assert!(!decide_child_approval(Some(&deny), "child-1", &body).await);
946    }
947
948    #[test]
949    fn approval_request_fields_extracts_and_defaults() {
950        let full = serde_json::json!({"tool_name":"Bash","permission":"run","resource":"ls"});
951        assert_eq!(
952            approval_request_fields(&full),
953            ("Bash".to_string(), "run".to_string(), "ls".to_string())
954        );
955        // Missing fields default to empty strings.
956        let partial = serde_json::json!({"tool_name":"Write"});
957        assert_eq!(
958            approval_request_fields(&partial),
959            ("Write".to_string(), String::new(), String::new())
960        );
961    }
962}