Skip to main content

bamboo_engine/external_agents/
actor_adapter.rs

1//! Actor external child runner.
2//!
3//! Runs a child session as an independent **actor**: a separate OS process with its own
4//! isolated context, speaking the `bamboo-subagent` WebSocket protocol. This is the
5//! engine-side adapter on the `wants_external` seam: it spawns the worker binary, waits for
6//! it to self-register into the Tier-1 file fabric, connects, sends the assignment, and
7//! forwards the child's `AgentEvent`s back onto the parent's `event_tx`.
8//!
9//! The built-in **local actor** instance of this runner is the default runtime for
10//! every sub-agent (the in-process runtime was removed). The expert `externalAgents`
11//! tables can additionally route specific roles to other actor/a2a agents.
12
13use std::collections::HashMap;
14use std::path::PathBuf;
15use std::sync::Arc;
16use std::time::Duration;
17
18use async_trait::async_trait;
19use bamboo_agent_core::{AgentError, AgentEvent, Role, Session};
20use tokio::sync::{mpsc, Mutex};
21use tokio_util::sync::CancellationToken;
22
23use bamboo_subagent::discovery::{Discovery, Fabric};
24use bamboo_subagent::fleet::{spawn_worker, SpawnedChild};
25use bamboo_subagent::proto::{AgentRecord, ChildFrame, ParentFrame, RunSpec, TerminalStatus};
26use bamboo_subagent::provision::{
27    ChildIdentity, ExecutorSpec, ModelRefSpec, Placement, ProvisionSpec, ScopedCredential,
28};
29use bamboo_subagent::transport::{client_config_trusting_cert, ChildClient};
30
31use crate::runtime::execution::{ExternalChildRunner, SpawnJob};
32
33/// Default cap on simultaneously running actor processes.
34pub const DEFAULT_MAX_CONCURRENT_ACTORS: usize = 8;
35
36/// Max nesting depth for direct nested execution (Phase 6). A worker whose
37/// session `spawn_depth` is below this gets its own spawn stack + the real
38/// SubAgent tool; at/over it, neither (and the tool itself refuses). Mirrors
39/// `bamboo_server_tools::DEFAULT_MAX_SPAWN_DEPTH` (kept in sync; engine can't
40/// depend on server-tools). Root orchestrator = 0 ⇒ 4 levels of sub-agents.
41pub const MAX_SPAWN_DEPTH: u32 = 4;
42
43/// Default cap on idle pooled (warm, reusable) workers kept per fingerprint.
44const DEFAULT_MAX_IDLE_PER_KEY: usize = 4;
45
46/// How long a pooled worker waits for its next assignment before reclaiming
47/// itself (must comfortably exceed the gap between sibling spawns).
48const POOLED_IDLE_TIMEOUT_SECS: u64 = 300;
49
50/// A warm worker parked for reuse: its process handle (killed on drop), the WS
51/// endpoint to reconnect to, and the id it registered under in the fabric.
52struct PooledActor {
53    worker: SpawnedChild,
54    endpoint: String,
55    agent_id: String,
56}
57
58/// A role pinned to a remote resident worker (remote-actor-plan §3.4 / P1.5,
59/// #193), resolved at runner-build time from `SubagentsConfig.remote_placements`:
60/// the env-named bearer is already READ into `token` here (the raw token never
61/// rides the config), and `ca_cert_file` is the path to a PEM pinning a
62/// self-signed worker cert (`None` ⇒ default webpki roots / plaintext `ws://`).
63#[derive(Debug, Clone)]
64pub struct ResolvedRemotePlacement {
65    pub endpoint: String,
66    pub token: Option<String>,
67    pub ca_cert_file: Option<PathBuf>,
68}
69
70/// A role routed to a registry-SCHEDULED worker (remote-actor-plan §3.4 / P2b,
71/// #181), resolved at runner-build time from `SubagentsConfig.schedulable_placements`.
72/// Unlike a fixed `ResolvedRemotePlacement` (one endpoint), this names the agent
73/// `registry_url` to query and the logical `pool` (= the registry `role`) whose
74/// LIVE workers are scheduling candidates. The env-named bearer is already READ
75/// into `token` here (the raw token never rides the config) and is used for BOTH
76/// the registry query and the chosen worker's `wss://` connect. `ca_cert_file`
77/// pins a self-signed worker/registry cert (`None` ⇒ default webpki roots).
78#[derive(Debug, Clone)]
79pub struct ResolvedSchedulablePlacement {
80    pub pool: String,
81    pub registry_url: String,
82    pub token: Option<String>,
83    pub ca_cert_file: Option<PathBuf>,
84}
85
86/// How `execute_external_child` should obtain its worker connection, decided
87/// once from `spec.placement`. Splits the divergent acquire/connect + retire
88/// logic three ways while the shared middle (Run dispatch, live registration,
89/// drive, close) stays identical. `Local` is the unchanged pre-#193 path;
90/// `Remote` is the unchanged #194 path; `Schedulable` (#181, P2b) is new.
91enum PlacementKind {
92    Local,
93    Remote,
94    Schedulable,
95}
96
97/// Spawns and drives a child session as an independent actor: a `bamboo-subagent` worker process.
98pub struct ActorChildRunner {
99    agent_id: String,
100    worker_bin: PathBuf,
101    worker_args: Vec<String>,
102    fabric_dir: PathBuf,
103    executor: ExecutorSpec,
104    /// Per-provider credentials snapshotted from the parent config at build
105    /// time; the spec carries only the ONE the child's provider needs.
106    credentials: Vec<ScopedCredential>,
107    /// Parent's default provider (used when the child has no explicit one).
108    default_provider: String,
109    /// Backpressure: bounds the number of concurrently *running* actors; further
110    /// runs wait for a slot instead of exploding the process table. (Idle pooled
111    /// workers do not hold a slot.)
112    concurrency: std::sync::Arc<tokio::sync::Semaphore>,
113    spawn_timeout: Duration,
114    /// Warm-worker pool keyed by a reuse fingerprint
115    /// (role/provider/model/workspace/disabled-tools). A finished run parks its
116    /// worker here so the next matching child reuses it instead of spawning a
117    /// fresh process — collapsing N sibling sub-agents onto a few processes.
118    pool: Arc<Mutex<HashMap<String, Vec<PooledActor>>>>,
119    max_idle_per_key: usize,
120    /// Host-side decision for a child's gated-tool approval request (Phase 2).
121    /// `None` ⇒ fail-closed DENY (the safe default). A wired decider (policy or
122    /// human-routing bridge) returns approve/deny over the actor WS.
123    approval_decider: Option<Arc<dyn ChildApprovalDecider>>,
124    /// Per-run escalation host bridge for non-bypass child-approval routing (#68;
125    /// Phase 6, Part B). The owning worker's `run()` installs its OWN host bridge
126    /// here via `set_escalation_bridge`; `execute_external_child` CAPTURES it at
127    /// grandchild-spawn time and hands the owned value to `drive()`, which uses it
128    /// to RE-PROXY a child's approval request UP to the parent run — chaining up
129    /// every level until a bypass level (model-review) or the top orchestrator
130    /// (human) decides, then relaying the reply back down. Was a process-global
131    /// slot; now per-runner so a fire-and-forget grandchild that OUTLIVES the run
132    /// that spawned it keeps that run's bridge for its whole lifetime instead of
133    /// reading a stale/overwritten global at approval time (→ fail-closed deny).
134    escalation_bridge: Arc<std::sync::Mutex<Option<bamboo_subagent::executor::HostBridge>>>,
135    /// Roles pinned to a REMOTE resident worker (#193), keyed by sub-agent role
136    /// (the child's `subagent_type`). A role present here routes through the
137    /// dedicated remote branch in `execute_external_child` (Bearer-authenticated
138    /// `wss://` connect, no spawn, no pool, no kill) instead of the local
139    /// subprocess + warm-pool path. Empty (the default) = all-local behavior.
140    remote_placements: HashMap<String, ResolvedRemotePlacement>,
141    /// Roles routed to a REGISTRY-SCHEDULED worker (#181, P2b), keyed by sub-agent
142    /// role. A role present here (AND not already in `remote_placements`, which
143    /// wins) routes through the dedicated SCHEDULABLE branch in
144    /// `execute_external_child`: query the registry for live workers in the pool,
145    /// pick one (round-robin), connect over `wss://` — no spawn, no pool, no kill,
146    /// and NO local-subprocess fallback (no live worker ⇒ a clear error). Empty
147    /// (the default) = all-local behavior.
148    schedulable_placements: HashMap<String, ResolvedSchedulablePlacement>,
149    /// Per-pool round-robin cursor for schedulable scheduling (#181, P2b). Bumped
150    /// once per pick so successive sibling spawns SPREAD across a pool's live
151    /// workers instead of all landing on the first candidate. Best-effort spread,
152    /// not a load balancer — the registry's live set can change between picks.
153    schedule_cursor: Arc<std::sync::Mutex<HashMap<String, usize>>>,
154    /// Per-`(registry_url, token)` cache of built [`RegistryFabric`]s (#202). A fabric
155    /// holds a reqwest `Client` (already connection-pooled), so under sibling
156    /// fan-out we construct ONE fabric per registry and reuse it across schedules
157    /// instead of rebuilding a fresh client every spawn. Construct-once-then-reuse;
158    /// a benign duplicate build under a startup race is harmless (last writer wins,
159    /// the loser's fabric drops). The bearer lives INSIDE the fabric's sensitive
160    /// `Authorization` header — never logged, never re-stringified here.
161    // Keyed by (registry_url, token) — NOT url alone — so two placements that
162    // share a registry but present DIFFERENT bearers never reuse each other's
163    // fabric (which would issue a discover query with the wrong token). #202.
164    fabric_cache: Arc<
165        std::sync::Mutex<HashMap<(String, Option<String>), Arc<bamboo_subagent::RegistryFabric>>>,
166    >,
167}
168
169/// Decides how the host answers a child worker's gated-tool approval request
170/// (Phase 2: child → parent approval delegation). Async so an implementation
171/// can consult a policy or defer to a human. With no decider wired the host
172/// replies with a fail-closed DENY.
173///
174/// NOTE: `decide` is awaited inside the per-child frame pump, so an
175/// implementation must resolve promptly (e.g. a policy lookup). A human-in-the-
176/// loop decision that may block indefinitely should instead be delivered
177/// out-of-band as a `ParentFrame::ApprovalReply` via the live steering channel
178/// (`super::live`), which `drive()` already forwards to the worker without
179/// stalling the pump.
180#[async_trait]
181pub trait ChildApprovalDecider: Send + Sync {
182    /// Decide whether `child_session_id` may perform the gated action described
183    /// by `request` (`{tool_name, permission, resource}`).
184    async fn decide(&self, child_session_id: &str, request: &serde_json::Value) -> bool;
185}
186
187/// Resolve a child approval request to approve/deny. Fail-closed (DENY) when no
188/// decider is wired — the single, testable seam for the host-side decision.
189async fn decide_child_approval(
190    decider: Option<&Arc<dyn ChildApprovalDecider>>,
191    child_session_id: &str,
192    request: &serde_json::Value,
193) -> bool {
194    match decider {
195        Some(decider) => decider.decide(child_session_id, request).await,
196        None => false,
197    }
198}
199
200/// How long the host waits for a human approval decision before failing the
201/// child's gated tool closed (DENY). Bounds an unanswered request so it can't
202/// hang the worker indefinitely.
203const CHILD_APPROVAL_TIMEOUT: Duration = Duration::from_secs(300);
204
205/// Extract `(tool_name, permission, resource)` from a worker's approval request
206/// body (`{tool_name, permission, resource}`); missing fields default to empty.
207fn approval_request_fields(body: &serde_json::Value) -> (String, String, String) {
208    let field = |k: &str| {
209        body.get(k)
210            .and_then(|v| v.as_str())
211            .unwrap_or("")
212            .to_string()
213    };
214    (field("tool_name"), field("permission"), field("resource"))
215}
216
217/// Off-loop reviewer for a child's gated-tool approval request (Phase 6, Part B).
218///
219/// Installed (process-global) by a BYPASSED self-orchestrating worker so its
220/// children's forced-ask (dangerous) gated actions — which still raise
221/// `ConfirmationRequired` even under bypass — get an LLM reasonableness check
222/// rather than a blind pass. `review` is an LLM call: `drive()` invokes it in a
223/// SPAWNED task (NEVER in the frame pump) and delivers the verdict async via the
224/// live channel, so the agent loop is never blocked.
225#[async_trait]
226pub trait ChildApprovalReviewer: Send + Sync {
227    /// Judge whether the gated action `request` (`{tool_name, permission,
228    /// resource}`) is reasonable for `child_session_id`'s task. `true` = approve.
229    async fn review(&self, child_session_id: &str, request: &serde_json::Value) -> bool;
230}
231
232fn child_approval_reviewer_slot() -> &'static std::sync::OnceLock<Arc<dyn ChildApprovalReviewer>> {
233    static SLOT: std::sync::OnceLock<Arc<dyn ChildApprovalReviewer>> = std::sync::OnceLock::new();
234    &SLOT
235}
236
237/// Install the process-global child-approval reviewer (idempotent; first wins).
238pub fn set_child_approval_reviewer(reviewer: Arc<dyn ChildApprovalReviewer>) {
239    let _ = child_approval_reviewer_slot().set(reviewer);
240}
241
242/// The process-global child-approval reviewer, if installed.
243pub fn child_approval_reviewer() -> Option<Arc<dyn ChildApprovalReviewer>> {
244    child_approval_reviewer_slot().get().cloned()
245}
246
247impl ActorChildRunner {
248    #[allow(clippy::too_many_arguments)]
249    pub fn new(
250        agent_id: String,
251        worker_bin: PathBuf,
252        worker_args: Vec<String>,
253        fabric_dir: PathBuf,
254        executor: ExecutorSpec,
255        credentials: Vec<ScopedCredential>,
256        default_provider: String,
257        max_concurrent: usize,
258    ) -> Self {
259        Self {
260            agent_id,
261            worker_bin,
262            worker_args,
263            fabric_dir,
264            executor,
265            credentials,
266            default_provider,
267            concurrency: std::sync::Arc::new(tokio::sync::Semaphore::new(max_concurrent.max(1))),
268            spawn_timeout: Duration::from_secs(30),
269            pool: Arc::new(Mutex::new(HashMap::new())),
270            max_idle_per_key: DEFAULT_MAX_IDLE_PER_KEY,
271            approval_decider: None,
272            escalation_bridge: Arc::new(std::sync::Mutex::new(None)),
273            remote_placements: HashMap::new(),
274            schedulable_placements: HashMap::new(),
275            schedule_cursor: Arc::new(std::sync::Mutex::new(HashMap::new())),
276            fabric_cache: Arc::new(std::sync::Mutex::new(HashMap::new())),
277        }
278    }
279
280    /// Wire the host-side decider for child gated-tool approval requests
281    /// (Phase 2). Without this the host fail-closed DENYs every request.
282    pub fn with_approval_decider(mut self, decider: Arc<dyn ChildApprovalDecider>) -> Self {
283        self.approval_decider = Some(decider);
284        self
285    }
286
287    /// Pin specific sub-agent roles to remote resident workers (#193). The map
288    /// is keyed by role (`subagent_type`); a child whose role is present connects
289    /// over `wss://` to the resolved endpoint instead of spawning a local
290    /// subprocess. Default (empty) keeps every role on the local path — exactly
291    /// today's behavior.
292    pub fn with_remote_placements(
293        mut self,
294        placements: HashMap<String, ResolvedRemotePlacement>,
295    ) -> Self {
296        self.remote_placements = placements;
297        self
298    }
299
300    /// Route specific sub-agent roles to a registry-SCHEDULED worker (#181, P2b).
301    /// The map is keyed by role (`subagent_type`); a child whose role is present
302    /// (and NOT already pinned by `remote_placements`, which takes precedence) is
303    /// run on a live worker discovered from the registry instead of a local
304    /// subprocess. Default (empty) keeps every role on the local path.
305    pub fn with_schedulable_placements(
306        mut self,
307        placements: HashMap<String, ResolvedSchedulablePlacement>,
308    ) -> Self {
309        self.schedulable_placements = placements;
310        self
311    }
312
313    /// Reuse fingerprint: two children are interchangeable on one warm worker iff
314    /// they share role, provider, model, workspace, disabled-tool set, AND every
315    /// capability the worker BAKES at provision time (`BambooRuntimeExecutor`
316    /// stamps these once and reuses them across runs): nesting depth, nested-spawn
317    /// stack, bypass mode, permission enforcement, and the depth cap. Omitting any
318    /// of these lets the pool hand a run a worker baked for a DIFFERENT posture —
319    /// e.g. a depth-1 worker (with its own spawn stack) reused for a depth-4
320    /// child would re-stamp `spawn_depth=1` and pass the depth-cap check, breaking
321    /// the recursion bound; or a bypass worker reused for a non-bypass child. So
322    /// these MUST split the pool bucket. Everything else (assignment, history) is
323    /// shipped per-run in the `RunSpec` and does not affect the fingerprint.
324    fn fingerprint(spec: &ProvisionSpec) -> String {
325        let role = spec.identity.role.as_str();
326        let (provider, model) = spec
327            .model
328            .as_ref()
329            .map(|m| (m.provider.as_str(), m.model.as_str()))
330            .unwrap_or(("", ""));
331        let workspace = spec.workspace.as_deref().unwrap_or("");
332        let mut tools = spec.disabled_tools.clone().unwrap_or_default();
333        tools.sort();
334        let caps = &spec.capabilities;
335        format!(
336            "{role}\u{1}{provider}\u{1}{model}\u{1}{workspace}\u{1}{}\u{1}d={}\u{1}ns={}\u{1}by={}\u{1}ep={}\u{1}md={}\u{1}nha={}\u{1}gro={}",
337            tools.join(","),
338            spec.identity.depth,
339            caps.nested_spawn,
340            caps.bypass,
341            caps.enforce_permissions,
342            caps.max_spawn_depth.unwrap_or(0),
343            // #73 review (P1): a worker bakes `no_human_review` ONCE from this flag
344            // at build() and never re-reads it per run, so the pool MUST NOT hand a
345            // worker baked for one approval posture to a run of the opposite one —
346            // else a scheduled-root worker reused for an interactive child would
347            // silently model-review instead of asking the human (and vice-versa,
348            // reintroducing the 300s-deny). Split the bucket on it.
349            caps.no_human_approver,
350            // #71: the read-only Bash checker is baked once at build() from this
351            // flag, so a guardian-reviewer worker must NOT be reused for an
352            // ordinary child (which expects unrestricted Bash), and vice-versa.
353            caps.guardian_read_only,
354        )
355    }
356
357    /// Check out a worker for this assignment: reuse a live pooled one matching
358    /// `key`, else spawn a fresh reusable worker.
359    async fn acquire_worker(
360        &self,
361        key: &str,
362        spec: &ProvisionSpec,
363    ) -> crate::runtime::runner::Result<PooledActor> {
364        // Drain the pool bucket, validating liveness; a worker that hit its idle
365        // timeout has exited and withdrawn its fabric record — skip and reap it.
366        loop {
367            let candidate = {
368                let mut pool = self.pool.lock().await;
369                pool.get_mut(key).and_then(|bucket| bucket.pop())
370            };
371            let Some(candidate) = candidate else { break };
372            let alive = Fabric::at(&self.fabric_dir)
373                .resolve(&candidate.agent_id)
374                .await
375                .ok()
376                .flatten()
377                .is_some();
378            if alive {
379                return Ok(candidate);
380            }
381            candidate.worker.kill().await;
382        }
383
384        let spawned = spawn_worker(
385            &self.worker_bin,
386            &self.worker_args,
387            spec,
388            self.spawn_timeout,
389        )
390        .await
391        .map_err(|e| AgentError::LLM(format!("actor spawn/register failed: {e}")))?;
392        let endpoint = spawned.record.endpoint.clone();
393        let agent_id = spawned.record.agent_id.clone();
394        Ok(PooledActor {
395            worker: spawned,
396            endpoint,
397            agent_id,
398        })
399    }
400
401    /// Park a worker for reuse after a clean run; if the bucket is full, retire it.
402    async fn release_worker(&self, key: &str, actor: PooledActor) {
403        let mut pool = self.pool.lock().await;
404        let bucket = pool.entry(key.to_string()).or_default();
405        if bucket.len() >= self.max_idle_per_key {
406            drop(pool);
407            self.retire_worker(actor).await;
408            return;
409        }
410        bucket.push(actor);
411    }
412
413    /// Forcefully stop a worker and clean its discovery record.
414    async fn retire_worker(&self, actor: PooledActor) {
415        let agent_id = actor.agent_id.clone();
416        actor.worker.kill().await;
417        let _ = Fabric::at(&self.fabric_dir).withdraw(&agent_id).await;
418    }
419
420    /// Assemble the parent-resolved provisioning document for this child.
421    fn build_spec(&self, session: &Session, job: &SpawnJob) -> ProvisionSpec {
422        let mut spec = ProvisionSpec::new(
423            ChildIdentity {
424                child_id: job.child_session_id.clone(),
425                parent_id: Some(job.parent_session_id.clone()),
426                project_key: None,
427                role: session
428                    .metadata
429                    .get("subagent_type")
430                    .cloned()
431                    .unwrap_or_else(|| "worker".to_string()),
432                // The child session already carries the correct depth
433                // (create_child_action's new_child_of did parent.spawn_depth+1);
434                // stamp it so the worker can re-establish it on its run session
435                // and enforce the max-depth cap across the actor boundary.
436                depth: session.spawn_depth,
437            },
438            self.executor.clone(),
439            self.fabric_dir.to_string_lossy().into_owned(),
440        );
441        spec.workspace = session.workspace.clone();
442        // Final model: the session's pinned model_ref (create.model / routing already applied),
443        // falling back to the job's bare model on the parent's default provider.
444        spec.model = session
445            .model_ref
446            .as_ref()
447            .map(|r| ModelRefSpec {
448                provider: r.provider.clone(),
449                model: r.model.clone(),
450            })
451            .or_else(|| {
452                let m = job.model.trim();
453                (!m.is_empty()).then(|| ModelRefSpec {
454                    provider: self.default_provider.clone(),
455                    model: m.to_string(),
456                })
457            });
458        spec.disabled_tools = job.disabled_tools.clone();
459        // Least-privilege secrets: only the credential for the child's provider.
460        let provider = spec
461            .model
462            .as_ref()
463            .map(|m| m.provider.as_str())
464            .filter(|p| !p.trim().is_empty())
465            .unwrap_or(&self.default_provider);
466        if let Some(cred) = self.credentials.iter().find(|c| c.provider == provider) {
467            spec.secrets.provider_credentials.push(cred.clone());
468        } else {
469            tracing::warn!(
470                "actor child {}: no credential found for provider '{}'",
471                job.child_session_id,
472                provider
473            );
474        }
475        // Phase 6 (direct nested execution): a worker BELOW the depth cap may
476        // orchestrate its OWN children — on startup it builds its own spawn
477        // stack and runs the real SubAgent tool (no host proxy). The cap (the
478        // SubAgent tool refuses to spawn at/over `max_spawn_depth`) bounds the
479        // recursion. Driven purely by the child's depth, so it auto-propagates
480        // down the tree without any extra config threading.
481        spec.capabilities.nested_spawn = session.spawn_depth < MAX_SPAWN_DEPTH;
482        spec.capabilities.max_spawn_depth = Some(MAX_SPAWN_DEPTH);
483        // #69: activate child-approval review. Sub-agents enforce permissions so
484        // their DANGEROUS actions (the worker uses a HIGH threshold) reach the
485        // parent for review — escalated to the human, or model-reviewed off-loop
486        // when the parent is in bypass. The worker installs no checker without
487        // this, so the whole review chain would otherwise stay dormant.
488        spec.capabilities.enforce_permissions = true;
489        // Propagate "bypass permissions" so a self-orchestrating worker knows it
490        // is a bypassed parent and installs the off-loop model-reviewer for its
491        // children's forced-ask actions (Phase 6, Part B). The child session
492        // already carries the inherited flag (create_child_action seeds it).
493        spec.capabilities.bypass = session
494            .agent_runtime_state
495            .as_ref()
496            .is_some_and(|s| s.bypass_permissions);
497        // #73: propagate "no interactive human approver" (headless / scheduled /
498        // deployed root, inherited by the child session). When set, the worker's
499        // per-run approval proxy model-reviews a gated action locally instead of
500        // escalating to a human who will never answer (which would 300s-deny).
501        spec.capabilities.no_human_approver = session
502            .agent_runtime_state
503            .as_ref()
504            .is_some_and(|s| s.no_human_approver);
505        // #71: mark a READ-ONLY Guardian reviewer so the worker installs the
506        // read-only Bash allowlist checker. The reviewer is spawned by
507        // `spawn_guardian_review` with `subagent_type == "guardian"` (the SAME
508        // marker the completion coordinator branches on to parse the verdict) AND
509        // the `guardian_read_only_disabled_tools` denylist. Keyed off that role
510        // marker (already read above to set `identity.role`), so it rides the same
511        // session-metadata path the denylist/subagent_type use — no new wire seam.
512        // Without this the worker keeps an UNRESTRICTED Bash, so the reviewer could
513        // still `rm -rf` / `git push` / `curl | sh`, defeating "read-only".
514        spec.capabilities.guardian_read_only =
515            session.metadata.get("subagent_type").map(String::as_str) == Some("guardian");
516        // #193: route this role to a REMOTE resident worker when one is pinned.
517        // `spec.identity.role` was just computed from `subagent_type` above; a
518        // match flips the placement to Remote and rides the worker's bearer on the
519        // scoped secrets envelope (TLS handshake / Authorization header only — the
520        // token is never logged). No match leaves the default `Placement::Local`,
521        // so the local path is byte-for-byte unchanged for every non-pinned role.
522        if let Some(placement) = self.remote_placements.get(spec.identity.role.as_str()) {
523            spec.placement = Placement::Remote {
524                endpoint: placement.endpoint.clone(),
525            };
526            spec.secrets.worker_auth_token = placement.token.clone();
527        } else if let Some(placement) = self.schedulable_placements.get(spec.identity.role.as_str())
528        {
529            // #181 (P2b): route this role to a registry-SCHEDULED worker — ONLY
530            // when it is NOT already pinned to a fixed remote endpoint (the
531            // `else if` makes remote_placements take precedence for a role in
532            // both). The concrete endpoint is resolved at run time in
533            // `execute_external_child`; here we only flip the placement to
534            // Schedulable{pool} and ride the bearer on the scoped secrets
535            // envelope (used for the registry query AND the worker connect). No
536            // match in either map leaves the default `Placement::Local`, so the
537            // local path is byte-for-byte unchanged for every non-routed role.
538            spec.placement = Placement::Schedulable {
539                pool: placement.pool.clone(),
540            };
541            spec.secrets.worker_auth_token = placement.token.clone();
542        }
543        spec
544    }
545
546    /// Max DISTINCT live candidates a single schedulable resolve will try to
547    /// connect to before giving up (#202). Bounds the connect-fail failover so a
548    /// pool full of stale-but-leased workers can't make one spawn walk the whole
549    /// fleet; the effective cap is `min(candidates.len(), MAX_SCHEDULE_CONNECT_ATTEMPTS)`.
550    const MAX_SCHEDULE_CONNECT_ATTEMPTS: usize = 3;
551
552    /// Get-or-build the [`RegistryFabric`] for `placement.registry_url`, caching
553    /// it on the runner (#202). A fabric wraps a connection-pooled reqwest
554    /// `Client`; under sibling fan-out this reuses ONE client per registry instead
555    /// of constructing a fresh one every schedule. Construct-once-then-reuse: a
556    /// cache miss builds (Bearer if configured) and stores it. The bearer stays
557    /// inside the fabric's sensitive `Authorization` header — never logged here.
558    ///
559    /// Concurrency: a brief race can build two fabrics for the same url; that is
560    /// harmless (both are valid, last-insert wins, the loser drops). We never hold
561    /// the cache lock across the build, so an `await`-free critical section can't
562    /// deadlock or stall siblings.
563    fn fabric_for(
564        &self,
565        placement: &ResolvedSchedulablePlacement,
566        role: &str,
567    ) -> std::result::Result<Arc<bamboo_subagent::RegistryFabric>, AgentError> {
568        // (registry_url, token) identity: a wrong-token fabric must never be
569        // reused for a discover query (#202).
570        let key = (placement.registry_url.clone(), placement.token.clone());
571        if let Some(fabric) = self.fabric_cache.lock().unwrap().get(&key).cloned() {
572            return Ok(fabric);
573        }
574
575        // Cache miss: build outside the lock (construction is sync + cheap but we
576        // keep the critical section minimal and await-free regardless).
577        let built = match placement.token.as_deref() {
578            Some(token) => {
579                bamboo_subagent::RegistryFabric::with_token(placement.registry_url.clone(), token)
580            }
581            None => bamboo_subagent::RegistryFabric::new(placement.registry_url.clone()),
582        }
583        .map_err(|e| {
584            AgentError::LLM(format!(
585                "schedulable role '{role}': registry client for '{}' failed: {e}",
586                placement.registry_url
587            ))
588        })?;
589        let arc = Arc::new(built);
590
591        // Insert-or-reuse: if a concurrent caller already populated the slot, take
592        // theirs (the duplicate we built drops) so all siblings share one fabric.
593        let mut cache = self.fabric_cache.lock().unwrap();
594        Ok(cache.entry(key).or_insert(arc).clone())
595    }
596
597    /// Pick a live worker for a SCHEDULABLE role from the agent registry (#181,
598    /// P2b) and CONNECT to it, with connect-fail failover (#202).
599    ///
600    /// Uses a cached [`RegistryFabric`] (per `registry_url`) to list live records
601    /// (the registry already excludes expired leases — health == a live lease),
602    /// filters to those whose `role` == the configured `pool`, then picks a
603    /// starting candidate via per-pool ROUND-ROBIN. Because a live lease is only a
604    /// COARSE health signal (a leased worker can have a dead process / a network
605    /// blip), it does not connect blindly to that one pick: it attempts
606    /// `connect_with_auth_tls`, and on failure WALKS FORWARD to the next DISTINCT
607    /// live candidate and retries — up to `min(candidates.len(),
608    /// MAX_SCHEDULE_CONNECT_ATTEMPTS)` attempts. A single stale-but-leased worker
609    /// therefore no longer fails the whole schedule.
610    ///
611    /// Returns the connected [`ChildClient`] plus the chosen [`AgentRecord`] and
612    /// the placement. No live candidate (empty pool) ⇒ a terminal `AgentError`;
613    /// ALL attempted candidates failing to connect ⇒ a terminal `AgentError` —
614    /// the caller NEVER falls back to a local subprocess (that would silently
615    /// defeat the placement). A registry-query failure is likewise terminal. The
616    /// error logs HOW MANY candidates were tried, never the token.
617    async fn resolve_schedulable_worker(
618        &self,
619        role: &str,
620    ) -> std::result::Result<(ChildClient, AgentRecord, ResolvedSchedulablePlacement), AgentError>
621    {
622        let placement = self
623            .schedulable_placements
624            .get(role)
625            .ok_or_else(|| {
626                AgentError::LLM(format!(
627                    "schedulable placement for role '{role}' vanished before scheduling"
628                ))
629            })?
630            .clone();
631
632        // Query the registry through the cached, connection-pooled fabric (#202).
633        let fabric = self.fabric_for(&placement, role)?;
634        let records = fabric.discover().await.map_err(|e| {
635            AgentError::LLM(format!(
636                "schedulable role '{role}': registry '{}' query failed: {e}",
637                placement.registry_url
638            ))
639        })?;
640
641        // Live candidates = records published under the pool's role. The registry
642        // already dropped expired leases, so presence == health.
643        let candidates: Vec<AgentRecord> = records
644            .into_iter()
645            .filter(|r| r.role == placement.pool)
646            .collect();
647
648        if candidates.is_empty() {
649            return Err(AgentError::LLM(format!(
650                "schedulable role '{role}': no live worker in pool '{}' at registry '{}' \
651                 (NOT spawning a local subprocess — a schedulable role has no local fallback)",
652                placement.pool, placement.registry_url
653            )));
654        }
655
656        // Round-robin START: advance a per-pool cursor ONCE to pick the starting
657        // index, then walk forward through the remaining candidates on connect
658        // failure. Bumping the cursor only once per resolve keeps the spread
659        // across sibling spawns (each resolve starts one slot further on); the
660        // failover walk is local to this resolve and does not perturb the cursor.
661        let start = {
662            let mut cursors = self.schedule_cursor.lock().unwrap();
663            let cursor = cursors.entry(placement.pool.clone()).or_insert(0);
664            let i = *cursor % candidates.len();
665            *cursor = cursor.wrapping_add(1);
666            i
667        };
668
669        // Derive the TLS trust ONCE (pinned CA or default roots / plaintext ws://).
670        let trust_cfg = match placement.ca_cert_file.as_deref() {
671            Some(path) => Some(client_config_trusting_cert(path).map_err(|e| {
672                AgentError::LLM(format!(
673                    "scheduled worker CA cert '{}': {e}",
674                    path.display()
675                ))
676            })?),
677            None => None,
678        };
679
680        // Connect-fail failover: try the starting candidate, then walk forward to
681        // the NEXT DISTINCT live candidate on failure, up to the bounded cap. Each
682        // attempt hits a different candidate (modulo wrap from the start index).
683        let max_attempts = candidates.len().min(Self::MAX_SCHEDULE_CONNECT_ATTEMPTS);
684        let mut last_err: Option<String> = None;
685        for attempt in 0..max_attempts {
686            let idx = (start + attempt) % candidates.len();
687            let record = &candidates[idx];
688            let endpoint = record.endpoint.clone();
689            match ChildClient::connect_with_auth_tls(
690                &endpoint,
691                placement.token.as_deref(),
692                trust_cfg.clone(),
693            )
694            .await
695            {
696                Ok(client) => {
697                    if attempt > 0 {
698                        // We skipped one or more stale-but-leased workers — record
699                        // how many (never the endpoint contents beyond the host we
700                        // connected to, never the token).
701                        tracing::info!(
702                            "schedulable role '{role}': connected to pool '{}' worker after \
703                             {attempt} stale candidate(s) skipped",
704                            placement.pool
705                        );
706                    }
707                    return Ok((client, candidates[idx].clone(), placement));
708                }
709                Err(e) => {
710                    tracing::warn!(
711                        "schedulable role '{role}': pool '{}' candidate connect failed \
712                         (attempt {}/{max_attempts}): {e}",
713                        placement.pool,
714                        attempt + 1
715                    );
716                    last_err = Some(e.to_string());
717                }
718            }
719        }
720
721        // All attempted candidates failed to connect ⇒ terminal error. NO spawn
722        // fallback (a schedulable role has none). Log how many were tried, not the
723        // token.
724        Err(AgentError::LLM(format!(
725            "schedulable role '{role}': all {max_attempts} live candidate(s) in pool '{}' at \
726             registry '{}' failed to connect (last error: {}) — NOT spawning a local subprocess",
727            placement.pool,
728            placement.registry_url,
729            last_err.as_deref().unwrap_or("unknown")
730        )))
731    }
732}
733
734#[async_trait]
735impl ExternalChildRunner for ActorChildRunner {
736    async fn should_handle(&self, session: &Session) -> bool {
737        session.metadata.get("runtime.kind") == Some(&"external".to_string())
738            && session.metadata.get("external.protocol") == Some(&"actor".to_string())
739            && session.metadata.get("external.agent_id") == Some(&self.agent_id)
740    }
741
742    fn set_escalation_bridge(&self, bridge: Option<bamboo_subagent::executor::HostBridge>) {
743        *self.escalation_bridge.lock().unwrap() = bridge;
744    }
745
746    async fn execute_external_child(
747        &self,
748        session: &mut Session,
749        job: &SpawnJob,
750        event_tx: mpsc::Sender<AgentEvent>,
751        cancel_token: CancellationToken,
752    ) -> crate::runtime::runner::Result<()> {
753        // #68 CORRECTNESS CRUX: capture the per-run escalation bridge HERE, at the
754        // moment this grandchild is spawned — while the parent run's bridge is
755        // still in our slot — into an owned local handed to `drive()` for this
756        // grandchild's whole lifetime. A fire-and-forget grandchild that OUTLIVES
757        // the run that spawned it must NOT re-read `self.escalation_bridge` at
758        // approval time: by then `run()` may have cleared/overwritten it (a worker
759        // serves runs sequentially), and re-proxying through a closed bridge
760        // fail-closed denies. Capturing at spawn pins the right bridge per run.
761        let escalation = self.escalation_bridge.lock().unwrap().clone();
762        let assignment = extract_assignment(session);
763        let mut spec = self.build_spec(session, job);
764        // Make every actor a warm, reusable worker so the pool can recycle it for
765        // the next sibling with a matching fingerprint.
766        spec.reusable = true;
767        if spec.limits.idle_timeout_secs.is_none() {
768            spec.limits.idle_timeout_secs = Some(POOLED_IDLE_TIMEOUT_SECS);
769        }
770        let pool_key = Self::fingerprint(&spec);
771        // Rehydration: the child session in the parent's store is the actor's
772        // durable state. Ship the full conversation so a reactivation
773        // (send_message / update / rerun) carries its history. A reused worker is
774        // stateless between runs, so this is also what isolates each child's
775        // context on a shared process.
776        let messages: Vec<serde_json::Value> = session
777            .messages
778            .iter()
779            .filter_map(|m| serde_json::to_value(m).ok())
780            .collect();
781
782        // Backpressure: hold a concurrency slot for the lifetime of the *run*
783        // (cancellation still proceeds — the cancel branch in drive() runs while
784        // we hold the permit). Released when this fn returns, i.e. once the worker
785        // is parked back into the pool, so idle workers don't pin slots.
786        let _slot = self
787            .concurrency
788            .acquire()
789            .await
790            .map_err(|_| AgentError::LLM("actor concurrency limiter closed".to_string()))?;
791
792        // Split LOCAL (spawn + warm-pool) from the two process-less remote paths
793        // ONLY at the divergent spots — acquire/connect here and the park/retire at
794        // the end. Everything between (Run dispatch, live-actor registration,
795        // drive, the close) is identical for all three. `kind` is the single guard.
796        //   - Local       (#0):  byte-for-byte the pre-#193 reuse-or-spawn path.
797        //   - Remote       (#194): connect to a FIXED resident endpoint, no spawn.
798        //   - Schedulable  (#181): resolve a live worker from the registry, connect.
799        let kind = match spec.placement {
800            Placement::Remote { .. } => PlacementKind::Remote,
801            Placement::Schedulable { .. } => PlacementKind::Schedulable,
802            Placement::Local => PlacementKind::Local,
803        };
804        let remote = !matches!(kind, PlacementKind::Local);
805
806        let (actor, mut client) = match kind {
807            PlacementKind::Remote => {
808                // REMOTE branch: connect to a resident worker. No spawn, no pool
809                // touch, no drain. We do not own the worker, so a connect failure
810                // has NO respawn fallback — it is a clear, terminal error.
811                let placement = self
812                    .remote_placements
813                    .get(spec.identity.role.as_str())
814                    .ok_or_else(|| {
815                        AgentError::LLM(format!(
816                            "remote placement for role '{}' vanished before connect",
817                            spec.identity.role
818                        ))
819                    })?;
820                let endpoint = placement.endpoint.clone();
821                // Build the TLS trust: a pinned CA pins a self-signed worker cert;
822                // otherwise default webpki roots (or plaintext for `ws://`).
823                let trust_cfg = match placement.ca_cert_file.as_deref() {
824                    Some(path) => Some(client_config_trusting_cert(path).map_err(|e| {
825                        AgentError::LLM(format!("remote worker CA cert '{}': {e}", path.display()))
826                    })?),
827                    None => None,
828                };
829                let client = ChildClient::connect_with_auth_tls(
830                    &endpoint,
831                    placement.token.as_deref(),
832                    trust_cfg,
833                )
834                .await
835                .map_err(|e| {
836                    AgentError::LLM(format!("remote actor connect to '{endpoint}' failed: {e}"))
837                })?;
838                // Process-less handle so live-actor registration (in-band steering)
839                // works exactly as for a local worker; `kill()` is a no-op.
840                let record = AgentRecord {
841                    agent_id: job.child_session_id.clone(),
842                    role: spec.identity.role.clone(),
843                    labels: Vec::new(),
844                    endpoint: endpoint.clone(),
845                    pid: 0,
846                    version: String::new(),
847                    started_at: chrono::Utc::now(),
848                    lease_expires_at: chrono::Utc::now(),
849                };
850                let actor = PooledActor {
851                    worker: SpawnedChild::remote(record),
852                    endpoint,
853                    agent_id: job.child_session_id.clone(),
854                };
855                (actor, client)
856            }
857            PlacementKind::Schedulable => {
858                // SCHEDULABLE branch (#181, P2b + #202): resolve a LIVE worker from
859                // the registry (round-robin over the pool's live records) AND
860                // connect to it, with connect-fail failover — on a stale-but-leased
861                // worker, resolve walks to the next live candidate (bounded). No
862                // spawn, no pool, no kill, NO local fallback — no live worker, or
863                // ALL candidates dead ⇒ a terminal error (raised inside
864                // resolve_schedulable_worker, which owns the connect now).
865                let (client, record, _placement) = self
866                    .resolve_schedulable_worker(spec.identity.role.as_str())
867                    .await?;
868                let endpoint = record.endpoint.clone();
869                // The chosen registry record IS the AgentRecord — synthesize the
870                // process-less handle straight from it (registry-managed worker).
871                let actor = PooledActor {
872                    worker: SpawnedChild::remote(record),
873                    endpoint,
874                    agent_id: job.child_session_id.clone(),
875                };
876                (actor, client)
877            }
878            PlacementKind::Local => {
879                // LOCAL branch — the EXACT pre-#193 path: reuse-or-spawn + the
880                // respawn-on-connect-miss fallback. Unchanged in behavior, ordering,
881                // and error text.
882                let mut actor = self.acquire_worker(&pool_key, &spec).await?;
883                let client = match ChildClient::connect(&actor.endpoint).await {
884                    Ok(client) => client,
885                    Err(e) => {
886                        // The pooled worker may have died between checkout and connect;
887                        // retire it and spawn one fresh, once.
888                        self.retire_worker(actor).await;
889                        let spawned = spawn_worker(
890                            &self.worker_bin,
891                            &self.worker_args,
892                            &spec,
893                            self.spawn_timeout,
894                        )
895                        .await
896                        .map_err(|e2| {
897                            AgentError::LLM(format!("actor respawn after reuse miss ({e}): {e2}"))
898                        })?;
899                        let endpoint = spawned.record.endpoint.clone();
900                        let agent_id = spawned.record.agent_id.clone();
901                        let client = ChildClient::connect(&endpoint)
902                            .await
903                            .map_err(|e2| AgentError::LLM(format!("actor connect failed: {e2}")))?;
904                        actor = PooledActor {
905                            worker: spawned,
906                            endpoint,
907                            agent_id,
908                        };
909                        client
910                    }
911                };
912                (actor, client)
913            }
914        };
915
916        client
917            .send(ParentFrame::Run(RunSpec {
918                assignment,
919                reasoning_effort: None,
920                messages,
921            }))
922            .await
923            .map_err(|e| AgentError::LLM(format!("actor run dispatch failed: {e}")))?;
924
925        // Register as a live actor so send_message (running, no interrupt) can
926        // steer this child in-band over the existing WS connection. The guard
927        // unregisters on every exit path.
928        let (live_tx, mut live_rx) = mpsc::unbounded_channel::<ParentFrame>();
929        let live_guard = super::live::register(&job.child_session_id, live_tx);
930
931        let result = drive(
932            &mut client,
933            &job.child_session_id,
934            self.approval_decider.as_ref(),
935            escalation,
936            &event_tx,
937            &cancel_token,
938            &mut live_rx,
939        )
940        .await;
941        // Unregister IMMEDIATELY: after drive returns nobody consumes live_rx,
942        // so a send_message landing in the close/park window below must see
943        // "not live" and take the durable-queue fallback instead of vanishing.
944        // (Even if one slipped in earlier, send_message also appends it to the
945        // durable transcript, so the next activation still rehydrates it.)
946        drop(live_guard);
947
948        // Close the connection: the worker's serve loop then accepts the next
949        // assignment (reuse) or idles out. Park the worker on a clean run; retire
950        // it on error/cancel (a wedged worker must not be reused).
951        let _ = client.close().await;
952        if remote {
953            // #193 / #181: we do NOT own a remote or scheduled worker — never park
954            // it into the local pool (its endpoint/agent_id are not ours to
955            // recycle) and never kill it (`SpawnedChild::remote.kill()` is a no-op
956            // anyway). Just let the connection drop above; the resident /
957            // registry-managed worker self-manages via its own idle timeout, ready
958            // for the next parent. `actor` is dropped here. (Schedulable shares this
959            // arm — release is a no-op for both registry-managed cases.)
960            drop(actor);
961        } else {
962            match &result {
963                Ok(_) => self.release_worker(&pool_key, actor).await,
964                Err(_) => self.retire_worker(actor).await,
965            }
966        }
967
968        // Write-back: persist the actor's final reply onto the child session so
969        // the transcript survives and the NEXT activation sees it as history.
970        // (run_child_spawn saves the session right after we return.)
971        match result {
972            Ok(Some(text)) => {
973                if !text.is_empty() {
974                    session.add_message(bamboo_agent_core::Message::assistant(text, None));
975                }
976                Ok(())
977            }
978            Ok(None) => Ok(()),
979            Err(e) => Err(e),
980        }
981    }
982}
983
984/// Pump child frames -> parent events until a terminal frame (or cancellation).
985/// On success, yields the actor's final result text (for session write-back).
986/// `live_rx` carries in-band frames (steering messages) from the live registry.
987///
988/// `escalation_bridge` (#68) is the per-run escalation host bridge CAPTURED BY
989/// VALUE at spawn time in `execute_external_child` (NOT read live here): when a
990/// non-bypass child re-proxies an approval request, this owned bridge routes it
991/// UP to the parent run. Owning it for the call's lifetime is what lets a
992/// fire-and-forget grandchild that outlives its spawning run still escalate to
993/// the correct (then-current) parent bridge rather than a stale/overwritten one.
994async fn drive(
995    client: &mut ChildClient,
996    child_session_id: &str,
997    approval_decider: Option<&Arc<dyn ChildApprovalDecider>>,
998    escalation_bridge: Option<bamboo_subagent::executor::HostBridge>,
999    event_tx: &mpsc::Sender<AgentEvent>,
1000    cancel_token: &CancellationToken,
1001    live_rx: &mut mpsc::UnboundedReceiver<ParentFrame>,
1002) -> crate::runtime::runner::Result<Option<String>> {
1003    loop {
1004        tokio::select! {
1005            _ = cancel_token.cancelled() => {
1006                // fall through to the cancel handling below
1007                break;
1008            }
1009            Some(frame) = live_rx.recv() => {
1010                // Forward in-band steering to the worker over the existing WS.
1011                if client.send(frame).await.is_err() {
1012                    tracing::warn!("live steering frame could not be sent; connection failing");
1013                }
1014            }
1015            frame = client.next_frame() => {
1016                match frame {
1017                    Ok(Some(ChildFrame::Event { event })) => {
1018                        // AgentEvent is serialized verbatim on the wire (zero mapping).
1019                        if let Ok(ev) = serde_json::from_value::<AgentEvent>(event) {
1020                            let _ = event_tx.send(ev).await;
1021                        }
1022                    }
1023                    Ok(Some(ChildFrame::ApprovalRequest { id, body })) => {
1024                        // Phase 2: a worker proxied a gated-tool approval back to
1025                        // the host. The WORKER side is live — its executor installs
1026                        // a per-run task-local `ApprovalProxy` (subagent_worker.rs)
1027                        // that calls `host.approval_call`, so this frame arrives
1028                        // when a child hits `ConfirmationRequired`.
1029                        if let Some(reviewer) = child_approval_reviewer() {
1030                            // Phase 6, Part B: a BYPASSED parent worker
1031                            // model-reviews its children's forced-ask (dangerous)
1032                            // actions. The review is an LLM call, so run it OFF
1033                            // the frame pump in a spawned task and deliver the
1034                            // verdict async via the live channel — the pump keeps
1035                            // forwarding events and the agent loop never blocks. A
1036                            // timeout denies a hung review so the child can't hang.
1037                            let child = child_session_id.to_string();
1038                            let req_id = id.clone();
1039                            let body = body.clone();
1040                            tokio::spawn(async move {
1041                                let approved = tokio::time::timeout(
1042                                    CHILD_APPROVAL_TIMEOUT,
1043                                    reviewer.review(&child, &body),
1044                                )
1045                                .await
1046                                .unwrap_or(false);
1047                                super::live::deliver_approval(&child, &req_id, approved);
1048                            });
1049                        } else if approval_decider.is_some() {
1050                            // A decider is wired (policy / auto): decide promptly
1051                            // and reply inline. (Must not block the pump — see the
1052                            // `ChildApprovalDecider` doc.)
1053                            let approved =
1054                                decide_child_approval(approval_decider, child_session_id, &body)
1055                                    .await;
1056                            if client
1057                                .send(ParentFrame::ApprovalReply { id, approved })
1058                                .await
1059                                .is_err()
1060                            {
1061                                tracing::warn!(
1062                                    "failed to answer approval_request; connection failing"
1063                                );
1064                            }
1065                        } else if let Some(host) = escalation_bridge.clone() {
1066                            // Non-bypass WORKER: ESCALATE up our own actor link
1067                            // (re-proxy) so the request chains to our parent — and
1068                            // up every level until a bypass level (model-review) or
1069                            // the top orchestrator (human) decides. Off-loop so the
1070                            // pump never blocks; relay the reply down to the child.
1071                            let child = child_session_id.to_string();
1072                            let req_id = id.clone();
1073                            let body = body.clone();
1074                            tokio::spawn(async move {
1075                                let approved = match tokio::time::timeout(
1076                                    CHILD_APPROVAL_TIMEOUT,
1077                                    host.approval_call(body),
1078                                )
1079                                .await
1080                                {
1081                                    Ok(Ok(reply)) => reply
1082                                        .get("approved")
1083                                        .and_then(|v| v.as_bool())
1084                                        .unwrap_or(false),
1085                                    // Transport error or timeout ⇒ fail closed.
1086                                    _ => false,
1087                                };
1088                                super::live::deliver_approval(&child, &req_id, approved);
1089                            });
1090                        } else {
1091                            // Top orchestrator (no escalation bridge): human-in-the-
1092                            // loop. Surface the request on the parent's event stream
1093                            // and DEFER — the decision arrives out-of-band via
1094                            // `live::deliver_approval(child, request_id, approved)`
1095                            // (→ this child's `live_rx` → forwarded to the worker
1096                            // above). A timeout denies a never-answered request so
1097                            // it can't hang the child forever.
1098                            let (tool_name, permission, resource) =
1099                                approval_request_fields(&body);
1100                            // Register the pending request BEFORE surfacing it so
1101                            // the external handler's `deliver_approval_checked` can
1102                            // correlate an out-of-band POST against a genuine
1103                            // human-loop request (and consume it one-shot).
1104                            super::live::register_pending_approval(child_session_id, &id);
1105                            let _ = event_tx
1106                                .send(AgentEvent::ChildApprovalRequested {
1107                                    child_session_id: child_session_id.to_string(),
1108                                    request_id: id.clone(),
1109                                    tool_name,
1110                                    permission,
1111                                    resource,
1112                                })
1113                                .await;
1114                            let child = child_session_id.to_string();
1115                            tokio::spawn(async move {
1116                                tokio::time::sleep(CHILD_APPROVAL_TIMEOUT).await;
1117                                // Deny only if still pending: a one-shot consume so
1118                                // we don't double-deliver if the human already
1119                                // answered (the POST took it), and so a late POST
1120                                // after this fires finds nothing pending.
1121                                if super::live::take_pending_approval(&child, &id) {
1122                                    super::live::deliver_approval(&child, &id, false);
1123                                }
1124                            });
1125                        }
1126                    }
1127                    Ok(Some(ChildFrame::Terminal { status, result, error, .. })) => {
1128                        return match status {
1129                            TerminalStatus::Completed => Ok(result),
1130                            TerminalStatus::Cancelled => Err(AgentError::Cancelled),
1131                            TerminalStatus::Error => Err(AgentError::LLM(
1132                                error.unwrap_or_else(|| "actor child errored".to_string()),
1133                            )),
1134                            // The suspend/resume round-trip (host re-dispatch of a
1135                            // nested parent) is not wired here yet; a worker in
1136                            // this build never emits Suspended, so this is
1137                            // unreachable in practice.
1138                            TerminalStatus::Suspended => Err(AgentError::LLM(
1139                                "nested sub-agent suspend received but resume transport is not wired"
1140                                    .to_string(),
1141                            )),
1142                        };
1143                    }
1144                    Ok(None) => {
1145                        return Err(AgentError::LLM(
1146                            "actor child closed before terminal".to_string(),
1147                        ));
1148                    }
1149                    Err(e) => {
1150                        return Err(AgentError::LLM(format!("actor transport error: {e}")));
1151                    }
1152                }
1153            }
1154        }
1155    }
1156
1157    // Only reached on cancellation: ask the child to stop (best-effort), then report cancelled.
1158    let _ = client.send(ParentFrame::Cancel).await;
1159    Err(AgentError::Cancelled)
1160}
1161
1162/// The assignment text = the child session's latest user message (falls back to its title).
1163fn extract_assignment(session: &Session) -> String {
1164    session
1165        .messages
1166        .iter()
1167        .rev()
1168        .find(|m| matches!(m.role, Role::User))
1169        .map(|m| m.content.clone())
1170        .unwrap_or_else(|| {
1171            session
1172                .metadata
1173                .get("title")
1174                .cloned()
1175                .unwrap_or_else(|| "Execute task".to_string())
1176        })
1177}
1178
1179#[cfg(test)]
1180mod tests {
1181    use super::*;
1182
1183    fn spec_with(
1184        role: &str,
1185        provider: &str,
1186        model: &str,
1187        workspace: Option<&str>,
1188        disabled: Option<Vec<&str>>,
1189    ) -> ProvisionSpec {
1190        let mut spec = ProvisionSpec::new(
1191            ChildIdentity {
1192                child_id: "c".into(),
1193                parent_id: None,
1194                project_key: None,
1195                role: role.into(),
1196                depth: 0,
1197            },
1198            ExecutorSpec::Echo,
1199            "/tmp/fab".into(),
1200        );
1201        spec.workspace = workspace.map(|w| w.to_string());
1202        spec.model = Some(ModelRefSpec {
1203            provider: provider.into(),
1204            model: model.into(),
1205        });
1206        spec.disabled_tools = disabled.map(|d| d.into_iter().map(String::from).collect());
1207        spec
1208    }
1209
1210    #[test]
1211    fn fingerprint_matches_interchangeable_children() {
1212        // Same role/provider/model/workspace and equal tool sets (order-insensitive)
1213        // are interchangeable on one warm worker — and differ only in child_id.
1214        let a = spec_with(
1215            "explorer",
1216            "p",
1217            "m",
1218            Some("/ws"),
1219            Some(vec!["Bash", "Edit"]),
1220        );
1221        let mut b = spec_with(
1222            "explorer",
1223            "p",
1224            "m",
1225            Some("/ws"),
1226            Some(vec!["Edit", "Bash"]),
1227        );
1228        b.identity.child_id = "other".into();
1229        assert_eq!(
1230            ActorChildRunner::fingerprint(&a),
1231            ActorChildRunner::fingerprint(&b)
1232        );
1233    }
1234
1235    #[test]
1236    fn fingerprint_separates_distinct_runtimes() {
1237        let base = spec_with("explorer", "p", "m", Some("/ws"), None);
1238        let base_fp = ActorChildRunner::fingerprint(&base);
1239        // Each axis that is baked into the worker must split the pool bucket.
1240        assert_ne!(
1241            base_fp,
1242            ActorChildRunner::fingerprint(&spec_with("writer", "p", "m", Some("/ws"), None))
1243        );
1244        assert_ne!(
1245            base_fp,
1246            ActorChildRunner::fingerprint(&spec_with("explorer", "p2", "m", Some("/ws"), None))
1247        );
1248        assert_ne!(
1249            base_fp,
1250            ActorChildRunner::fingerprint(&spec_with("explorer", "p", "m2", Some("/ws"), None))
1251        );
1252        assert_ne!(
1253            base_fp,
1254            ActorChildRunner::fingerprint(&spec_with("explorer", "p", "m", Some("/ws2"), None))
1255        );
1256        assert_ne!(
1257            base_fp,
1258            ActorChildRunner::fingerprint(&spec_with(
1259                "explorer",
1260                "p",
1261                "m",
1262                Some("/ws"),
1263                Some(vec!["Bash"])
1264            ))
1265        );
1266    }
1267
1268    #[test]
1269    fn fingerprint_splits_on_baked_capabilities() {
1270        // Every capability baked once at provision time must split the pool
1271        // bucket, else a worker baked for one posture gets reused for another
1272        // (e.g. a depth-1 worker re-stamping spawn_depth onto a depth-4 child,
1273        // breaking the depth cap; or a bypass worker reused for a non-bypass one).
1274        let base_fp =
1275            ActorChildRunner::fingerprint(&spec_with("explorer", "p", "m", Some("/ws"), None));
1276
1277        let mut depth = spec_with("explorer", "p", "m", Some("/ws"), None);
1278        depth.identity.depth = 2;
1279        assert_ne!(
1280            base_fp,
1281            ActorChildRunner::fingerprint(&depth),
1282            "depth must split"
1283        );
1284
1285        let mut nested = spec_with("explorer", "p", "m", Some("/ws"), None);
1286        nested.capabilities.nested_spawn = true;
1287        assert_ne!(
1288            base_fp,
1289            ActorChildRunner::fingerprint(&nested),
1290            "nested_spawn must split"
1291        );
1292
1293        let mut bypass = spec_with("explorer", "p", "m", Some("/ws"), None);
1294        bypass.capabilities.bypass = true;
1295        assert_ne!(
1296            base_fp,
1297            ActorChildRunner::fingerprint(&bypass),
1298            "bypass must split"
1299        );
1300
1301        let mut enforce = spec_with("explorer", "p", "m", Some("/ws"), None);
1302        enforce.capabilities.enforce_permissions = true;
1303        assert_ne!(
1304            base_fp,
1305            ActorChildRunner::fingerprint(&enforce),
1306            "enforce_permissions must split"
1307        );
1308
1309        let mut cap = spec_with("explorer", "p", "m", Some("/ws"), None);
1310        cap.capabilities.max_spawn_depth = Some(8);
1311        assert_ne!(
1312            base_fp,
1313            ActorChildRunner::fingerprint(&cap),
1314            "max_spawn_depth must split"
1315        );
1316
1317        // #73 (P1): the worker bakes `no_human_review` from this flag once at
1318        // build(), so it MUST split the pool or a worker baked for one approval
1319        // posture is reused for the opposite one.
1320        let mut nha = spec_with("explorer", "p", "m", Some("/ws"), None);
1321        nha.capabilities.no_human_approver = true;
1322        assert_ne!(
1323            base_fp,
1324            ActorChildRunner::fingerprint(&nha),
1325            "no_human_approver must split"
1326        );
1327
1328        // #71: the read-only Bash checker is baked once at build() from this flag,
1329        // so a guardian reviewer worker must not be reused for an ordinary child.
1330        let mut gro = spec_with("explorer", "p", "m", Some("/ws"), None);
1331        gro.capabilities.guardian_read_only = true;
1332        assert_ne!(
1333            base_fp,
1334            ActorChildRunner::fingerprint(&gro),
1335            "guardian_read_only must split"
1336        );
1337    }
1338
1339    struct StaticDecider(bool);
1340
1341    #[async_trait]
1342    impl ChildApprovalDecider for StaticDecider {
1343        async fn decide(&self, _child: &str, _req: &serde_json::Value) -> bool {
1344            self.0
1345        }
1346    }
1347
1348    #[tokio::test]
1349    async fn child_approval_fails_closed_without_decider() {
1350        // No decider wired ⇒ the host denies (safe default), unchanged behavior.
1351        let body = serde_json::json!({"tool_name":"Bash","permission":"run","resource":"rm -rf /"});
1352        assert!(!decide_child_approval(None, "child-1", &body).await);
1353    }
1354
1355    #[tokio::test]
1356    async fn child_approval_honors_wired_decider() {
1357        let body =
1358            serde_json::json!({"tool_name":"Write","permission":"write","resource":"/tmp/x"});
1359        let approve: Arc<dyn ChildApprovalDecider> = Arc::new(StaticDecider(true));
1360        let deny: Arc<dyn ChildApprovalDecider> = Arc::new(StaticDecider(false));
1361        assert!(decide_child_approval(Some(&approve), "child-1", &body).await);
1362        assert!(!decide_child_approval(Some(&deny), "child-1", &body).await);
1363    }
1364
1365    #[test]
1366    fn approval_request_fields_extracts_and_defaults() {
1367        let full = serde_json::json!({"tool_name":"Bash","permission":"run","resource":"ls"});
1368        assert_eq!(
1369            approval_request_fields(&full),
1370            ("Bash".to_string(), "run".to_string(), "ls".to_string())
1371        );
1372        // Missing fields default to empty strings.
1373        let partial = serde_json::json!({"tool_name":"Write"});
1374        assert_eq!(
1375            approval_request_fields(&partial),
1376            ("Write".to_string(), String::new(), String::new())
1377        );
1378    }
1379
1380    // ---- #193: remote placement routing -------------------------------------
1381
1382    use crate::runtime::execution::SpawnJob;
1383    use bamboo_agent_core::Session;
1384
1385    /// A runner with a BOGUS worker_bin (`/bin/false`): a local spawn here would
1386    /// FAIL, so a passing remote test proves the remote path never spawns.
1387    fn bogus_runner(placements: HashMap<String, ResolvedRemotePlacement>) -> ActorChildRunner {
1388        ActorChildRunner::new(
1389            "test-actor".into(),
1390            PathBuf::from("/bin/false"),
1391            vec![],
1392            std::env::temp_dir().join("bamboo-test-fab-193"),
1393            ExecutorSpec::Echo,
1394            vec![],
1395            "anthropic".into(),
1396            4,
1397        )
1398        .with_remote_placements(placements)
1399    }
1400
1401    /// A child session of the given role (the role rides `subagent_type`, the
1402    /// path build_spec + the remote lookup both read).
1403    fn session_of_role(role: &str, assignment: &str) -> Session {
1404        let mut s = Session::new("child-1", "test-model");
1405        s.metadata
1406            .insert("subagent_type".to_string(), role.to_string());
1407        s.add_message(bamboo_agent_core::Message::user(assignment));
1408        s
1409    }
1410
1411    fn job_for(child: &str) -> SpawnJob {
1412        SpawnJob {
1413            parent_session_id: "parent-1".into(),
1414            child_session_id: child.into(),
1415            model: String::new(),
1416            disabled_tools: None,
1417        }
1418    }
1419
1420    #[test]
1421    fn build_spec_sets_remote_placement_for_matching_role() {
1422        let mut placements = HashMap::new();
1423        placements.insert(
1424            "explorer".to_string(),
1425            ResolvedRemotePlacement {
1426                endpoint: "wss://gpu-host:8443".into(),
1427                token: Some("T-secret".into()),
1428                ca_cert_file: None,
1429            },
1430        );
1431        let runner = bogus_runner(placements);
1432
1433        // Matching role -> Placement::Remote + the bearer on the secrets envelope.
1434        let s = session_of_role("explorer", "do the thing");
1435        let spec = runner.build_spec(&s, &job_for("child-1"));
1436        match &spec.placement {
1437            Placement::Remote { endpoint } => assert_eq!(endpoint, "wss://gpu-host:8443"),
1438            other => panic!("expected Remote, got {other:?}"),
1439        }
1440        assert_eq!(spec.secrets.worker_auth_token.as_deref(), Some("T-secret"));
1441    }
1442
1443    #[test]
1444    fn build_spec_leaves_local_for_unmatched_role() {
1445        let mut placements = HashMap::new();
1446        placements.insert(
1447            "explorer".to_string(),
1448            ResolvedRemotePlacement {
1449                endpoint: "wss://gpu-host:8443".into(),
1450                token: Some("T".into()),
1451                ca_cert_file: None,
1452            },
1453        );
1454        let runner = bogus_runner(placements);
1455
1456        // A DIFFERENT role keeps the default Local placement + no bearer.
1457        let s = session_of_role("writer", "do the thing");
1458        let spec = runner.build_spec(&s, &job_for("child-1"));
1459        assert_eq!(spec.placement, Placement::Local);
1460        assert!(spec.secrets.worker_auth_token.is_none());
1461    }
1462
1463    #[test]
1464    fn build_spec_local_when_no_placements() {
1465        let runner = bogus_runner(HashMap::new());
1466        let s = session_of_role("explorer", "do the thing");
1467        let spec = runner.build_spec(&s, &job_for("child-1"));
1468        assert_eq!(spec.placement, Placement::Local);
1469        assert!(spec.secrets.worker_auth_token.is_none());
1470    }
1471
1472    /// End-to-end remote run through `execute_external_child`: a resident worker
1473    /// (Bearer-gated `WsServer` + `EchoExecutor`) serves the role; the runner is
1474    /// built with a `remote_placements` entry pointing at it AND a BOGUS
1475    /// worker_bin (`/bin/false`). A passing test proves the remote path CONNECTS
1476    /// to the resident worker and NEVER spawns (a spawn would fail on /bin/false),
1477    /// and that a terminal/echo result flows back.
1478    #[tokio::test]
1479    async fn execute_external_child_routes_role_to_remote_worker_without_spawning() {
1480        // 1. Stand up the resident worker on loopback with a required bearer.
1481        let token = "remote-test-token";
1482        let server = bamboo_subagent::transport::WsServer::bind_with_token(
1483            (std::net::Ipv4Addr::LOCALHOST, 0).into(),
1484            Some(token.to_string()),
1485        )
1486        .await
1487        .expect("bind resident worker");
1488        let endpoint = server.ws_endpoint(); // ws://127.0.0.1:<port>
1489        let srv = tokio::spawn(async move {
1490            // serve() loops connection-after-connection; the test exits, dropping it.
1491            let _ = server
1492                .serve(Arc::new(bamboo_subagent::executor::EchoExecutor))
1493                .await;
1494        });
1495
1496        // 2. Build the runner: role "explorer" pinned remote, bogus worker_bin.
1497        let mut placements = HashMap::new();
1498        placements.insert(
1499            "explorer".to_string(),
1500            ResolvedRemotePlacement {
1501                endpoint: endpoint.clone(),
1502                token: Some(token.to_string()),
1503                ca_cert_file: None, // plaintext ws:// loopback, default trust
1504            },
1505        );
1506        let runner = bogus_runner(placements);
1507
1508        // 3. Drive a real run for that role.
1509        let mut session = session_of_role("explorer", "hello remote");
1510        let job = job_for("child-1");
1511        let (event_tx, mut event_rx) = mpsc::channel::<AgentEvent>(64);
1512        let cancel = CancellationToken::new();
1513
1514        let result = tokio::time::timeout(
1515            Duration::from_secs(10),
1516            runner.execute_external_child(&mut session, &job, event_tx, cancel),
1517        )
1518        .await
1519        .expect("run did not hang")
1520        .expect("remote run succeeded (connected to resident worker, did not spawn)");
1521
1522        let _ = result;
1523        // The EchoExecutor's reply is written back onto the child session as an
1524        // assistant message — proof a terminal result flowed back over the link.
1525        let last = session
1526            .messages
1527            .iter()
1528            .rev()
1529            .find(|m| matches!(m.role, Role::Assistant))
1530            .expect("an assistant reply was written back");
1531        assert!(
1532            last.content.contains("echo:"),
1533            "expected echo reply, got {:?}",
1534            last.content
1535        );
1536
1537        // Drain a couple of streamed events to confirm the event pipe carried the
1538        // worker's tokens too (best-effort; the reply assertion above is primary).
1539        let mut saw_event = false;
1540        while let Ok(Some(_ev)) =
1541            tokio::time::timeout(Duration::from_millis(50), event_rx.recv()).await
1542        {
1543            saw_event = true;
1544        }
1545        let _ = saw_event;
1546
1547        srv.abort();
1548    }
1549
1550    // ---- #181 (P2b): schedulable placement routing --------------------------
1551
1552    use wiremock::matchers::{method as wm_method, path as wm_path};
1553    use wiremock::{Mock, MockServer, ResponseTemplate};
1554
1555    /// A bogus-worker_bin runner carrying SCHEDULABLE placements (and optionally
1556    /// remote ones, to test precedence). A local spawn here would fail on
1557    /// `/bin/false`, so a passing schedulable test proves no subprocess spawned.
1558    fn bogus_sched_runner(
1559        remote: HashMap<String, ResolvedRemotePlacement>,
1560        sched: HashMap<String, ResolvedSchedulablePlacement>,
1561    ) -> ActorChildRunner {
1562        ActorChildRunner::new(
1563            "test-actor".into(),
1564            PathBuf::from("/bin/false"),
1565            vec![],
1566            std::env::temp_dir().join("bamboo-test-fab-181"),
1567            ExecutorSpec::Echo,
1568            vec![],
1569            "anthropic".into(),
1570            4,
1571        )
1572        .with_remote_placements(remote)
1573        .with_schedulable_placements(sched)
1574    }
1575
1576    fn sched_placement(
1577        pool: &str,
1578        registry_url: impl Into<String>,
1579    ) -> ResolvedSchedulablePlacement {
1580        ResolvedSchedulablePlacement {
1581            pool: pool.into(),
1582            registry_url: registry_url.into(),
1583            token: None,
1584            ca_cert_file: None,
1585        }
1586    }
1587
1588    fn live_record(agent_id: &str, role: &str, endpoint: &str) -> AgentRecord {
1589        AgentRecord {
1590            agent_id: agent_id.into(),
1591            role: role.into(),
1592            labels: Vec::new(),
1593            endpoint: endpoint.into(),
1594            pid: 0,
1595            version: String::new(),
1596            started_at: chrono::Utc::now(),
1597            lease_expires_at: chrono::Utc::now() + chrono::Duration::seconds(60),
1598        }
1599    }
1600
1601    #[test]
1602    fn build_spec_sets_schedulable_placement_for_matching_role() {
1603        let mut sched = HashMap::new();
1604        sched.insert("explorer".to_string(), {
1605            let mut p = sched_placement("gpu-pool", "https://control-plane:9562");
1606            p.token = Some("T-sched".into());
1607            p
1608        });
1609        let runner = bogus_sched_runner(HashMap::new(), sched);
1610
1611        let s = session_of_role("explorer", "do the thing");
1612        let spec = runner.build_spec(&s, &job_for("child-1"));
1613        match &spec.placement {
1614            Placement::Schedulable { pool } => assert_eq!(pool, "gpu-pool"),
1615            other => panic!("expected Schedulable, got {other:?}"),
1616        }
1617        // The bearer rides the scoped secrets envelope (registry + worker connect).
1618        assert_eq!(spec.secrets.worker_auth_token.as_deref(), Some("T-sched"));
1619    }
1620
1621    #[test]
1622    fn build_spec_remote_wins_when_role_in_both_maps() {
1623        // A role present in BOTH remote_placements and schedulable_placements must
1624        // resolve to the FIXED remote placement (documented precedence).
1625        let mut remote = HashMap::new();
1626        remote.insert(
1627            "explorer".to_string(),
1628            ResolvedRemotePlacement {
1629                endpoint: "wss://fixed-host:8443".into(),
1630                token: Some("T-remote".into()),
1631                ca_cert_file: None,
1632            },
1633        );
1634        let mut sched = HashMap::new();
1635        sched.insert(
1636            "explorer".to_string(),
1637            sched_placement("gpu-pool", "https://control-plane:9562"),
1638        );
1639        let runner = bogus_sched_runner(remote, sched);
1640
1641        let s = session_of_role("explorer", "do the thing");
1642        let spec = runner.build_spec(&s, &job_for("child-1"));
1643        match &spec.placement {
1644            Placement::Remote { endpoint } => assert_eq!(endpoint, "wss://fixed-host:8443"),
1645            other => panic!("expected Remote (precedence), got {other:?}"),
1646        }
1647        assert_eq!(spec.secrets.worker_auth_token.as_deref(), Some("T-remote"));
1648    }
1649
1650    #[test]
1651    fn build_spec_local_for_unmatched_schedulable_role() {
1652        let mut sched = HashMap::new();
1653        sched.insert(
1654            "explorer".to_string(),
1655            sched_placement("gpu-pool", "https://control-plane:9562"),
1656        );
1657        let runner = bogus_sched_runner(HashMap::new(), sched);
1658        let s = session_of_role("writer", "do the thing");
1659        let spec = runner.build_spec(&s, &job_for("child-1"));
1660        assert_eq!(spec.placement, Placement::Local);
1661        assert!(spec.secrets.worker_auth_token.is_none());
1662    }
1663
1664    /// Spin up `n` loopback Echo `WsServer`s; returns their endpoints and the
1665    /// JoinHandles (abort on test end). Each is a REAL connectable worker.
1666    async fn spawn_echo_workers(n: usize) -> (Vec<String>, Vec<tokio::task::JoinHandle<()>>) {
1667        let mut endpoints = Vec::new();
1668        let mut handles = Vec::new();
1669        for _ in 0..n {
1670            let server = bamboo_subagent::transport::WsServer::bind_loopback()
1671                .await
1672                .expect("bind echo worker");
1673            endpoints.push(server.ws_endpoint());
1674            handles.push(tokio::spawn(async move {
1675                let _ = server
1676                    .serve(Arc::new(bamboo_subagent::executor::EchoExecutor))
1677                    .await;
1678            }));
1679        }
1680        (endpoints, handles)
1681    }
1682
1683    #[tokio::test]
1684    async fn resolve_schedulable_worker_round_robin_spreads_over_candidates() {
1685        // Registry returns three live, CONNECTABLE workers in the pool; successive
1686        // picks must advance the per-pool cursor and cover all three (round-robin
1687        // spread). resolve now also connects, so the candidates are real servers.
1688        let (eps, handles) = spawn_echo_workers(3).await;
1689        let registry = MockServer::start().await;
1690        let recs = vec![
1691            live_record("w-0", "gpu-pool", &eps[0]),
1692            live_record("w-1", "gpu-pool", &eps[1]),
1693            live_record("w-2", "gpu-pool", &eps[2]),
1694            // A worker in a DIFFERENT pool must be filtered out.
1695            live_record("other", "cpu-pool", "ws://127.0.0.1:9"),
1696        ];
1697        Mock::given(wm_method("GET"))
1698            .and(wm_path("/v1/agents"))
1699            .respond_with(ResponseTemplate::new(200).set_body_json(recs))
1700            .mount(&registry)
1701            .await;
1702
1703        let mut sched = HashMap::new();
1704        sched.insert(
1705            "explorer".to_string(),
1706            sched_placement("gpu-pool", registry.uri()),
1707        );
1708        let runner = bogus_sched_runner(HashMap::new(), sched);
1709
1710        // Three picks → cursor 0,1,2 → agent_ids w-0, w-1, w-2 in order.
1711        let mut picked = Vec::new();
1712        for _ in 0..3 {
1713            let (client, rec, placement) = match runner.resolve_schedulable_worker("explorer").await
1714            {
1715                Ok(v) => v,
1716                Err(e) => panic!("a live worker is picked: {e}"),
1717            };
1718            assert_eq!(placement.pool, "gpu-pool");
1719            assert_eq!(rec.role, "gpu-pool", "only pool workers are candidates");
1720            picked.push(rec.agent_id);
1721            let _ = client.close().await;
1722        }
1723        picked.sort();
1724        assert_eq!(
1725            picked,
1726            vec!["w-0".to_string(), "w-1".to_string(), "w-2".to_string()],
1727            "round-robin covered every candidate over three picks"
1728        );
1729        for h in handles {
1730            h.abort();
1731        }
1732    }
1733
1734    #[tokio::test]
1735    async fn resolve_schedulable_worker_fails_over_dead_first_candidate() {
1736        // The FIRST candidate (by cursor order: cursor starts at 0) points at a
1737        // DEAD endpoint (no listener on a closed port); a LATER candidate is a real
1738        // Echo worker. resolve must SKIP the dead one and connect to the live one,
1739        // returning the live record — connect-fail failover (#202).
1740        let (eps, handles) = spawn_echo_workers(1).await;
1741        // A port with no listener: bind then drop to free it (best-effort closed).
1742        let dead = {
1743            let l = tokio::net::TcpListener::bind((std::net::Ipv4Addr::LOCALHOST, 0))
1744                .await
1745                .unwrap();
1746            let port = l.local_addr().unwrap().port();
1747            drop(l);
1748            format!("ws://127.0.0.1:{port}")
1749        };
1750        let registry = MockServer::start().await;
1751        let recs = vec![
1752            // idx 0 = cursor start = DEAD
1753            live_record("w-dead", "gpu-pool", &dead),
1754            // idx 1 = LIVE echo worker
1755            live_record("w-live", "gpu-pool", &eps[0]),
1756        ];
1757        Mock::given(wm_method("GET"))
1758            .and(wm_path("/v1/agents"))
1759            .respond_with(ResponseTemplate::new(200).set_body_json(recs))
1760            .mount(&registry)
1761            .await;
1762
1763        let mut sched = HashMap::new();
1764        sched.insert(
1765            "explorer".to_string(),
1766            sched_placement("gpu-pool", registry.uri()),
1767        );
1768        let runner = bogus_sched_runner(HashMap::new(), sched);
1769
1770        let (client, rec, _placement) = match runner.resolve_schedulable_worker("explorer").await {
1771            Ok(v) => v,
1772            Err(e) => panic!("failover skips the dead candidate, connects to the live one: {e}"),
1773        };
1774        assert_eq!(
1775            rec.agent_id, "w-live",
1776            "the dead first candidate was skipped; the live one was chosen"
1777        );
1778        let _ = client.close().await;
1779        for h in handles {
1780            h.abort();
1781        }
1782    }
1783
1784    #[tokio::test]
1785    async fn resolve_schedulable_worker_errors_when_all_candidates_dead() {
1786        // ALL candidates point at dead endpoints: resolve must error (no panic),
1787        // and CRUCIALLY no spawn (a schedulable role has no local fallback). The
1788        // error names how many were tried.
1789        let mut dead = Vec::new();
1790        for _ in 0..2 {
1791            let l = tokio::net::TcpListener::bind((std::net::Ipv4Addr::LOCALHOST, 0))
1792                .await
1793                .unwrap();
1794            let port = l.local_addr().unwrap().port();
1795            drop(l);
1796            dead.push(format!("ws://127.0.0.1:{port}"));
1797        }
1798        let registry = MockServer::start().await;
1799        let recs = vec![
1800            live_record("d-0", "gpu-pool", &dead[0]),
1801            live_record("d-1", "gpu-pool", &dead[1]),
1802        ];
1803        Mock::given(wm_method("GET"))
1804            .and(wm_path("/v1/agents"))
1805            .respond_with(ResponseTemplate::new(200).set_body_json(recs))
1806            .mount(&registry)
1807            .await;
1808
1809        let mut sched = HashMap::new();
1810        sched.insert(
1811            "explorer".to_string(),
1812            sched_placement("gpu-pool", registry.uri()),
1813        );
1814        let runner = bogus_sched_runner(HashMap::new(), sched);
1815
1816        let msg = match runner.resolve_schedulable_worker("explorer").await {
1817            Ok(_) => panic!("all candidates dead must error, not connect"),
1818            Err(e) => e.to_string(),
1819        };
1820        assert!(
1821            msg.contains("failed to connect"),
1822            "names the connect failure: {msg}"
1823        );
1824        assert!(msg.contains("gpu-pool"), "names the pool: {msg}");
1825        assert!(
1826            msg.contains("NOT spawning"),
1827            "confirms no local-subprocess fallback: {msg}"
1828        );
1829    }
1830
1831    #[tokio::test]
1832    async fn fabric_cache_reuses_one_fabric_per_registry_url() {
1833        // Two resolves to the SAME registry_url must REUSE one cached fabric (#202):
1834        // after N calls the cache has exactly one entry for that url.
1835        let (eps, handles) = spawn_echo_workers(1).await;
1836        let registry = MockServer::start().await;
1837        Mock::given(wm_method("GET"))
1838            .and(wm_path("/v1/agents"))
1839            .respond_with(
1840                ResponseTemplate::new(200)
1841                    .set_body_json(vec![live_record("w-0", "gpu-pool", &eps[0])]),
1842            )
1843            .mount(&registry)
1844            .await;
1845
1846        let mut sched = HashMap::new();
1847        sched.insert(
1848            "explorer".to_string(),
1849            sched_placement("gpu-pool", registry.uri()),
1850        );
1851        let runner = bogus_sched_runner(HashMap::new(), sched);
1852
1853        for _ in 0..3 {
1854            let (client, _rec, _p) = match runner.resolve_schedulable_worker("explorer").await {
1855                Ok(v) => v,
1856                Err(e) => panic!("resolve succeeds: {e}"),
1857            };
1858            let _ = client.close().await;
1859        }
1860
1861        let cache = runner.fabric_cache.lock().unwrap();
1862        assert_eq!(
1863            cache.len(),
1864            1,
1865            "three resolves to the same (registry_url, token) reused ONE cached fabric"
1866        );
1867        assert!(cache.contains_key(&(registry.uri(), None)));
1868        drop(cache);
1869        for h in handles {
1870            h.abort();
1871        }
1872    }
1873
1874    #[test]
1875    fn fabric_cache_keys_on_registry_url_and_token() {
1876        // Two placements sharing a registry_url but presenting DIFFERENT tokens
1877        // must NOT share a fabric — a reused wrong-bearer fabric would issue the
1878        // discover query with the other role's token (#202 review F1).
1879        let runner = bogus_sched_runner(HashMap::new(), HashMap::new());
1880        let url = "http://127.0.0.1:9/".to_string();
1881        let mut p = sched_placement("pool", url.clone());
1882
1883        p.token = Some("token-a".into());
1884        let fa = runner.fabric_for(&p, "role-a").expect("build a");
1885        p.token = Some("token-b".into());
1886        let fb = runner.fabric_for(&p, "role-b").expect("build b");
1887        // Same url, different token → two distinct cache entries (not aliased).
1888        assert!(
1889            !Arc::ptr_eq(&fa, &fb),
1890            "different tokens must not share a fabric"
1891        );
1892        assert_eq!(runner.fabric_cache.lock().unwrap().len(), 2);
1893        // Re-requesting token-a reuses the first fabric.
1894        p.token = Some("token-a".into());
1895        let fa2 = runner.fabric_for(&p, "role-a").expect("reuse a");
1896        assert!(Arc::ptr_eq(&fa, &fa2), "same (url, token) must reuse");
1897        assert_eq!(runner.fabric_cache.lock().unwrap().len(), 2);
1898    }
1899
1900    #[tokio::test]
1901    async fn resolve_schedulable_worker_errors_with_no_live_worker() {
1902        // The pool has no live workers (registry returns an empty / off-pool set):
1903        // a clear error, and CRUCIALLY no spawn (a schedulable role has no local
1904        // fallback).
1905        let registry = MockServer::start().await;
1906        Mock::given(wm_method("GET"))
1907            .and(wm_path("/v1/agents"))
1908            .respond_with(ResponseTemplate::new(200).set_body_json(vec![live_record(
1909                "x",
1910                "cpu-pool",
1911                "ws://127.0.0.1:9",
1912            )]))
1913            .mount(&registry)
1914            .await;
1915        let mut sched = HashMap::new();
1916        sched.insert(
1917            "explorer".to_string(),
1918            sched_placement("gpu-pool", registry.uri()),
1919        );
1920        let runner = bogus_sched_runner(HashMap::new(), sched);
1921
1922        let msg = match runner.resolve_schedulable_worker("explorer").await {
1923            Ok(_) => panic!("no live worker in pool must error, not connect"),
1924            Err(e) => e.to_string(),
1925        };
1926        assert!(msg.contains("no live worker"), "clear error: {msg}");
1927        assert!(msg.contains("gpu-pool"), "names the pool: {msg}");
1928    }
1929
1930    /// Full schedulable e2e: a resident `WsServer` Echo worker is registered (via a
1931    /// wiremock registry) into pool "gpu-pool"; the runner is configured with a
1932    /// schedulable placement for role "explorer" → that pool, plus a BOGUS
1933    /// worker_bin (`/bin/false`). A passing run proves the SCHEDULABLE path RESOLVES
1934    /// the worker from the registry and connects to it WITHOUT ever spawning a local
1935    /// subprocess (a spawn would fail on /bin/false), and that an echo result flows
1936    /// back.
1937    #[tokio::test]
1938    async fn execute_external_child_schedules_role_from_registry_without_spawning() {
1939        // 1. Stand up the resident Echo worker on loopback (no bearer needed here).
1940        let server = bamboo_subagent::transport::WsServer::bind_loopback()
1941            .await
1942            .expect("bind resident worker");
1943        let endpoint = server.ws_endpoint(); // ws://127.0.0.1:<port>
1944        let srv = tokio::spawn(async move {
1945            let _ = server
1946                .serve(Arc::new(bamboo_subagent::executor::EchoExecutor))
1947                .await;
1948        });
1949
1950        // 2. Registry: returns the live worker registered into pool "gpu-pool"
1951        //    pointing at the real worker endpoint.
1952        let registry = MockServer::start().await;
1953        Mock::given(wm_method("GET"))
1954            .and(wm_path("/v1/agents"))
1955            .respond_with(ResponseTemplate::new(200).set_body_json(vec![live_record(
1956                "live-explorer",
1957                "gpu-pool",
1958                &endpoint,
1959            )]))
1960            .mount(&registry)
1961            .await;
1962
1963        // 3. Runner: role "explorer" → schedulable on "gpu-pool"; bogus worker_bin.
1964        let mut sched = HashMap::new();
1965        sched.insert(
1966            "explorer".to_string(),
1967            sched_placement("gpu-pool", registry.uri()),
1968        );
1969        let runner = bogus_sched_runner(HashMap::new(), sched);
1970
1971        // 4. Drive a real run for that role.
1972        let mut session = session_of_role("explorer", "hello scheduled");
1973        let job = job_for("child-1");
1974        let (event_tx, mut event_rx) = mpsc::channel::<AgentEvent>(64);
1975        let cancel = CancellationToken::new();
1976
1977        tokio::time::timeout(
1978            Duration::from_secs(10),
1979            runner.execute_external_child(&mut session, &job, event_tx, cancel),
1980        )
1981        .await
1982        .expect("run did not hang")
1983        .expect("scheduled run succeeded (resolved from registry, did not spawn)");
1984
1985        // The EchoExecutor's reply is written back — proof a terminal flowed back.
1986        let last = session
1987            .messages
1988            .iter()
1989            .rev()
1990            .find(|m| matches!(m.role, Role::Assistant))
1991            .expect("an assistant reply was written back");
1992        assert!(
1993            last.content.contains("echo:"),
1994            "expected echo reply, got {:?}",
1995            last.content
1996        );
1997
1998        // Best-effort: confirm streamed events also flowed.
1999        let mut saw_event = false;
2000        while let Ok(Some(_ev)) =
2001            tokio::time::timeout(Duration::from_millis(50), event_rx.recv()).await
2002        {
2003            saw_event = true;
2004        }
2005        let _ = saw_event;
2006
2007        srv.abort();
2008    }
2009}