Skip to main content

bamboo_engine/external_agents/
actor_adapter.rs

1//! Actor external child runner.
2//!
3//! Runs a child session as an independent **actor**: a separate OS process with its own
4//! isolated context, speaking the `bamboo-subagent` WebSocket protocol. This is the
5//! engine-side adapter on the `wants_external` seam: it spawns the worker binary, waits for
6//! it to self-register into the Tier-1 file fabric, connects, sends the assignment, and
7//! forwards the child's `AgentEvent`s back onto the parent's `event_tx`.
8//!
9//! The built-in **local actor** instance of this runner is the default runtime for
10//! every sub-agent (the in-process runtime was removed). The expert `externalAgents`
11//! tables can additionally route specific roles to other actor/a2a agents.
12
13use std::collections::HashMap;
14use std::path::PathBuf;
15use std::sync::Arc;
16use std::time::Duration;
17
18use async_trait::async_trait;
19use bamboo_agent_core::{AgentError, AgentEvent, Role, Session};
20use tokio::sync::mpsc;
21use tokio_util::sync::CancellationToken;
22
23use bamboo_subagent::fleet::{spawn_worker_on_bus, SpawnedChild};
24use bamboo_subagent::proto::{AgentRecord, ChildFrame, ParentFrame, RunSpec, TerminalStatus};
25use bamboo_subagent::provision::{
26    ChildIdentity, ExecutorSpec, ModelRefSpec, Placement, ProvisionSpec, ScopedCredential,
27};
28use bamboo_subagent::transport::{client_config_trusting_cert, ChildClient};
29
30use crate::runtime::execution::{ExternalChildRunner, SpawnJob};
31
32/// Default cap on simultaneously running actor processes.
33pub const DEFAULT_MAX_CONCURRENT_ACTORS: usize = 8;
34
35/// Max nesting depth for direct nested execution (Phase 6). A worker whose
36/// session `spawn_depth` is below this gets its own spawn stack + the real
37/// SubAgent tool; at/over it, neither (and the tool itself refuses). Mirrors
38/// `bamboo_server_tools::DEFAULT_MAX_SPAWN_DEPTH` (kept in sync; engine can't
39/// depend on server-tools). Root orchestrator = 0 ⇒ 4 levels of sub-agents.
40pub const MAX_SPAWN_DEPTH: u32 = 4;
41
42/// Default cap on idle pooled (warm, reusable) workers kept per fingerprint.
43const DEFAULT_MAX_IDLE_PER_KEY: usize = 4;
44
45/// How long a pooled worker waits for its next assignment before reclaiming
46/// itself (must comfortably exceed the gap between sibling spawns).
47const POOLED_IDLE_TIMEOUT_SECS: u64 = 300;
48
49/// Deadline for a local worker's FIRST frame after a Run is dispatched. A warm
50/// worker answers in seconds; a cold spawn within tens. Total silence past this
51/// means the worker is dead (e.g. a pooled worker that exited right after its
52/// liveness check) and its Run is queued with nobody to serve it — trip it so the
53/// runner respawns once instead of hanging forever. Generous, to never false-trip
54/// a slow-but-healthy cold start.
55const WORKER_FIRST_FRAME_TIMEOUT: Duration = Duration::from_secs(60);
56
57/// A warm worker on the mailbox bus, parked for reuse between runs. It stays
58/// dialed-in + subscribed to `mailbox_id`; the next interchangeable child
59/// delivers its `Run` there instead of spawning a fresh process. Dropping it
60/// kills a local kill-on-drop subprocess; a remote / schedulable handle is
61/// process-less (`kill()` is a no-op — it self-manages via its idle timeout).
62struct PooledWorker {
63    worker: SpawnedChild,
64    /// The bus mailbox this worker subscribes to (where its `Run`s are delivered).
65    mailbox_id: String,
66}
67
68/// A role pinned to a remote resident worker (remote-actor-plan §3.4 / P1.5,
69/// #193), resolved at runner-build time from `SubagentsConfig.remote_placements`:
70/// the env-named bearer is already READ into `token` here (the raw token never
71/// rides the config), and `ca_cert_file` is the path to a PEM pinning a
72/// self-signed worker cert (`None` ⇒ default webpki roots / plaintext `ws://`).
73#[derive(Debug, Clone)]
74pub struct ResolvedRemotePlacement {
75    pub endpoint: String,
76    pub token: Option<String>,
77    pub ca_cert_file: Option<PathBuf>,
78    /// Display name for the machine this role runs on — the matching cluster
79    /// node's `label`/host, surfaced on the UI placement badge. `None` ⇒ derive
80    /// from the endpoint host.
81    pub host_label: Option<String>,
82}
83
84/// A role routed to a SCHEDULED worker (remote-actor-plan §3.4 / P2b, #181),
85/// resolved at runner-build time from `SubagentsConfig.schedulable_placements`.
86/// Names the logical `pool` (= the bus role) whose LIVE connected workers are the
87/// scheduling candidates — the runner picks one via the bus presence query
88/// (`BrokerClient::list_connected`). Phase 3 retired the old HTTP registry, so a
89/// pool is now just a role on the bus.
90#[derive(Debug, Clone)]
91pub struct ResolvedSchedulablePlacement {
92    pub pool: String,
93    /// Display name for the machine this pool's workers run on — the matching
94    /// cluster node's `label`/host, surfaced on the UI placement badge. `None` ⇒
95    /// fall back to the pool name.
96    pub host_label: Option<String>,
97}
98
99/// How `execute_external_child` should obtain its worker connection, decided
100/// once from `spec.placement`. Splits the divergent acquire/connect + retire
101/// logic three ways while the shared middle (Run dispatch, live registration,
102/// drive, close) stays identical. `Local` is the unchanged pre-#193 path;
103/// `Remote` is the unchanged #194 path; `Schedulable` (#181, P2b) is new.
104enum PlacementKind {
105    Local,
106    Remote,
107    Schedulable,
108}
109
110/// Spawns and drives a child session as an independent actor: a `bamboo-subagent` worker process.
111pub struct ActorChildRunner {
112    agent_id: String,
113    worker_bin: PathBuf,
114    worker_args: Vec<String>,
115    fabric_dir: PathBuf,
116    executor: ExecutorSpec,
117    /// Per-provider credentials snapshotted from the parent config at build
118    /// time; the spec carries only the ONE the child's provider needs.
119    credentials: Vec<ScopedCredential>,
120    /// Parent's default provider (used when the child has no explicit one).
121    default_provider: String,
122    /// The mailbox bus to run local children over (the unified transport). Local
123    /// sub-agents require it; `None` only when no broker could be embedded.
124    bus: Option<bamboo_subagent::BusEndpoint>,
125    /// Backpressure: bounds the number of concurrently *running* actors; further
126    /// runs wait for a slot instead of exploding the process table. (Idle pooled
127    /// workers do not hold a slot.)
128    concurrency: std::sync::Arc<tokio::sync::Semaphore>,
129    /// Warm-worker pool keyed by a reuse fingerprint
130    /// (role/provider/model/workspace/disabled-tools/baked-caps). A finished run
131    /// parks its bus worker here so the next interchangeable child reuses it
132    /// (delivers its `Run` to the same mailbox) instead of spawning a fresh
133    /// process — collapsing N sibling sub-agents onto a few warm workers.
134    pool: Arc<tokio::sync::Mutex<HashMap<String, Vec<PooledWorker>>>>,
135    max_idle_per_key: usize,
136    /// Host-side decision for a child's gated-tool approval request (Phase 2).
137    /// `None` ⇒ fail-closed DENY (the safe default). A wired decider (policy or
138    /// human-routing bridge) returns approve/deny over the actor WS.
139    approval_decider: Option<Arc<dyn ChildApprovalDecider>>,
140    /// Per-run escalation host bridge for non-bypass child-approval routing (#68;
141    /// Phase 6, Part B). The owning worker's `run()` installs its OWN host bridge
142    /// here via `set_escalation_bridge`; `execute_external_child` CAPTURES it at
143    /// grandchild-spawn time and hands the owned value to `drive()`, which uses it
144    /// to RE-PROXY a child's approval request UP to the parent run — chaining up
145    /// every level until a bypass level (model-review) or the top orchestrator
146    /// (human) decides, then relaying the reply back down. Was a process-global
147    /// slot; now per-runner so a fire-and-forget grandchild that OUTLIVES the run
148    /// that spawned it keeps that run's bridge for its whole lifetime instead of
149    /// reading a stale/overwritten global at approval time (→ fail-closed deny).
150    escalation_bridge: Arc<std::sync::Mutex<Option<bamboo_subagent::executor::HostBridge>>>,
151    /// Roles pinned to a REMOTE resident worker (#193), keyed by sub-agent role
152    /// (the child's `subagent_type`). A role present here routes through the
153    /// dedicated remote branch in `execute_external_child` (Bearer-authenticated
154    /// `wss://` connect, no spawn, no pool, no kill) instead of the local
155    /// subprocess + warm-pool path. Empty (the default) = all-local behavior.
156    remote_placements: HashMap<String, ResolvedRemotePlacement>,
157    /// Roles routed to a REGISTRY-SCHEDULED worker (#181, P2b), keyed by sub-agent
158    /// role. A role present here (AND not already in `remote_placements`, which
159    /// wins) routes through the dedicated SCHEDULABLE branch in
160    /// `execute_external_child`: query the registry for live workers in the pool,
161    /// pick one (round-robin), connect over `wss://` — no spawn, no pool, no kill,
162    /// and NO local-subprocess fallback (no live worker ⇒ a clear error). Empty
163    /// (the default) = all-local behavior.
164    schedulable_placements: HashMap<String, ResolvedSchedulablePlacement>,
165    /// Per-pool round-robin cursor for schedulable scheduling (#181, P2b). Bumped
166    /// once per pick so successive sibling spawns SPREAD across a pool's live
167    /// workers instead of all landing on the first candidate. Best-effort spread,
168    /// not a load balancer — the registry's live set can change between picks.
169    schedule_cursor: Arc<std::sync::Mutex<HashMap<String, usize>>>,
170}
171
172/// Decides how the host answers a child worker's gated-tool approval request
173/// (Phase 2: child → parent approval delegation). Async so an implementation
174/// can consult a policy or defer to a human. With no decider wired the host
175/// replies with a fail-closed DENY.
176///
177/// NOTE: `decide` is awaited inside the per-child frame pump, so an
178/// implementation must resolve promptly (e.g. a policy lookup). A human-in-the-
179/// loop decision that may block indefinitely should instead be delivered
180/// out-of-band as a `ParentFrame::ApprovalReply` via the live steering channel
181/// (`super::live`), which `drive()` already forwards to the worker without
182/// stalling the pump.
183#[async_trait]
184pub trait ChildApprovalDecider: Send + Sync {
185    /// Decide whether `child_session_id` may perform the gated action described
186    /// by `request` (`{tool_name, permission, resource}`).
187    async fn decide(&self, child_session_id: &str, request: &serde_json::Value) -> bool;
188}
189
190/// Resolve a child approval request to approve/deny. Fail-closed (DENY) when no
191/// decider is wired — the single, testable seam for the host-side decision.
192async fn decide_child_approval(
193    decider: Option<&Arc<dyn ChildApprovalDecider>>,
194    child_session_id: &str,
195    request: &serde_json::Value,
196) -> bool {
197    match decider {
198        Some(decider) => decider.decide(child_session_id, request).await,
199        None => false,
200    }
201}
202
203/// How long the host waits for a human approval decision before failing the
204/// child's gated tool closed (DENY). Bounds an unanswered request so it can't
205/// hang the worker indefinitely.
206const CHILD_APPROVAL_TIMEOUT: Duration = Duration::from_secs(300);
207
208/// Extract `(tool_name, permission, resource)` from a worker's approval request
209/// body (`{tool_name, permission, resource}`); missing fields default to empty.
210fn approval_request_fields(body: &serde_json::Value) -> (String, String, String) {
211    let field = |k: &str| {
212        body.get(k)
213            .and_then(|v| v.as_str())
214            .unwrap_or("")
215            .to_string()
216    };
217    (field("tool_name"), field("permission"), field("resource"))
218}
219
220/// Off-loop reviewer for a child's gated-tool approval request (Phase 6, Part B).
221///
222/// Installed (process-global) by a BYPASSED self-orchestrating worker so its
223/// children's forced-ask (dangerous) gated actions — which still raise
224/// `ConfirmationRequired` even under bypass — get an LLM reasonableness check
225/// rather than a blind pass. `review` is an LLM call: `drive()` invokes it in a
226/// SPAWNED task (NEVER in the frame pump) and delivers the verdict async via the
227/// live channel, so the agent loop is never blocked.
228#[async_trait]
229pub trait ChildApprovalReviewer: Send + Sync {
230    /// Judge whether the gated action `request` (`{tool_name, permission,
231    /// resource}`) is reasonable for `child_session_id`'s task. `true` = approve.
232    async fn review(&self, child_session_id: &str, request: &serde_json::Value) -> bool;
233}
234
235fn child_approval_reviewer_slot() -> &'static std::sync::OnceLock<Arc<dyn ChildApprovalReviewer>> {
236    static SLOT: std::sync::OnceLock<Arc<dyn ChildApprovalReviewer>> = std::sync::OnceLock::new();
237    &SLOT
238}
239
240/// Install the process-global child-approval reviewer (idempotent; first wins).
241pub fn set_child_approval_reviewer(reviewer: Arc<dyn ChildApprovalReviewer>) {
242    let _ = child_approval_reviewer_slot().set(reviewer);
243}
244
245/// The process-global child-approval reviewer, if installed.
246pub fn child_approval_reviewer() -> Option<Arc<dyn ChildApprovalReviewer>> {
247    child_approval_reviewer_slot().get().cloned()
248}
249
250impl ActorChildRunner {
251    #[allow(clippy::too_many_arguments)]
252    pub fn new(
253        agent_id: String,
254        worker_bin: PathBuf,
255        worker_args: Vec<String>,
256        fabric_dir: PathBuf,
257        executor: ExecutorSpec,
258        credentials: Vec<ScopedCredential>,
259        default_provider: String,
260        max_concurrent: usize,
261    ) -> Self {
262        Self {
263            agent_id,
264            worker_bin,
265            worker_args,
266            fabric_dir,
267            executor,
268            credentials,
269            default_provider,
270            bus: None,
271            concurrency: std::sync::Arc::new(tokio::sync::Semaphore::new(max_concurrent.max(1))),
272            pool: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
273            max_idle_per_key: DEFAULT_MAX_IDLE_PER_KEY,
274            approval_decider: None,
275            escalation_bridge: Arc::new(std::sync::Mutex::new(None)),
276            remote_placements: HashMap::new(),
277            schedulable_placements: HashMap::new(),
278            schedule_cursor: Arc::new(std::sync::Mutex::new(HashMap::new())),
279        }
280    }
281
282    /// Run children over the mailbox bus (the unified actor+mailbox transport).
283    /// When set, local children dial this bus and are driven by mailbox id; when
284    /// unset they use the legacy direct-WS path. The server passes its in-process
285    /// broker here (`subagents.broker`); tests without a broker leave it unset.
286    pub fn with_bus(mut self, bus: Option<bamboo_subagent::BusEndpoint>) -> Self {
287        self.bus = bus.filter(|b| !b.endpoint.trim().is_empty());
288        self
289    }
290
291    /// Wire the host-side decider for child gated-tool approval requests
292    /// (Phase 2). Without this the host fail-closed DENYs every request.
293    pub fn with_approval_decider(mut self, decider: Arc<dyn ChildApprovalDecider>) -> Self {
294        self.approval_decider = Some(decider);
295        self
296    }
297
298    /// Pin specific sub-agent roles to remote resident workers (#193). The map
299    /// is keyed by role (`subagent_type`); a child whose role is present connects
300    /// over `wss://` to the resolved endpoint instead of spawning a local
301    /// subprocess. Default (empty) keeps every role on the local path — exactly
302    /// today's behavior.
303    pub fn with_remote_placements(
304        mut self,
305        placements: HashMap<String, ResolvedRemotePlacement>,
306    ) -> Self {
307        self.remote_placements = placements;
308        self
309    }
310
311    /// Route specific sub-agent roles to a registry-SCHEDULED worker (#181, P2b).
312    /// The map is keyed by role (`subagent_type`); a child whose role is present
313    /// (and NOT already pinned by `remote_placements`, which takes precedence) is
314    /// run on a live worker discovered from the registry instead of a local
315    /// subprocess. Default (empty) keeps every role on the local path.
316    pub fn with_schedulable_placements(
317        mut self,
318        placements: HashMap<String, ResolvedSchedulablePlacement>,
319    ) -> Self {
320        self.schedulable_placements = placements;
321        self
322    }
323
324    /// Reuse fingerprint: two children are interchangeable on one warm worker iff
325    /// they share role, provider, model, workspace, disabled-tool set, AND every
326    /// capability the worker BAKES at provision time (`BambooRuntimeExecutor`
327    /// stamps these once and reuses them across runs): nesting depth, nested-spawn
328    /// stack, bypass mode, permission enforcement, and the depth cap. Omitting any
329    /// of these lets the pool hand a run a worker baked for a DIFFERENT posture —
330    /// e.g. a depth-1 worker (with its own spawn stack) reused for a depth-4
331    /// child would re-stamp `spawn_depth=1` and pass the depth-cap check, breaking
332    /// the recursion bound; or a bypass worker reused for a non-bypass child. So
333    /// these MUST split the pool bucket. Everything else (assignment, history) is
334    /// shipped per-run in the `RunSpec` and does not affect the fingerprint.
335    /// Reuse fingerprint (role/provider/model/workspace/disabled-tools/baked
336    /// caps): two children with the same fingerprint are interchangeable on one
337    /// warm worker, so they share a pool bucket. Any axis the worker bakes ONCE
338    /// at provision time MUST be in here, else a worker baked for one posture
339    /// gets reused for another (see the `fingerprint_*` tests).
340    fn fingerprint(spec: &ProvisionSpec) -> String {
341        let role = spec.identity.role.as_str();
342        let (provider, model) = spec
343            .model
344            .as_ref()
345            .map(|m| (m.provider.as_str(), m.model.as_str()))
346            .unwrap_or(("", ""));
347        let workspace = spec.workspace.as_deref().unwrap_or("");
348        let mut tools = spec.disabled_tools.clone().unwrap_or_default();
349        tools.sort();
350        let caps = &spec.capabilities;
351        format!(
352            "{role}\u{1}{provider}\u{1}{model}\u{1}{workspace}\u{1}{}\u{1}d={}\u{1}ns={}\u{1}by={}\u{1}ep={}\u{1}md={}\u{1}nha={}\u{1}gro={}",
353            tools.join(","),
354            spec.identity.depth,
355            caps.nested_spawn,
356            caps.bypass,
357            caps.enforce_permissions,
358            caps.max_spawn_depth.unwrap_or(0),
359            // #73 review (P1): a worker bakes `no_human_review` ONCE from this flag
360            // at build() and never re-reads it per run, so the pool MUST NOT hand a
361            // worker baked for one approval posture to a run of the opposite one —
362            // else a scheduled-root worker reused for an interactive child would
363            // silently model-review instead of asking the human (and vice-versa,
364            // reintroducing the 300s-deny). Split the bucket on it.
365            caps.no_human_approver,
366            // #71: the read-only Bash checker is baked once at build() from this
367            // flag, so a guardian-reviewer worker must NOT be reused for an
368            // ordinary child (which expects unrestricted Bash), and vice-versa.
369            caps.guardian_read_only,
370        )
371    }
372
373    /// Check out a warm bus worker for `key`, reusing a live parked one if any,
374    /// else spawning a fresh one that dials the bus. The returned worker is OWNED
375    /// by the caller for the run's duration (checkout removes it from the pool, so
376    /// a concurrent sibling gets a different worker or spawns its own — one run per
377    /// worker at a time, matching the pre-bus pool semantics).
378    async fn acquire_bus_worker(
379        &self,
380        key: &str,
381        spec: &ProvisionSpec,
382    ) -> crate::runtime::runner::Result<PooledWorker> {
383        // Drain the bucket, skipping (and reaping) any worker whose process exited
384        // while parked. A live one is handed straight out for reuse.
385        loop {
386            let candidate = {
387                let mut pool = self.pool.lock().await;
388                pool.get_mut(key).and_then(|bucket| bucket.pop())
389            };
390            let Some(mut candidate) = candidate else { break };
391            if candidate.worker.is_alive() {
392                return Ok(candidate);
393            }
394            candidate.worker.kill().await;
395        }
396
397        let spawned = spawn_worker_on_bus(&self.worker_bin, &self.worker_args, spec)
398            .await
399            .map_err(|e| AgentError::LLM(format!("actor spawn (bus) failed: {e}")))?;
400        let mailbox_id = spawned.record.agent_id.clone();
401        Ok(PooledWorker {
402            worker: spawned,
403            mailbox_id,
404        })
405    }
406
407    /// Park a warm bus worker for reuse after a clean run; if its bucket is full
408    /// (or it died), kill it instead. The worker stays dialed-in + subscribed
409    /// while parked, so a reusing child just delivers a new `Run` to its mailbox.
410    async fn release_bus_worker(&self, key: &str, mut worker: PooledWorker) {
411        if !worker.worker.is_alive() {
412            worker.worker.kill().await;
413            return;
414        }
415        let mut pool = self.pool.lock().await;
416        let bucket = pool.entry(key.to_string()).or_default();
417        if bucket.len() >= self.max_idle_per_key {
418            drop(pool);
419            worker.worker.kill().await;
420            return;
421        }
422        bucket.push(worker);
423    }
424
425    /// Assemble the parent-resolved provisioning document for this child.
426    fn build_spec(&self, session: &Session, job: &SpawnJob) -> ProvisionSpec {
427        let mut spec = ProvisionSpec::new(
428            ChildIdentity {
429                child_id: job.child_session_id.clone(),
430                parent_id: Some(job.parent_session_id.clone()),
431                project_key: None,
432                role: session
433                    .metadata
434                    .get("subagent_type")
435                    .cloned()
436                    .unwrap_or_else(|| "worker".to_string()),
437                // The child session already carries the correct depth
438                // (create_child_action's new_child_of did parent.spawn_depth+1);
439                // stamp it so the worker can re-establish it on its run session
440                // and enforce the max-depth cap across the actor boundary.
441                depth: session.spawn_depth,
442            },
443            self.executor.clone(),
444            self.fabric_dir.to_string_lossy().into_owned(),
445        );
446        spec.workspace = session.workspace.clone();
447        // Unified transport: when a bus is configured, the child dials it (no
448        // listen socket / file discovery) and the parent drives it by mailbox id.
449        spec.bus = self.bus.clone();
450        // Final model: the session's pinned model_ref (create.model / routing already applied),
451        // falling back to the job's bare model on the parent's default provider.
452        spec.model = session
453            .model_ref
454            .as_ref()
455            .map(|r| ModelRefSpec {
456                provider: r.provider.clone(),
457                model: r.model.clone(),
458            })
459            .or_else(|| {
460                let m = job.model.trim();
461                (!m.is_empty()).then(|| ModelRefSpec {
462                    provider: self.default_provider.clone(),
463                    model: m.to_string(),
464                })
465            });
466        spec.disabled_tools = job.disabled_tools.clone();
467        // Least-privilege secrets: only the credential for the child's provider.
468        let provider = spec
469            .model
470            .as_ref()
471            .map(|m| m.provider.as_str())
472            .filter(|p| !p.trim().is_empty())
473            .unwrap_or(&self.default_provider);
474        if let Some(cred) = self.credentials.iter().find(|c| c.provider == provider) {
475            spec.secrets.provider_credentials.push(cred.clone());
476        } else {
477            tracing::warn!(
478                "actor child {}: no credential found for provider '{}'",
479                job.child_session_id,
480                provider
481            );
482        }
483        // Phase 6 (direct nested execution): a worker BELOW the depth cap may
484        // orchestrate its OWN children — on startup it builds its own spawn
485        // stack and runs the real SubAgent tool (no host proxy). The cap (the
486        // SubAgent tool refuses to spawn at/over `max_spawn_depth`) bounds the
487        // recursion. Driven purely by the child's depth, so it auto-propagates
488        // down the tree without any extra config threading.
489        spec.capabilities.nested_spawn = session.spawn_depth < MAX_SPAWN_DEPTH;
490        spec.capabilities.max_spawn_depth = Some(MAX_SPAWN_DEPTH);
491        // #69: activate child-approval review. Sub-agents enforce permissions so
492        // their DANGEROUS actions (the worker uses a HIGH threshold) reach the
493        // parent for review — escalated to the human, or model-reviewed off-loop
494        // when the parent is in bypass. The worker installs no checker without
495        // this, so the whole review chain would otherwise stay dormant.
496        spec.capabilities.enforce_permissions = true;
497        // Propagate "bypass permissions" so a self-orchestrating worker knows it
498        // is a bypassed parent and installs the off-loop model-reviewer for its
499        // children's forced-ask actions (Phase 6, Part B). The child session
500        // already carries the inherited flag (create_child_action seeds it).
501        spec.capabilities.bypass = session
502            .agent_runtime_state
503            .as_ref()
504            .is_some_and(|s| s.bypass_permissions);
505        // #73: propagate "no interactive human approver" (headless / scheduled /
506        // deployed root, inherited by the child session). When set, the worker's
507        // per-run approval proxy model-reviews a gated action locally instead of
508        // escalating to a human who will never answer (which would 300s-deny).
509        spec.capabilities.no_human_approver = session
510            .agent_runtime_state
511            .as_ref()
512            .is_some_and(|s| s.no_human_approver);
513        // #71: mark a READ-ONLY Guardian reviewer so the worker installs the
514        // read-only Bash allowlist checker. The reviewer is spawned by
515        // `spawn_guardian_review` with `subagent_type == "guardian"` (the SAME
516        // marker the completion coordinator branches on to parse the verdict) AND
517        // the `guardian_read_only_disabled_tools` denylist. Keyed off that role
518        // marker (already read above to set `identity.role`), so it rides the same
519        // session-metadata path the denylist/subagent_type use — no new wire seam.
520        // Without this the worker keeps an UNRESTRICTED Bash, so the reviewer could
521        // still `rm -rf` / `git push` / `curl | sh`, defeating "read-only".
522        spec.capabilities.guardian_read_only =
523            session.metadata.get("subagent_type").map(String::as_str) == Some("guardian");
524        // #193: route this role to a REMOTE resident worker when one is pinned.
525        // `spec.identity.role` was just computed from `subagent_type` above; a
526        // match flips the placement to Remote and rides the worker's bearer on the
527        // scoped secrets envelope (TLS handshake / Authorization header only — the
528        // token is never logged). No match leaves the default `Placement::Local`,
529        // so the local path is byte-for-byte unchanged for every non-pinned role.
530        if let Some(placement) = self.remote_placements.get(spec.identity.role.as_str()) {
531            spec.placement = Placement::Remote {
532                endpoint: placement.endpoint.clone(),
533            };
534            spec.secrets.worker_auth_token = placement.token.clone();
535        } else if let Some(placement) = self.schedulable_placements.get(spec.identity.role.as_str())
536        {
537            // #181 (P2b): route this role to a SCHEDULED worker — ONLY when it is
538            // NOT already pinned to a fixed remote endpoint (the `else if` makes
539            // remote_placements take precedence for a role in both). The concrete
540            // worker is picked at run time in `execute_external_child` from the bus
541            // (a live connected worker of the pool role). No per-placement bearer
542            // now — the bus connection uses the bus token. No match in either map
543            // leaves the default `Placement::Local`.
544            spec.placement = Placement::Schedulable {
545                pool: placement.pool.clone(),
546            };
547        }
548        spec
549    }
550
551    /// The `metadata["placement"]` JSON to stamp on a child from its resolved
552    /// placement, preferring the matching cluster node's `host_label` (its
553    /// operator label/host) over the raw endpoint/pool. `None` for a Local child
554    /// (the DTO defaults it to the backend's own host). Split out of
555    /// `execute_external_child` so the role→placement→host resolution is unit-testable.
556    fn placement_stamp_for(&self, spec: &ProvisionSpec) -> Option<String> {
557        let host_label = match &spec.placement {
558            Placement::Remote { .. } => self
559                .remote_placements
560                .get(spec.identity.role.as_str())
561                .and_then(|p| p.host_label.as_deref()),
562            Placement::Schedulable { .. } => self
563                .schedulable_placements
564                .get(spec.identity.role.as_str())
565                .and_then(|p| p.host_label.as_deref()),
566            Placement::Local => None,
567        };
568        placement_metadata(&spec.placement, host_label)
569    }
570
571    /// Pick a live worker for a SCHEDULABLE role from the BUS (#181, Phase 3):
572    /// ask the broker which actors are connected serving the pool role (presence
573    /// is connection-truth — no HTTP registry, no leases, no connect-fail
574    /// failover), then round-robin one per resolve for spread. Returns the chosen
575    /// worker's mailbox id. An empty pool ⇒ a terminal `AgentError` — NEVER a
576    /// local-subprocess fallback (that would silently defeat the placement).
577    async fn resolve_schedulable_worker(
578        &self,
579        role: &str,
580    ) -> std::result::Result<String, AgentError> {
581        let pool = self
582            .schedulable_placements
583            .get(role)
584            .ok_or_else(|| {
585                AgentError::LLM(format!(
586                    "schedulable placement for role '{role}' vanished before scheduling"
587                ))
588            })?
589            .pool
590            .clone();
591        let bus = self.bus.as_ref().ok_or_else(|| {
592            AgentError::LLM(format!(
593                "schedulable role '{role}': no mailbox bus configured (subagents.broker)"
594            ))
595        })?;
596
597        // Ask the BUS who is connected serving the pool role — presence is
598        // connection-truth (no HTTP registry, no leases, no stale-record failover).
599        let mut q = bamboo_broker::BrokerClient::connect(
600            &bus.endpoint,
601            bamboo_subagent::AgentRef {
602                session_id: format!("sched-q-{role}"),
603                role: None,
604            },
605            &bus.token,
606        )
607        .await
608        .map_err(|e| AgentError::LLM(format!("schedulable role '{role}': bus connect failed: {e}")))?;
609        let candidates = q.list_connected(&pool).await.map_err(|e| {
610            AgentError::LLM(format!("schedulable role '{role}': bus presence query failed: {e}"))
611        })?;
612
613        if candidates.is_empty() {
614            return Err(AgentError::LLM(format!(
615                "schedulable role '{role}': no live worker in pool '{pool}' on the bus \
616                 (NOT spawning a local subprocess — a schedulable role has no local fallback)"
617            )));
618        }
619
620        // Round-robin: advance a per-pool cursor once per resolve so successive
621        // sibling spawns spread across the connected pool workers. No failover
622        // needed — a listed worker is connected NOW (the bus only lists live
623        // subscribers), so there is no stale-but-leased candidate to skip.
624        let idx = {
625            let mut cursors = self.schedule_cursor.lock().unwrap();
626            let cursor = cursors.entry(pool.clone()).or_insert(0);
627            let i = *cursor % candidates.len();
628            *cursor = cursor.wrapping_add(1);
629            i
630        };
631        Ok(candidates[idx].clone())
632    }
633}
634
635#[async_trait]
636impl ExternalChildRunner for ActorChildRunner {
637    async fn should_handle(&self, session: &Session) -> bool {
638        session.metadata.get("runtime.kind") == Some(&"external".to_string())
639            && session.metadata.get("external.protocol") == Some(&"actor".to_string())
640            && session.metadata.get("external.agent_id") == Some(&self.agent_id)
641    }
642
643    fn set_escalation_bridge(&self, bridge: Option<bamboo_subagent::executor::HostBridge>) {
644        *self.escalation_bridge.lock().unwrap() = bridge;
645    }
646
647    async fn execute_external_child(
648        &self,
649        session: &mut Session,
650        job: &SpawnJob,
651        event_tx: mpsc::Sender<AgentEvent>,
652        cancel_token: CancellationToken,
653    ) -> crate::runtime::runner::Result<()> {
654        // #68 CORRECTNESS CRUX: capture the per-run escalation bridge HERE, at the
655        // moment this grandchild is spawned — while the parent run's bridge is
656        // still in our slot — into an owned local handed to `drive()` for this
657        // grandchild's whole lifetime. A fire-and-forget grandchild that OUTLIVES
658        // the run that spawned it must NOT re-read `self.escalation_bridge` at
659        // approval time: by then `run()` may have cleared/overwritten it (a worker
660        // serves runs sequentially), and re-proxying through a closed bridge
661        // fail-closed denies. Capturing at spawn pins the right bridge per run.
662        let escalation = self.escalation_bridge.lock().unwrap().clone();
663        let assignment = extract_assignment(session);
664        let mut spec = self.build_spec(session, job);
665        // Mark the worker reusable + give it an idle timeout so it self-reaps if
666        // orphaned. Warm bus workers are pooled per fingerprint and reused.
667        spec.reusable = true;
668        if spec.limits.idle_timeout_secs.is_none() {
669            spec.limits.idle_timeout_secs = Some(POOLED_IDLE_TIMEOUT_SECS);
670        }
671        let pool_key = Self::fingerprint(&spec);
672        // Rehydration: the child session in the parent's store is the actor's
673        // durable state. Ship the full conversation so a reactivation
674        // (send_message / update / rerun) carries its history. A reused worker is
675        // stateless between runs, so this is also what isolates each child's
676        // context on a shared process.
677        let messages: Vec<serde_json::Value> = session
678            .messages
679            .iter()
680            .filter_map(|m| serde_json::to_value(m).ok())
681            .collect();
682
683        // Backpressure: hold a concurrency slot for the lifetime of the *run*
684        // (cancellation still proceeds — the cancel branch in drive() runs while
685        // we hold the permit). Released when this fn returns, i.e. once the worker
686        // is parked back into the pool, so idle workers don't pin slots.
687        let _slot = self
688            .concurrency
689            .acquire()
690            .await
691            .map_err(|_| AgentError::LLM("actor concurrency limiter closed".to_string()))?;
692
693        // Split LOCAL (spawn + warm-pool) from the two process-less remote paths
694        // ONLY at the divergent spots — acquire/connect here and the park/retire at
695        // the end. Everything between (Run dispatch, live-actor registration,
696        // drive, the close) is identical for all three. `kind` is the single guard.
697        //   - Local       (#0):  byte-for-byte the pre-#193 reuse-or-spawn path.
698        //   - Remote       (#194): connect to a FIXED resident endpoint, no spawn.
699        //   - Schedulable  (#181): resolve a live worker from the registry, connect.
700        let kind = match spec.placement {
701            Placement::Remote { .. } => PlacementKind::Remote,
702            Placement::Schedulable { .. } => PlacementKind::Schedulable,
703            Placement::Local => PlacementKind::Local,
704        };
705        let remote = !matches!(kind, PlacementKind::Local);
706
707        // Stamp WHICH machine this child runs on onto its session metadata, so the
708        // UI can show it (mirrored into the session index → SessionSummary.placement).
709        // Only remote/scheduled placements need a stamp — a Local child falls through
710        // to the DTO default (this backend's own host). Persisted by the caller with
711        // the rest of the child session after we return.
712        if let Some(placement_meta) = self.placement_stamp_for(&spec) {
713            session
714                .metadata
715                .insert("placement".to_string(), placement_meta);
716        }
717
718        // Retry-once loop: a pooled local worker can die between its liveness
719        // check and handling the Run (a tiny TOCTOU window) — its Run then sits
720        // queued with no server. The first-frame watchdog in `drive` surfaces that
721        // as `WorkerUnresponsive`; we reap the dead worker and re-acquire ONCE
722        // (which spawns fresh / reuses the next live one). Remote/schedulable have
723        // no spawn fallback, so they never retry.
724        let mut attempt = 0u8;
725        let (result, actor) = loop {
726        let (actor, mut client) = match kind {
727            PlacementKind::Remote => {
728                // REMOTE branch: connect to a resident worker. No spawn, no pool
729                // touch, no drain. We do not own the worker, so a connect failure
730                // has NO respawn fallback — it is a clear, terminal error.
731                let placement = self
732                    .remote_placements
733                    .get(spec.identity.role.as_str())
734                    .ok_or_else(|| {
735                        AgentError::LLM(format!(
736                            "remote placement for role '{}' vanished before connect",
737                            spec.identity.role
738                        ))
739                    })?;
740                let endpoint = placement.endpoint.clone();
741                // Build the TLS trust: a pinned CA pins a self-signed worker cert;
742                // otherwise default webpki roots (or plaintext for `ws://`).
743                let trust_cfg = match placement.ca_cert_file.as_deref() {
744                    Some(path) => Some(client_config_trusting_cert(path).map_err(|e| {
745                        AgentError::LLM(format!("remote worker CA cert '{}': {e}", path.display()))
746                    })?),
747                    None => None,
748                };
749                let client = ChildClient::connect_with_auth_tls(
750                    &endpoint,
751                    placement.token.as_deref(),
752                    trust_cfg,
753                )
754                .await
755                .map_err(|e| {
756                    AgentError::LLM(format!("remote actor connect to '{endpoint}' failed: {e}"))
757                })?;
758                // Process-less handle so live-actor registration (in-band steering)
759                // works exactly as for a local worker; `kill()` is a no-op.
760                let record = AgentRecord {
761                    agent_id: job.child_session_id.clone(),
762                    role: spec.identity.role.clone(),
763                    labels: Vec::new(),
764                    endpoint: endpoint.clone(),
765                    pid: 0,
766                    version: String::new(),
767                    started_at: chrono::Utc::now(),
768                    lease_expires_at: chrono::Utc::now(),
769                };
770                let _ = endpoint;
771                let actor = PooledWorker {
772                    worker: SpawnedChild::remote(record),
773                    mailbox_id: job.child_session_id.clone(),
774                };
775                let client: Box<dyn bamboo_subagent::ChildLink> = Box::new(client);
776                (actor, client)
777            }
778            PlacementKind::Schedulable => {
779                // SCHEDULABLE branch (#181): pick a LIVE worker of the pool role
780                // from the BUS (presence = connection-truth; no HTTP registry, no
781                // leases, no failover) and drive it by mailbox id. The pool worker
782                // stays connected and is reused next time. No spawn, no kill, NO
783                // local fallback — an empty pool is a terminal error (raised in
784                // resolve_schedulable_worker).
785                let bus = self.bus.as_ref().ok_or_else(|| {
786                    AgentError::LLM(
787                        "schedulable sub-agents require a mailbox bus (subagents.broker)"
788                            .to_string(),
789                    )
790                })?;
791                let mailbox_id = self
792                    .resolve_schedulable_worker(spec.identity.role.as_str())
793                    .await?;
794                let parent = bamboo_subagent::AgentRef {
795                    session_id: format!("p-{}", job.child_session_id),
796                    role: None,
797                };
798                let link = bamboo_broker::BrokerChildLink::connect(
799                    &bus.endpoint,
800                    parent,
801                    &bus.token,
802                    mailbox_id.clone(),
803                )
804                .await
805                .map_err(|e| {
806                    AgentError::LLM(format!("schedulable link connect to '{mailbox_id}' failed: {e}"))
807                })?;
808                // Process-less handle — a bus-resident pool worker is never ours to
809                // kill (remote ⇒ dropped, not pooled, after the run).
810                let actor = PooledWorker {
811                    worker: SpawnedChild::remote(AgentRecord {
812                        agent_id: mailbox_id.clone(),
813                        role: spec.identity.role.clone(),
814                        labels: Vec::new(),
815                        endpoint: bus.endpoint.clone(),
816                        pid: 0,
817                        version: String::new(),
818                        started_at: chrono::Utc::now(),
819                        lease_expires_at: chrono::Utc::now(),
820                    }),
821                    mailbox_id,
822                };
823                let client: Box<dyn bamboo_subagent::ChildLink> = Box::new(link);
824                (actor, client)
825            }
826            PlacementKind::Local => {
827                // LOCAL = the mailbox bus (the unified transport): check out a warm
828                // pooled worker (reuse a live parked one, else spawn fresh) and
829                // drive it by mailbox id — no listen socket, no file discovery, no
830                // respawn-on-connect-miss (the broker queues the Run until the
831                // worker handles it). The legacy direct-WS path was retired; the bus
832                // is required.
833                let bus = self.bus.as_ref().ok_or_else(|| {
834                    AgentError::LLM(
835                        "local sub-agents require a mailbox bus (subagents.broker); none is \
836                         configured and the bus could not be embedded"
837                            .to_string(),
838                    )
839                })?;
840                let actor = self.acquire_bus_worker(&pool_key, &spec).await?;
841                let parent = bamboo_subagent::AgentRef {
842                    session_id: format!("p-{}", job.child_session_id),
843                    role: None,
844                };
845                let link = bamboo_broker::BrokerChildLink::connect(
846                    &bus.endpoint,
847                    parent,
848                    &bus.token,
849                    actor.mailbox_id.clone(),
850                )
851                .await
852                .map_err(|e| AgentError::LLM(format!("broker child link connect failed: {e}")))?;
853                let client: Box<dyn bamboo_subagent::ChildLink> = Box::new(link);
854                (actor, client)
855            }
856        };
857
858        if let Err(e) = client
859            .send(ParentFrame::Run(RunSpec {
860                // Cloned (not moved) so a retry can re-dispatch to a fresh worker.
861                assignment: assignment.clone(),
862                reasoning_effort: None,
863                messages: messages.clone(),
864            }))
865            .await
866        {
867            if !remote {
868                actor.worker.kill().await;
869            }
870            return Err(AgentError::LLM(format!("actor run dispatch failed: {e}")));
871        }
872
873        // Register as a live actor so send_message (running, no interrupt) can
874        // steer this child in-band over the existing WS connection. The guard
875        // unregisters on every exit path.
876        let (live_tx, mut live_rx) = mpsc::unbounded_channel::<ParentFrame>();
877        let live_guard = super::live::register(&job.child_session_id, live_tx);
878
879        let result = drive(
880            &mut *client,
881            &job.child_session_id,
882            self.approval_decider.as_ref(),
883            escalation.clone(),
884            &event_tx,
885            &cancel_token,
886            &mut live_rx,
887            // First-frame watchdog for EVERY placement: a wedged-but-connected
888            // worker (subscribed ≠ serving — e.g. stuck on a prior LLM call) emits
889            // no first frame; without a deadline drive() blocks forever. Bounding it
890            // turns the "running-but-unresponsive" hang into a recoverable
891            // WorkerUnresponsive (reap+respawn local / re-pick schedulable / error
892            // on a fixed remote endpoint).
893            Some(WORKER_FIRST_FRAME_TIMEOUT),
894        )
895        .await;
896        // Unregister IMMEDIATELY: after drive returns nobody consumes live_rx,
897        // so a send_message landing in the close/park window below must see
898        // "not live" and take the durable-queue fallback instead of vanishing.
899        // (Even if one slipped in earlier, send_message also appends it to the
900        // durable transcript, so the next activation still rehydrates it.)
901        drop(live_guard);
902        // Close the parent link (dropping it closes our broker connection; the
903        // worker stays dialed-in + subscribed, ready for its next Run).
904        drop(client);
905
906        // No first frame ⇒ the worker is wedged. Recover ONCE before giving up:
907        //   - Local: reap the dead pooled worker + respawn.
908        //   - Schedulable: not ours to kill — drop it and re-select a live pool
909        //     member (a wedged worker must not fail the run when the pool has others).
910        //   - Remote: a FIXED endpoint has no alternative — fall through to a bounded
911        //     WorkerUnresponsive error (far better than the previous infinite hang).
912        if attempt == 0 && matches!(result, Err(AgentError::WorkerUnresponsive(_))) {
913            match kind {
914                PlacementKind::Local => {
915                    tracing::warn!(
916                        "actor child {} got no first frame; reaping the worker and respawning once",
917                        job.child_session_id
918                    );
919                    actor.worker.kill().await;
920                    attempt += 1;
921                    continue;
922                }
923                PlacementKind::Schedulable => {
924                    tracing::warn!(
925                        "scheduled actor child {} got no first frame; re-selecting a pool worker",
926                        job.child_session_id
927                    );
928                    drop(actor);
929                    attempt += 1;
930                    continue;
931                }
932                PlacementKind::Remote => {}
933            }
934        }
935        break (result, actor);
936        };
937
938        // Park the warm worker for reuse on a clean run, or kill it on
939        // error/cancel (a wedged worker must not be reused). Remote / schedulable
940        // workers are registry-managed — never ours to pool/kill, just drop.
941        if remote {
942            drop(actor);
943        } else {
944            match &result {
945                Ok(_) => self.release_bus_worker(&pool_key, actor).await,
946                Err(_) => actor.worker.kill().await,
947            }
948        }
949
950        // Write-back: persist the actor's final reply onto the child session so
951        // the transcript survives and the NEXT activation sees it as history.
952        // (run_child_spawn saves the session right after we return.)
953        match result {
954            Ok(Some(text)) => {
955                if !text.is_empty() {
956                    session.add_message(bamboo_agent_core::Message::assistant(text, None));
957                }
958                Ok(())
959            }
960            Ok(None) => Ok(()),
961            Err(e) => Err(e),
962        }
963    }
964}
965
966/// The `{kind,host}` placement descriptor stamped onto a child session's metadata
967/// under `"placement"` — read back by the storage index → `SessionSummary.placement`
968/// → the UI's machine badge. `None` for `Local` (those fall through to the DTO's
969/// default of this backend's own host). The value is a JSON string matching
970/// `bamboo_storage::SessionPlacement { kind, host }`.
971fn placement_metadata(placement: &Placement, host_label: Option<&str>) -> Option<String> {
972    // Prefer the cluster node's own label/host (its metadata) when the placement
973    // maps to a node; else fall back to the raw endpoint host / pool name.
974    let value = match placement {
975        Placement::Local => return None,
976        Placement::Remote { endpoint } => serde_json::json!({
977            "kind": "remote",
978            "host": host_label.map(str::to_string).unwrap_or_else(|| host_of_endpoint(endpoint)),
979        }),
980        Placement::Schedulable { pool } => serde_json::json!({
981            "kind": "remote",
982            "host": host_label.unwrap_or(pool),
983        }),
984    };
985    serde_json::to_string(&value).ok()
986}
987
988/// Extract the host from a `ws[s]://host:port[/path]` bus endpoint, for display.
989fn host_of_endpoint(endpoint: &str) -> String {
990    endpoint
991        .trim()
992        .trim_start_matches("wss://")
993        .trim_start_matches("ws://")
994        .split(['/', ':'])
995        .next()
996        .unwrap_or(endpoint)
997        .to_string()
998}
999
1000/// Pump child frames -> parent events until a terminal frame (or cancellation).
1001/// On success, yields the actor's final result text (for session write-back).
1002/// `live_rx` carries in-band frames (steering messages) from the live registry.
1003///
1004/// `escalation_bridge` (#68) is the per-run escalation host bridge CAPTURED BY
1005/// VALUE at spawn time in `execute_external_child` (NOT read live here): when a
1006/// non-bypass child re-proxies an approval request, this owned bridge routes it
1007/// UP to the parent run. Owning it for the call's lifetime is what lets a
1008/// fire-and-forget grandchild that outlives its spawning run still escalate to
1009/// the correct (then-current) parent bridge rather than a stale/overwritten one.
1010async fn drive(
1011    client: &mut dyn bamboo_subagent::ChildLink,
1012    child_session_id: &str,
1013    approval_decider: Option<&Arc<dyn ChildApprovalDecider>>,
1014    escalation_bridge: Option<bamboo_subagent::executor::HostBridge>,
1015    event_tx: &mpsc::Sender<AgentEvent>,
1016    cancel_token: &CancellationToken,
1017    live_rx: &mut mpsc::UnboundedReceiver<ParentFrame>,
1018    first_frame_timeout: Option<Duration>,
1019) -> crate::runtime::runner::Result<Option<String>> {
1020    // First-frame watchdog: a live worker emits its first frame (run-started /
1021    // first token) within seconds; total silence past the deadline means the
1022    // worker is dead (e.g. a pooled worker that exited right after checkout), so
1023    // its Run sits queued forever. We trip ONLY before the first frame — once any
1024    // frame arrives the worker is proven live and a legitimately long run (a slow
1025    // tool between tokens) never trips it.
1026    let mut got_first_frame = false;
1027    let mut first_frame_watch = first_frame_timeout.map(|d| Box::pin(tokio::time::sleep(d)));
1028    loop {
1029        tokio::select! {
1030            _ = cancel_token.cancelled() => {
1031                // fall through to the cancel handling below
1032                break;
1033            }
1034            _ = async {
1035                match first_frame_watch.as_mut() {
1036                    Some(s) => s.as_mut().await,
1037                    None => std::future::pending::<()>().await,
1038                }
1039            }, if !got_first_frame => {
1040                return Err(AgentError::WorkerUnresponsive(format!(
1041                    "child {child_session_id} produced no frame within {:?}",
1042                    first_frame_timeout.unwrap_or_default()
1043                )));
1044            }
1045            Some(frame) = live_rx.recv() => {
1046                // Forward in-band steering to the worker over the existing WS.
1047                if client.send(frame).await.is_err() {
1048                    tracing::warn!("live steering frame could not be sent; connection failing");
1049                }
1050            }
1051            frame = client.next_frame() => {
1052                // Any frame (event / approval / terminal / close / error) proves
1053                // the worker responded — disarm the first-frame watchdog.
1054                got_first_frame = true;
1055                first_frame_watch = None;
1056                match frame {
1057                    Ok(Some(ChildFrame::Event { event })) => {
1058                        // AgentEvent is serialized verbatim on the wire (zero mapping).
1059                        if let Ok(ev) = serde_json::from_value::<AgentEvent>(event) {
1060                            let _ = event_tx.send(ev).await;
1061                        }
1062                    }
1063                    Ok(Some(ChildFrame::ApprovalRequest { id, body })) => {
1064                        // Phase 2: a worker proxied a gated-tool approval back to
1065                        // the host. The WORKER side is live — its executor installs
1066                        // a per-run task-local `ApprovalProxy` (subagent_worker.rs)
1067                        // that calls `host.approval_call`, so this frame arrives
1068                        // when a child hits `ConfirmationRequired`.
1069                        if let Some(reviewer) = child_approval_reviewer() {
1070                            // Phase 6, Part B: a BYPASSED parent worker
1071                            // model-reviews its children's forced-ask (dangerous)
1072                            // actions. The review is an LLM call, so run it OFF
1073                            // the frame pump in a spawned task and deliver the
1074                            // verdict async via the live channel — the pump keeps
1075                            // forwarding events and the agent loop never blocks. A
1076                            // timeout denies a hung review so the child can't hang.
1077                            let child = child_session_id.to_string();
1078                            let req_id = id.clone();
1079                            let body = body.clone();
1080                            tokio::spawn(async move {
1081                                let approved = tokio::time::timeout(
1082                                    CHILD_APPROVAL_TIMEOUT,
1083                                    reviewer.review(&child, &body),
1084                                )
1085                                .await
1086                                .unwrap_or(false);
1087                                super::live::deliver_approval(&child, &req_id, approved);
1088                            });
1089                        } else if approval_decider.is_some() {
1090                            // A decider is wired (policy / auto): decide promptly
1091                            // and reply inline. (Must not block the pump — see the
1092                            // `ChildApprovalDecider` doc.)
1093                            let approved =
1094                                decide_child_approval(approval_decider, child_session_id, &body)
1095                                    .await;
1096                            if client
1097                                .send(ParentFrame::ApprovalReply { id, approved })
1098                                .await
1099                                .is_err()
1100                            {
1101                                tracing::warn!(
1102                                    "failed to answer approval_request; connection failing"
1103                                );
1104                            }
1105                        } else if let Some(host) = escalation_bridge.clone() {
1106                            // Non-bypass WORKER: ESCALATE up our own actor link
1107                            // (re-proxy) so the request chains to our parent — and
1108                            // up every level until a bypass level (model-review) or
1109                            // the top orchestrator (human) decides. Off-loop so the
1110                            // pump never blocks; relay the reply down to the child.
1111                            let child = child_session_id.to_string();
1112                            let req_id = id.clone();
1113                            let body = body.clone();
1114                            tokio::spawn(async move {
1115                                let approved = match tokio::time::timeout(
1116                                    CHILD_APPROVAL_TIMEOUT,
1117                                    host.approval_call(body),
1118                                )
1119                                .await
1120                                {
1121                                    Ok(Ok(reply)) => reply
1122                                        .get("approved")
1123                                        .and_then(|v| v.as_bool())
1124                                        .unwrap_or(false),
1125                                    // Transport error or timeout ⇒ fail closed.
1126                                    _ => false,
1127                                };
1128                                super::live::deliver_approval(&child, &req_id, approved);
1129                            });
1130                        } else {
1131                            // Top orchestrator (no escalation bridge): human-in-the-
1132                            // loop. Surface the request on the parent's event stream
1133                            // and DEFER — the decision arrives out-of-band via
1134                            // `live::deliver_approval(child, request_id, approved)`
1135                            // (→ this child's `live_rx` → forwarded to the worker
1136                            // above). A timeout denies a never-answered request so
1137                            // it can't hang the child forever.
1138                            let (tool_name, permission, resource) =
1139                                approval_request_fields(&body);
1140                            // Register the pending request BEFORE surfacing it so
1141                            // the external handler's `deliver_approval_checked` can
1142                            // correlate an out-of-band POST against a genuine
1143                            // human-loop request (and consume it one-shot).
1144                            super::live::register_pending_approval(child_session_id, &id);
1145                            let _ = event_tx
1146                                .send(AgentEvent::ChildApprovalRequested {
1147                                    child_session_id: child_session_id.to_string(),
1148                                    request_id: id.clone(),
1149                                    tool_name,
1150                                    permission,
1151                                    resource,
1152                                })
1153                                .await;
1154                            let child = child_session_id.to_string();
1155                            tokio::spawn(async move {
1156                                tokio::time::sleep(CHILD_APPROVAL_TIMEOUT).await;
1157                                // Deny only if still pending: a one-shot consume so
1158                                // we don't double-deliver if the human already
1159                                // answered (the POST took it), and so a late POST
1160                                // after this fires finds nothing pending.
1161                                if super::live::take_pending_approval(&child, &id) {
1162                                    super::live::deliver_approval(&child, &id, false);
1163                                }
1164                            });
1165                        }
1166                    }
1167                    Ok(Some(ChildFrame::Terminal { status, result, error, .. })) => {
1168                        return match status {
1169                            TerminalStatus::Completed => Ok(result),
1170                            TerminalStatus::Cancelled => Err(AgentError::Cancelled),
1171                            TerminalStatus::Error => Err(AgentError::LLM(
1172                                error.unwrap_or_else(|| "actor child errored".to_string()),
1173                            )),
1174                            // The suspend/resume round-trip (host re-dispatch of a
1175                            // nested parent) is not wired here yet; a worker in
1176                            // this build never emits Suspended, so this is
1177                            // unreachable in practice.
1178                            TerminalStatus::Suspended => Err(AgentError::LLM(
1179                                "nested sub-agent suspend received but resume transport is not wired"
1180                                    .to_string(),
1181                            )),
1182                        };
1183                    }
1184                    Ok(None) => {
1185                        return Err(AgentError::LLM(
1186                            "actor child closed before terminal".to_string(),
1187                        ));
1188                    }
1189                    Err(e) => {
1190                        return Err(AgentError::LLM(format!("actor transport error: {e}")));
1191                    }
1192                }
1193            }
1194        }
1195    }
1196
1197    // Only reached on cancellation: ask the child to stop (best-effort), then report cancelled.
1198    let _ = client.send(ParentFrame::Cancel).await;
1199    Err(AgentError::Cancelled)
1200}
1201
1202/// The assignment text = the child session's latest user message (falls back to its title).
1203fn extract_assignment(session: &Session) -> String {
1204    session
1205        .messages
1206        .iter()
1207        .rev()
1208        .find(|m| matches!(m.role, Role::User))
1209        .map(|m| m.content.clone())
1210        .unwrap_or_else(|| {
1211            session
1212                .metadata
1213                .get("title")
1214                .cloned()
1215                .unwrap_or_else(|| "Execute task".to_string())
1216        })
1217}
1218
1219#[cfg(test)]
1220mod tests {
1221    use super::*;
1222
1223    fn spec_with(
1224        role: &str,
1225        provider: &str,
1226        model: &str,
1227        workspace: Option<&str>,
1228        disabled: Option<Vec<&str>>,
1229    ) -> ProvisionSpec {
1230        let mut spec = ProvisionSpec::new(
1231            ChildIdentity {
1232                child_id: "c".into(),
1233                parent_id: None,
1234                project_key: None,
1235                role: role.into(),
1236                depth: 0,
1237            },
1238            ExecutorSpec::Echo,
1239            "/tmp/fab".into(),
1240        );
1241        spec.workspace = workspace.map(|w| w.to_string());
1242        spec.model = Some(ModelRefSpec {
1243            provider: provider.into(),
1244            model: model.into(),
1245        });
1246        spec.disabled_tools = disabled.map(|d| d.into_iter().map(String::from).collect());
1247        spec
1248    }
1249
1250    #[test]
1251    fn fingerprint_matches_interchangeable_children() {
1252        // Same role/provider/model/workspace and equal tool sets (order-insensitive)
1253        // are interchangeable on one warm worker — and differ only in child_id.
1254        let a = spec_with(
1255            "explorer",
1256            "p",
1257            "m",
1258            Some("/ws"),
1259            Some(vec!["Bash", "Edit"]),
1260        );
1261        let mut b = spec_with(
1262            "explorer",
1263            "p",
1264            "m",
1265            Some("/ws"),
1266            Some(vec!["Edit", "Bash"]),
1267        );
1268        b.identity.child_id = "other".into();
1269        assert_eq!(
1270            ActorChildRunner::fingerprint(&a),
1271            ActorChildRunner::fingerprint(&b)
1272        );
1273    }
1274
1275    #[test]
1276    fn fingerprint_separates_distinct_runtimes() {
1277        let base = spec_with("explorer", "p", "m", Some("/ws"), None);
1278        let base_fp = ActorChildRunner::fingerprint(&base);
1279        // Each axis that is baked into the worker must split the pool bucket.
1280        assert_ne!(
1281            base_fp,
1282            ActorChildRunner::fingerprint(&spec_with("writer", "p", "m", Some("/ws"), None))
1283        );
1284        assert_ne!(
1285            base_fp,
1286            ActorChildRunner::fingerprint(&spec_with("explorer", "p2", "m", Some("/ws"), None))
1287        );
1288        assert_ne!(
1289            base_fp,
1290            ActorChildRunner::fingerprint(&spec_with("explorer", "p", "m2", Some("/ws"), None))
1291        );
1292        assert_ne!(
1293            base_fp,
1294            ActorChildRunner::fingerprint(&spec_with("explorer", "p", "m", Some("/ws2"), None))
1295        );
1296        assert_ne!(
1297            base_fp,
1298            ActorChildRunner::fingerprint(&spec_with(
1299                "explorer",
1300                "p",
1301                "m",
1302                Some("/ws"),
1303                Some(vec!["Bash"])
1304            ))
1305        );
1306    }
1307
1308    #[test]
1309    fn fingerprint_splits_on_baked_capabilities() {
1310        // Every capability baked once at provision time must split the pool
1311        // bucket, else a worker baked for one posture gets reused for another
1312        // (e.g. a depth-1 worker re-stamping spawn_depth onto a depth-4 child,
1313        // breaking the depth cap; or a bypass worker reused for a non-bypass one).
1314        let base_fp =
1315            ActorChildRunner::fingerprint(&spec_with("explorer", "p", "m", Some("/ws"), None));
1316
1317        let mut depth = spec_with("explorer", "p", "m", Some("/ws"), None);
1318        depth.identity.depth = 2;
1319        assert_ne!(
1320            base_fp,
1321            ActorChildRunner::fingerprint(&depth),
1322            "depth must split"
1323        );
1324
1325        let mut nested = spec_with("explorer", "p", "m", Some("/ws"), None);
1326        nested.capabilities.nested_spawn = true;
1327        assert_ne!(
1328            base_fp,
1329            ActorChildRunner::fingerprint(&nested),
1330            "nested_spawn must split"
1331        );
1332
1333        let mut bypass = spec_with("explorer", "p", "m", Some("/ws"), None);
1334        bypass.capabilities.bypass = true;
1335        assert_ne!(
1336            base_fp,
1337            ActorChildRunner::fingerprint(&bypass),
1338            "bypass must split"
1339        );
1340
1341        let mut enforce = spec_with("explorer", "p", "m", Some("/ws"), None);
1342        enforce.capabilities.enforce_permissions = true;
1343        assert_ne!(
1344            base_fp,
1345            ActorChildRunner::fingerprint(&enforce),
1346            "enforce_permissions must split"
1347        );
1348
1349        let mut cap = spec_with("explorer", "p", "m", Some("/ws"), None);
1350        cap.capabilities.max_spawn_depth = Some(8);
1351        assert_ne!(
1352            base_fp,
1353            ActorChildRunner::fingerprint(&cap),
1354            "max_spawn_depth must split"
1355        );
1356
1357        // #73 (P1): the worker bakes `no_human_review` from this flag once at
1358        // build(), so it MUST split the pool or a worker baked for one approval
1359        // posture is reused for the opposite one.
1360        let mut nha = spec_with("explorer", "p", "m", Some("/ws"), None);
1361        nha.capabilities.no_human_approver = true;
1362        assert_ne!(
1363            base_fp,
1364            ActorChildRunner::fingerprint(&nha),
1365            "no_human_approver must split"
1366        );
1367
1368        // #71: the read-only Bash checker is baked once at build() from this flag,
1369        // so a guardian reviewer worker must not be reused for an ordinary child.
1370        let mut gro = spec_with("explorer", "p", "m", Some("/ws"), None);
1371        gro.capabilities.guardian_read_only = true;
1372        assert_ne!(
1373            base_fp,
1374            ActorChildRunner::fingerprint(&gro),
1375            "guardian_read_only must split"
1376        );
1377    }
1378
1379    struct StaticDecider(bool);
1380
1381    #[async_trait]
1382    impl ChildApprovalDecider for StaticDecider {
1383        async fn decide(&self, _child: &str, _req: &serde_json::Value) -> bool {
1384            self.0
1385        }
1386    }
1387
1388    // ---- first-frame watchdog (dead-pooled-worker recovery) -----------------
1389
1390    /// A link that never yields a frame — models a worker that died (or never
1391    /// subscribed) so its Run sits queued with no server.
1392    struct SilentLink;
1393    #[async_trait]
1394    impl bamboo_subagent::ChildLink for SilentLink {
1395        async fn send(&mut self, _: ParentFrame) -> bamboo_subagent::TransportResult<()> {
1396            Ok(())
1397        }
1398        async fn next_frame(
1399            &mut self,
1400        ) -> bamboo_subagent::TransportResult<Option<ChildFrame>> {
1401            std::future::pending().await
1402        }
1403    }
1404
1405    /// A link that immediately yields one terminal frame (a healthy fast worker).
1406    struct InstantTerminalLink {
1407        done: bool,
1408    }
1409    #[async_trait]
1410    impl bamboo_subagent::ChildLink for InstantTerminalLink {
1411        async fn send(&mut self, _: ParentFrame) -> bamboo_subagent::TransportResult<()> {
1412            Ok(())
1413        }
1414        async fn next_frame(
1415            &mut self,
1416        ) -> bamboo_subagent::TransportResult<Option<ChildFrame>> {
1417            if self.done {
1418                std::future::pending().await
1419            } else {
1420                self.done = true;
1421                Ok(Some(ChildFrame::Terminal {
1422                    status: TerminalStatus::Completed,
1423                    result: Some("done".into()),
1424                    error: None,
1425                    transcript: vec![],
1426                }))
1427            }
1428        }
1429    }
1430
1431    #[tokio::test]
1432    async fn drive_trips_first_frame_watchdog_on_a_silent_worker() {
1433        let (event_tx, _rx) = mpsc::channel::<AgentEvent>(8);
1434        let cancel = CancellationToken::new();
1435        let (_live_tx, mut live_rx) = mpsc::unbounded_channel::<ParentFrame>();
1436        let mut link = SilentLink;
1437        let r = drive(
1438            &mut link,
1439            "child-x",
1440            None,
1441            None,
1442            &event_tx,
1443            &cancel,
1444            &mut live_rx,
1445            Some(Duration::from_millis(100)),
1446        )
1447        .await;
1448        assert!(
1449            matches!(r, Err(AgentError::WorkerUnresponsive(_))),
1450            "a silent worker must trip the first-frame watchdog, got {r:?}"
1451        );
1452    }
1453
1454    #[tokio::test]
1455    async fn drive_does_not_trip_when_a_frame_arrives() {
1456        let (event_tx, _rx) = mpsc::channel::<AgentEvent>(8);
1457        let cancel = CancellationToken::new();
1458        let (_live_tx, mut live_rx) = mpsc::unbounded_channel::<ParentFrame>();
1459        let mut link = InstantTerminalLink { done: false };
1460        // Even a tiny timeout must NOT trip: the terminal frame arrives first and
1461        // disarms the watchdog.
1462        let r = drive(
1463            &mut link,
1464            "child-y",
1465            None,
1466            None,
1467            &event_tx,
1468            &cancel,
1469            &mut live_rx,
1470            Some(Duration::from_millis(50)),
1471        )
1472        .await;
1473        assert_eq!(r.ok().flatten().as_deref(), Some("done"));
1474    }
1475
1476    #[tokio::test]
1477    async fn child_approval_fails_closed_without_decider() {
1478        // No decider wired ⇒ the host denies (safe default), unchanged behavior.
1479        let body = serde_json::json!({"tool_name":"Bash","permission":"run","resource":"rm -rf /"});
1480        assert!(!decide_child_approval(None, "child-1", &body).await);
1481    }
1482
1483    #[tokio::test]
1484    async fn child_approval_honors_wired_decider() {
1485        let body =
1486            serde_json::json!({"tool_name":"Write","permission":"write","resource":"/tmp/x"});
1487        let approve: Arc<dyn ChildApprovalDecider> = Arc::new(StaticDecider(true));
1488        let deny: Arc<dyn ChildApprovalDecider> = Arc::new(StaticDecider(false));
1489        assert!(decide_child_approval(Some(&approve), "child-1", &body).await);
1490        assert!(!decide_child_approval(Some(&deny), "child-1", &body).await);
1491    }
1492
1493    #[test]
1494    fn approval_request_fields_extracts_and_defaults() {
1495        let full = serde_json::json!({"tool_name":"Bash","permission":"run","resource":"ls"});
1496        assert_eq!(
1497            approval_request_fields(&full),
1498            ("Bash".to_string(), "run".to_string(), "ls".to_string())
1499        );
1500        // Missing fields default to empty strings.
1501        let partial = serde_json::json!({"tool_name":"Write"});
1502        assert_eq!(
1503            approval_request_fields(&partial),
1504            ("Write".to_string(), String::new(), String::new())
1505        );
1506    }
1507
1508    // ---- #193: remote placement routing -------------------------------------
1509
1510    use crate::runtime::execution::SpawnJob;
1511    use bamboo_agent_core::Session;
1512
1513    /// A runner with a BOGUS worker_bin (`/bin/false`): a local spawn here would
1514    /// FAIL, so a passing remote test proves the remote path never spawns.
1515    fn bogus_runner(placements: HashMap<String, ResolvedRemotePlacement>) -> ActorChildRunner {
1516        ActorChildRunner::new(
1517            "test-actor".into(),
1518            PathBuf::from("/bin/false"),
1519            vec![],
1520            std::env::temp_dir().join("bamboo-test-fab-193"),
1521            ExecutorSpec::Echo,
1522            vec![],
1523            "anthropic".into(),
1524            4,
1525        )
1526        .with_remote_placements(placements)
1527    }
1528
1529    /// A child session of the given role (the role rides `subagent_type`, the
1530    /// path build_spec + the remote lookup both read).
1531    fn session_of_role(role: &str, assignment: &str) -> Session {
1532        let mut s = Session::new("child-1", "test-model");
1533        s.metadata
1534            .insert("subagent_type".to_string(), role.to_string());
1535        s.add_message(bamboo_agent_core::Message::user(assignment));
1536        s
1537    }
1538
1539    fn job_for(child: &str) -> SpawnJob {
1540        SpawnJob {
1541            parent_session_id: "parent-1".into(),
1542            child_session_id: child.into(),
1543            model: String::new(),
1544            disabled_tools: None,
1545        }
1546    }
1547
1548    #[test]
1549    fn build_spec_sets_remote_placement_for_matching_role() {
1550        let mut placements = HashMap::new();
1551        placements.insert(
1552            "explorer".to_string(),
1553            ResolvedRemotePlacement {
1554                endpoint: "wss://gpu-host:8443".into(),
1555                token: Some("T-secret".into()),
1556                ca_cert_file: None,
1557                host_label: None,
1558            },
1559        );
1560        let runner = bogus_runner(placements);
1561
1562        // Matching role -> Placement::Remote + the bearer on the secrets envelope.
1563        let s = session_of_role("explorer", "do the thing");
1564        let spec = runner.build_spec(&s, &job_for("child-1"));
1565        match &spec.placement {
1566            Placement::Remote { endpoint } => assert_eq!(endpoint, "wss://gpu-host:8443"),
1567            other => panic!("expected Remote, got {other:?}"),
1568        }
1569        assert_eq!(spec.secrets.worker_auth_token.as_deref(), Some("T-secret"));
1570    }
1571
1572    #[test]
1573    fn build_spec_leaves_local_for_unmatched_role() {
1574        let mut placements = HashMap::new();
1575        placements.insert(
1576            "explorer".to_string(),
1577            ResolvedRemotePlacement {
1578                endpoint: "wss://gpu-host:8443".into(),
1579                token: Some("T".into()),
1580                ca_cert_file: None,
1581                host_label: None,
1582            },
1583        );
1584        let runner = bogus_runner(placements);
1585
1586        // A DIFFERENT role keeps the default Local placement + no bearer.
1587        let s = session_of_role("writer", "do the thing");
1588        let spec = runner.build_spec(&s, &job_for("child-1"));
1589        assert_eq!(spec.placement, Placement::Local);
1590        assert!(spec.secrets.worker_auth_token.is_none());
1591    }
1592
1593    #[test]
1594    fn build_spec_local_when_no_placements() {
1595        let runner = bogus_runner(HashMap::new());
1596        let s = session_of_role("explorer", "do the thing");
1597        let spec = runner.build_spec(&s, &job_for("child-1"));
1598        assert_eq!(spec.placement, Placement::Local);
1599        assert!(spec.secrets.worker_auth_token.is_none());
1600    }
1601
1602    #[test]
1603    fn placement_metadata_stamps_remote_and_schedulable_not_local() {
1604        // Local children carry no stamp — the DTO defaults them to the backend host.
1605        assert_eq!(placement_metadata(&Placement::Local, None), None);
1606
1607        // Remote, no node label → host derived from the endpoint.
1608        let r = placement_metadata(
1609            &Placement::Remote {
1610                endpoint: "wss://10.0.0.5:8443/stream".into(),
1611            },
1612            None,
1613        )
1614        .unwrap();
1615        assert!(r.contains(r#""kind":"remote""#), "{r}");
1616        assert!(r.contains(r#""host":"10.0.0.5""#), "{r}");
1617
1618        // A cluster node's label (its metadata) OVERRIDES the raw endpoint host.
1619        let labeled = placement_metadata(
1620            &Placement::Remote {
1621                endpoint: "ws://169.254.230.101:8899".into(),
1622            },
1623            Some("mini"),
1624        )
1625        .unwrap();
1626        assert!(labeled.contains(r#""host":"mini""#), "{labeled}");
1627
1628        // Schedulable → {kind:"remote", host:<node label, else pool>}.
1629        let s =
1630            placement_metadata(&Placement::Schedulable { pool: "explorers".into() }, Some("mini"))
1631                .unwrap();
1632        assert!(s.contains(r#""kind":"remote""#), "{s}");
1633        assert!(s.contains(r#""host":"mini""#), "{s}");
1634
1635        // The stamp round-trips through the storage placement type.
1636        let p: bamboo_storage::SessionPlacement = serde_json::from_str(&labeled).unwrap();
1637        assert_eq!(p.kind, "remote");
1638        assert_eq!(p.host, "mini");
1639    }
1640
1641    /// End-to-end remote run through `execute_external_child`: a resident worker
1642    /// (Bearer-gated `WsServer` + `EchoExecutor`) serves the role; the runner is
1643    /// built with a `remote_placements` entry pointing at it AND a BOGUS
1644    /// worker_bin (`/bin/false`). A passing test proves the remote path CONNECTS
1645    /// to the resident worker and NEVER spawns (a spawn would fail on /bin/false),
1646    /// and that a terminal/echo result flows back.
1647    #[tokio::test]
1648    async fn execute_external_child_routes_role_to_remote_worker_without_spawning() {
1649        // 1. Stand up the resident worker on loopback with a required bearer.
1650        let token = "remote-test-token";
1651        let server = bamboo_subagent::transport::WsServer::bind_with_token(
1652            (std::net::Ipv4Addr::LOCALHOST, 0).into(),
1653            Some(token.to_string()),
1654        )
1655        .await
1656        .expect("bind resident worker");
1657        let endpoint = server.ws_endpoint(); // ws://127.0.0.1:<port>
1658        let srv = tokio::spawn(async move {
1659            // serve() loops connection-after-connection; the test exits, dropping it.
1660            let _ = server
1661                .serve(Arc::new(bamboo_subagent::executor::EchoExecutor))
1662                .await;
1663        });
1664
1665        // 2. Build the runner: role "explorer" pinned remote, bogus worker_bin.
1666        let mut placements = HashMap::new();
1667        placements.insert(
1668            "explorer".to_string(),
1669            ResolvedRemotePlacement {
1670                endpoint: endpoint.clone(),
1671                token: Some(token.to_string()),
1672                ca_cert_file: None,
1673                host_label: Some("mini-e2e".into()), // node label, surfaced on the badge
1674            },
1675        );
1676        let runner = bogus_runner(placements);
1677
1678        // 3. Drive a real run for that role.
1679        let mut session = session_of_role("explorer", "hello remote");
1680        let job = job_for("child-1");
1681        let (event_tx, mut event_rx) = mpsc::channel::<AgentEvent>(64);
1682        let cancel = CancellationToken::new();
1683
1684        let result = tokio::time::timeout(
1685            Duration::from_secs(10),
1686            runner.execute_external_child(&mut session, &job, event_tx, cancel),
1687        )
1688        .await
1689        .expect("run did not hang")
1690        .expect("remote run succeeded (connected to resident worker, did not spawn)");
1691
1692        let _ = result;
1693        // The EchoExecutor's reply is written back onto the child session as an
1694        // assistant message — proof a terminal result flowed back over the link.
1695        let last = session
1696            .messages
1697            .iter()
1698            .rev()
1699            .find(|m| matches!(m.role, Role::Assistant))
1700            .expect("an assistant reply was written back");
1701        assert!(
1702            last.content.contains("echo:"),
1703            "expected echo reply, got {:?}",
1704            last.content
1705        );
1706
1707        // A remote run must stamp WHICH machine it ran on onto the child session
1708        // (mirrored to the UI badge) using the placement's node label.
1709        let placement = session
1710            .metadata
1711            .get("placement")
1712            .expect("remote child session stamped with a placement");
1713        assert!(placement.contains(r#""kind":"remote""#), "{placement}");
1714        assert!(placement.contains(r#""host":"mini-e2e""#), "{placement}");
1715
1716        // Drain a couple of streamed events to confirm the event pipe carried the
1717        // worker's tokens too (best-effort; the reply assertion above is primary).
1718        let mut saw_event = false;
1719        while let Ok(Some(_ev)) =
1720            tokio::time::timeout(Duration::from_millis(50), event_rx.recv()).await
1721        {
1722            saw_event = true;
1723        }
1724        let _ = saw_event;
1725
1726        srv.abort();
1727    }
1728
1729    // ---- #181 (P2b): schedulable placement routing --------------------------
1730
1731
1732    /// A bogus-worker_bin runner carrying SCHEDULABLE placements (and optionally
1733    /// remote ones, to test precedence). A local spawn here would fail on
1734    /// `/bin/false`, so a passing schedulable test proves no subprocess spawned.
1735    fn bogus_sched_runner(
1736        remote: HashMap<String, ResolvedRemotePlacement>,
1737        sched: HashMap<String, ResolvedSchedulablePlacement>,
1738    ) -> ActorChildRunner {
1739        ActorChildRunner::new(
1740            "test-actor".into(),
1741            PathBuf::from("/bin/false"),
1742            vec![],
1743            std::env::temp_dir().join("bamboo-test-fab-181"),
1744            ExecutorSpec::Echo,
1745            vec![],
1746            "anthropic".into(),
1747            4,
1748        )
1749        .with_remote_placements(remote)
1750        .with_schedulable_placements(sched)
1751    }
1752
1753    fn sched_placement(pool: &str, _registry_url: impl Into<String>) -> ResolvedSchedulablePlacement {
1754        ResolvedSchedulablePlacement { pool: pool.into(), host_label: None }
1755    }
1756
1757    #[test]
1758    fn build_spec_sets_schedulable_placement_for_matching_role() {
1759        let mut sched = HashMap::new();
1760        sched.insert(
1761            "explorer".to_string(),
1762            sched_placement("gpu-pool", "unused"),
1763        );
1764        let runner = bogus_sched_runner(HashMap::new(), sched);
1765
1766        let s = session_of_role("explorer", "do the thing");
1767        let spec = runner.build_spec(&s, &job_for("child-1"));
1768        match &spec.placement {
1769            Placement::Schedulable { pool } => assert_eq!(pool, "gpu-pool"),
1770            other => panic!("expected Schedulable, got {other:?}"),
1771        }
1772        // No per-placement bearer now — the bus connection carries the bus token.
1773        assert!(spec.secrets.worker_auth_token.is_none());
1774    }
1775
1776    #[test]
1777    fn build_spec_remote_wins_when_role_in_both_maps() {
1778        // A role present in BOTH remote_placements and schedulable_placements must
1779        // resolve to the FIXED remote placement (documented precedence).
1780        let mut remote = HashMap::new();
1781        remote.insert(
1782            "explorer".to_string(),
1783            ResolvedRemotePlacement {
1784                endpoint: "wss://fixed-host:8443".into(),
1785                token: Some("T-remote".into()),
1786                ca_cert_file: None,
1787                host_label: None,
1788            },
1789        );
1790        let mut sched = HashMap::new();
1791        sched.insert(
1792            "explorer".to_string(),
1793            sched_placement("gpu-pool", "https://control-plane:9562"),
1794        );
1795        let runner = bogus_sched_runner(remote, sched);
1796
1797        let s = session_of_role("explorer", "do the thing");
1798        let spec = runner.build_spec(&s, &job_for("child-1"));
1799        match &spec.placement {
1800            Placement::Remote { endpoint } => assert_eq!(endpoint, "wss://fixed-host:8443"),
1801            other => panic!("expected Remote (precedence), got {other:?}"),
1802        }
1803        assert_eq!(spec.secrets.worker_auth_token.as_deref(), Some("T-remote"));
1804    }
1805
1806    #[test]
1807    fn build_spec_local_for_unmatched_schedulable_role() {
1808        let mut sched = HashMap::new();
1809        sched.insert(
1810            "explorer".to_string(),
1811            sched_placement("gpu-pool", "https://control-plane:9562"),
1812        );
1813        let runner = bogus_sched_runner(HashMap::new(), sched);
1814        let s = session_of_role("writer", "do the thing");
1815        let spec = runner.build_spec(&s, &job_for("child-1"));
1816        assert_eq!(spec.placement, Placement::Local);
1817        assert!(spec.secrets.worker_auth_token.is_none());
1818    }
1819
1820    /// The full role → resolved-placement → badge-host chain: a child routed to a
1821    /// remote/schedulable placement carrying a cluster node's `host_label` stamps
1822    /// that label; without a label it falls back to the endpoint host / pool; a
1823    /// Local child gets no stamp (the DTO defaults it to the backend host).
1824    #[test]
1825    fn placement_stamp_uses_node_label_for_remote_and_schedulable() {
1826        // Remote WITH a node label → {remote, <label>}, overriding the raw IP.
1827        let mut remote = HashMap::new();
1828        remote.insert(
1829            "explorer".to_string(),
1830            ResolvedRemotePlacement {
1831                endpoint: "ws://169.254.230.101:8899".into(),
1832                token: None,
1833                ca_cert_file: None,
1834                host_label: Some("mini".into()),
1835            },
1836        );
1837        let runner = bogus_runner(remote);
1838        let spec = runner.build_spec(&session_of_role("explorer", "go"), &job_for("c1"));
1839        let stamp = runner.placement_stamp_for(&spec).expect("remote child is stamped");
1840        assert!(stamp.contains(r#""kind":"remote""#), "{stamp}");
1841        assert!(stamp.contains(r#""host":"mini""#), "{stamp}");
1842
1843        // Remote WITHOUT a node label → falls back to the endpoint host.
1844        let mut remote_nolabel = HashMap::new();
1845        remote_nolabel.insert(
1846            "explorer".to_string(),
1847            ResolvedRemotePlacement {
1848                endpoint: "ws://169.254.230.101:8899".into(),
1849                token: None,
1850                ca_cert_file: None,
1851                host_label: None,
1852            },
1853        );
1854        let r2 = bogus_runner(remote_nolabel);
1855        let spec2 = r2.build_spec(&session_of_role("explorer", "go"), &job_for("c1"));
1856        assert!(
1857            r2.placement_stamp_for(&spec2)
1858                .unwrap()
1859                .contains(r#""host":"169.254.230.101""#)
1860        );
1861
1862        // Schedulable WITH a node label → {remote, <label>} (a node, not a pool name).
1863        let mut sched = HashMap::new();
1864        sched.insert(
1865            "mac-mini-monitor".to_string(),
1866            ResolvedSchedulablePlacement {
1867                pool: "mac-mini-monitor".into(),
1868                host_label: Some("mini".into()),
1869            },
1870        );
1871        let sr = bogus_sched_runner(HashMap::new(), sched);
1872        let spec3 = sr.build_spec(&session_of_role("mac-mini-monitor", "go"), &job_for("c1"));
1873        let stamp3 = sr.placement_stamp_for(&spec3).expect("scheduled child is stamped");
1874        assert!(stamp3.contains(r#""kind":"remote""#), "{stamp3}");
1875        assert!(stamp3.contains(r#""host":"mini""#), "{stamp3}");
1876
1877        // A Local (unmatched) child gets NO stamp.
1878        let local = bogus_runner(HashMap::new());
1879        let spec4 = local.build_spec(&session_of_role("writer", "go"), &job_for("c1"));
1880        assert_eq!(local.placement_stamp_for(&spec4), None);
1881    }
1882
1883    // ---- #181: schedulable selection over the BUS (Phase 3 cutover) ----------
1884
1885    async fn start_bus() -> (String, tempfile::TempDir) {
1886        let dir = tempfile::tempdir().unwrap();
1887        let core = std::sync::Arc::new(bamboo_broker::BrokerCore::new(dir.path()));
1888        let server = std::sync::Arc::new(bamboo_broker::BrokerServer::new(core, "t"));
1889        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1890        let addr = listener.local_addr().unwrap();
1891        tokio::spawn(async move {
1892            let _ = server.serve(listener).await;
1893        });
1894        (format!("ws://{addr}"), dir)
1895    }
1896
1897    async fn join_pool(endpoint: &str, id: &str, pool: &str) -> bamboo_broker::BrokerClient {
1898        let mut c = bamboo_broker::BrokerClient::connect(
1899            endpoint,
1900            bamboo_subagent::AgentRef {
1901                session_id: id.into(),
1902                role: Some(pool.into()),
1903            },
1904            "t",
1905        )
1906        .await
1907        .unwrap();
1908        c.subscribe().await.unwrap();
1909        c
1910    }
1911
1912    fn sched_runner_on_bus(endpoint: &str, child_role: &str, pool: &str) -> ActorChildRunner {
1913        let mut sched = HashMap::new();
1914        sched.insert(child_role.to_string(), sched_placement(pool, "unused"));
1915        bogus_sched_runner(HashMap::new(), sched).with_bus(Some(bamboo_subagent::BusEndpoint {
1916            endpoint: endpoint.into(),
1917            token: "t".into(),
1918        }))
1919    }
1920
1921    #[tokio::test]
1922    async fn resolve_schedulable_picks_a_live_bus_worker() {
1923        let (endpoint, _dir) = start_bus().await;
1924        let _w = join_pool(&endpoint, "w-gpu", "gpu-pool").await;
1925        let runner = sched_runner_on_bus(&endpoint, "explorer", "gpu-pool");
1926
1927        let mailbox = runner
1928            .resolve_schedulable_worker("explorer")
1929            .await
1930            .expect("a live pool worker is found on the bus");
1931        assert_eq!(mailbox, "w-gpu");
1932    }
1933
1934    #[tokio::test]
1935    async fn resolve_schedulable_round_robins_over_pool_workers() {
1936        let (endpoint, _dir) = start_bus().await;
1937        let _a = join_pool(&endpoint, "w-a", "gpu-pool").await;
1938        let _b = join_pool(&endpoint, "w-b", "gpu-pool").await;
1939        let runner = sched_runner_on_bus(&endpoint, "explorer", "gpu-pool");
1940
1941        // Successive resolves spread across both connected workers.
1942        let mut picked = std::collections::HashSet::new();
1943        for _ in 0..6 {
1944            picked.insert(runner.resolve_schedulable_worker("explorer").await.unwrap());
1945        }
1946        assert_eq!(
1947            picked,
1948            ["w-a".to_string(), "w-b".to_string()].into_iter().collect(),
1949            "round-robin must cover every connected pool worker"
1950        );
1951    }
1952
1953    #[tokio::test]
1954    async fn resolve_schedulable_errors_on_empty_pool() {
1955        let (endpoint, _dir) = start_bus().await;
1956        // No worker subscribes to "gpu-pool".
1957        let runner = sched_runner_on_bus(&endpoint, "explorer", "gpu-pool");
1958
1959        let err = runner
1960            .resolve_schedulable_worker("explorer")
1961            .await
1962            .expect_err("an empty pool is terminal — no local fallback")
1963            .to_string();
1964        assert!(err.contains("no live worker in pool"), "got: {err}");
1965        assert!(err.contains("NOT spawning"), "got: {err}");
1966    }
1967
1968    /// FULL schedulable run over the bus: a worker SERVING `EchoExecutor` joins the
1969    /// pool by role; `execute_external_child` with a Schedulable placement resolves
1970    /// it from the bus (no local subprocess — the worker_bin is `/bin/false`),
1971    /// drives the run, gets the echo back, AND stamps the child session with the
1972    /// pool's cluster-node label — `{kind:remote, host:"mini"}`. The end-to-end
1973    /// analogue of the live `mac-mini-monitor`→mini run.
1974    #[tokio::test]
1975    async fn execute_external_child_runs_schedulable_over_bus_and_stamps_node_label() {
1976        let (endpoint, _dir) = start_bus().await;
1977
1978        // A bus worker SERVING runs (not just presence), joined to the pool by role.
1979        let ep = endpoint.clone();
1980        let worker = tokio::spawn(async move {
1981            let _ = bamboo_broker::serve_executor(
1982                &ep,
1983                bamboo_subagent::AgentRef {
1984                    session_id: "mmm-worker".into(),
1985                    role: Some("mac-mini-monitor".into()),
1986                },
1987                "t",
1988                std::sync::Arc::new(bamboo_subagent::executor::EchoExecutor),
1989            )
1990            .await;
1991        });
1992
1993        // Wait until the worker is visible on the bus so the pool is non-empty
1994        // when execute_external_child resolves it (serve_executor connects async).
1995        let mut probe = bamboo_broker::BrokerClient::connect(
1996            &endpoint,
1997            bamboo_subagent::AgentRef { session_id: "probe".into(), role: None },
1998            "t",
1999        )
2000        .await
2001        .unwrap();
2002        let mut ready = false;
2003        for _ in 0..100 {
2004            if probe
2005                .list_connected("mac-mini-monitor")
2006                .await
2007                .unwrap()
2008                .iter()
2009                .any(|id| id == "mmm-worker")
2010            {
2011                ready = true;
2012                break;
2013            }
2014            tokio::time::sleep(Duration::from_millis(30)).await;
2015        }
2016        assert!(ready, "worker never joined the pool");
2017
2018        // Runner: child role → schedulable pool "mac-mini-monitor" carrying the
2019        // cluster node's label "mini"; bogus worker_bin so any local spawn fails.
2020        let mut sched = HashMap::new();
2021        sched.insert(
2022            "mac-mini-monitor".to_string(),
2023            ResolvedSchedulablePlacement {
2024                pool: "mac-mini-monitor".into(),
2025                host_label: Some("mini".into()),
2026            },
2027        );
2028        let runner = bogus_sched_runner(HashMap::new(), sched).with_bus(Some(
2029            bamboo_subagent::BusEndpoint { endpoint: endpoint.clone(), token: "t".into() },
2030        ));
2031
2032        let mut session = session_of_role("mac-mini-monitor", "hello scheduled");
2033        let job = job_for("child-1");
2034        let (event_tx, _rx) = mpsc::channel::<AgentEvent>(64);
2035        let cancel = CancellationToken::new();
2036
2037        tokio::time::timeout(
2038            Duration::from_secs(10),
2039            runner.execute_external_child(&mut session, &job, event_tx, cancel),
2040        )
2041        .await
2042        .expect("run did not hang")
2043        .expect("schedulable run succeeded over the bus (no local spawn)");
2044
2045        // Echo reply flowed back — proves it routed to the bus worker, not local.
2046        let last = session
2047            .messages
2048            .iter()
2049            .rev()
2050            .find(|m| matches!(m.role, Role::Assistant))
2051            .expect("an assistant reply was written back");
2052        assert!(last.content.contains("echo:"), "got {:?}", last.content);
2053
2054        // ...and the child is stamped with the pool's cluster-node label.
2055        let placement = session
2056            .metadata
2057            .get("placement")
2058            .expect("scheduled child session stamped with a placement");
2059        assert!(placement.contains(r#""kind":"remote""#), "{placement}");
2060        assert!(placement.contains(r#""host":"mini""#), "{placement}");
2061
2062        worker.abort();
2063    }
2064}