bamboo_engine/external_agents/actor_adapter.rs
1//! Actor external child runner.
2//!
3//! Runs a child session as an independent **actor**: a separate OS process with its own
4//! isolated context, speaking the `bamboo-subagent` WebSocket protocol. This is the
5//! engine-side adapter on the `wants_external` seam: it spawns the worker binary, waits for
6//! it to self-register into the Tier-1 file fabric, connects, sends the assignment, and
7//! forwards the child's `AgentEvent`s back onto the parent's `event_tx`.
8//!
9//! The built-in **local actor** instance of this runner is the default runtime for
10//! every sub-agent (the in-process runtime was removed). The expert `externalAgents`
11//! tables can additionally route specific roles to other actor/a2a agents.
12
13use std::collections::HashMap;
14use std::path::PathBuf;
15use std::sync::Arc;
16use std::time::Duration;
17
18use async_trait::async_trait;
19use bamboo_agent_core::{AgentError, AgentEvent, Role, Session};
20use tokio::sync::{mpsc, Mutex};
21use tokio_util::sync::CancellationToken;
22
23use bamboo_subagent::discovery::{Discovery, Fabric};
24use bamboo_subagent::fleet::{spawn_worker, SpawnedChild};
25use bamboo_subagent::proto::{AgentRecord, ChildFrame, ParentFrame, RunSpec, TerminalStatus};
26use bamboo_subagent::provision::{
27 ChildIdentity, ExecutorSpec, ModelRefSpec, Placement, ProvisionSpec, ScopedCredential,
28};
29use bamboo_subagent::transport::{client_config_trusting_cert, ChildClient};
30
31use crate::runtime::execution::{ExternalChildRunner, SpawnJob};
32
33/// Default cap on simultaneously running actor processes.
34pub const DEFAULT_MAX_CONCURRENT_ACTORS: usize = 8;
35
36/// Max nesting depth for direct nested execution (Phase 6). A worker whose
37/// session `spawn_depth` is below this gets its own spawn stack + the real
38/// SubAgent tool; at/over it, neither (and the tool itself refuses). Mirrors
39/// `bamboo_server_tools::DEFAULT_MAX_SPAWN_DEPTH` (kept in sync; engine can't
40/// depend on server-tools). Root orchestrator = 0 ⇒ 4 levels of sub-agents.
41pub const MAX_SPAWN_DEPTH: u32 = 4;
42
43/// Default cap on idle pooled (warm, reusable) workers kept per fingerprint.
44const DEFAULT_MAX_IDLE_PER_KEY: usize = 4;
45
46/// How long a pooled worker waits for its next assignment before reclaiming
47/// itself (must comfortably exceed the gap between sibling spawns).
48const POOLED_IDLE_TIMEOUT_SECS: u64 = 300;
49
50/// A warm worker parked for reuse: its process handle (killed on drop), the WS
51/// endpoint to reconnect to, and the id it registered under in the fabric.
52struct PooledActor {
53 worker: SpawnedChild,
54 endpoint: String,
55 agent_id: String,
56}
57
58/// A role pinned to a remote resident worker (remote-actor-plan §3.4 / P1.5,
59/// #193), resolved at runner-build time from `SubagentsConfig.remote_placements`:
60/// the env-named bearer is already READ into `token` here (the raw token never
61/// rides the config), and `ca_cert_file` is the path to a PEM pinning a
62/// self-signed worker cert (`None` ⇒ default webpki roots / plaintext `ws://`).
63#[derive(Debug, Clone)]
64pub struct ResolvedRemotePlacement {
65 pub endpoint: String,
66 pub token: Option<String>,
67 pub ca_cert_file: Option<PathBuf>,
68}
69
70/// A role routed to a registry-SCHEDULED worker (remote-actor-plan §3.4 / P2b,
71/// #181), resolved at runner-build time from `SubagentsConfig.schedulable_placements`.
72/// Unlike a fixed `ResolvedRemotePlacement` (one endpoint), this names the agent
73/// `registry_url` to query and the logical `pool` (= the registry `role`) whose
74/// LIVE workers are scheduling candidates. The env-named bearer is already READ
75/// into `token` here (the raw token never rides the config) and is used for BOTH
76/// the registry query and the chosen worker's `wss://` connect. `ca_cert_file`
77/// pins a self-signed worker/registry cert (`None` ⇒ default webpki roots).
78#[derive(Debug, Clone)]
79pub struct ResolvedSchedulablePlacement {
80 pub pool: String,
81 pub registry_url: String,
82 pub token: Option<String>,
83 pub ca_cert_file: Option<PathBuf>,
84}
85
86/// How `execute_external_child` should obtain its worker connection, decided
87/// once from `spec.placement`. Splits the divergent acquire/connect + retire
88/// logic three ways while the shared middle (Run dispatch, live registration,
89/// drive, close) stays identical. `Local` is the unchanged pre-#193 path;
90/// `Remote` is the unchanged #194 path; `Schedulable` (#181, P2b) is new.
91enum PlacementKind {
92 Local,
93 Remote,
94 Schedulable,
95}
96
97/// Spawns and drives a child session as an independent actor: a `bamboo-subagent` worker process.
98pub struct ActorChildRunner {
99 agent_id: String,
100 worker_bin: PathBuf,
101 worker_args: Vec<String>,
102 fabric_dir: PathBuf,
103 executor: ExecutorSpec,
104 /// Per-provider credentials snapshotted from the parent config at build
105 /// time; the spec carries only the ONE the child's provider needs.
106 credentials: Vec<ScopedCredential>,
107 /// Parent's default provider (used when the child has no explicit one).
108 default_provider: String,
109 /// Backpressure: bounds the number of concurrently *running* actors; further
110 /// runs wait for a slot instead of exploding the process table. (Idle pooled
111 /// workers do not hold a slot.)
112 concurrency: std::sync::Arc<tokio::sync::Semaphore>,
113 spawn_timeout: Duration,
114 /// Warm-worker pool keyed by a reuse fingerprint
115 /// (role/provider/model/workspace/disabled-tools). A finished run parks its
116 /// worker here so the next matching child reuses it instead of spawning a
117 /// fresh process — collapsing N sibling sub-agents onto a few processes.
118 pool: Arc<Mutex<HashMap<String, Vec<PooledActor>>>>,
119 max_idle_per_key: usize,
120 /// Host-side decision for a child's gated-tool approval request (Phase 2).
121 /// `None` ⇒ fail-closed DENY (the safe default). A wired decider (policy or
122 /// human-routing bridge) returns approve/deny over the actor WS.
123 approval_decider: Option<Arc<dyn ChildApprovalDecider>>,
124 /// Per-run escalation host bridge for non-bypass child-approval routing (#68;
125 /// Phase 6, Part B). The owning worker's `run()` installs its OWN host bridge
126 /// here via `set_escalation_bridge`; `execute_external_child` CAPTURES it at
127 /// grandchild-spawn time and hands the owned value to `drive()`, which uses it
128 /// to RE-PROXY a child's approval request UP to the parent run — chaining up
129 /// every level until a bypass level (model-review) or the top orchestrator
130 /// (human) decides, then relaying the reply back down. Was a process-global
131 /// slot; now per-runner so a fire-and-forget grandchild that OUTLIVES the run
132 /// that spawned it keeps that run's bridge for its whole lifetime instead of
133 /// reading a stale/overwritten global at approval time (→ fail-closed deny).
134 escalation_bridge: Arc<std::sync::Mutex<Option<bamboo_subagent::executor::HostBridge>>>,
135 /// Roles pinned to a REMOTE resident worker (#193), keyed by sub-agent role
136 /// (the child's `subagent_type`). A role present here routes through the
137 /// dedicated remote branch in `execute_external_child` (Bearer-authenticated
138 /// `wss://` connect, no spawn, no pool, no kill) instead of the local
139 /// subprocess + warm-pool path. Empty (the default) = all-local behavior.
140 remote_placements: HashMap<String, ResolvedRemotePlacement>,
141 /// Roles routed to a REGISTRY-SCHEDULED worker (#181, P2b), keyed by sub-agent
142 /// role. A role present here (AND not already in `remote_placements`, which
143 /// wins) routes through the dedicated SCHEDULABLE branch in
144 /// `execute_external_child`: query the registry for live workers in the pool,
145 /// pick one (round-robin), connect over `wss://` — no spawn, no pool, no kill,
146 /// and NO local-subprocess fallback (no live worker ⇒ a clear error). Empty
147 /// (the default) = all-local behavior.
148 schedulable_placements: HashMap<String, ResolvedSchedulablePlacement>,
149 /// Per-pool round-robin cursor for schedulable scheduling (#181, P2b). Bumped
150 /// once per pick so successive sibling spawns SPREAD across a pool's live
151 /// workers instead of all landing on the first candidate. Best-effort spread,
152 /// not a load balancer — the registry's live set can change between picks.
153 schedule_cursor: Arc<std::sync::Mutex<HashMap<String, usize>>>,
154 /// Per-`(registry_url, token)` cache of built [`RegistryFabric`]s (#202). A fabric
155 /// holds a reqwest `Client` (already connection-pooled), so under sibling
156 /// fan-out we construct ONE fabric per registry and reuse it across schedules
157 /// instead of rebuilding a fresh client every spawn. Construct-once-then-reuse;
158 /// a benign duplicate build under a startup race is harmless (last writer wins,
159 /// the loser's fabric drops). The bearer lives INSIDE the fabric's sensitive
160 /// `Authorization` header — never logged, never re-stringified here.
161 // Keyed by (registry_url, token) — NOT url alone — so two placements that
162 // share a registry but present DIFFERENT bearers never reuse each other's
163 // fabric (which would issue a discover query with the wrong token). #202.
164 fabric_cache: Arc<
165 std::sync::Mutex<HashMap<(String, Option<String>), Arc<bamboo_subagent::RegistryFabric>>>,
166 >,
167}
168
169/// Decides how the host answers a child worker's gated-tool approval request
170/// (Phase 2: child → parent approval delegation). Async so an implementation
171/// can consult a policy or defer to a human. With no decider wired the host
172/// replies with a fail-closed DENY.
173///
174/// NOTE: `decide` is awaited inside the per-child frame pump, so an
175/// implementation must resolve promptly (e.g. a policy lookup). A human-in-the-
176/// loop decision that may block indefinitely should instead be delivered
177/// out-of-band as a `ParentFrame::ApprovalReply` via the live steering channel
178/// (`super::live`), which `drive()` already forwards to the worker without
179/// stalling the pump.
180#[async_trait]
181pub trait ChildApprovalDecider: Send + Sync {
182 /// Decide whether `child_session_id` may perform the gated action described
183 /// by `request` (`{tool_name, permission, resource}`).
184 async fn decide(&self, child_session_id: &str, request: &serde_json::Value) -> bool;
185}
186
187/// Resolve a child approval request to approve/deny. Fail-closed (DENY) when no
188/// decider is wired — the single, testable seam for the host-side decision.
189async fn decide_child_approval(
190 decider: Option<&Arc<dyn ChildApprovalDecider>>,
191 child_session_id: &str,
192 request: &serde_json::Value,
193) -> bool {
194 match decider {
195 Some(decider) => decider.decide(child_session_id, request).await,
196 None => false,
197 }
198}
199
200/// How long the host waits for a human approval decision before failing the
201/// child's gated tool closed (DENY). Bounds an unanswered request so it can't
202/// hang the worker indefinitely.
203const CHILD_APPROVAL_TIMEOUT: Duration = Duration::from_secs(300);
204
205/// Extract `(tool_name, permission, resource)` from a worker's approval request
206/// body (`{tool_name, permission, resource}`); missing fields default to empty.
207fn approval_request_fields(body: &serde_json::Value) -> (String, String, String) {
208 let field = |k: &str| {
209 body.get(k)
210 .and_then(|v| v.as_str())
211 .unwrap_or("")
212 .to_string()
213 };
214 (field("tool_name"), field("permission"), field("resource"))
215}
216
217/// Off-loop reviewer for a child's gated-tool approval request (Phase 6, Part B).
218///
219/// Installed (process-global) by a BYPASSED self-orchestrating worker so its
220/// children's forced-ask (dangerous) gated actions — which still raise
221/// `ConfirmationRequired` even under bypass — get an LLM reasonableness check
222/// rather than a blind pass. `review` is an LLM call: `drive()` invokes it in a
223/// SPAWNED task (NEVER in the frame pump) and delivers the verdict async via the
224/// live channel, so the agent loop is never blocked.
225#[async_trait]
226pub trait ChildApprovalReviewer: Send + Sync {
227 /// Judge whether the gated action `request` (`{tool_name, permission,
228 /// resource}`) is reasonable for `child_session_id`'s task. `true` = approve.
229 async fn review(&self, child_session_id: &str, request: &serde_json::Value) -> bool;
230}
231
232fn child_approval_reviewer_slot() -> &'static std::sync::OnceLock<Arc<dyn ChildApprovalReviewer>> {
233 static SLOT: std::sync::OnceLock<Arc<dyn ChildApprovalReviewer>> = std::sync::OnceLock::new();
234 &SLOT
235}
236
237/// Install the process-global child-approval reviewer (idempotent; first wins).
238pub fn set_child_approval_reviewer(reviewer: Arc<dyn ChildApprovalReviewer>) {
239 let _ = child_approval_reviewer_slot().set(reviewer);
240}
241
242/// The process-global child-approval reviewer, if installed.
243pub fn child_approval_reviewer() -> Option<Arc<dyn ChildApprovalReviewer>> {
244 child_approval_reviewer_slot().get().cloned()
245}
246
247impl ActorChildRunner {
248 #[allow(clippy::too_many_arguments)]
249 pub fn new(
250 agent_id: String,
251 worker_bin: PathBuf,
252 worker_args: Vec<String>,
253 fabric_dir: PathBuf,
254 executor: ExecutorSpec,
255 credentials: Vec<ScopedCredential>,
256 default_provider: String,
257 max_concurrent: usize,
258 ) -> Self {
259 Self {
260 agent_id,
261 worker_bin,
262 worker_args,
263 fabric_dir,
264 executor,
265 credentials,
266 default_provider,
267 concurrency: std::sync::Arc::new(tokio::sync::Semaphore::new(max_concurrent.max(1))),
268 spawn_timeout: Duration::from_secs(30),
269 pool: Arc::new(Mutex::new(HashMap::new())),
270 max_idle_per_key: DEFAULT_MAX_IDLE_PER_KEY,
271 approval_decider: None,
272 escalation_bridge: Arc::new(std::sync::Mutex::new(None)),
273 remote_placements: HashMap::new(),
274 schedulable_placements: HashMap::new(),
275 schedule_cursor: Arc::new(std::sync::Mutex::new(HashMap::new())),
276 fabric_cache: Arc::new(std::sync::Mutex::new(HashMap::new())),
277 }
278 }
279
280 /// Wire the host-side decider for child gated-tool approval requests
281 /// (Phase 2). Without this the host fail-closed DENYs every request.
282 pub fn with_approval_decider(mut self, decider: Arc<dyn ChildApprovalDecider>) -> Self {
283 self.approval_decider = Some(decider);
284 self
285 }
286
287 /// Pin specific sub-agent roles to remote resident workers (#193). The map
288 /// is keyed by role (`subagent_type`); a child whose role is present connects
289 /// over `wss://` to the resolved endpoint instead of spawning a local
290 /// subprocess. Default (empty) keeps every role on the local path — exactly
291 /// today's behavior.
292 pub fn with_remote_placements(
293 mut self,
294 placements: HashMap<String, ResolvedRemotePlacement>,
295 ) -> Self {
296 self.remote_placements = placements;
297 self
298 }
299
300 /// Route specific sub-agent roles to a registry-SCHEDULED worker (#181, P2b).
301 /// The map is keyed by role (`subagent_type`); a child whose role is present
302 /// (and NOT already pinned by `remote_placements`, which takes precedence) is
303 /// run on a live worker discovered from the registry instead of a local
304 /// subprocess. Default (empty) keeps every role on the local path.
305 pub fn with_schedulable_placements(
306 mut self,
307 placements: HashMap<String, ResolvedSchedulablePlacement>,
308 ) -> Self {
309 self.schedulable_placements = placements;
310 self
311 }
312
313 /// Reuse fingerprint: two children are interchangeable on one warm worker iff
314 /// they share role, provider, model, workspace, disabled-tool set, AND every
315 /// capability the worker BAKES at provision time (`BambooRuntimeExecutor`
316 /// stamps these once and reuses them across runs): nesting depth, nested-spawn
317 /// stack, bypass mode, permission enforcement, and the depth cap. Omitting any
318 /// of these lets the pool hand a run a worker baked for a DIFFERENT posture —
319 /// e.g. a depth-1 worker (with its own spawn stack) reused for a depth-4
320 /// child would re-stamp `spawn_depth=1` and pass the depth-cap check, breaking
321 /// the recursion bound; or a bypass worker reused for a non-bypass child. So
322 /// these MUST split the pool bucket. Everything else (assignment, history) is
323 /// shipped per-run in the `RunSpec` and does not affect the fingerprint.
324 fn fingerprint(spec: &ProvisionSpec) -> String {
325 let role = spec.identity.role.as_str();
326 let (provider, model) = spec
327 .model
328 .as_ref()
329 .map(|m| (m.provider.as_str(), m.model.as_str()))
330 .unwrap_or(("", ""));
331 let workspace = spec.workspace.as_deref().unwrap_or("");
332 let mut tools = spec.disabled_tools.clone().unwrap_or_default();
333 tools.sort();
334 let caps = &spec.capabilities;
335 format!(
336 "{role}\u{1}{provider}\u{1}{model}\u{1}{workspace}\u{1}{}\u{1}d={}\u{1}ns={}\u{1}by={}\u{1}ep={}\u{1}md={}\u{1}nha={}\u{1}gro={}",
337 tools.join(","),
338 spec.identity.depth,
339 caps.nested_spawn,
340 caps.bypass,
341 caps.enforce_permissions,
342 caps.max_spawn_depth.unwrap_or(0),
343 // #73 review (P1): a worker bakes `no_human_review` ONCE from this flag
344 // at build() and never re-reads it per run, so the pool MUST NOT hand a
345 // worker baked for one approval posture to a run of the opposite one —
346 // else a scheduled-root worker reused for an interactive child would
347 // silently model-review instead of asking the human (and vice-versa,
348 // reintroducing the 300s-deny). Split the bucket on it.
349 caps.no_human_approver,
350 // #71: the read-only Bash checker is baked once at build() from this
351 // flag, so a guardian-reviewer worker must NOT be reused for an
352 // ordinary child (which expects unrestricted Bash), and vice-versa.
353 caps.guardian_read_only,
354 )
355 }
356
357 /// Check out a worker for this assignment: reuse a live pooled one matching
358 /// `key`, else spawn a fresh reusable worker.
359 async fn acquire_worker(
360 &self,
361 key: &str,
362 spec: &ProvisionSpec,
363 ) -> crate::runtime::runner::Result<PooledActor> {
364 // Drain the pool bucket, validating liveness; a worker that hit its idle
365 // timeout has exited and withdrawn its fabric record — skip and reap it.
366 loop {
367 let candidate = {
368 let mut pool = self.pool.lock().await;
369 pool.get_mut(key).and_then(|bucket| bucket.pop())
370 };
371 let Some(candidate) = candidate else { break };
372 let alive = Fabric::at(&self.fabric_dir)
373 .resolve(&candidate.agent_id)
374 .await
375 .ok()
376 .flatten()
377 .is_some();
378 if alive {
379 return Ok(candidate);
380 }
381 candidate.worker.kill().await;
382 }
383
384 let spawned = spawn_worker(
385 &self.worker_bin,
386 &self.worker_args,
387 spec,
388 self.spawn_timeout,
389 )
390 .await
391 .map_err(|e| AgentError::LLM(format!("actor spawn/register failed: {e}")))?;
392 let endpoint = spawned.record.endpoint.clone();
393 let agent_id = spawned.record.agent_id.clone();
394 Ok(PooledActor {
395 worker: spawned,
396 endpoint,
397 agent_id,
398 })
399 }
400
401 /// Park a worker for reuse after a clean run; if the bucket is full, retire it.
402 async fn release_worker(&self, key: &str, actor: PooledActor) {
403 let mut pool = self.pool.lock().await;
404 let bucket = pool.entry(key.to_string()).or_default();
405 if bucket.len() >= self.max_idle_per_key {
406 drop(pool);
407 self.retire_worker(actor).await;
408 return;
409 }
410 bucket.push(actor);
411 }
412
413 /// Forcefully stop a worker and clean its discovery record.
414 async fn retire_worker(&self, actor: PooledActor) {
415 let agent_id = actor.agent_id.clone();
416 actor.worker.kill().await;
417 let _ = Fabric::at(&self.fabric_dir).withdraw(&agent_id).await;
418 }
419
420 /// Assemble the parent-resolved provisioning document for this child.
421 fn build_spec(&self, session: &Session, job: &SpawnJob) -> ProvisionSpec {
422 let mut spec = ProvisionSpec::new(
423 ChildIdentity {
424 child_id: job.child_session_id.clone(),
425 parent_id: Some(job.parent_session_id.clone()),
426 project_key: None,
427 role: session
428 .metadata
429 .get("subagent_type")
430 .cloned()
431 .unwrap_or_else(|| "worker".to_string()),
432 // The child session already carries the correct depth
433 // (create_child_action's new_child_of did parent.spawn_depth+1);
434 // stamp it so the worker can re-establish it on its run session
435 // and enforce the max-depth cap across the actor boundary.
436 depth: session.spawn_depth,
437 },
438 self.executor.clone(),
439 self.fabric_dir.to_string_lossy().into_owned(),
440 );
441 spec.workspace = session.workspace.clone();
442 // Final model: the session's pinned model_ref (create.model / routing already applied),
443 // falling back to the job's bare model on the parent's default provider.
444 spec.model = session
445 .model_ref
446 .as_ref()
447 .map(|r| ModelRefSpec {
448 provider: r.provider.clone(),
449 model: r.model.clone(),
450 })
451 .or_else(|| {
452 let m = job.model.trim();
453 (!m.is_empty()).then(|| ModelRefSpec {
454 provider: self.default_provider.clone(),
455 model: m.to_string(),
456 })
457 });
458 spec.disabled_tools = job.disabled_tools.clone();
459 // Least-privilege secrets: only the credential for the child's provider.
460 let provider = spec
461 .model
462 .as_ref()
463 .map(|m| m.provider.as_str())
464 .filter(|p| !p.trim().is_empty())
465 .unwrap_or(&self.default_provider);
466 if let Some(cred) = self.credentials.iter().find(|c| c.provider == provider) {
467 spec.secrets.provider_credentials.push(cred.clone());
468 } else {
469 tracing::warn!(
470 "actor child {}: no credential found for provider '{}'",
471 job.child_session_id,
472 provider
473 );
474 }
475 // Phase 6 (direct nested execution): a worker BELOW the depth cap may
476 // orchestrate its OWN children — on startup it builds its own spawn
477 // stack and runs the real SubAgent tool (no host proxy). The cap (the
478 // SubAgent tool refuses to spawn at/over `max_spawn_depth`) bounds the
479 // recursion. Driven purely by the child's depth, so it auto-propagates
480 // down the tree without any extra config threading.
481 spec.capabilities.nested_spawn = session.spawn_depth < MAX_SPAWN_DEPTH;
482 spec.capabilities.max_spawn_depth = Some(MAX_SPAWN_DEPTH);
483 // #69: activate child-approval review. Sub-agents enforce permissions so
484 // their DANGEROUS actions (the worker uses a HIGH threshold) reach the
485 // parent for review — escalated to the human, or model-reviewed off-loop
486 // when the parent is in bypass. The worker installs no checker without
487 // this, so the whole review chain would otherwise stay dormant.
488 spec.capabilities.enforce_permissions = true;
489 // Propagate "bypass permissions" so a self-orchestrating worker knows it
490 // is a bypassed parent and installs the off-loop model-reviewer for its
491 // children's forced-ask actions (Phase 6, Part B). The child session
492 // already carries the inherited flag (create_child_action seeds it).
493 spec.capabilities.bypass = session
494 .agent_runtime_state
495 .as_ref()
496 .is_some_and(|s| s.bypass_permissions);
497 // #73: propagate "no interactive human approver" (headless / scheduled /
498 // deployed root, inherited by the child session). When set, the worker's
499 // per-run approval proxy model-reviews a gated action locally instead of
500 // escalating to a human who will never answer (which would 300s-deny).
501 spec.capabilities.no_human_approver = session
502 .agent_runtime_state
503 .as_ref()
504 .is_some_and(|s| s.no_human_approver);
505 // #71: mark a READ-ONLY Guardian reviewer so the worker installs the
506 // read-only Bash allowlist checker. The reviewer is spawned by
507 // `spawn_guardian_review` with `subagent_type == "guardian"` (the SAME
508 // marker the completion coordinator branches on to parse the verdict) AND
509 // the `guardian_read_only_disabled_tools` denylist. Keyed off that role
510 // marker (already read above to set `identity.role`), so it rides the same
511 // session-metadata path the denylist/subagent_type use — no new wire seam.
512 // Without this the worker keeps an UNRESTRICTED Bash, so the reviewer could
513 // still `rm -rf` / `git push` / `curl | sh`, defeating "read-only".
514 spec.capabilities.guardian_read_only =
515 session.metadata.get("subagent_type").map(String::as_str) == Some("guardian");
516 // #193: route this role to a REMOTE resident worker when one is pinned.
517 // `spec.identity.role` was just computed from `subagent_type` above; a
518 // match flips the placement to Remote and rides the worker's bearer on the
519 // scoped secrets envelope (TLS handshake / Authorization header only — the
520 // token is never logged). No match leaves the default `Placement::Local`,
521 // so the local path is byte-for-byte unchanged for every non-pinned role.
522 if let Some(placement) = self.remote_placements.get(spec.identity.role.as_str()) {
523 spec.placement = Placement::Remote {
524 endpoint: placement.endpoint.clone(),
525 };
526 spec.secrets.worker_auth_token = placement.token.clone();
527 } else if let Some(placement) = self.schedulable_placements.get(spec.identity.role.as_str())
528 {
529 // #181 (P2b): route this role to a registry-SCHEDULED worker — ONLY
530 // when it is NOT already pinned to a fixed remote endpoint (the
531 // `else if` makes remote_placements take precedence for a role in
532 // both). The concrete endpoint is resolved at run time in
533 // `execute_external_child`; here we only flip the placement to
534 // Schedulable{pool} and ride the bearer on the scoped secrets
535 // envelope (used for the registry query AND the worker connect). No
536 // match in either map leaves the default `Placement::Local`, so the
537 // local path is byte-for-byte unchanged for every non-routed role.
538 spec.placement = Placement::Schedulable {
539 pool: placement.pool.clone(),
540 };
541 spec.secrets.worker_auth_token = placement.token.clone();
542 }
543 spec
544 }
545
546 /// Max DISTINCT live candidates a single schedulable resolve will try to
547 /// connect to before giving up (#202). Bounds the connect-fail failover so a
548 /// pool full of stale-but-leased workers can't make one spawn walk the whole
549 /// fleet; the effective cap is `min(candidates.len(), MAX_SCHEDULE_CONNECT_ATTEMPTS)`.
550 const MAX_SCHEDULE_CONNECT_ATTEMPTS: usize = 3;
551
552 /// Get-or-build the [`RegistryFabric`] for `placement.registry_url`, caching
553 /// it on the runner (#202). A fabric wraps a connection-pooled reqwest
554 /// `Client`; under sibling fan-out this reuses ONE client per registry instead
555 /// of constructing a fresh one every schedule. Construct-once-then-reuse: a
556 /// cache miss builds (Bearer if configured) and stores it. The bearer stays
557 /// inside the fabric's sensitive `Authorization` header — never logged here.
558 ///
559 /// Concurrency: a brief race can build two fabrics for the same url; that is
560 /// harmless (both are valid, last-insert wins, the loser drops). We never hold
561 /// the cache lock across the build, so an `await`-free critical section can't
562 /// deadlock or stall siblings.
563 fn fabric_for(
564 &self,
565 placement: &ResolvedSchedulablePlacement,
566 role: &str,
567 ) -> std::result::Result<Arc<bamboo_subagent::RegistryFabric>, AgentError> {
568 // (registry_url, token) identity: a wrong-token fabric must never be
569 // reused for a discover query (#202).
570 let key = (placement.registry_url.clone(), placement.token.clone());
571 if let Some(fabric) = self.fabric_cache.lock().unwrap().get(&key).cloned() {
572 return Ok(fabric);
573 }
574
575 // Cache miss: build outside the lock (construction is sync + cheap but we
576 // keep the critical section minimal and await-free regardless).
577 let built = match placement.token.as_deref() {
578 Some(token) => {
579 bamboo_subagent::RegistryFabric::with_token(placement.registry_url.clone(), token)
580 }
581 None => bamboo_subagent::RegistryFabric::new(placement.registry_url.clone()),
582 }
583 .map_err(|e| {
584 AgentError::LLM(format!(
585 "schedulable role '{role}': registry client for '{}' failed: {e}",
586 placement.registry_url
587 ))
588 })?;
589 let arc = Arc::new(built);
590
591 // Insert-or-reuse: if a concurrent caller already populated the slot, take
592 // theirs (the duplicate we built drops) so all siblings share one fabric.
593 let mut cache = self.fabric_cache.lock().unwrap();
594 Ok(cache.entry(key).or_insert(arc).clone())
595 }
596
597 /// Pick a live worker for a SCHEDULABLE role from the agent registry (#181,
598 /// P2b) and CONNECT to it, with connect-fail failover (#202).
599 ///
600 /// Uses a cached [`RegistryFabric`] (per `registry_url`) to list live records
601 /// (the registry already excludes expired leases — health == a live lease),
602 /// filters to those whose `role` == the configured `pool`, then picks a
603 /// starting candidate via per-pool ROUND-ROBIN. Because a live lease is only a
604 /// COARSE health signal (a leased worker can have a dead process / a network
605 /// blip), it does not connect blindly to that one pick: it attempts
606 /// `connect_with_auth_tls`, and on failure WALKS FORWARD to the next DISTINCT
607 /// live candidate and retries — up to `min(candidates.len(),
608 /// MAX_SCHEDULE_CONNECT_ATTEMPTS)` attempts. A single stale-but-leased worker
609 /// therefore no longer fails the whole schedule.
610 ///
611 /// Returns the connected [`ChildClient`] plus the chosen [`AgentRecord`] and
612 /// the placement. No live candidate (empty pool) ⇒ a terminal `AgentError`;
613 /// ALL attempted candidates failing to connect ⇒ a terminal `AgentError` —
614 /// the caller NEVER falls back to a local subprocess (that would silently
615 /// defeat the placement). A registry-query failure is likewise terminal. The
616 /// error logs HOW MANY candidates were tried, never the token.
617 async fn resolve_schedulable_worker(
618 &self,
619 role: &str,
620 ) -> std::result::Result<(ChildClient, AgentRecord, ResolvedSchedulablePlacement), AgentError>
621 {
622 let placement = self
623 .schedulable_placements
624 .get(role)
625 .ok_or_else(|| {
626 AgentError::LLM(format!(
627 "schedulable placement for role '{role}' vanished before scheduling"
628 ))
629 })?
630 .clone();
631
632 // Query the registry through the cached, connection-pooled fabric (#202).
633 let fabric = self.fabric_for(&placement, role)?;
634 let records = fabric.discover().await.map_err(|e| {
635 AgentError::LLM(format!(
636 "schedulable role '{role}': registry '{}' query failed: {e}",
637 placement.registry_url
638 ))
639 })?;
640
641 // Live candidates = records published under the pool's role. The registry
642 // already dropped expired leases, so presence == health.
643 let candidates: Vec<AgentRecord> = records
644 .into_iter()
645 .filter(|r| r.role == placement.pool)
646 .collect();
647
648 if candidates.is_empty() {
649 return Err(AgentError::LLM(format!(
650 "schedulable role '{role}': no live worker in pool '{}' at registry '{}' \
651 (NOT spawning a local subprocess — a schedulable role has no local fallback)",
652 placement.pool, placement.registry_url
653 )));
654 }
655
656 // Round-robin START: advance a per-pool cursor ONCE to pick the starting
657 // index, then walk forward through the remaining candidates on connect
658 // failure. Bumping the cursor only once per resolve keeps the spread
659 // across sibling spawns (each resolve starts one slot further on); the
660 // failover walk is local to this resolve and does not perturb the cursor.
661 let start = {
662 let mut cursors = self.schedule_cursor.lock().unwrap();
663 let cursor = cursors.entry(placement.pool.clone()).or_insert(0);
664 let i = *cursor % candidates.len();
665 *cursor = cursor.wrapping_add(1);
666 i
667 };
668
669 // Derive the TLS trust ONCE (pinned CA or default roots / plaintext ws://).
670 let trust_cfg = match placement.ca_cert_file.as_deref() {
671 Some(path) => Some(client_config_trusting_cert(path).map_err(|e| {
672 AgentError::LLM(format!(
673 "scheduled worker CA cert '{}': {e}",
674 path.display()
675 ))
676 })?),
677 None => None,
678 };
679
680 // Connect-fail failover: try the starting candidate, then walk forward to
681 // the NEXT DISTINCT live candidate on failure, up to the bounded cap. Each
682 // attempt hits a different candidate (modulo wrap from the start index).
683 let max_attempts = candidates.len().min(Self::MAX_SCHEDULE_CONNECT_ATTEMPTS);
684 let mut last_err: Option<String> = None;
685 for attempt in 0..max_attempts {
686 let idx = (start + attempt) % candidates.len();
687 let record = &candidates[idx];
688 let endpoint = record.endpoint.clone();
689 match ChildClient::connect_with_auth_tls(
690 &endpoint,
691 placement.token.as_deref(),
692 trust_cfg.clone(),
693 )
694 .await
695 {
696 Ok(client) => {
697 if attempt > 0 {
698 // We skipped one or more stale-but-leased workers — record
699 // how many (never the endpoint contents beyond the host we
700 // connected to, never the token).
701 tracing::info!(
702 "schedulable role '{role}': connected to pool '{}' worker after \
703 {attempt} stale candidate(s) skipped",
704 placement.pool
705 );
706 }
707 return Ok((client, candidates[idx].clone(), placement));
708 }
709 Err(e) => {
710 tracing::warn!(
711 "schedulable role '{role}': pool '{}' candidate connect failed \
712 (attempt {}/{max_attempts}): {e}",
713 placement.pool,
714 attempt + 1
715 );
716 last_err = Some(e.to_string());
717 }
718 }
719 }
720
721 // All attempted candidates failed to connect ⇒ terminal error. NO spawn
722 // fallback (a schedulable role has none). Log how many were tried, not the
723 // token.
724 Err(AgentError::LLM(format!(
725 "schedulable role '{role}': all {max_attempts} live candidate(s) in pool '{}' at \
726 registry '{}' failed to connect (last error: {}) — NOT spawning a local subprocess",
727 placement.pool,
728 placement.registry_url,
729 last_err.as_deref().unwrap_or("unknown")
730 )))
731 }
732}
733
734#[async_trait]
735impl ExternalChildRunner for ActorChildRunner {
736 async fn should_handle(&self, session: &Session) -> bool {
737 session.metadata.get("runtime.kind") == Some(&"external".to_string())
738 && session.metadata.get("external.protocol") == Some(&"actor".to_string())
739 && session.metadata.get("external.agent_id") == Some(&self.agent_id)
740 }
741
742 fn set_escalation_bridge(&self, bridge: Option<bamboo_subagent::executor::HostBridge>) {
743 *self.escalation_bridge.lock().unwrap() = bridge;
744 }
745
746 async fn execute_external_child(
747 &self,
748 session: &mut Session,
749 job: &SpawnJob,
750 event_tx: mpsc::Sender<AgentEvent>,
751 cancel_token: CancellationToken,
752 ) -> crate::runtime::runner::Result<()> {
753 // #68 CORRECTNESS CRUX: capture the per-run escalation bridge HERE, at the
754 // moment this grandchild is spawned — while the parent run's bridge is
755 // still in our slot — into an owned local handed to `drive()` for this
756 // grandchild's whole lifetime. A fire-and-forget grandchild that OUTLIVES
757 // the run that spawned it must NOT re-read `self.escalation_bridge` at
758 // approval time: by then `run()` may have cleared/overwritten it (a worker
759 // serves runs sequentially), and re-proxying through a closed bridge
760 // fail-closed denies. Capturing at spawn pins the right bridge per run.
761 let escalation = self.escalation_bridge.lock().unwrap().clone();
762 let assignment = extract_assignment(session);
763 let mut spec = self.build_spec(session, job);
764 // Make every actor a warm, reusable worker so the pool can recycle it for
765 // the next sibling with a matching fingerprint.
766 spec.reusable = true;
767 if spec.limits.idle_timeout_secs.is_none() {
768 spec.limits.idle_timeout_secs = Some(POOLED_IDLE_TIMEOUT_SECS);
769 }
770 let pool_key = Self::fingerprint(&spec);
771 // Rehydration: the child session in the parent's store is the actor's
772 // durable state. Ship the full conversation so a reactivation
773 // (send_message / update / rerun) carries its history. A reused worker is
774 // stateless between runs, so this is also what isolates each child's
775 // context on a shared process.
776 let messages: Vec<serde_json::Value> = session
777 .messages
778 .iter()
779 .filter_map(|m| serde_json::to_value(m).ok())
780 .collect();
781
782 // Backpressure: hold a concurrency slot for the lifetime of the *run*
783 // (cancellation still proceeds — the cancel branch in drive() runs while
784 // we hold the permit). Released when this fn returns, i.e. once the worker
785 // is parked back into the pool, so idle workers don't pin slots.
786 let _slot = self
787 .concurrency
788 .acquire()
789 .await
790 .map_err(|_| AgentError::LLM("actor concurrency limiter closed".to_string()))?;
791
792 // Split LOCAL (spawn + warm-pool) from the two process-less remote paths
793 // ONLY at the divergent spots — acquire/connect here and the park/retire at
794 // the end. Everything between (Run dispatch, live-actor registration,
795 // drive, the close) is identical for all three. `kind` is the single guard.
796 // - Local (#0): byte-for-byte the pre-#193 reuse-or-spawn path.
797 // - Remote (#194): connect to a FIXED resident endpoint, no spawn.
798 // - Schedulable (#181): resolve a live worker from the registry, connect.
799 let kind = match spec.placement {
800 Placement::Remote { .. } => PlacementKind::Remote,
801 Placement::Schedulable { .. } => PlacementKind::Schedulable,
802 Placement::Local => PlacementKind::Local,
803 };
804 let remote = !matches!(kind, PlacementKind::Local);
805
806 let (actor, mut client) = match kind {
807 PlacementKind::Remote => {
808 // REMOTE branch: connect to a resident worker. No spawn, no pool
809 // touch, no drain. We do not own the worker, so a connect failure
810 // has NO respawn fallback — it is a clear, terminal error.
811 let placement = self
812 .remote_placements
813 .get(spec.identity.role.as_str())
814 .ok_or_else(|| {
815 AgentError::LLM(format!(
816 "remote placement for role '{}' vanished before connect",
817 spec.identity.role
818 ))
819 })?;
820 let endpoint = placement.endpoint.clone();
821 // Build the TLS trust: a pinned CA pins a self-signed worker cert;
822 // otherwise default webpki roots (or plaintext for `ws://`).
823 let trust_cfg = match placement.ca_cert_file.as_deref() {
824 Some(path) => Some(client_config_trusting_cert(path).map_err(|e| {
825 AgentError::LLM(format!("remote worker CA cert '{}': {e}", path.display()))
826 })?),
827 None => None,
828 };
829 let client = ChildClient::connect_with_auth_tls(
830 &endpoint,
831 placement.token.as_deref(),
832 trust_cfg,
833 )
834 .await
835 .map_err(|e| {
836 AgentError::LLM(format!("remote actor connect to '{endpoint}' failed: {e}"))
837 })?;
838 // Process-less handle so live-actor registration (in-band steering)
839 // works exactly as for a local worker; `kill()` is a no-op.
840 let record = AgentRecord {
841 agent_id: job.child_session_id.clone(),
842 role: spec.identity.role.clone(),
843 labels: Vec::new(),
844 endpoint: endpoint.clone(),
845 pid: 0,
846 version: String::new(),
847 started_at: chrono::Utc::now(),
848 lease_expires_at: chrono::Utc::now(),
849 };
850 let actor = PooledActor {
851 worker: SpawnedChild::remote(record),
852 endpoint,
853 agent_id: job.child_session_id.clone(),
854 };
855 (actor, client)
856 }
857 PlacementKind::Schedulable => {
858 // SCHEDULABLE branch (#181, P2b + #202): resolve a LIVE worker from
859 // the registry (round-robin over the pool's live records) AND
860 // connect to it, with connect-fail failover — on a stale-but-leased
861 // worker, resolve walks to the next live candidate (bounded). No
862 // spawn, no pool, no kill, NO local fallback — no live worker, or
863 // ALL candidates dead ⇒ a terminal error (raised inside
864 // resolve_schedulable_worker, which owns the connect now).
865 let (client, record, _placement) = self
866 .resolve_schedulable_worker(spec.identity.role.as_str())
867 .await?;
868 let endpoint = record.endpoint.clone();
869 // The chosen registry record IS the AgentRecord — synthesize the
870 // process-less handle straight from it (registry-managed worker).
871 let actor = PooledActor {
872 worker: SpawnedChild::remote(record),
873 endpoint,
874 agent_id: job.child_session_id.clone(),
875 };
876 (actor, client)
877 }
878 PlacementKind::Local => {
879 // LOCAL branch — the EXACT pre-#193 path: reuse-or-spawn + the
880 // respawn-on-connect-miss fallback. Unchanged in behavior, ordering,
881 // and error text.
882 let mut actor = self.acquire_worker(&pool_key, &spec).await?;
883 let client = match ChildClient::connect(&actor.endpoint).await {
884 Ok(client) => client,
885 Err(e) => {
886 // The pooled worker may have died between checkout and connect;
887 // retire it and spawn one fresh, once.
888 self.retire_worker(actor).await;
889 let spawned = spawn_worker(
890 &self.worker_bin,
891 &self.worker_args,
892 &spec,
893 self.spawn_timeout,
894 )
895 .await
896 .map_err(|e2| {
897 AgentError::LLM(format!("actor respawn after reuse miss ({e}): {e2}"))
898 })?;
899 let endpoint = spawned.record.endpoint.clone();
900 let agent_id = spawned.record.agent_id.clone();
901 let client = ChildClient::connect(&endpoint)
902 .await
903 .map_err(|e2| AgentError::LLM(format!("actor connect failed: {e2}")))?;
904 actor = PooledActor {
905 worker: spawned,
906 endpoint,
907 agent_id,
908 };
909 client
910 }
911 };
912 (actor, client)
913 }
914 };
915
916 client
917 .send(ParentFrame::Run(RunSpec {
918 assignment,
919 reasoning_effort: None,
920 messages,
921 }))
922 .await
923 .map_err(|e| AgentError::LLM(format!("actor run dispatch failed: {e}")))?;
924
925 // Register as a live actor so send_message (running, no interrupt) can
926 // steer this child in-band over the existing WS connection. The guard
927 // unregisters on every exit path.
928 let (live_tx, mut live_rx) = mpsc::unbounded_channel::<ParentFrame>();
929 let live_guard = super::live::register(&job.child_session_id, live_tx);
930
931 let result = drive(
932 &mut client,
933 &job.child_session_id,
934 self.approval_decider.as_ref(),
935 escalation,
936 &event_tx,
937 &cancel_token,
938 &mut live_rx,
939 )
940 .await;
941 // Unregister IMMEDIATELY: after drive returns nobody consumes live_rx,
942 // so a send_message landing in the close/park window below must see
943 // "not live" and take the durable-queue fallback instead of vanishing.
944 // (Even if one slipped in earlier, send_message also appends it to the
945 // durable transcript, so the next activation still rehydrates it.)
946 drop(live_guard);
947
948 // Close the connection: the worker's serve loop then accepts the next
949 // assignment (reuse) or idles out. Park the worker on a clean run; retire
950 // it on error/cancel (a wedged worker must not be reused).
951 let _ = client.close().await;
952 if remote {
953 // #193 / #181: we do NOT own a remote or scheduled worker — never park
954 // it into the local pool (its endpoint/agent_id are not ours to
955 // recycle) and never kill it (`SpawnedChild::remote.kill()` is a no-op
956 // anyway). Just let the connection drop above; the resident /
957 // registry-managed worker self-manages via its own idle timeout, ready
958 // for the next parent. `actor` is dropped here. (Schedulable shares this
959 // arm — release is a no-op for both registry-managed cases.)
960 drop(actor);
961 } else {
962 match &result {
963 Ok(_) => self.release_worker(&pool_key, actor).await,
964 Err(_) => self.retire_worker(actor).await,
965 }
966 }
967
968 // Write-back: persist the actor's final reply onto the child session so
969 // the transcript survives and the NEXT activation sees it as history.
970 // (run_child_spawn saves the session right after we return.)
971 match result {
972 Ok(Some(text)) => {
973 if !text.is_empty() {
974 session.add_message(bamboo_agent_core::Message::assistant(text, None));
975 }
976 Ok(())
977 }
978 Ok(None) => Ok(()),
979 Err(e) => Err(e),
980 }
981 }
982}
983
984/// Pump child frames -> parent events until a terminal frame (or cancellation).
985/// On success, yields the actor's final result text (for session write-back).
986/// `live_rx` carries in-band frames (steering messages) from the live registry.
987///
988/// `escalation_bridge` (#68) is the per-run escalation host bridge CAPTURED BY
989/// VALUE at spawn time in `execute_external_child` (NOT read live here): when a
990/// non-bypass child re-proxies an approval request, this owned bridge routes it
991/// UP to the parent run. Owning it for the call's lifetime is what lets a
992/// fire-and-forget grandchild that outlives its spawning run still escalate to
993/// the correct (then-current) parent bridge rather than a stale/overwritten one.
994async fn drive(
995 client: &mut ChildClient,
996 child_session_id: &str,
997 approval_decider: Option<&Arc<dyn ChildApprovalDecider>>,
998 escalation_bridge: Option<bamboo_subagent::executor::HostBridge>,
999 event_tx: &mpsc::Sender<AgentEvent>,
1000 cancel_token: &CancellationToken,
1001 live_rx: &mut mpsc::UnboundedReceiver<ParentFrame>,
1002) -> crate::runtime::runner::Result<Option<String>> {
1003 loop {
1004 tokio::select! {
1005 _ = cancel_token.cancelled() => {
1006 // fall through to the cancel handling below
1007 break;
1008 }
1009 Some(frame) = live_rx.recv() => {
1010 // Forward in-band steering to the worker over the existing WS.
1011 if client.send(frame).await.is_err() {
1012 tracing::warn!("live steering frame could not be sent; connection failing");
1013 }
1014 }
1015 frame = client.next_frame() => {
1016 match frame {
1017 Ok(Some(ChildFrame::Event { event })) => {
1018 // AgentEvent is serialized verbatim on the wire (zero mapping).
1019 if let Ok(ev) = serde_json::from_value::<AgentEvent>(event) {
1020 let _ = event_tx.send(ev).await;
1021 }
1022 }
1023 Ok(Some(ChildFrame::ApprovalRequest { id, body })) => {
1024 // Phase 2: a worker proxied a gated-tool approval back to
1025 // the host. The WORKER side is live — its executor installs
1026 // a per-run task-local `ApprovalProxy` (subagent_worker.rs)
1027 // that calls `host.approval_call`, so this frame arrives
1028 // when a child hits `ConfirmationRequired`.
1029 if let Some(reviewer) = child_approval_reviewer() {
1030 // Phase 6, Part B: a BYPASSED parent worker
1031 // model-reviews its children's forced-ask (dangerous)
1032 // actions. The review is an LLM call, so run it OFF
1033 // the frame pump in a spawned task and deliver the
1034 // verdict async via the live channel — the pump keeps
1035 // forwarding events and the agent loop never blocks. A
1036 // timeout denies a hung review so the child can't hang.
1037 let child = child_session_id.to_string();
1038 let req_id = id.clone();
1039 let body = body.clone();
1040 tokio::spawn(async move {
1041 let approved = tokio::time::timeout(
1042 CHILD_APPROVAL_TIMEOUT,
1043 reviewer.review(&child, &body),
1044 )
1045 .await
1046 .unwrap_or(false);
1047 super::live::deliver_approval(&child, &req_id, approved);
1048 });
1049 } else if approval_decider.is_some() {
1050 // A decider is wired (policy / auto): decide promptly
1051 // and reply inline. (Must not block the pump — see the
1052 // `ChildApprovalDecider` doc.)
1053 let approved =
1054 decide_child_approval(approval_decider, child_session_id, &body)
1055 .await;
1056 if client
1057 .send(ParentFrame::ApprovalReply { id, approved })
1058 .await
1059 .is_err()
1060 {
1061 tracing::warn!(
1062 "failed to answer approval_request; connection failing"
1063 );
1064 }
1065 } else if let Some(host) = escalation_bridge.clone() {
1066 // Non-bypass WORKER: ESCALATE up our own actor link
1067 // (re-proxy) so the request chains to our parent — and
1068 // up every level until a bypass level (model-review) or
1069 // the top orchestrator (human) decides. Off-loop so the
1070 // pump never blocks; relay the reply down to the child.
1071 let child = child_session_id.to_string();
1072 let req_id = id.clone();
1073 let body = body.clone();
1074 tokio::spawn(async move {
1075 let approved = match tokio::time::timeout(
1076 CHILD_APPROVAL_TIMEOUT,
1077 host.approval_call(body),
1078 )
1079 .await
1080 {
1081 Ok(Ok(reply)) => reply
1082 .get("approved")
1083 .and_then(|v| v.as_bool())
1084 .unwrap_or(false),
1085 // Transport error or timeout ⇒ fail closed.
1086 _ => false,
1087 };
1088 super::live::deliver_approval(&child, &req_id, approved);
1089 });
1090 } else {
1091 // Top orchestrator (no escalation bridge): human-in-the-
1092 // loop. Surface the request on the parent's event stream
1093 // and DEFER — the decision arrives out-of-band via
1094 // `live::deliver_approval(child, request_id, approved)`
1095 // (→ this child's `live_rx` → forwarded to the worker
1096 // above). A timeout denies a never-answered request so
1097 // it can't hang the child forever.
1098 let (tool_name, permission, resource) =
1099 approval_request_fields(&body);
1100 // Register the pending request BEFORE surfacing it so
1101 // the external handler's `deliver_approval_checked` can
1102 // correlate an out-of-band POST against a genuine
1103 // human-loop request (and consume it one-shot).
1104 super::live::register_pending_approval(child_session_id, &id);
1105 let _ = event_tx
1106 .send(AgentEvent::ChildApprovalRequested {
1107 child_session_id: child_session_id.to_string(),
1108 request_id: id.clone(),
1109 tool_name,
1110 permission,
1111 resource,
1112 })
1113 .await;
1114 let child = child_session_id.to_string();
1115 tokio::spawn(async move {
1116 tokio::time::sleep(CHILD_APPROVAL_TIMEOUT).await;
1117 // Deny only if still pending: a one-shot consume so
1118 // we don't double-deliver if the human already
1119 // answered (the POST took it), and so a late POST
1120 // after this fires finds nothing pending.
1121 if super::live::take_pending_approval(&child, &id) {
1122 super::live::deliver_approval(&child, &id, false);
1123 }
1124 });
1125 }
1126 }
1127 Ok(Some(ChildFrame::Terminal { status, result, error, .. })) => {
1128 return match status {
1129 TerminalStatus::Completed => Ok(result),
1130 TerminalStatus::Cancelled => Err(AgentError::Cancelled),
1131 TerminalStatus::Error => Err(AgentError::LLM(
1132 error.unwrap_or_else(|| "actor child errored".to_string()),
1133 )),
1134 // The suspend/resume round-trip (host re-dispatch of a
1135 // nested parent) is not wired here yet; a worker in
1136 // this build never emits Suspended, so this is
1137 // unreachable in practice.
1138 TerminalStatus::Suspended => Err(AgentError::LLM(
1139 "nested sub-agent suspend received but resume transport is not wired"
1140 .to_string(),
1141 )),
1142 };
1143 }
1144 Ok(None) => {
1145 return Err(AgentError::LLM(
1146 "actor child closed before terminal".to_string(),
1147 ));
1148 }
1149 Err(e) => {
1150 return Err(AgentError::LLM(format!("actor transport error: {e}")));
1151 }
1152 }
1153 }
1154 }
1155 }
1156
1157 // Only reached on cancellation: ask the child to stop (best-effort), then report cancelled.
1158 let _ = client.send(ParentFrame::Cancel).await;
1159 Err(AgentError::Cancelled)
1160}
1161
1162/// The assignment text = the child session's latest user message (falls back to its title).
1163fn extract_assignment(session: &Session) -> String {
1164 session
1165 .messages
1166 .iter()
1167 .rev()
1168 .find(|m| matches!(m.role, Role::User))
1169 .map(|m| m.content.clone())
1170 .unwrap_or_else(|| {
1171 session
1172 .metadata
1173 .get("title")
1174 .cloned()
1175 .unwrap_or_else(|| "Execute task".to_string())
1176 })
1177}
1178
1179#[cfg(test)]
1180mod tests {
1181 use super::*;
1182
1183 fn spec_with(
1184 role: &str,
1185 provider: &str,
1186 model: &str,
1187 workspace: Option<&str>,
1188 disabled: Option<Vec<&str>>,
1189 ) -> ProvisionSpec {
1190 let mut spec = ProvisionSpec::new(
1191 ChildIdentity {
1192 child_id: "c".into(),
1193 parent_id: None,
1194 project_key: None,
1195 role: role.into(),
1196 depth: 0,
1197 },
1198 ExecutorSpec::Echo,
1199 "/tmp/fab".into(),
1200 );
1201 spec.workspace = workspace.map(|w| w.to_string());
1202 spec.model = Some(ModelRefSpec {
1203 provider: provider.into(),
1204 model: model.into(),
1205 });
1206 spec.disabled_tools = disabled.map(|d| d.into_iter().map(String::from).collect());
1207 spec
1208 }
1209
1210 #[test]
1211 fn fingerprint_matches_interchangeable_children() {
1212 // Same role/provider/model/workspace and equal tool sets (order-insensitive)
1213 // are interchangeable on one warm worker — and differ only in child_id.
1214 let a = spec_with(
1215 "explorer",
1216 "p",
1217 "m",
1218 Some("/ws"),
1219 Some(vec!["Bash", "Edit"]),
1220 );
1221 let mut b = spec_with(
1222 "explorer",
1223 "p",
1224 "m",
1225 Some("/ws"),
1226 Some(vec!["Edit", "Bash"]),
1227 );
1228 b.identity.child_id = "other".into();
1229 assert_eq!(
1230 ActorChildRunner::fingerprint(&a),
1231 ActorChildRunner::fingerprint(&b)
1232 );
1233 }
1234
1235 #[test]
1236 fn fingerprint_separates_distinct_runtimes() {
1237 let base = spec_with("explorer", "p", "m", Some("/ws"), None);
1238 let base_fp = ActorChildRunner::fingerprint(&base);
1239 // Each axis that is baked into the worker must split the pool bucket.
1240 assert_ne!(
1241 base_fp,
1242 ActorChildRunner::fingerprint(&spec_with("writer", "p", "m", Some("/ws"), None))
1243 );
1244 assert_ne!(
1245 base_fp,
1246 ActorChildRunner::fingerprint(&spec_with("explorer", "p2", "m", Some("/ws"), None))
1247 );
1248 assert_ne!(
1249 base_fp,
1250 ActorChildRunner::fingerprint(&spec_with("explorer", "p", "m2", Some("/ws"), None))
1251 );
1252 assert_ne!(
1253 base_fp,
1254 ActorChildRunner::fingerprint(&spec_with("explorer", "p", "m", Some("/ws2"), None))
1255 );
1256 assert_ne!(
1257 base_fp,
1258 ActorChildRunner::fingerprint(&spec_with(
1259 "explorer",
1260 "p",
1261 "m",
1262 Some("/ws"),
1263 Some(vec!["Bash"])
1264 ))
1265 );
1266 }
1267
1268 #[test]
1269 fn fingerprint_splits_on_baked_capabilities() {
1270 // Every capability baked once at provision time must split the pool
1271 // bucket, else a worker baked for one posture gets reused for another
1272 // (e.g. a depth-1 worker re-stamping spawn_depth onto a depth-4 child,
1273 // breaking the depth cap; or a bypass worker reused for a non-bypass one).
1274 let base_fp =
1275 ActorChildRunner::fingerprint(&spec_with("explorer", "p", "m", Some("/ws"), None));
1276
1277 let mut depth = spec_with("explorer", "p", "m", Some("/ws"), None);
1278 depth.identity.depth = 2;
1279 assert_ne!(
1280 base_fp,
1281 ActorChildRunner::fingerprint(&depth),
1282 "depth must split"
1283 );
1284
1285 let mut nested = spec_with("explorer", "p", "m", Some("/ws"), None);
1286 nested.capabilities.nested_spawn = true;
1287 assert_ne!(
1288 base_fp,
1289 ActorChildRunner::fingerprint(&nested),
1290 "nested_spawn must split"
1291 );
1292
1293 let mut bypass = spec_with("explorer", "p", "m", Some("/ws"), None);
1294 bypass.capabilities.bypass = true;
1295 assert_ne!(
1296 base_fp,
1297 ActorChildRunner::fingerprint(&bypass),
1298 "bypass must split"
1299 );
1300
1301 let mut enforce = spec_with("explorer", "p", "m", Some("/ws"), None);
1302 enforce.capabilities.enforce_permissions = true;
1303 assert_ne!(
1304 base_fp,
1305 ActorChildRunner::fingerprint(&enforce),
1306 "enforce_permissions must split"
1307 );
1308
1309 let mut cap = spec_with("explorer", "p", "m", Some("/ws"), None);
1310 cap.capabilities.max_spawn_depth = Some(8);
1311 assert_ne!(
1312 base_fp,
1313 ActorChildRunner::fingerprint(&cap),
1314 "max_spawn_depth must split"
1315 );
1316
1317 // #73 (P1): the worker bakes `no_human_review` from this flag once at
1318 // build(), so it MUST split the pool or a worker baked for one approval
1319 // posture is reused for the opposite one.
1320 let mut nha = spec_with("explorer", "p", "m", Some("/ws"), None);
1321 nha.capabilities.no_human_approver = true;
1322 assert_ne!(
1323 base_fp,
1324 ActorChildRunner::fingerprint(&nha),
1325 "no_human_approver must split"
1326 );
1327
1328 // #71: the read-only Bash checker is baked once at build() from this flag,
1329 // so a guardian reviewer worker must not be reused for an ordinary child.
1330 let mut gro = spec_with("explorer", "p", "m", Some("/ws"), None);
1331 gro.capabilities.guardian_read_only = true;
1332 assert_ne!(
1333 base_fp,
1334 ActorChildRunner::fingerprint(&gro),
1335 "guardian_read_only must split"
1336 );
1337 }
1338
1339 struct StaticDecider(bool);
1340
1341 #[async_trait]
1342 impl ChildApprovalDecider for StaticDecider {
1343 async fn decide(&self, _child: &str, _req: &serde_json::Value) -> bool {
1344 self.0
1345 }
1346 }
1347
1348 #[tokio::test]
1349 async fn child_approval_fails_closed_without_decider() {
1350 // No decider wired ⇒ the host denies (safe default), unchanged behavior.
1351 let body = serde_json::json!({"tool_name":"Bash","permission":"run","resource":"rm -rf /"});
1352 assert!(!decide_child_approval(None, "child-1", &body).await);
1353 }
1354
1355 #[tokio::test]
1356 async fn child_approval_honors_wired_decider() {
1357 let body =
1358 serde_json::json!({"tool_name":"Write","permission":"write","resource":"/tmp/x"});
1359 let approve: Arc<dyn ChildApprovalDecider> = Arc::new(StaticDecider(true));
1360 let deny: Arc<dyn ChildApprovalDecider> = Arc::new(StaticDecider(false));
1361 assert!(decide_child_approval(Some(&approve), "child-1", &body).await);
1362 assert!(!decide_child_approval(Some(&deny), "child-1", &body).await);
1363 }
1364
1365 #[test]
1366 fn approval_request_fields_extracts_and_defaults() {
1367 let full = serde_json::json!({"tool_name":"Bash","permission":"run","resource":"ls"});
1368 assert_eq!(
1369 approval_request_fields(&full),
1370 ("Bash".to_string(), "run".to_string(), "ls".to_string())
1371 );
1372 // Missing fields default to empty strings.
1373 let partial = serde_json::json!({"tool_name":"Write"});
1374 assert_eq!(
1375 approval_request_fields(&partial),
1376 ("Write".to_string(), String::new(), String::new())
1377 );
1378 }
1379
1380 // ---- #193: remote placement routing -------------------------------------
1381
1382 use crate::runtime::execution::SpawnJob;
1383 use bamboo_agent_core::Session;
1384
1385 /// A runner with a BOGUS worker_bin (`/bin/false`): a local spawn here would
1386 /// FAIL, so a passing remote test proves the remote path never spawns.
1387 fn bogus_runner(placements: HashMap<String, ResolvedRemotePlacement>) -> ActorChildRunner {
1388 ActorChildRunner::new(
1389 "test-actor".into(),
1390 PathBuf::from("/bin/false"),
1391 vec![],
1392 std::env::temp_dir().join("bamboo-test-fab-193"),
1393 ExecutorSpec::Echo,
1394 vec![],
1395 "anthropic".into(),
1396 4,
1397 )
1398 .with_remote_placements(placements)
1399 }
1400
1401 /// A child session of the given role (the role rides `subagent_type`, the
1402 /// path build_spec + the remote lookup both read).
1403 fn session_of_role(role: &str, assignment: &str) -> Session {
1404 let mut s = Session::new("child-1", "test-model");
1405 s.metadata
1406 .insert("subagent_type".to_string(), role.to_string());
1407 s.add_message(bamboo_agent_core::Message::user(assignment));
1408 s
1409 }
1410
1411 fn job_for(child: &str) -> SpawnJob {
1412 SpawnJob {
1413 parent_session_id: "parent-1".into(),
1414 child_session_id: child.into(),
1415 model: String::new(),
1416 disabled_tools: None,
1417 }
1418 }
1419
1420 #[test]
1421 fn build_spec_sets_remote_placement_for_matching_role() {
1422 let mut placements = HashMap::new();
1423 placements.insert(
1424 "explorer".to_string(),
1425 ResolvedRemotePlacement {
1426 endpoint: "wss://gpu-host:8443".into(),
1427 token: Some("T-secret".into()),
1428 ca_cert_file: None,
1429 },
1430 );
1431 let runner = bogus_runner(placements);
1432
1433 // Matching role -> Placement::Remote + the bearer on the secrets envelope.
1434 let s = session_of_role("explorer", "do the thing");
1435 let spec = runner.build_spec(&s, &job_for("child-1"));
1436 match &spec.placement {
1437 Placement::Remote { endpoint } => assert_eq!(endpoint, "wss://gpu-host:8443"),
1438 other => panic!("expected Remote, got {other:?}"),
1439 }
1440 assert_eq!(spec.secrets.worker_auth_token.as_deref(), Some("T-secret"));
1441 }
1442
1443 #[test]
1444 fn build_spec_leaves_local_for_unmatched_role() {
1445 let mut placements = HashMap::new();
1446 placements.insert(
1447 "explorer".to_string(),
1448 ResolvedRemotePlacement {
1449 endpoint: "wss://gpu-host:8443".into(),
1450 token: Some("T".into()),
1451 ca_cert_file: None,
1452 },
1453 );
1454 let runner = bogus_runner(placements);
1455
1456 // A DIFFERENT role keeps the default Local placement + no bearer.
1457 let s = session_of_role("writer", "do the thing");
1458 let spec = runner.build_spec(&s, &job_for("child-1"));
1459 assert_eq!(spec.placement, Placement::Local);
1460 assert!(spec.secrets.worker_auth_token.is_none());
1461 }
1462
1463 #[test]
1464 fn build_spec_local_when_no_placements() {
1465 let runner = bogus_runner(HashMap::new());
1466 let s = session_of_role("explorer", "do the thing");
1467 let spec = runner.build_spec(&s, &job_for("child-1"));
1468 assert_eq!(spec.placement, Placement::Local);
1469 assert!(spec.secrets.worker_auth_token.is_none());
1470 }
1471
1472 /// End-to-end remote run through `execute_external_child`: a resident worker
1473 /// (Bearer-gated `WsServer` + `EchoExecutor`) serves the role; the runner is
1474 /// built with a `remote_placements` entry pointing at it AND a BOGUS
1475 /// worker_bin (`/bin/false`). A passing test proves the remote path CONNECTS
1476 /// to the resident worker and NEVER spawns (a spawn would fail on /bin/false),
1477 /// and that a terminal/echo result flows back.
1478 #[tokio::test]
1479 async fn execute_external_child_routes_role_to_remote_worker_without_spawning() {
1480 // 1. Stand up the resident worker on loopback with a required bearer.
1481 let token = "remote-test-token";
1482 let server = bamboo_subagent::transport::WsServer::bind_with_token(
1483 (std::net::Ipv4Addr::LOCALHOST, 0).into(),
1484 Some(token.to_string()),
1485 )
1486 .await
1487 .expect("bind resident worker");
1488 let endpoint = server.ws_endpoint(); // ws://127.0.0.1:<port>
1489 let srv = tokio::spawn(async move {
1490 // serve() loops connection-after-connection; the test exits, dropping it.
1491 let _ = server
1492 .serve(Arc::new(bamboo_subagent::executor::EchoExecutor))
1493 .await;
1494 });
1495
1496 // 2. Build the runner: role "explorer" pinned remote, bogus worker_bin.
1497 let mut placements = HashMap::new();
1498 placements.insert(
1499 "explorer".to_string(),
1500 ResolvedRemotePlacement {
1501 endpoint: endpoint.clone(),
1502 token: Some(token.to_string()),
1503 ca_cert_file: None, // plaintext ws:// loopback, default trust
1504 },
1505 );
1506 let runner = bogus_runner(placements);
1507
1508 // 3. Drive a real run for that role.
1509 let mut session = session_of_role("explorer", "hello remote");
1510 let job = job_for("child-1");
1511 let (event_tx, mut event_rx) = mpsc::channel::<AgentEvent>(64);
1512 let cancel = CancellationToken::new();
1513
1514 let result = tokio::time::timeout(
1515 Duration::from_secs(10),
1516 runner.execute_external_child(&mut session, &job, event_tx, cancel),
1517 )
1518 .await
1519 .expect("run did not hang")
1520 .expect("remote run succeeded (connected to resident worker, did not spawn)");
1521
1522 let _ = result;
1523 // The EchoExecutor's reply is written back onto the child session as an
1524 // assistant message — proof a terminal result flowed back over the link.
1525 let last = session
1526 .messages
1527 .iter()
1528 .rev()
1529 .find(|m| matches!(m.role, Role::Assistant))
1530 .expect("an assistant reply was written back");
1531 assert!(
1532 last.content.contains("echo:"),
1533 "expected echo reply, got {:?}",
1534 last.content
1535 );
1536
1537 // Drain a couple of streamed events to confirm the event pipe carried the
1538 // worker's tokens too (best-effort; the reply assertion above is primary).
1539 let mut saw_event = false;
1540 while let Ok(Some(_ev)) =
1541 tokio::time::timeout(Duration::from_millis(50), event_rx.recv()).await
1542 {
1543 saw_event = true;
1544 }
1545 let _ = saw_event;
1546
1547 srv.abort();
1548 }
1549
1550 // ---- #181 (P2b): schedulable placement routing --------------------------
1551
1552 use wiremock::matchers::{method as wm_method, path as wm_path};
1553 use wiremock::{Mock, MockServer, ResponseTemplate};
1554
1555 /// A bogus-worker_bin runner carrying SCHEDULABLE placements (and optionally
1556 /// remote ones, to test precedence). A local spawn here would fail on
1557 /// `/bin/false`, so a passing schedulable test proves no subprocess spawned.
1558 fn bogus_sched_runner(
1559 remote: HashMap<String, ResolvedRemotePlacement>,
1560 sched: HashMap<String, ResolvedSchedulablePlacement>,
1561 ) -> ActorChildRunner {
1562 ActorChildRunner::new(
1563 "test-actor".into(),
1564 PathBuf::from("/bin/false"),
1565 vec![],
1566 std::env::temp_dir().join("bamboo-test-fab-181"),
1567 ExecutorSpec::Echo,
1568 vec![],
1569 "anthropic".into(),
1570 4,
1571 )
1572 .with_remote_placements(remote)
1573 .with_schedulable_placements(sched)
1574 }
1575
1576 fn sched_placement(
1577 pool: &str,
1578 registry_url: impl Into<String>,
1579 ) -> ResolvedSchedulablePlacement {
1580 ResolvedSchedulablePlacement {
1581 pool: pool.into(),
1582 registry_url: registry_url.into(),
1583 token: None,
1584 ca_cert_file: None,
1585 }
1586 }
1587
1588 fn live_record(agent_id: &str, role: &str, endpoint: &str) -> AgentRecord {
1589 AgentRecord {
1590 agent_id: agent_id.into(),
1591 role: role.into(),
1592 labels: Vec::new(),
1593 endpoint: endpoint.into(),
1594 pid: 0,
1595 version: String::new(),
1596 started_at: chrono::Utc::now(),
1597 lease_expires_at: chrono::Utc::now() + chrono::Duration::seconds(60),
1598 }
1599 }
1600
1601 #[test]
1602 fn build_spec_sets_schedulable_placement_for_matching_role() {
1603 let mut sched = HashMap::new();
1604 sched.insert("explorer".to_string(), {
1605 let mut p = sched_placement("gpu-pool", "https://control-plane:9562");
1606 p.token = Some("T-sched".into());
1607 p
1608 });
1609 let runner = bogus_sched_runner(HashMap::new(), sched);
1610
1611 let s = session_of_role("explorer", "do the thing");
1612 let spec = runner.build_spec(&s, &job_for("child-1"));
1613 match &spec.placement {
1614 Placement::Schedulable { pool } => assert_eq!(pool, "gpu-pool"),
1615 other => panic!("expected Schedulable, got {other:?}"),
1616 }
1617 // The bearer rides the scoped secrets envelope (registry + worker connect).
1618 assert_eq!(spec.secrets.worker_auth_token.as_deref(), Some("T-sched"));
1619 }
1620
1621 #[test]
1622 fn build_spec_remote_wins_when_role_in_both_maps() {
1623 // A role present in BOTH remote_placements and schedulable_placements must
1624 // resolve to the FIXED remote placement (documented precedence).
1625 let mut remote = HashMap::new();
1626 remote.insert(
1627 "explorer".to_string(),
1628 ResolvedRemotePlacement {
1629 endpoint: "wss://fixed-host:8443".into(),
1630 token: Some("T-remote".into()),
1631 ca_cert_file: None,
1632 },
1633 );
1634 let mut sched = HashMap::new();
1635 sched.insert(
1636 "explorer".to_string(),
1637 sched_placement("gpu-pool", "https://control-plane:9562"),
1638 );
1639 let runner = bogus_sched_runner(remote, sched);
1640
1641 let s = session_of_role("explorer", "do the thing");
1642 let spec = runner.build_spec(&s, &job_for("child-1"));
1643 match &spec.placement {
1644 Placement::Remote { endpoint } => assert_eq!(endpoint, "wss://fixed-host:8443"),
1645 other => panic!("expected Remote (precedence), got {other:?}"),
1646 }
1647 assert_eq!(spec.secrets.worker_auth_token.as_deref(), Some("T-remote"));
1648 }
1649
1650 #[test]
1651 fn build_spec_local_for_unmatched_schedulable_role() {
1652 let mut sched = HashMap::new();
1653 sched.insert(
1654 "explorer".to_string(),
1655 sched_placement("gpu-pool", "https://control-plane:9562"),
1656 );
1657 let runner = bogus_sched_runner(HashMap::new(), sched);
1658 let s = session_of_role("writer", "do the thing");
1659 let spec = runner.build_spec(&s, &job_for("child-1"));
1660 assert_eq!(spec.placement, Placement::Local);
1661 assert!(spec.secrets.worker_auth_token.is_none());
1662 }
1663
1664 /// Spin up `n` loopback Echo `WsServer`s; returns their endpoints and the
1665 /// JoinHandles (abort on test end). Each is a REAL connectable worker.
1666 async fn spawn_echo_workers(n: usize) -> (Vec<String>, Vec<tokio::task::JoinHandle<()>>) {
1667 let mut endpoints = Vec::new();
1668 let mut handles = Vec::new();
1669 for _ in 0..n {
1670 let server = bamboo_subagent::transport::WsServer::bind_loopback()
1671 .await
1672 .expect("bind echo worker");
1673 endpoints.push(server.ws_endpoint());
1674 handles.push(tokio::spawn(async move {
1675 let _ = server
1676 .serve(Arc::new(bamboo_subagent::executor::EchoExecutor))
1677 .await;
1678 }));
1679 }
1680 (endpoints, handles)
1681 }
1682
1683 #[tokio::test]
1684 async fn resolve_schedulable_worker_round_robin_spreads_over_candidates() {
1685 // Registry returns three live, CONNECTABLE workers in the pool; successive
1686 // picks must advance the per-pool cursor and cover all three (round-robin
1687 // spread). resolve now also connects, so the candidates are real servers.
1688 let (eps, handles) = spawn_echo_workers(3).await;
1689 let registry = MockServer::start().await;
1690 let recs = vec![
1691 live_record("w-0", "gpu-pool", &eps[0]),
1692 live_record("w-1", "gpu-pool", &eps[1]),
1693 live_record("w-2", "gpu-pool", &eps[2]),
1694 // A worker in a DIFFERENT pool must be filtered out.
1695 live_record("other", "cpu-pool", "ws://127.0.0.1:9"),
1696 ];
1697 Mock::given(wm_method("GET"))
1698 .and(wm_path("/v1/agents"))
1699 .respond_with(ResponseTemplate::new(200).set_body_json(recs))
1700 .mount(®istry)
1701 .await;
1702
1703 let mut sched = HashMap::new();
1704 sched.insert(
1705 "explorer".to_string(),
1706 sched_placement("gpu-pool", registry.uri()),
1707 );
1708 let runner = bogus_sched_runner(HashMap::new(), sched);
1709
1710 // Three picks → cursor 0,1,2 → agent_ids w-0, w-1, w-2 in order.
1711 let mut picked = Vec::new();
1712 for _ in 0..3 {
1713 let (client, rec, placement) = match runner.resolve_schedulable_worker("explorer").await
1714 {
1715 Ok(v) => v,
1716 Err(e) => panic!("a live worker is picked: {e}"),
1717 };
1718 assert_eq!(placement.pool, "gpu-pool");
1719 assert_eq!(rec.role, "gpu-pool", "only pool workers are candidates");
1720 picked.push(rec.agent_id);
1721 let _ = client.close().await;
1722 }
1723 picked.sort();
1724 assert_eq!(
1725 picked,
1726 vec!["w-0".to_string(), "w-1".to_string(), "w-2".to_string()],
1727 "round-robin covered every candidate over three picks"
1728 );
1729 for h in handles {
1730 h.abort();
1731 }
1732 }
1733
1734 #[tokio::test]
1735 async fn resolve_schedulable_worker_fails_over_dead_first_candidate() {
1736 // The FIRST candidate (by cursor order: cursor starts at 0) points at a
1737 // DEAD endpoint (no listener on a closed port); a LATER candidate is a real
1738 // Echo worker. resolve must SKIP the dead one and connect to the live one,
1739 // returning the live record — connect-fail failover (#202).
1740 let (eps, handles) = spawn_echo_workers(1).await;
1741 // A port with no listener: bind then drop to free it (best-effort closed).
1742 let dead = {
1743 let l = tokio::net::TcpListener::bind((std::net::Ipv4Addr::LOCALHOST, 0))
1744 .await
1745 .unwrap();
1746 let port = l.local_addr().unwrap().port();
1747 drop(l);
1748 format!("ws://127.0.0.1:{port}")
1749 };
1750 let registry = MockServer::start().await;
1751 let recs = vec![
1752 // idx 0 = cursor start = DEAD
1753 live_record("w-dead", "gpu-pool", &dead),
1754 // idx 1 = LIVE echo worker
1755 live_record("w-live", "gpu-pool", &eps[0]),
1756 ];
1757 Mock::given(wm_method("GET"))
1758 .and(wm_path("/v1/agents"))
1759 .respond_with(ResponseTemplate::new(200).set_body_json(recs))
1760 .mount(®istry)
1761 .await;
1762
1763 let mut sched = HashMap::new();
1764 sched.insert(
1765 "explorer".to_string(),
1766 sched_placement("gpu-pool", registry.uri()),
1767 );
1768 let runner = bogus_sched_runner(HashMap::new(), sched);
1769
1770 let (client, rec, _placement) = match runner.resolve_schedulable_worker("explorer").await {
1771 Ok(v) => v,
1772 Err(e) => panic!("failover skips the dead candidate, connects to the live one: {e}"),
1773 };
1774 assert_eq!(
1775 rec.agent_id, "w-live",
1776 "the dead first candidate was skipped; the live one was chosen"
1777 );
1778 let _ = client.close().await;
1779 for h in handles {
1780 h.abort();
1781 }
1782 }
1783
1784 #[tokio::test]
1785 async fn resolve_schedulable_worker_errors_when_all_candidates_dead() {
1786 // ALL candidates point at dead endpoints: resolve must error (no panic),
1787 // and CRUCIALLY no spawn (a schedulable role has no local fallback). The
1788 // error names how many were tried.
1789 let mut dead = Vec::new();
1790 for _ in 0..2 {
1791 let l = tokio::net::TcpListener::bind((std::net::Ipv4Addr::LOCALHOST, 0))
1792 .await
1793 .unwrap();
1794 let port = l.local_addr().unwrap().port();
1795 drop(l);
1796 dead.push(format!("ws://127.0.0.1:{port}"));
1797 }
1798 let registry = MockServer::start().await;
1799 let recs = vec![
1800 live_record("d-0", "gpu-pool", &dead[0]),
1801 live_record("d-1", "gpu-pool", &dead[1]),
1802 ];
1803 Mock::given(wm_method("GET"))
1804 .and(wm_path("/v1/agents"))
1805 .respond_with(ResponseTemplate::new(200).set_body_json(recs))
1806 .mount(®istry)
1807 .await;
1808
1809 let mut sched = HashMap::new();
1810 sched.insert(
1811 "explorer".to_string(),
1812 sched_placement("gpu-pool", registry.uri()),
1813 );
1814 let runner = bogus_sched_runner(HashMap::new(), sched);
1815
1816 let msg = match runner.resolve_schedulable_worker("explorer").await {
1817 Ok(_) => panic!("all candidates dead must error, not connect"),
1818 Err(e) => e.to_string(),
1819 };
1820 assert!(
1821 msg.contains("failed to connect"),
1822 "names the connect failure: {msg}"
1823 );
1824 assert!(msg.contains("gpu-pool"), "names the pool: {msg}");
1825 assert!(
1826 msg.contains("NOT spawning"),
1827 "confirms no local-subprocess fallback: {msg}"
1828 );
1829 }
1830
1831 #[tokio::test]
1832 async fn fabric_cache_reuses_one_fabric_per_registry_url() {
1833 // Two resolves to the SAME registry_url must REUSE one cached fabric (#202):
1834 // after N calls the cache has exactly one entry for that url.
1835 let (eps, handles) = spawn_echo_workers(1).await;
1836 let registry = MockServer::start().await;
1837 Mock::given(wm_method("GET"))
1838 .and(wm_path("/v1/agents"))
1839 .respond_with(
1840 ResponseTemplate::new(200)
1841 .set_body_json(vec![live_record("w-0", "gpu-pool", &eps[0])]),
1842 )
1843 .mount(®istry)
1844 .await;
1845
1846 let mut sched = HashMap::new();
1847 sched.insert(
1848 "explorer".to_string(),
1849 sched_placement("gpu-pool", registry.uri()),
1850 );
1851 let runner = bogus_sched_runner(HashMap::new(), sched);
1852
1853 for _ in 0..3 {
1854 let (client, _rec, _p) = match runner.resolve_schedulable_worker("explorer").await {
1855 Ok(v) => v,
1856 Err(e) => panic!("resolve succeeds: {e}"),
1857 };
1858 let _ = client.close().await;
1859 }
1860
1861 let cache = runner.fabric_cache.lock().unwrap();
1862 assert_eq!(
1863 cache.len(),
1864 1,
1865 "three resolves to the same (registry_url, token) reused ONE cached fabric"
1866 );
1867 assert!(cache.contains_key(&(registry.uri(), None)));
1868 drop(cache);
1869 for h in handles {
1870 h.abort();
1871 }
1872 }
1873
1874 #[test]
1875 fn fabric_cache_keys_on_registry_url_and_token() {
1876 // Two placements sharing a registry_url but presenting DIFFERENT tokens
1877 // must NOT share a fabric — a reused wrong-bearer fabric would issue the
1878 // discover query with the other role's token (#202 review F1).
1879 let runner = bogus_sched_runner(HashMap::new(), HashMap::new());
1880 let url = "http://127.0.0.1:9/".to_string();
1881 let mut p = sched_placement("pool", url.clone());
1882
1883 p.token = Some("token-a".into());
1884 let fa = runner.fabric_for(&p, "role-a").expect("build a");
1885 p.token = Some("token-b".into());
1886 let fb = runner.fabric_for(&p, "role-b").expect("build b");
1887 // Same url, different token → two distinct cache entries (not aliased).
1888 assert!(
1889 !Arc::ptr_eq(&fa, &fb),
1890 "different tokens must not share a fabric"
1891 );
1892 assert_eq!(runner.fabric_cache.lock().unwrap().len(), 2);
1893 // Re-requesting token-a reuses the first fabric.
1894 p.token = Some("token-a".into());
1895 let fa2 = runner.fabric_for(&p, "role-a").expect("reuse a");
1896 assert!(Arc::ptr_eq(&fa, &fa2), "same (url, token) must reuse");
1897 assert_eq!(runner.fabric_cache.lock().unwrap().len(), 2);
1898 }
1899
1900 #[tokio::test]
1901 async fn resolve_schedulable_worker_errors_with_no_live_worker() {
1902 // The pool has no live workers (registry returns an empty / off-pool set):
1903 // a clear error, and CRUCIALLY no spawn (a schedulable role has no local
1904 // fallback).
1905 let registry = MockServer::start().await;
1906 Mock::given(wm_method("GET"))
1907 .and(wm_path("/v1/agents"))
1908 .respond_with(ResponseTemplate::new(200).set_body_json(vec![live_record(
1909 "x",
1910 "cpu-pool",
1911 "ws://127.0.0.1:9",
1912 )]))
1913 .mount(®istry)
1914 .await;
1915 let mut sched = HashMap::new();
1916 sched.insert(
1917 "explorer".to_string(),
1918 sched_placement("gpu-pool", registry.uri()),
1919 );
1920 let runner = bogus_sched_runner(HashMap::new(), sched);
1921
1922 let msg = match runner.resolve_schedulable_worker("explorer").await {
1923 Ok(_) => panic!("no live worker in pool must error, not connect"),
1924 Err(e) => e.to_string(),
1925 };
1926 assert!(msg.contains("no live worker"), "clear error: {msg}");
1927 assert!(msg.contains("gpu-pool"), "names the pool: {msg}");
1928 }
1929
1930 /// Full schedulable e2e: a resident `WsServer` Echo worker is registered (via a
1931 /// wiremock registry) into pool "gpu-pool"; the runner is configured with a
1932 /// schedulable placement for role "explorer" → that pool, plus a BOGUS
1933 /// worker_bin (`/bin/false`). A passing run proves the SCHEDULABLE path RESOLVES
1934 /// the worker from the registry and connects to it WITHOUT ever spawning a local
1935 /// subprocess (a spawn would fail on /bin/false), and that an echo result flows
1936 /// back.
1937 #[tokio::test]
1938 async fn execute_external_child_schedules_role_from_registry_without_spawning() {
1939 // 1. Stand up the resident Echo worker on loopback (no bearer needed here).
1940 let server = bamboo_subagent::transport::WsServer::bind_loopback()
1941 .await
1942 .expect("bind resident worker");
1943 let endpoint = server.ws_endpoint(); // ws://127.0.0.1:<port>
1944 let srv = tokio::spawn(async move {
1945 let _ = server
1946 .serve(Arc::new(bamboo_subagent::executor::EchoExecutor))
1947 .await;
1948 });
1949
1950 // 2. Registry: returns the live worker registered into pool "gpu-pool"
1951 // pointing at the real worker endpoint.
1952 let registry = MockServer::start().await;
1953 Mock::given(wm_method("GET"))
1954 .and(wm_path("/v1/agents"))
1955 .respond_with(ResponseTemplate::new(200).set_body_json(vec![live_record(
1956 "live-explorer",
1957 "gpu-pool",
1958 &endpoint,
1959 )]))
1960 .mount(®istry)
1961 .await;
1962
1963 // 3. Runner: role "explorer" → schedulable on "gpu-pool"; bogus worker_bin.
1964 let mut sched = HashMap::new();
1965 sched.insert(
1966 "explorer".to_string(),
1967 sched_placement("gpu-pool", registry.uri()),
1968 );
1969 let runner = bogus_sched_runner(HashMap::new(), sched);
1970
1971 // 4. Drive a real run for that role.
1972 let mut session = session_of_role("explorer", "hello scheduled");
1973 let job = job_for("child-1");
1974 let (event_tx, mut event_rx) = mpsc::channel::<AgentEvent>(64);
1975 let cancel = CancellationToken::new();
1976
1977 tokio::time::timeout(
1978 Duration::from_secs(10),
1979 runner.execute_external_child(&mut session, &job, event_tx, cancel),
1980 )
1981 .await
1982 .expect("run did not hang")
1983 .expect("scheduled run succeeded (resolved from registry, did not spawn)");
1984
1985 // The EchoExecutor's reply is written back — proof a terminal flowed back.
1986 let last = session
1987 .messages
1988 .iter()
1989 .rev()
1990 .find(|m| matches!(m.role, Role::Assistant))
1991 .expect("an assistant reply was written back");
1992 assert!(
1993 last.content.contains("echo:"),
1994 "expected echo reply, got {:?}",
1995 last.content
1996 );
1997
1998 // Best-effort: confirm streamed events also flowed.
1999 let mut saw_event = false;
2000 while let Ok(Some(_ev)) =
2001 tokio::time::timeout(Duration::from_millis(50), event_rx.recv()).await
2002 {
2003 saw_event = true;
2004 }
2005 let _ = saw_event;
2006
2007 srv.abort();
2008 }
2009}