bamboo_engine/external_agents/actor_adapter.rs
1//! Actor external child runner.
2//!
3//! Runs a child session as an independent **actor**: a separate OS process with its own
4//! isolated context, speaking the `bamboo-subagent` WebSocket protocol. This is the
5//! engine-side adapter on the `wants_external` seam: it spawns the worker binary, waits for
6//! it to self-register into the Tier-1 file fabric, connects, sends the assignment, and
7//! forwards the child's `AgentEvent`s back onto the parent's `event_tx`.
8//!
9//! The built-in **local actor** instance of this runner is the default runtime for
10//! every sub-agent (the in-process runtime was removed). The expert `externalAgents`
11//! tables can additionally route specific roles to other actor/a2a agents.
12
13use std::collections::HashMap;
14use std::path::PathBuf;
15use std::sync::Arc;
16use std::time::Duration;
17
18use async_trait::async_trait;
19use bamboo_agent_core::{AgentError, AgentEvent, Role, Session};
20use tokio::sync::mpsc;
21use tokio_util::sync::CancellationToken;
22
23use bamboo_subagent::fleet::{spawn_worker_on_bus, SpawnedChild};
24use bamboo_subagent::proto::{AgentRecord, ChildFrame, ParentFrame, RunSpec, TerminalStatus};
25use bamboo_subagent::provision::{
26 ChildIdentity, ExecutorSpec, ModelRefSpec, Placement, ProvisionSpec, ScopedCredential,
27};
28use bamboo_subagent::transport::{client_config_trusting_cert, ChildClient};
29
30use crate::runtime::execution::{ExternalChildRunner, SpawnJob};
31
32/// Default cap on simultaneously running actor processes.
33pub const DEFAULT_MAX_CONCURRENT_ACTORS: usize = 8;
34
35/// Max nesting depth for direct nested execution (Phase 6). A worker whose
36/// session `spawn_depth` is below this gets its own spawn stack + the real
37/// SubAgent tool; at/over it, neither (and the tool itself refuses). Mirrors
38/// `bamboo_server_tools::DEFAULT_MAX_SPAWN_DEPTH` (kept in sync; engine can't
39/// depend on server-tools). Root orchestrator = 0 ⇒ 4 levels of sub-agents.
40pub const MAX_SPAWN_DEPTH: u32 = 4;
41
42/// Default cap on idle pooled (warm, reusable) workers kept per fingerprint.
43const DEFAULT_MAX_IDLE_PER_KEY: usize = 4;
44
45/// How long a pooled worker waits for its next assignment before reclaiming
46/// itself (must comfortably exceed the gap between sibling spawns).
47const POOLED_IDLE_TIMEOUT_SECS: u64 = 300;
48
49/// Deadline for a local worker's FIRST frame after a Run is dispatched. A warm
50/// worker answers in seconds; a cold spawn within tens. Total silence past this
51/// means the worker is dead (e.g. a pooled worker that exited right after its
52/// liveness check) and its Run is queued with nobody to serve it — trip it so the
53/// runner respawns once instead of hanging forever. Generous, to never false-trip
54/// a slow-but-healthy cold start.
55const WORKER_FIRST_FRAME_TIMEOUT: Duration = Duration::from_secs(60);
56
57/// A warm worker on the mailbox bus, parked for reuse between runs. It stays
58/// dialed-in + subscribed to `mailbox_id`; the next interchangeable child
59/// delivers its `Run` there instead of spawning a fresh process. Dropping it
60/// kills a local kill-on-drop subprocess; a remote / schedulable handle is
61/// process-less (`kill()` is a no-op — it self-manages via its idle timeout).
62struct PooledWorker {
63 worker: SpawnedChild,
64 /// The bus mailbox this worker subscribes to (where its `Run`s are delivered).
65 mailbox_id: String,
66}
67
68/// A role pinned to a remote resident worker (remote-actor-plan §3.4 / P1.5,
69/// #193), resolved at runner-build time from `SubagentsConfig.remote_placements`:
70/// the env-named bearer is already READ into `token` here (the raw token never
71/// rides the config), and `ca_cert_file` is the path to a PEM pinning a
72/// self-signed worker cert (`None` ⇒ default webpki roots / plaintext `ws://`).
73#[derive(Debug, Clone)]
74pub struct ResolvedRemotePlacement {
75 pub endpoint: String,
76 pub token: Option<String>,
77 pub ca_cert_file: Option<PathBuf>,
78 /// Display name for the machine this role runs on — the matching cluster
79 /// node's `label`/host, surfaced on the UI placement badge. `None` ⇒ derive
80 /// from the endpoint host.
81 pub host_label: Option<String>,
82}
83
84/// A role routed to a SCHEDULED worker (remote-actor-plan §3.4 / P2b, #181),
85/// resolved at runner-build time from `SubagentsConfig.schedulable_placements`.
86/// Names the logical `pool` (= the bus role) whose LIVE connected workers are the
87/// scheduling candidates — the runner picks one via the bus presence query
88/// (`BrokerClient::list_connected`). Phase 3 retired the old HTTP registry, so a
89/// pool is now just a role on the bus.
90#[derive(Debug, Clone)]
91pub struct ResolvedSchedulablePlacement {
92 pub pool: String,
93 /// Display name for the machine this pool's workers run on — the matching
94 /// cluster node's `label`/host, surfaced on the UI placement badge. `None` ⇒
95 /// fall back to the pool name.
96 pub host_label: Option<String>,
97}
98
99/// How `execute_external_child` should obtain its worker connection, decided
100/// once from `spec.placement`. Splits the divergent acquire/connect + retire
101/// logic three ways while the shared middle (Run dispatch, live registration,
102/// drive, close) stays identical. `Local` is the unchanged pre-#193 path;
103/// `Remote` is the unchanged #194 path; `Schedulable` (#181, P2b) is new.
104enum PlacementKind {
105 Local,
106 Remote,
107 Schedulable,
108}
109
110/// Spawns and drives a child session as an independent actor: a `bamboo-subagent` worker process.
111pub struct ActorChildRunner {
112 agent_id: String,
113 worker_bin: PathBuf,
114 worker_args: Vec<String>,
115 fabric_dir: PathBuf,
116 executor: ExecutorSpec,
117 /// Per-provider credentials snapshotted from the parent config at build
118 /// time; the spec carries only the ONE the child's provider needs.
119 credentials: Vec<ScopedCredential>,
120 /// Parent's default provider (used when the child has no explicit one).
121 default_provider: String,
122 /// The mailbox bus to run local children over (the unified transport). Local
123 /// sub-agents require it; `None` only when no broker could be embedded.
124 bus: Option<bamboo_subagent::BusEndpoint>,
125 /// Backpressure: bounds the number of concurrently *running* actors; further
126 /// runs wait for a slot instead of exploding the process table. (Idle pooled
127 /// workers do not hold a slot.)
128 concurrency: std::sync::Arc<tokio::sync::Semaphore>,
129 /// Warm-worker pool keyed by a reuse fingerprint
130 /// (role/provider/model/workspace/disabled-tools/baked-caps). A finished run
131 /// parks its bus worker here so the next interchangeable child reuses it
132 /// (delivers its `Run` to the same mailbox) instead of spawning a fresh
133 /// process — collapsing N sibling sub-agents onto a few warm workers.
134 pool: Arc<tokio::sync::Mutex<HashMap<String, Vec<PooledWorker>>>>,
135 max_idle_per_key: usize,
136 /// Host-side decision for a child's gated-tool approval request (Phase 2).
137 /// `None` ⇒ fail-closed DENY (the safe default). A wired decider (policy or
138 /// human-routing bridge) returns approve/deny over the actor WS.
139 approval_decider: Option<Arc<dyn ChildApprovalDecider>>,
140 /// Per-run escalation host bridge for non-bypass child-approval routing (#68;
141 /// Phase 6, Part B). The owning worker's `run()` installs its OWN host bridge
142 /// here via `set_escalation_bridge`; `execute_external_child` CAPTURES it at
143 /// grandchild-spawn time and hands the owned value to `drive()`, which uses it
144 /// to RE-PROXY a child's approval request UP to the parent run — chaining up
145 /// every level until a bypass level (model-review) or the top orchestrator
146 /// (human) decides, then relaying the reply back down. Was a process-global
147 /// slot; now per-runner so a fire-and-forget grandchild that OUTLIVES the run
148 /// that spawned it keeps that run's bridge for its whole lifetime instead of
149 /// reading a stale/overwritten global at approval time (→ fail-closed deny).
150 escalation_bridge: Arc<std::sync::Mutex<Option<bamboo_subagent::executor::HostBridge>>>,
151 /// Roles pinned to a REMOTE resident worker (#193), keyed by sub-agent role
152 /// (the child's `subagent_type`). A role present here routes through the
153 /// dedicated remote branch in `execute_external_child` (Bearer-authenticated
154 /// `wss://` connect, no spawn, no pool, no kill) instead of the local
155 /// subprocess + warm-pool path. Empty (the default) = all-local behavior.
156 remote_placements: HashMap<String, ResolvedRemotePlacement>,
157 /// Roles routed to a REGISTRY-SCHEDULED worker (#181, P2b), keyed by sub-agent
158 /// role. A role present here (AND not already in `remote_placements`, which
159 /// wins) routes through the dedicated SCHEDULABLE branch in
160 /// `execute_external_child`: query the registry for live workers in the pool,
161 /// pick one (round-robin), connect over `wss://` — no spawn, no pool, no kill,
162 /// and NO local-subprocess fallback (no live worker ⇒ a clear error). Empty
163 /// (the default) = all-local behavior.
164 schedulable_placements: HashMap<String, ResolvedSchedulablePlacement>,
165 /// Per-pool round-robin cursor for schedulable scheduling (#181, P2b). Bumped
166 /// once per pick so successive sibling spawns SPREAD across a pool's live
167 /// workers instead of all landing on the first candidate. Best-effort spread,
168 /// not a load balancer — the registry's live set can change between picks.
169 schedule_cursor: Arc<std::sync::Mutex<HashMap<String, usize>>>,
170}
171
172/// Decides how the host answers a child worker's gated-tool approval request
173/// (Phase 2: child → parent approval delegation). Async so an implementation
174/// can consult a policy or defer to a human. With no decider wired the host
175/// replies with a fail-closed DENY.
176///
177/// NOTE: `decide` is awaited inside the per-child frame pump, so an
178/// implementation must resolve promptly (e.g. a policy lookup). A human-in-the-
179/// loop decision that may block indefinitely should instead be delivered
180/// out-of-band as a `ParentFrame::ApprovalReply` via the live steering channel
181/// (`super::live`), which `drive()` already forwards to the worker without
182/// stalling the pump.
183#[async_trait]
184pub trait ChildApprovalDecider: Send + Sync {
185 /// Decide whether `child_session_id` may perform the gated action described
186 /// by `request` (`{tool_name, permission, resource}`).
187 async fn decide(&self, child_session_id: &str, request: &serde_json::Value) -> bool;
188}
189
190/// Resolve a child approval request to approve/deny. Fail-closed (DENY) when no
191/// decider is wired — the single, testable seam for the host-side decision.
192async fn decide_child_approval(
193 decider: Option<&Arc<dyn ChildApprovalDecider>>,
194 child_session_id: &str,
195 request: &serde_json::Value,
196) -> bool {
197 match decider {
198 Some(decider) => decider.decide(child_session_id, request).await,
199 None => false,
200 }
201}
202
203/// How long the host waits for a human approval decision before failing the
204/// child's gated tool closed (DENY). Bounds an unanswered request so it can't
205/// hang the worker indefinitely.
206const CHILD_APPROVAL_TIMEOUT: Duration = Duration::from_secs(300);
207
208/// Extract `(tool_name, permission, resource)` from a worker's approval request
209/// body (`{tool_name, permission, resource}`); missing fields default to empty.
210fn approval_request_fields(body: &serde_json::Value) -> (String, String, String) {
211 let field = |k: &str| {
212 body.get(k)
213 .and_then(|v| v.as_str())
214 .unwrap_or("")
215 .to_string()
216 };
217 (field("tool_name"), field("permission"), field("resource"))
218}
219
220/// Off-loop reviewer for a child's gated-tool approval request (Phase 6, Part B).
221///
222/// Installed (process-global) by a BYPASSED self-orchestrating worker so its
223/// children's forced-ask (dangerous) gated actions — which still raise
224/// `ConfirmationRequired` even under bypass — get an LLM reasonableness check
225/// rather than a blind pass. `review` is an LLM call: `drive()` invokes it in a
226/// SPAWNED task (NEVER in the frame pump) and delivers the verdict async via the
227/// live channel, so the agent loop is never blocked.
228#[async_trait]
229pub trait ChildApprovalReviewer: Send + Sync {
230 /// Judge whether the gated action `request` (`{tool_name, permission,
231 /// resource}`) is reasonable for `child_session_id`'s task. `true` = approve.
232 async fn review(&self, child_session_id: &str, request: &serde_json::Value) -> bool;
233}
234
235fn child_approval_reviewer_slot() -> &'static std::sync::OnceLock<Arc<dyn ChildApprovalReviewer>> {
236 static SLOT: std::sync::OnceLock<Arc<dyn ChildApprovalReviewer>> = std::sync::OnceLock::new();
237 &SLOT
238}
239
240/// Install the process-global child-approval reviewer (idempotent; first wins).
241pub fn set_child_approval_reviewer(reviewer: Arc<dyn ChildApprovalReviewer>) {
242 let _ = child_approval_reviewer_slot().set(reviewer);
243}
244
245/// The process-global child-approval reviewer, if installed.
246pub fn child_approval_reviewer() -> Option<Arc<dyn ChildApprovalReviewer>> {
247 child_approval_reviewer_slot().get().cloned()
248}
249
250impl ActorChildRunner {
251 #[allow(clippy::too_many_arguments)]
252 pub fn new(
253 agent_id: String,
254 worker_bin: PathBuf,
255 worker_args: Vec<String>,
256 fabric_dir: PathBuf,
257 executor: ExecutorSpec,
258 credentials: Vec<ScopedCredential>,
259 default_provider: String,
260 max_concurrent: usize,
261 ) -> Self {
262 Self {
263 agent_id,
264 worker_bin,
265 worker_args,
266 fabric_dir,
267 executor,
268 credentials,
269 default_provider,
270 bus: None,
271 concurrency: std::sync::Arc::new(tokio::sync::Semaphore::new(max_concurrent.max(1))),
272 pool: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
273 max_idle_per_key: DEFAULT_MAX_IDLE_PER_KEY,
274 approval_decider: None,
275 escalation_bridge: Arc::new(std::sync::Mutex::new(None)),
276 remote_placements: HashMap::new(),
277 schedulable_placements: HashMap::new(),
278 schedule_cursor: Arc::new(std::sync::Mutex::new(HashMap::new())),
279 }
280 }
281
282 /// Run children over the mailbox bus (the unified actor+mailbox transport).
283 /// When set, local children dial this bus and are driven by mailbox id; when
284 /// unset they use the legacy direct-WS path. The server passes its in-process
285 /// broker here (`subagents.broker`); tests without a broker leave it unset.
286 pub fn with_bus(mut self, bus: Option<bamboo_subagent::BusEndpoint>) -> Self {
287 self.bus = bus.filter(|b| !b.endpoint.trim().is_empty());
288 self
289 }
290
291 /// Wire the host-side decider for child gated-tool approval requests
292 /// (Phase 2). Without this the host fail-closed DENYs every request.
293 pub fn with_approval_decider(mut self, decider: Arc<dyn ChildApprovalDecider>) -> Self {
294 self.approval_decider = Some(decider);
295 self
296 }
297
298 /// Pin specific sub-agent roles to remote resident workers (#193). The map
299 /// is keyed by role (`subagent_type`); a child whose role is present connects
300 /// over `wss://` to the resolved endpoint instead of spawning a local
301 /// subprocess. Default (empty) keeps every role on the local path — exactly
302 /// today's behavior.
303 pub fn with_remote_placements(
304 mut self,
305 placements: HashMap<String, ResolvedRemotePlacement>,
306 ) -> Self {
307 self.remote_placements = placements;
308 self
309 }
310
311 /// Route specific sub-agent roles to a registry-SCHEDULED worker (#181, P2b).
312 /// The map is keyed by role (`subagent_type`); a child whose role is present
313 /// (and NOT already pinned by `remote_placements`, which takes precedence) is
314 /// run on a live worker discovered from the registry instead of a local
315 /// subprocess. Default (empty) keeps every role on the local path.
316 pub fn with_schedulable_placements(
317 mut self,
318 placements: HashMap<String, ResolvedSchedulablePlacement>,
319 ) -> Self {
320 self.schedulable_placements = placements;
321 self
322 }
323
324 /// Reuse fingerprint: two children are interchangeable on one warm worker iff
325 /// they share role, provider, model, workspace, disabled-tool set, AND every
326 /// capability the worker BAKES at provision time (`BambooRuntimeExecutor`
327 /// stamps these once and reuses them across runs): nesting depth, nested-spawn
328 /// stack, bypass mode, permission enforcement, and the depth cap. Omitting any
329 /// of these lets the pool hand a run a worker baked for a DIFFERENT posture —
330 /// e.g. a depth-1 worker (with its own spawn stack) reused for a depth-4
331 /// child would re-stamp `spawn_depth=1` and pass the depth-cap check, breaking
332 /// the recursion bound; or a bypass worker reused for a non-bypass child. So
333 /// these MUST split the pool bucket. Everything else (assignment, history) is
334 /// shipped per-run in the `RunSpec` and does not affect the fingerprint.
335 /// Reuse fingerprint (role/provider/model/workspace/disabled-tools/baked
336 /// caps): two children with the same fingerprint are interchangeable on one
337 /// warm worker, so they share a pool bucket. Any axis the worker bakes ONCE
338 /// at provision time MUST be in here, else a worker baked for one posture
339 /// gets reused for another (see the `fingerprint_*` tests).
340 fn fingerprint(spec: &ProvisionSpec) -> String {
341 let role = spec.identity.role.as_str();
342 let (provider, model) = spec
343 .model
344 .as_ref()
345 .map(|m| (m.provider.as_str(), m.model.as_str()))
346 .unwrap_or(("", ""));
347 let workspace = spec.workspace.as_deref().unwrap_or("");
348 let mut tools = spec.disabled_tools.clone().unwrap_or_default();
349 tools.sort();
350 let caps = &spec.capabilities;
351 format!(
352 "{role}\u{1}{provider}\u{1}{model}\u{1}{workspace}\u{1}{}\u{1}d={}\u{1}ns={}\u{1}by={}\u{1}ep={}\u{1}md={}\u{1}nha={}\u{1}gro={}",
353 tools.join(","),
354 spec.identity.depth,
355 caps.nested_spawn,
356 caps.bypass,
357 caps.enforce_permissions,
358 caps.max_spawn_depth.unwrap_or(0),
359 // #73 review (P1): a worker bakes `no_human_review` ONCE from this flag
360 // at build() and never re-reads it per run, so the pool MUST NOT hand a
361 // worker baked for one approval posture to a run of the opposite one —
362 // else a scheduled-root worker reused for an interactive child would
363 // silently model-review instead of asking the human (and vice-versa,
364 // reintroducing the 300s-deny). Split the bucket on it.
365 caps.no_human_approver,
366 // #71: the read-only Bash checker is baked once at build() from this
367 // flag, so a guardian-reviewer worker must NOT be reused for an
368 // ordinary child (which expects unrestricted Bash), and vice-versa.
369 caps.guardian_read_only,
370 )
371 }
372
373 /// Check out a warm bus worker for `key`, reusing a live parked one if any,
374 /// else spawning a fresh one that dials the bus. The returned worker is OWNED
375 /// by the caller for the run's duration (checkout removes it from the pool, so
376 /// a concurrent sibling gets a different worker or spawns its own — one run per
377 /// worker at a time, matching the pre-bus pool semantics).
378 async fn acquire_bus_worker(
379 &self,
380 key: &str,
381 spec: &ProvisionSpec,
382 ) -> crate::runtime::runner::Result<PooledWorker> {
383 // Drain the bucket, skipping (and reaping) any worker whose process exited
384 // while parked. A live one is handed straight out for reuse.
385 loop {
386 let candidate = {
387 let mut pool = self.pool.lock().await;
388 pool.get_mut(key).and_then(|bucket| bucket.pop())
389 };
390 let Some(mut candidate) = candidate else { break };
391 if candidate.worker.is_alive() {
392 return Ok(candidate);
393 }
394 candidate.worker.kill().await;
395 }
396
397 let spawned = spawn_worker_on_bus(&self.worker_bin, &self.worker_args, spec)
398 .await
399 .map_err(|e| AgentError::LLM(format!("actor spawn (bus) failed: {e}")))?;
400 let mailbox_id = spawned.record.agent_id.clone();
401 Ok(PooledWorker {
402 worker: spawned,
403 mailbox_id,
404 })
405 }
406
407 /// Park a warm bus worker for reuse after a clean run; if its bucket is full
408 /// (or it died), kill it instead. The worker stays dialed-in + subscribed
409 /// while parked, so a reusing child just delivers a new `Run` to its mailbox.
410 async fn release_bus_worker(&self, key: &str, mut worker: PooledWorker) {
411 if !worker.worker.is_alive() {
412 worker.worker.kill().await;
413 return;
414 }
415 let mut pool = self.pool.lock().await;
416 let bucket = pool.entry(key.to_string()).or_default();
417 if bucket.len() >= self.max_idle_per_key {
418 drop(pool);
419 worker.worker.kill().await;
420 return;
421 }
422 bucket.push(worker);
423 }
424
425 /// Assemble the parent-resolved provisioning document for this child.
426 fn build_spec(&self, session: &Session, job: &SpawnJob) -> ProvisionSpec {
427 let mut spec = ProvisionSpec::new(
428 ChildIdentity {
429 child_id: job.child_session_id.clone(),
430 parent_id: Some(job.parent_session_id.clone()),
431 project_key: None,
432 role: session
433 .metadata
434 .get("subagent_type")
435 .cloned()
436 .unwrap_or_else(|| "worker".to_string()),
437 // The child session already carries the correct depth
438 // (create_child_action's new_child_of did parent.spawn_depth+1);
439 // stamp it so the worker can re-establish it on its run session
440 // and enforce the max-depth cap across the actor boundary.
441 depth: session.spawn_depth,
442 },
443 self.executor.clone(),
444 self.fabric_dir.to_string_lossy().into_owned(),
445 );
446 spec.workspace = session.workspace.clone();
447 // Unified transport: when a bus is configured, the child dials it (no
448 // listen socket / file discovery) and the parent drives it by mailbox id.
449 spec.bus = self.bus.clone();
450 // Final model: the session's pinned model_ref (create.model / routing already applied),
451 // falling back to the job's bare model on the parent's default provider.
452 spec.model = session
453 .model_ref
454 .as_ref()
455 .map(|r| ModelRefSpec {
456 provider: r.provider.clone(),
457 model: r.model.clone(),
458 })
459 .or_else(|| {
460 let m = job.model.trim();
461 (!m.is_empty()).then(|| ModelRefSpec {
462 provider: self.default_provider.clone(),
463 model: m.to_string(),
464 })
465 });
466 spec.disabled_tools = job.disabled_tools.clone();
467 // Least-privilege secrets: only the credential for the child's provider.
468 let provider = spec
469 .model
470 .as_ref()
471 .map(|m| m.provider.as_str())
472 .filter(|p| !p.trim().is_empty())
473 .unwrap_or(&self.default_provider);
474 if let Some(cred) = self.credentials.iter().find(|c| c.provider == provider) {
475 spec.secrets.provider_credentials.push(cred.clone());
476 } else {
477 tracing::warn!(
478 "actor child {}: no credential found for provider '{}'",
479 job.child_session_id,
480 provider
481 );
482 }
483 // Phase 6 (direct nested execution): a worker BELOW the depth cap may
484 // orchestrate its OWN children — on startup it builds its own spawn
485 // stack and runs the real SubAgent tool (no host proxy). The cap (the
486 // SubAgent tool refuses to spawn at/over `max_spawn_depth`) bounds the
487 // recursion. Driven purely by the child's depth, so it auto-propagates
488 // down the tree without any extra config threading.
489 spec.capabilities.nested_spawn = session.spawn_depth < MAX_SPAWN_DEPTH;
490 spec.capabilities.max_spawn_depth = Some(MAX_SPAWN_DEPTH);
491 // #69: activate child-approval review. Sub-agents enforce permissions so
492 // their DANGEROUS actions (the worker uses a HIGH threshold) reach the
493 // parent for review — escalated to the human, or model-reviewed off-loop
494 // when the parent is in bypass. The worker installs no checker without
495 // this, so the whole review chain would otherwise stay dormant.
496 spec.capabilities.enforce_permissions = true;
497 // Propagate "bypass permissions" so a self-orchestrating worker knows it
498 // is a bypassed parent and installs the off-loop model-reviewer for its
499 // children's forced-ask actions (Phase 6, Part B). The child session
500 // already carries the inherited flag (create_child_action seeds it).
501 spec.capabilities.bypass = session
502 .agent_runtime_state
503 .as_ref()
504 .is_some_and(|s| s.bypass_permissions);
505 // #73: propagate "no interactive human approver" (headless / scheduled /
506 // deployed root, inherited by the child session). When set, the worker's
507 // per-run approval proxy model-reviews a gated action locally instead of
508 // escalating to a human who will never answer (which would 300s-deny).
509 spec.capabilities.no_human_approver = session
510 .agent_runtime_state
511 .as_ref()
512 .is_some_and(|s| s.no_human_approver);
513 // #71: mark a READ-ONLY Guardian reviewer so the worker installs the
514 // read-only Bash allowlist checker. The reviewer is spawned by
515 // `spawn_guardian_review` with `subagent_type == "guardian"` (the SAME
516 // marker the completion coordinator branches on to parse the verdict) AND
517 // the `guardian_read_only_disabled_tools` denylist. Keyed off that role
518 // marker (already read above to set `identity.role`), so it rides the same
519 // session-metadata path the denylist/subagent_type use — no new wire seam.
520 // Without this the worker keeps an UNRESTRICTED Bash, so the reviewer could
521 // still `rm -rf` / `git push` / `curl | sh`, defeating "read-only".
522 spec.capabilities.guardian_read_only =
523 session.metadata.get("subagent_type").map(String::as_str) == Some("guardian");
524 // #193: route this role to a REMOTE resident worker when one is pinned.
525 // `spec.identity.role` was just computed from `subagent_type` above; a
526 // match flips the placement to Remote and rides the worker's bearer on the
527 // scoped secrets envelope (TLS handshake / Authorization header only — the
528 // token is never logged). No match leaves the default `Placement::Local`,
529 // so the local path is byte-for-byte unchanged for every non-pinned role.
530 if let Some(placement) = self.remote_placements.get(spec.identity.role.as_str()) {
531 spec.placement = Placement::Remote {
532 endpoint: placement.endpoint.clone(),
533 };
534 spec.secrets.worker_auth_token = placement.token.clone();
535 } else if let Some(placement) = self.schedulable_placements.get(spec.identity.role.as_str())
536 {
537 // #181 (P2b): route this role to a SCHEDULED worker — ONLY when it is
538 // NOT already pinned to a fixed remote endpoint (the `else if` makes
539 // remote_placements take precedence for a role in both). The concrete
540 // worker is picked at run time in `execute_external_child` from the bus
541 // (a live connected worker of the pool role). No per-placement bearer
542 // now — the bus connection uses the bus token. No match in either map
543 // leaves the default `Placement::Local`.
544 spec.placement = Placement::Schedulable {
545 pool: placement.pool.clone(),
546 };
547 }
548 spec
549 }
550
551 /// The `metadata["placement"]` JSON to stamp on a child from its resolved
552 /// placement, preferring the matching cluster node's `host_label` (its
553 /// operator label/host) over the raw endpoint/pool. `None` for a Local child
554 /// (the DTO defaults it to the backend's own host). Split out of
555 /// `execute_external_child` so the role→placement→host resolution is unit-testable.
556 fn placement_stamp_for(&self, spec: &ProvisionSpec) -> Option<String> {
557 let host_label = match &spec.placement {
558 Placement::Remote { .. } => self
559 .remote_placements
560 .get(spec.identity.role.as_str())
561 .and_then(|p| p.host_label.as_deref()),
562 Placement::Schedulable { .. } => self
563 .schedulable_placements
564 .get(spec.identity.role.as_str())
565 .and_then(|p| p.host_label.as_deref()),
566 Placement::Local => None,
567 };
568 placement_metadata(&spec.placement, host_label)
569 }
570
571 /// Pick a live worker for a SCHEDULABLE role from the BUS (#181, Phase 3):
572 /// ask the broker which actors are connected serving the pool role (presence
573 /// is connection-truth — no HTTP registry, no leases, no connect-fail
574 /// failover), then round-robin one per resolve for spread. Returns the chosen
575 /// worker's mailbox id. An empty pool ⇒ a terminal `AgentError` — NEVER a
576 /// local-subprocess fallback (that would silently defeat the placement).
577 async fn resolve_schedulable_worker(
578 &self,
579 role: &str,
580 ) -> std::result::Result<String, AgentError> {
581 let pool = self
582 .schedulable_placements
583 .get(role)
584 .ok_or_else(|| {
585 AgentError::LLM(format!(
586 "schedulable placement for role '{role}' vanished before scheduling"
587 ))
588 })?
589 .pool
590 .clone();
591 let bus = self.bus.as_ref().ok_or_else(|| {
592 AgentError::LLM(format!(
593 "schedulable role '{role}': no mailbox bus configured (subagents.broker)"
594 ))
595 })?;
596
597 // Ask the BUS who is connected serving the pool role — presence is
598 // connection-truth (no HTTP registry, no leases, no stale-record failover).
599 let mut q = bamboo_broker::BrokerClient::connect(
600 &bus.endpoint,
601 bamboo_subagent::AgentRef {
602 session_id: format!("sched-q-{role}"),
603 role: None,
604 },
605 &bus.token,
606 )
607 .await
608 .map_err(|e| AgentError::LLM(format!("schedulable role '{role}': bus connect failed: {e}")))?;
609 let candidates = q.list_connected(&pool).await.map_err(|e| {
610 AgentError::LLM(format!("schedulable role '{role}': bus presence query failed: {e}"))
611 })?;
612
613 if candidates.is_empty() {
614 return Err(AgentError::LLM(format!(
615 "schedulable role '{role}': no live worker in pool '{pool}' on the bus \
616 (NOT spawning a local subprocess — a schedulable role has no local fallback)"
617 )));
618 }
619
620 // Round-robin: advance a per-pool cursor once per resolve so successive
621 // sibling spawns spread across the connected pool workers. No failover
622 // needed — a listed worker is connected NOW (the bus only lists live
623 // subscribers), so there is no stale-but-leased candidate to skip.
624 let idx = {
625 let mut cursors = self.schedule_cursor.lock().unwrap();
626 let cursor = cursors.entry(pool.clone()).or_insert(0);
627 let i = *cursor % candidates.len();
628 *cursor = cursor.wrapping_add(1);
629 i
630 };
631 Ok(candidates[idx].clone())
632 }
633}
634
635#[async_trait]
636impl ExternalChildRunner for ActorChildRunner {
637 async fn should_handle(&self, session: &Session) -> bool {
638 session.metadata.get("runtime.kind") == Some(&"external".to_string())
639 && session.metadata.get("external.protocol") == Some(&"actor".to_string())
640 && session.metadata.get("external.agent_id") == Some(&self.agent_id)
641 }
642
643 fn set_escalation_bridge(&self, bridge: Option<bamboo_subagent::executor::HostBridge>) {
644 *self.escalation_bridge.lock().unwrap() = bridge;
645 }
646
647 async fn execute_external_child(
648 &self,
649 session: &mut Session,
650 job: &SpawnJob,
651 event_tx: mpsc::Sender<AgentEvent>,
652 cancel_token: CancellationToken,
653 ) -> crate::runtime::runner::Result<()> {
654 // #68 CORRECTNESS CRUX: capture the per-run escalation bridge HERE, at the
655 // moment this grandchild is spawned — while the parent run's bridge is
656 // still in our slot — into an owned local handed to `drive()` for this
657 // grandchild's whole lifetime. A fire-and-forget grandchild that OUTLIVES
658 // the run that spawned it must NOT re-read `self.escalation_bridge` at
659 // approval time: by then `run()` may have cleared/overwritten it (a worker
660 // serves runs sequentially), and re-proxying through a closed bridge
661 // fail-closed denies. Capturing at spawn pins the right bridge per run.
662 let escalation = self.escalation_bridge.lock().unwrap().clone();
663 let assignment = extract_assignment(session);
664 let mut spec = self.build_spec(session, job);
665 // Mark the worker reusable + give it an idle timeout so it self-reaps if
666 // orphaned. Warm bus workers are pooled per fingerprint and reused.
667 spec.reusable = true;
668 if spec.limits.idle_timeout_secs.is_none() {
669 spec.limits.idle_timeout_secs = Some(POOLED_IDLE_TIMEOUT_SECS);
670 }
671 let pool_key = Self::fingerprint(&spec);
672 // Rehydration: the child session in the parent's store is the actor's
673 // durable state. Ship the full conversation so a reactivation
674 // (send_message / update / rerun) carries its history. A reused worker is
675 // stateless between runs, so this is also what isolates each child's
676 // context on a shared process.
677 let messages: Vec<serde_json::Value> = session
678 .messages
679 .iter()
680 .filter_map(|m| serde_json::to_value(m).ok())
681 .collect();
682
683 // Backpressure: hold a concurrency slot for the lifetime of the *run*
684 // (cancellation still proceeds — the cancel branch in drive() runs while
685 // we hold the permit). Released when this fn returns, i.e. once the worker
686 // is parked back into the pool, so idle workers don't pin slots.
687 let _slot = self
688 .concurrency
689 .acquire()
690 .await
691 .map_err(|_| AgentError::LLM("actor concurrency limiter closed".to_string()))?;
692
693 // Split LOCAL (spawn + warm-pool) from the two process-less remote paths
694 // ONLY at the divergent spots — acquire/connect here and the park/retire at
695 // the end. Everything between (Run dispatch, live-actor registration,
696 // drive, the close) is identical for all three. `kind` is the single guard.
697 // - Local (#0): byte-for-byte the pre-#193 reuse-or-spawn path.
698 // - Remote (#194): connect to a FIXED resident endpoint, no spawn.
699 // - Schedulable (#181): resolve a live worker from the registry, connect.
700 let kind = match spec.placement {
701 Placement::Remote { .. } => PlacementKind::Remote,
702 Placement::Schedulable { .. } => PlacementKind::Schedulable,
703 Placement::Local => PlacementKind::Local,
704 };
705 let remote = !matches!(kind, PlacementKind::Local);
706
707 // Stamp WHICH machine this child runs on onto its session metadata, so the
708 // UI can show it (mirrored into the session index → SessionSummary.placement).
709 // Only remote/scheduled placements need a stamp — a Local child falls through
710 // to the DTO default (this backend's own host). Persisted by the caller with
711 // the rest of the child session after we return.
712 if let Some(placement_meta) = self.placement_stamp_for(&spec) {
713 session
714 .metadata
715 .insert("placement".to_string(), placement_meta);
716 }
717
718 // Retry-once loop: a pooled local worker can die between its liveness
719 // check and handling the Run (a tiny TOCTOU window) — its Run then sits
720 // queued with no server. The first-frame watchdog in `drive` surfaces that
721 // as `WorkerUnresponsive`; we reap the dead worker and re-acquire ONCE
722 // (which spawns fresh / reuses the next live one). Remote/schedulable have
723 // no spawn fallback, so they never retry.
724 let mut attempt = 0u8;
725 let (result, actor) = loop {
726 let (actor, mut client) = match kind {
727 PlacementKind::Remote => {
728 // REMOTE branch: connect to a resident worker. No spawn, no pool
729 // touch, no drain. We do not own the worker, so a connect failure
730 // has NO respawn fallback — it is a clear, terminal error.
731 let placement = self
732 .remote_placements
733 .get(spec.identity.role.as_str())
734 .ok_or_else(|| {
735 AgentError::LLM(format!(
736 "remote placement for role '{}' vanished before connect",
737 spec.identity.role
738 ))
739 })?;
740 let endpoint = placement.endpoint.clone();
741 // Build the TLS trust: a pinned CA pins a self-signed worker cert;
742 // otherwise default webpki roots (or plaintext for `ws://`).
743 let trust_cfg = match placement.ca_cert_file.as_deref() {
744 Some(path) => Some(client_config_trusting_cert(path).map_err(|e| {
745 AgentError::LLM(format!("remote worker CA cert '{}': {e}", path.display()))
746 })?),
747 None => None,
748 };
749 let client = ChildClient::connect_with_auth_tls(
750 &endpoint,
751 placement.token.as_deref(),
752 trust_cfg,
753 )
754 .await
755 .map_err(|e| {
756 AgentError::LLM(format!("remote actor connect to '{endpoint}' failed: {e}"))
757 })?;
758 // Process-less handle so live-actor registration (in-band steering)
759 // works exactly as for a local worker; `kill()` is a no-op.
760 let record = AgentRecord {
761 agent_id: job.child_session_id.clone(),
762 role: spec.identity.role.clone(),
763 labels: Vec::new(),
764 endpoint: endpoint.clone(),
765 pid: 0,
766 version: String::new(),
767 started_at: chrono::Utc::now(),
768 lease_expires_at: chrono::Utc::now(),
769 };
770 let _ = endpoint;
771 let actor = PooledWorker {
772 worker: SpawnedChild::remote(record),
773 mailbox_id: job.child_session_id.clone(),
774 };
775 let client: Box<dyn bamboo_subagent::ChildLink> = Box::new(client);
776 (actor, client)
777 }
778 PlacementKind::Schedulable => {
779 // SCHEDULABLE branch (#181): pick a LIVE worker of the pool role
780 // from the BUS (presence = connection-truth; no HTTP registry, no
781 // leases, no failover) and drive it by mailbox id. The pool worker
782 // stays connected and is reused next time. No spawn, no kill, NO
783 // local fallback — an empty pool is a terminal error (raised in
784 // resolve_schedulable_worker).
785 let bus = self.bus.as_ref().ok_or_else(|| {
786 AgentError::LLM(
787 "schedulable sub-agents require a mailbox bus (subagents.broker)"
788 .to_string(),
789 )
790 })?;
791 let mailbox_id = self
792 .resolve_schedulable_worker(spec.identity.role.as_str())
793 .await?;
794 let parent = bamboo_subagent::AgentRef {
795 session_id: format!("p-{}", job.child_session_id),
796 role: None,
797 };
798 let link = bamboo_broker::BrokerChildLink::connect(
799 &bus.endpoint,
800 parent,
801 &bus.token,
802 mailbox_id.clone(),
803 )
804 .await
805 .map_err(|e| {
806 AgentError::LLM(format!("schedulable link connect to '{mailbox_id}' failed: {e}"))
807 })?;
808 // Process-less handle — a bus-resident pool worker is never ours to
809 // kill (remote ⇒ dropped, not pooled, after the run).
810 let actor = PooledWorker {
811 worker: SpawnedChild::remote(AgentRecord {
812 agent_id: mailbox_id.clone(),
813 role: spec.identity.role.clone(),
814 labels: Vec::new(),
815 endpoint: bus.endpoint.clone(),
816 pid: 0,
817 version: String::new(),
818 started_at: chrono::Utc::now(),
819 lease_expires_at: chrono::Utc::now(),
820 }),
821 mailbox_id,
822 };
823 let client: Box<dyn bamboo_subagent::ChildLink> = Box::new(link);
824 (actor, client)
825 }
826 PlacementKind::Local => {
827 // LOCAL = the mailbox bus (the unified transport): check out a warm
828 // pooled worker (reuse a live parked one, else spawn fresh) and
829 // drive it by mailbox id — no listen socket, no file discovery, no
830 // respawn-on-connect-miss (the broker queues the Run until the
831 // worker handles it). The legacy direct-WS path was retired; the bus
832 // is required.
833 let bus = self.bus.as_ref().ok_or_else(|| {
834 AgentError::LLM(
835 "local sub-agents require a mailbox bus (subagents.broker); none is \
836 configured and the bus could not be embedded"
837 .to_string(),
838 )
839 })?;
840 let actor = self.acquire_bus_worker(&pool_key, &spec).await?;
841 let parent = bamboo_subagent::AgentRef {
842 session_id: format!("p-{}", job.child_session_id),
843 role: None,
844 };
845 let link = bamboo_broker::BrokerChildLink::connect(
846 &bus.endpoint,
847 parent,
848 &bus.token,
849 actor.mailbox_id.clone(),
850 )
851 .await
852 .map_err(|e| AgentError::LLM(format!("broker child link connect failed: {e}")))?;
853 let client: Box<dyn bamboo_subagent::ChildLink> = Box::new(link);
854 (actor, client)
855 }
856 };
857
858 if let Err(e) = client
859 .send(ParentFrame::Run(RunSpec {
860 // Cloned (not moved) so a retry can re-dispatch to a fresh worker.
861 assignment: assignment.clone(),
862 reasoning_effort: None,
863 messages: messages.clone(),
864 }))
865 .await
866 {
867 if !remote {
868 actor.worker.kill().await;
869 }
870 return Err(AgentError::LLM(format!("actor run dispatch failed: {e}")));
871 }
872
873 // Register as a live actor so send_message (running, no interrupt) can
874 // steer this child in-band over the existing WS connection. The guard
875 // unregisters on every exit path.
876 let (live_tx, mut live_rx) = mpsc::unbounded_channel::<ParentFrame>();
877 let live_guard = super::live::register(&job.child_session_id, live_tx);
878
879 let result = drive(
880 &mut *client,
881 &job.child_session_id,
882 self.approval_decider.as_ref(),
883 escalation.clone(),
884 &event_tx,
885 &cancel_token,
886 &mut live_rx,
887 // First-frame watchdog for EVERY placement: a wedged-but-connected
888 // worker (subscribed ≠ serving — e.g. stuck on a prior LLM call) emits
889 // no first frame; without a deadline drive() blocks forever. Bounding it
890 // turns the "running-but-unresponsive" hang into a recoverable
891 // WorkerUnresponsive (reap+respawn local / re-pick schedulable / error
892 // on a fixed remote endpoint).
893 Some(WORKER_FIRST_FRAME_TIMEOUT),
894 )
895 .await;
896 // Unregister IMMEDIATELY: after drive returns nobody consumes live_rx,
897 // so a send_message landing in the close/park window below must see
898 // "not live" and take the durable-queue fallback instead of vanishing.
899 // (Even if one slipped in earlier, send_message also appends it to the
900 // durable transcript, so the next activation still rehydrates it.)
901 drop(live_guard);
902 // Close the parent link (dropping it closes our broker connection; the
903 // worker stays dialed-in + subscribed, ready for its next Run).
904 drop(client);
905
906 // No first frame ⇒ the worker is wedged. Recover ONCE before giving up:
907 // - Local: reap the dead pooled worker + respawn.
908 // - Schedulable: not ours to kill — drop it and re-select a live pool
909 // member (a wedged worker must not fail the run when the pool has others).
910 // - Remote: a FIXED endpoint has no alternative — fall through to a bounded
911 // WorkerUnresponsive error (far better than the previous infinite hang).
912 if attempt == 0 && matches!(result, Err(AgentError::WorkerUnresponsive(_))) {
913 match kind {
914 PlacementKind::Local => {
915 tracing::warn!(
916 "actor child {} got no first frame; reaping the worker and respawning once",
917 job.child_session_id
918 );
919 actor.worker.kill().await;
920 attempt += 1;
921 continue;
922 }
923 PlacementKind::Schedulable => {
924 tracing::warn!(
925 "scheduled actor child {} got no first frame; re-selecting a pool worker",
926 job.child_session_id
927 );
928 drop(actor);
929 attempt += 1;
930 continue;
931 }
932 PlacementKind::Remote => {}
933 }
934 }
935 break (result, actor);
936 };
937
938 // Park the warm worker for reuse on a clean run, or kill it on
939 // error/cancel (a wedged worker must not be reused). Remote / schedulable
940 // workers are registry-managed — never ours to pool/kill, just drop.
941 if remote {
942 drop(actor);
943 } else {
944 match &result {
945 Ok(_) => self.release_bus_worker(&pool_key, actor).await,
946 Err(_) => actor.worker.kill().await,
947 }
948 }
949
950 // Write-back: persist the actor's final reply onto the child session so
951 // the transcript survives and the NEXT activation sees it as history.
952 // (run_child_spawn saves the session right after we return.)
953 match result {
954 Ok(Some(text)) => {
955 if !text.is_empty() {
956 session.add_message(bamboo_agent_core::Message::assistant(text, None));
957 }
958 Ok(())
959 }
960 Ok(None) => Ok(()),
961 Err(e) => Err(e),
962 }
963 }
964}
965
966/// The `{kind,host}` placement descriptor stamped onto a child session's metadata
967/// under `"placement"` — read back by the storage index → `SessionSummary.placement`
968/// → the UI's machine badge. `None` for `Local` (those fall through to the DTO's
969/// default of this backend's own host). The value is a JSON string matching
970/// `bamboo_storage::SessionPlacement { kind, host }`.
971fn placement_metadata(placement: &Placement, host_label: Option<&str>) -> Option<String> {
972 // Prefer the cluster node's own label/host (its metadata) when the placement
973 // maps to a node; else fall back to the raw endpoint host / pool name.
974 let value = match placement {
975 Placement::Local => return None,
976 Placement::Remote { endpoint } => serde_json::json!({
977 "kind": "remote",
978 "host": host_label.map(str::to_string).unwrap_or_else(|| host_of_endpoint(endpoint)),
979 }),
980 Placement::Schedulable { pool } => serde_json::json!({
981 "kind": "remote",
982 "host": host_label.unwrap_or(pool),
983 }),
984 };
985 serde_json::to_string(&value).ok()
986}
987
988/// Extract the host from a `ws[s]://host:port[/path]` bus endpoint, for display.
989fn host_of_endpoint(endpoint: &str) -> String {
990 endpoint
991 .trim()
992 .trim_start_matches("wss://")
993 .trim_start_matches("ws://")
994 .split(['/', ':'])
995 .next()
996 .unwrap_or(endpoint)
997 .to_string()
998}
999
1000/// Pump child frames -> parent events until a terminal frame (or cancellation).
1001/// On success, yields the actor's final result text (for session write-back).
1002/// `live_rx` carries in-band frames (steering messages) from the live registry.
1003///
1004/// `escalation_bridge` (#68) is the per-run escalation host bridge CAPTURED BY
1005/// VALUE at spawn time in `execute_external_child` (NOT read live here): when a
1006/// non-bypass child re-proxies an approval request, this owned bridge routes it
1007/// UP to the parent run. Owning it for the call's lifetime is what lets a
1008/// fire-and-forget grandchild that outlives its spawning run still escalate to
1009/// the correct (then-current) parent bridge rather than a stale/overwritten one.
1010async fn drive(
1011 client: &mut dyn bamboo_subagent::ChildLink,
1012 child_session_id: &str,
1013 approval_decider: Option<&Arc<dyn ChildApprovalDecider>>,
1014 escalation_bridge: Option<bamboo_subagent::executor::HostBridge>,
1015 event_tx: &mpsc::Sender<AgentEvent>,
1016 cancel_token: &CancellationToken,
1017 live_rx: &mut mpsc::UnboundedReceiver<ParentFrame>,
1018 first_frame_timeout: Option<Duration>,
1019) -> crate::runtime::runner::Result<Option<String>> {
1020 // First-frame watchdog: a live worker emits its first frame (run-started /
1021 // first token) within seconds; total silence past the deadline means the
1022 // worker is dead (e.g. a pooled worker that exited right after checkout), so
1023 // its Run sits queued forever. We trip ONLY before the first frame — once any
1024 // frame arrives the worker is proven live and a legitimately long run (a slow
1025 // tool between tokens) never trips it.
1026 let mut got_first_frame = false;
1027 let mut first_frame_watch = first_frame_timeout.map(|d| Box::pin(tokio::time::sleep(d)));
1028 loop {
1029 tokio::select! {
1030 _ = cancel_token.cancelled() => {
1031 // fall through to the cancel handling below
1032 break;
1033 }
1034 _ = async {
1035 match first_frame_watch.as_mut() {
1036 Some(s) => s.as_mut().await,
1037 None => std::future::pending::<()>().await,
1038 }
1039 }, if !got_first_frame => {
1040 return Err(AgentError::WorkerUnresponsive(format!(
1041 "child {child_session_id} produced no frame within {:?}",
1042 first_frame_timeout.unwrap_or_default()
1043 )));
1044 }
1045 Some(frame) = live_rx.recv() => {
1046 // Forward in-band steering to the worker over the existing WS.
1047 if client.send(frame).await.is_err() {
1048 tracing::warn!("live steering frame could not be sent; connection failing");
1049 }
1050 }
1051 frame = client.next_frame() => {
1052 // Any frame (event / approval / terminal / close / error) proves
1053 // the worker responded — disarm the first-frame watchdog.
1054 got_first_frame = true;
1055 first_frame_watch = None;
1056 match frame {
1057 Ok(Some(ChildFrame::Event { event })) => {
1058 // AgentEvent is serialized verbatim on the wire (zero mapping).
1059 if let Ok(ev) = serde_json::from_value::<AgentEvent>(event) {
1060 let _ = event_tx.send(ev).await;
1061 }
1062 }
1063 Ok(Some(ChildFrame::ApprovalRequest { id, body })) => {
1064 // Phase 2: a worker proxied a gated-tool approval back to
1065 // the host. The WORKER side is live — its executor installs
1066 // a per-run task-local `ApprovalProxy` (subagent_worker.rs)
1067 // that calls `host.approval_call`, so this frame arrives
1068 // when a child hits `ConfirmationRequired`.
1069 if let Some(reviewer) = child_approval_reviewer() {
1070 // Phase 6, Part B: a BYPASSED parent worker
1071 // model-reviews its children's forced-ask (dangerous)
1072 // actions. The review is an LLM call, so run it OFF
1073 // the frame pump in a spawned task and deliver the
1074 // verdict async via the live channel — the pump keeps
1075 // forwarding events and the agent loop never blocks. A
1076 // timeout denies a hung review so the child can't hang.
1077 let child = child_session_id.to_string();
1078 let req_id = id.clone();
1079 let body = body.clone();
1080 tokio::spawn(async move {
1081 let approved = tokio::time::timeout(
1082 CHILD_APPROVAL_TIMEOUT,
1083 reviewer.review(&child, &body),
1084 )
1085 .await
1086 .unwrap_or(false);
1087 super::live::deliver_approval(&child, &req_id, approved);
1088 });
1089 } else if approval_decider.is_some() {
1090 // A decider is wired (policy / auto): decide promptly
1091 // and reply inline. (Must not block the pump — see the
1092 // `ChildApprovalDecider` doc.)
1093 let approved =
1094 decide_child_approval(approval_decider, child_session_id, &body)
1095 .await;
1096 if client
1097 .send(ParentFrame::ApprovalReply { id, approved })
1098 .await
1099 .is_err()
1100 {
1101 tracing::warn!(
1102 "failed to answer approval_request; connection failing"
1103 );
1104 }
1105 } else if let Some(host) = escalation_bridge.clone() {
1106 // Non-bypass WORKER: ESCALATE up our own actor link
1107 // (re-proxy) so the request chains to our parent — and
1108 // up every level until a bypass level (model-review) or
1109 // the top orchestrator (human) decides. Off-loop so the
1110 // pump never blocks; relay the reply down to the child.
1111 let child = child_session_id.to_string();
1112 let req_id = id.clone();
1113 let body = body.clone();
1114 tokio::spawn(async move {
1115 let approved = match tokio::time::timeout(
1116 CHILD_APPROVAL_TIMEOUT,
1117 host.approval_call(body),
1118 )
1119 .await
1120 {
1121 Ok(Ok(reply)) => reply
1122 .get("approved")
1123 .and_then(|v| v.as_bool())
1124 .unwrap_or(false),
1125 // Transport error or timeout ⇒ fail closed.
1126 _ => false,
1127 };
1128 super::live::deliver_approval(&child, &req_id, approved);
1129 });
1130 } else {
1131 // Top orchestrator (no escalation bridge): human-in-the-
1132 // loop. Surface the request on the parent's event stream
1133 // and DEFER — the decision arrives out-of-band via
1134 // `live::deliver_approval(child, request_id, approved)`
1135 // (→ this child's `live_rx` → forwarded to the worker
1136 // above). A timeout denies a never-answered request so
1137 // it can't hang the child forever.
1138 let (tool_name, permission, resource) =
1139 approval_request_fields(&body);
1140 // Register the pending request BEFORE surfacing it so
1141 // the external handler's `deliver_approval_checked` can
1142 // correlate an out-of-band POST against a genuine
1143 // human-loop request (and consume it one-shot).
1144 super::live::register_pending_approval(child_session_id, &id);
1145 let _ = event_tx
1146 .send(AgentEvent::ChildApprovalRequested {
1147 child_session_id: child_session_id.to_string(),
1148 request_id: id.clone(),
1149 tool_name,
1150 permission,
1151 resource,
1152 })
1153 .await;
1154 let child = child_session_id.to_string();
1155 tokio::spawn(async move {
1156 tokio::time::sleep(CHILD_APPROVAL_TIMEOUT).await;
1157 // Deny only if still pending: a one-shot consume so
1158 // we don't double-deliver if the human already
1159 // answered (the POST took it), and so a late POST
1160 // after this fires finds nothing pending.
1161 if super::live::take_pending_approval(&child, &id) {
1162 super::live::deliver_approval(&child, &id, false);
1163 }
1164 });
1165 }
1166 }
1167 Ok(Some(ChildFrame::Terminal { status, result, error, .. })) => {
1168 return match status {
1169 TerminalStatus::Completed => Ok(result),
1170 TerminalStatus::Cancelled => Err(AgentError::Cancelled),
1171 TerminalStatus::Error => Err(AgentError::LLM(
1172 error.unwrap_or_else(|| "actor child errored".to_string()),
1173 )),
1174 // The suspend/resume round-trip (host re-dispatch of a
1175 // nested parent) is not wired here yet; a worker in
1176 // this build never emits Suspended, so this is
1177 // unreachable in practice.
1178 TerminalStatus::Suspended => Err(AgentError::LLM(
1179 "nested sub-agent suspend received but resume transport is not wired"
1180 .to_string(),
1181 )),
1182 };
1183 }
1184 Ok(None) => {
1185 return Err(AgentError::LLM(
1186 "actor child closed before terminal".to_string(),
1187 ));
1188 }
1189 Err(e) => {
1190 return Err(AgentError::LLM(format!("actor transport error: {e}")));
1191 }
1192 }
1193 }
1194 }
1195 }
1196
1197 // Only reached on cancellation: ask the child to stop (best-effort), then report cancelled.
1198 let _ = client.send(ParentFrame::Cancel).await;
1199 Err(AgentError::Cancelled)
1200}
1201
1202/// The assignment text = the child session's latest user message (falls back to its title).
1203fn extract_assignment(session: &Session) -> String {
1204 session
1205 .messages
1206 .iter()
1207 .rev()
1208 .find(|m| matches!(m.role, Role::User))
1209 .map(|m| m.content.clone())
1210 .unwrap_or_else(|| {
1211 session
1212 .metadata
1213 .get("title")
1214 .cloned()
1215 .unwrap_or_else(|| "Execute task".to_string())
1216 })
1217}
1218
1219#[cfg(test)]
1220mod tests {
1221 use super::*;
1222
1223 fn spec_with(
1224 role: &str,
1225 provider: &str,
1226 model: &str,
1227 workspace: Option<&str>,
1228 disabled: Option<Vec<&str>>,
1229 ) -> ProvisionSpec {
1230 let mut spec = ProvisionSpec::new(
1231 ChildIdentity {
1232 child_id: "c".into(),
1233 parent_id: None,
1234 project_key: None,
1235 role: role.into(),
1236 depth: 0,
1237 },
1238 ExecutorSpec::Echo,
1239 "/tmp/fab".into(),
1240 );
1241 spec.workspace = workspace.map(|w| w.to_string());
1242 spec.model = Some(ModelRefSpec {
1243 provider: provider.into(),
1244 model: model.into(),
1245 });
1246 spec.disabled_tools = disabled.map(|d| d.into_iter().map(String::from).collect());
1247 spec
1248 }
1249
1250 #[test]
1251 fn fingerprint_matches_interchangeable_children() {
1252 // Same role/provider/model/workspace and equal tool sets (order-insensitive)
1253 // are interchangeable on one warm worker — and differ only in child_id.
1254 let a = spec_with(
1255 "explorer",
1256 "p",
1257 "m",
1258 Some("/ws"),
1259 Some(vec!["Bash", "Edit"]),
1260 );
1261 let mut b = spec_with(
1262 "explorer",
1263 "p",
1264 "m",
1265 Some("/ws"),
1266 Some(vec!["Edit", "Bash"]),
1267 );
1268 b.identity.child_id = "other".into();
1269 assert_eq!(
1270 ActorChildRunner::fingerprint(&a),
1271 ActorChildRunner::fingerprint(&b)
1272 );
1273 }
1274
1275 #[test]
1276 fn fingerprint_separates_distinct_runtimes() {
1277 let base = spec_with("explorer", "p", "m", Some("/ws"), None);
1278 let base_fp = ActorChildRunner::fingerprint(&base);
1279 // Each axis that is baked into the worker must split the pool bucket.
1280 assert_ne!(
1281 base_fp,
1282 ActorChildRunner::fingerprint(&spec_with("writer", "p", "m", Some("/ws"), None))
1283 );
1284 assert_ne!(
1285 base_fp,
1286 ActorChildRunner::fingerprint(&spec_with("explorer", "p2", "m", Some("/ws"), None))
1287 );
1288 assert_ne!(
1289 base_fp,
1290 ActorChildRunner::fingerprint(&spec_with("explorer", "p", "m2", Some("/ws"), None))
1291 );
1292 assert_ne!(
1293 base_fp,
1294 ActorChildRunner::fingerprint(&spec_with("explorer", "p", "m", Some("/ws2"), None))
1295 );
1296 assert_ne!(
1297 base_fp,
1298 ActorChildRunner::fingerprint(&spec_with(
1299 "explorer",
1300 "p",
1301 "m",
1302 Some("/ws"),
1303 Some(vec!["Bash"])
1304 ))
1305 );
1306 }
1307
1308 #[test]
1309 fn fingerprint_splits_on_baked_capabilities() {
1310 // Every capability baked once at provision time must split the pool
1311 // bucket, else a worker baked for one posture gets reused for another
1312 // (e.g. a depth-1 worker re-stamping spawn_depth onto a depth-4 child,
1313 // breaking the depth cap; or a bypass worker reused for a non-bypass one).
1314 let base_fp =
1315 ActorChildRunner::fingerprint(&spec_with("explorer", "p", "m", Some("/ws"), None));
1316
1317 let mut depth = spec_with("explorer", "p", "m", Some("/ws"), None);
1318 depth.identity.depth = 2;
1319 assert_ne!(
1320 base_fp,
1321 ActorChildRunner::fingerprint(&depth),
1322 "depth must split"
1323 );
1324
1325 let mut nested = spec_with("explorer", "p", "m", Some("/ws"), None);
1326 nested.capabilities.nested_spawn = true;
1327 assert_ne!(
1328 base_fp,
1329 ActorChildRunner::fingerprint(&nested),
1330 "nested_spawn must split"
1331 );
1332
1333 let mut bypass = spec_with("explorer", "p", "m", Some("/ws"), None);
1334 bypass.capabilities.bypass = true;
1335 assert_ne!(
1336 base_fp,
1337 ActorChildRunner::fingerprint(&bypass),
1338 "bypass must split"
1339 );
1340
1341 let mut enforce = spec_with("explorer", "p", "m", Some("/ws"), None);
1342 enforce.capabilities.enforce_permissions = true;
1343 assert_ne!(
1344 base_fp,
1345 ActorChildRunner::fingerprint(&enforce),
1346 "enforce_permissions must split"
1347 );
1348
1349 let mut cap = spec_with("explorer", "p", "m", Some("/ws"), None);
1350 cap.capabilities.max_spawn_depth = Some(8);
1351 assert_ne!(
1352 base_fp,
1353 ActorChildRunner::fingerprint(&cap),
1354 "max_spawn_depth must split"
1355 );
1356
1357 // #73 (P1): the worker bakes `no_human_review` from this flag once at
1358 // build(), so it MUST split the pool or a worker baked for one approval
1359 // posture is reused for the opposite one.
1360 let mut nha = spec_with("explorer", "p", "m", Some("/ws"), None);
1361 nha.capabilities.no_human_approver = true;
1362 assert_ne!(
1363 base_fp,
1364 ActorChildRunner::fingerprint(&nha),
1365 "no_human_approver must split"
1366 );
1367
1368 // #71: the read-only Bash checker is baked once at build() from this flag,
1369 // so a guardian reviewer worker must not be reused for an ordinary child.
1370 let mut gro = spec_with("explorer", "p", "m", Some("/ws"), None);
1371 gro.capabilities.guardian_read_only = true;
1372 assert_ne!(
1373 base_fp,
1374 ActorChildRunner::fingerprint(&gro),
1375 "guardian_read_only must split"
1376 );
1377 }
1378
1379 struct StaticDecider(bool);
1380
1381 #[async_trait]
1382 impl ChildApprovalDecider for StaticDecider {
1383 async fn decide(&self, _child: &str, _req: &serde_json::Value) -> bool {
1384 self.0
1385 }
1386 }
1387
1388 // ---- first-frame watchdog (dead-pooled-worker recovery) -----------------
1389
1390 /// A link that never yields a frame — models a worker that died (or never
1391 /// subscribed) so its Run sits queued with no server.
1392 struct SilentLink;
1393 #[async_trait]
1394 impl bamboo_subagent::ChildLink for SilentLink {
1395 async fn send(&mut self, _: ParentFrame) -> bamboo_subagent::TransportResult<()> {
1396 Ok(())
1397 }
1398 async fn next_frame(
1399 &mut self,
1400 ) -> bamboo_subagent::TransportResult<Option<ChildFrame>> {
1401 std::future::pending().await
1402 }
1403 }
1404
1405 /// A link that immediately yields one terminal frame (a healthy fast worker).
1406 struct InstantTerminalLink {
1407 done: bool,
1408 }
1409 #[async_trait]
1410 impl bamboo_subagent::ChildLink for InstantTerminalLink {
1411 async fn send(&mut self, _: ParentFrame) -> bamboo_subagent::TransportResult<()> {
1412 Ok(())
1413 }
1414 async fn next_frame(
1415 &mut self,
1416 ) -> bamboo_subagent::TransportResult<Option<ChildFrame>> {
1417 if self.done {
1418 std::future::pending().await
1419 } else {
1420 self.done = true;
1421 Ok(Some(ChildFrame::Terminal {
1422 status: TerminalStatus::Completed,
1423 result: Some("done".into()),
1424 error: None,
1425 transcript: vec![],
1426 }))
1427 }
1428 }
1429 }
1430
1431 #[tokio::test]
1432 async fn drive_trips_first_frame_watchdog_on_a_silent_worker() {
1433 let (event_tx, _rx) = mpsc::channel::<AgentEvent>(8);
1434 let cancel = CancellationToken::new();
1435 let (_live_tx, mut live_rx) = mpsc::unbounded_channel::<ParentFrame>();
1436 let mut link = SilentLink;
1437 let r = drive(
1438 &mut link,
1439 "child-x",
1440 None,
1441 None,
1442 &event_tx,
1443 &cancel,
1444 &mut live_rx,
1445 Some(Duration::from_millis(100)),
1446 )
1447 .await;
1448 assert!(
1449 matches!(r, Err(AgentError::WorkerUnresponsive(_))),
1450 "a silent worker must trip the first-frame watchdog, got {r:?}"
1451 );
1452 }
1453
1454 #[tokio::test]
1455 async fn drive_does_not_trip_when_a_frame_arrives() {
1456 let (event_tx, _rx) = mpsc::channel::<AgentEvent>(8);
1457 let cancel = CancellationToken::new();
1458 let (_live_tx, mut live_rx) = mpsc::unbounded_channel::<ParentFrame>();
1459 let mut link = InstantTerminalLink { done: false };
1460 // Even a tiny timeout must NOT trip: the terminal frame arrives first and
1461 // disarms the watchdog.
1462 let r = drive(
1463 &mut link,
1464 "child-y",
1465 None,
1466 None,
1467 &event_tx,
1468 &cancel,
1469 &mut live_rx,
1470 Some(Duration::from_millis(50)),
1471 )
1472 .await;
1473 assert_eq!(r.ok().flatten().as_deref(), Some("done"));
1474 }
1475
1476 #[tokio::test]
1477 async fn child_approval_fails_closed_without_decider() {
1478 // No decider wired ⇒ the host denies (safe default), unchanged behavior.
1479 let body = serde_json::json!({"tool_name":"Bash","permission":"run","resource":"rm -rf /"});
1480 assert!(!decide_child_approval(None, "child-1", &body).await);
1481 }
1482
1483 #[tokio::test]
1484 async fn child_approval_honors_wired_decider() {
1485 let body =
1486 serde_json::json!({"tool_name":"Write","permission":"write","resource":"/tmp/x"});
1487 let approve: Arc<dyn ChildApprovalDecider> = Arc::new(StaticDecider(true));
1488 let deny: Arc<dyn ChildApprovalDecider> = Arc::new(StaticDecider(false));
1489 assert!(decide_child_approval(Some(&approve), "child-1", &body).await);
1490 assert!(!decide_child_approval(Some(&deny), "child-1", &body).await);
1491 }
1492
1493 #[test]
1494 fn approval_request_fields_extracts_and_defaults() {
1495 let full = serde_json::json!({"tool_name":"Bash","permission":"run","resource":"ls"});
1496 assert_eq!(
1497 approval_request_fields(&full),
1498 ("Bash".to_string(), "run".to_string(), "ls".to_string())
1499 );
1500 // Missing fields default to empty strings.
1501 let partial = serde_json::json!({"tool_name":"Write"});
1502 assert_eq!(
1503 approval_request_fields(&partial),
1504 ("Write".to_string(), String::new(), String::new())
1505 );
1506 }
1507
1508 // ---- #193: remote placement routing -------------------------------------
1509
1510 use crate::runtime::execution::SpawnJob;
1511 use bamboo_agent_core::Session;
1512
1513 /// A runner with a BOGUS worker_bin (`/bin/false`): a local spawn here would
1514 /// FAIL, so a passing remote test proves the remote path never spawns.
1515 fn bogus_runner(placements: HashMap<String, ResolvedRemotePlacement>) -> ActorChildRunner {
1516 ActorChildRunner::new(
1517 "test-actor".into(),
1518 PathBuf::from("/bin/false"),
1519 vec![],
1520 std::env::temp_dir().join("bamboo-test-fab-193"),
1521 ExecutorSpec::Echo,
1522 vec![],
1523 "anthropic".into(),
1524 4,
1525 )
1526 .with_remote_placements(placements)
1527 }
1528
1529 /// A child session of the given role (the role rides `subagent_type`, the
1530 /// path build_spec + the remote lookup both read).
1531 fn session_of_role(role: &str, assignment: &str) -> Session {
1532 let mut s = Session::new("child-1", "test-model");
1533 s.metadata
1534 .insert("subagent_type".to_string(), role.to_string());
1535 s.add_message(bamboo_agent_core::Message::user(assignment));
1536 s
1537 }
1538
1539 fn job_for(child: &str) -> SpawnJob {
1540 SpawnJob {
1541 parent_session_id: "parent-1".into(),
1542 child_session_id: child.into(),
1543 model: String::new(),
1544 disabled_tools: None,
1545 }
1546 }
1547
1548 #[test]
1549 fn build_spec_sets_remote_placement_for_matching_role() {
1550 let mut placements = HashMap::new();
1551 placements.insert(
1552 "explorer".to_string(),
1553 ResolvedRemotePlacement {
1554 endpoint: "wss://gpu-host:8443".into(),
1555 token: Some("T-secret".into()),
1556 ca_cert_file: None,
1557 host_label: None,
1558 },
1559 );
1560 let runner = bogus_runner(placements);
1561
1562 // Matching role -> Placement::Remote + the bearer on the secrets envelope.
1563 let s = session_of_role("explorer", "do the thing");
1564 let spec = runner.build_spec(&s, &job_for("child-1"));
1565 match &spec.placement {
1566 Placement::Remote { endpoint } => assert_eq!(endpoint, "wss://gpu-host:8443"),
1567 other => panic!("expected Remote, got {other:?}"),
1568 }
1569 assert_eq!(spec.secrets.worker_auth_token.as_deref(), Some("T-secret"));
1570 }
1571
1572 #[test]
1573 fn build_spec_leaves_local_for_unmatched_role() {
1574 let mut placements = HashMap::new();
1575 placements.insert(
1576 "explorer".to_string(),
1577 ResolvedRemotePlacement {
1578 endpoint: "wss://gpu-host:8443".into(),
1579 token: Some("T".into()),
1580 ca_cert_file: None,
1581 host_label: None,
1582 },
1583 );
1584 let runner = bogus_runner(placements);
1585
1586 // A DIFFERENT role keeps the default Local placement + no bearer.
1587 let s = session_of_role("writer", "do the thing");
1588 let spec = runner.build_spec(&s, &job_for("child-1"));
1589 assert_eq!(spec.placement, Placement::Local);
1590 assert!(spec.secrets.worker_auth_token.is_none());
1591 }
1592
1593 #[test]
1594 fn build_spec_local_when_no_placements() {
1595 let runner = bogus_runner(HashMap::new());
1596 let s = session_of_role("explorer", "do the thing");
1597 let spec = runner.build_spec(&s, &job_for("child-1"));
1598 assert_eq!(spec.placement, Placement::Local);
1599 assert!(spec.secrets.worker_auth_token.is_none());
1600 }
1601
1602 #[test]
1603 fn placement_metadata_stamps_remote_and_schedulable_not_local() {
1604 // Local children carry no stamp — the DTO defaults them to the backend host.
1605 assert_eq!(placement_metadata(&Placement::Local, None), None);
1606
1607 // Remote, no node label → host derived from the endpoint.
1608 let r = placement_metadata(
1609 &Placement::Remote {
1610 endpoint: "wss://10.0.0.5:8443/stream".into(),
1611 },
1612 None,
1613 )
1614 .unwrap();
1615 assert!(r.contains(r#""kind":"remote""#), "{r}");
1616 assert!(r.contains(r#""host":"10.0.0.5""#), "{r}");
1617
1618 // A cluster node's label (its metadata) OVERRIDES the raw endpoint host.
1619 let labeled = placement_metadata(
1620 &Placement::Remote {
1621 endpoint: "ws://169.254.230.101:8899".into(),
1622 },
1623 Some("mini"),
1624 )
1625 .unwrap();
1626 assert!(labeled.contains(r#""host":"mini""#), "{labeled}");
1627
1628 // Schedulable → {kind:"remote", host:<node label, else pool>}.
1629 let s =
1630 placement_metadata(&Placement::Schedulable { pool: "explorers".into() }, Some("mini"))
1631 .unwrap();
1632 assert!(s.contains(r#""kind":"remote""#), "{s}");
1633 assert!(s.contains(r#""host":"mini""#), "{s}");
1634
1635 // The stamp round-trips through the storage placement type.
1636 let p: bamboo_storage::SessionPlacement = serde_json::from_str(&labeled).unwrap();
1637 assert_eq!(p.kind, "remote");
1638 assert_eq!(p.host, "mini");
1639 }
1640
1641 /// End-to-end remote run through `execute_external_child`: a resident worker
1642 /// (Bearer-gated `WsServer` + `EchoExecutor`) serves the role; the runner is
1643 /// built with a `remote_placements` entry pointing at it AND a BOGUS
1644 /// worker_bin (`/bin/false`). A passing test proves the remote path CONNECTS
1645 /// to the resident worker and NEVER spawns (a spawn would fail on /bin/false),
1646 /// and that a terminal/echo result flows back.
1647 #[tokio::test]
1648 async fn execute_external_child_routes_role_to_remote_worker_without_spawning() {
1649 // 1. Stand up the resident worker on loopback with a required bearer.
1650 let token = "remote-test-token";
1651 let server = bamboo_subagent::transport::WsServer::bind_with_token(
1652 (std::net::Ipv4Addr::LOCALHOST, 0).into(),
1653 Some(token.to_string()),
1654 )
1655 .await
1656 .expect("bind resident worker");
1657 let endpoint = server.ws_endpoint(); // ws://127.0.0.1:<port>
1658 let srv = tokio::spawn(async move {
1659 // serve() loops connection-after-connection; the test exits, dropping it.
1660 let _ = server
1661 .serve(Arc::new(bamboo_subagent::executor::EchoExecutor))
1662 .await;
1663 });
1664
1665 // 2. Build the runner: role "explorer" pinned remote, bogus worker_bin.
1666 let mut placements = HashMap::new();
1667 placements.insert(
1668 "explorer".to_string(),
1669 ResolvedRemotePlacement {
1670 endpoint: endpoint.clone(),
1671 token: Some(token.to_string()),
1672 ca_cert_file: None,
1673 host_label: Some("mini-e2e".into()), // node label, surfaced on the badge
1674 },
1675 );
1676 let runner = bogus_runner(placements);
1677
1678 // 3. Drive a real run for that role.
1679 let mut session = session_of_role("explorer", "hello remote");
1680 let job = job_for("child-1");
1681 let (event_tx, mut event_rx) = mpsc::channel::<AgentEvent>(64);
1682 let cancel = CancellationToken::new();
1683
1684 let result = tokio::time::timeout(
1685 Duration::from_secs(10),
1686 runner.execute_external_child(&mut session, &job, event_tx, cancel),
1687 )
1688 .await
1689 .expect("run did not hang")
1690 .expect("remote run succeeded (connected to resident worker, did not spawn)");
1691
1692 let _ = result;
1693 // The EchoExecutor's reply is written back onto the child session as an
1694 // assistant message — proof a terminal result flowed back over the link.
1695 let last = session
1696 .messages
1697 .iter()
1698 .rev()
1699 .find(|m| matches!(m.role, Role::Assistant))
1700 .expect("an assistant reply was written back");
1701 assert!(
1702 last.content.contains("echo:"),
1703 "expected echo reply, got {:?}",
1704 last.content
1705 );
1706
1707 // A remote run must stamp WHICH machine it ran on onto the child session
1708 // (mirrored to the UI badge) using the placement's node label.
1709 let placement = session
1710 .metadata
1711 .get("placement")
1712 .expect("remote child session stamped with a placement");
1713 assert!(placement.contains(r#""kind":"remote""#), "{placement}");
1714 assert!(placement.contains(r#""host":"mini-e2e""#), "{placement}");
1715
1716 // Drain a couple of streamed events to confirm the event pipe carried the
1717 // worker's tokens too (best-effort; the reply assertion above is primary).
1718 let mut saw_event = false;
1719 while let Ok(Some(_ev)) =
1720 tokio::time::timeout(Duration::from_millis(50), event_rx.recv()).await
1721 {
1722 saw_event = true;
1723 }
1724 let _ = saw_event;
1725
1726 srv.abort();
1727 }
1728
1729 // ---- #181 (P2b): schedulable placement routing --------------------------
1730
1731
1732 /// A bogus-worker_bin runner carrying SCHEDULABLE placements (and optionally
1733 /// remote ones, to test precedence). A local spawn here would fail on
1734 /// `/bin/false`, so a passing schedulable test proves no subprocess spawned.
1735 fn bogus_sched_runner(
1736 remote: HashMap<String, ResolvedRemotePlacement>,
1737 sched: HashMap<String, ResolvedSchedulablePlacement>,
1738 ) -> ActorChildRunner {
1739 ActorChildRunner::new(
1740 "test-actor".into(),
1741 PathBuf::from("/bin/false"),
1742 vec![],
1743 std::env::temp_dir().join("bamboo-test-fab-181"),
1744 ExecutorSpec::Echo,
1745 vec![],
1746 "anthropic".into(),
1747 4,
1748 )
1749 .with_remote_placements(remote)
1750 .with_schedulable_placements(sched)
1751 }
1752
1753 fn sched_placement(pool: &str, _registry_url: impl Into<String>) -> ResolvedSchedulablePlacement {
1754 ResolvedSchedulablePlacement { pool: pool.into(), host_label: None }
1755 }
1756
1757 #[test]
1758 fn build_spec_sets_schedulable_placement_for_matching_role() {
1759 let mut sched = HashMap::new();
1760 sched.insert(
1761 "explorer".to_string(),
1762 sched_placement("gpu-pool", "unused"),
1763 );
1764 let runner = bogus_sched_runner(HashMap::new(), sched);
1765
1766 let s = session_of_role("explorer", "do the thing");
1767 let spec = runner.build_spec(&s, &job_for("child-1"));
1768 match &spec.placement {
1769 Placement::Schedulable { pool } => assert_eq!(pool, "gpu-pool"),
1770 other => panic!("expected Schedulable, got {other:?}"),
1771 }
1772 // No per-placement bearer now — the bus connection carries the bus token.
1773 assert!(spec.secrets.worker_auth_token.is_none());
1774 }
1775
1776 #[test]
1777 fn build_spec_remote_wins_when_role_in_both_maps() {
1778 // A role present in BOTH remote_placements and schedulable_placements must
1779 // resolve to the FIXED remote placement (documented precedence).
1780 let mut remote = HashMap::new();
1781 remote.insert(
1782 "explorer".to_string(),
1783 ResolvedRemotePlacement {
1784 endpoint: "wss://fixed-host:8443".into(),
1785 token: Some("T-remote".into()),
1786 ca_cert_file: None,
1787 host_label: None,
1788 },
1789 );
1790 let mut sched = HashMap::new();
1791 sched.insert(
1792 "explorer".to_string(),
1793 sched_placement("gpu-pool", "https://control-plane:9562"),
1794 );
1795 let runner = bogus_sched_runner(remote, sched);
1796
1797 let s = session_of_role("explorer", "do the thing");
1798 let spec = runner.build_spec(&s, &job_for("child-1"));
1799 match &spec.placement {
1800 Placement::Remote { endpoint } => assert_eq!(endpoint, "wss://fixed-host:8443"),
1801 other => panic!("expected Remote (precedence), got {other:?}"),
1802 }
1803 assert_eq!(spec.secrets.worker_auth_token.as_deref(), Some("T-remote"));
1804 }
1805
1806 #[test]
1807 fn build_spec_local_for_unmatched_schedulable_role() {
1808 let mut sched = HashMap::new();
1809 sched.insert(
1810 "explorer".to_string(),
1811 sched_placement("gpu-pool", "https://control-plane:9562"),
1812 );
1813 let runner = bogus_sched_runner(HashMap::new(), sched);
1814 let s = session_of_role("writer", "do the thing");
1815 let spec = runner.build_spec(&s, &job_for("child-1"));
1816 assert_eq!(spec.placement, Placement::Local);
1817 assert!(spec.secrets.worker_auth_token.is_none());
1818 }
1819
1820 /// The full role → resolved-placement → badge-host chain: a child routed to a
1821 /// remote/schedulable placement carrying a cluster node's `host_label` stamps
1822 /// that label; without a label it falls back to the endpoint host / pool; a
1823 /// Local child gets no stamp (the DTO defaults it to the backend host).
1824 #[test]
1825 fn placement_stamp_uses_node_label_for_remote_and_schedulable() {
1826 // Remote WITH a node label → {remote, <label>}, overriding the raw IP.
1827 let mut remote = HashMap::new();
1828 remote.insert(
1829 "explorer".to_string(),
1830 ResolvedRemotePlacement {
1831 endpoint: "ws://169.254.230.101:8899".into(),
1832 token: None,
1833 ca_cert_file: None,
1834 host_label: Some("mini".into()),
1835 },
1836 );
1837 let runner = bogus_runner(remote);
1838 let spec = runner.build_spec(&session_of_role("explorer", "go"), &job_for("c1"));
1839 let stamp = runner.placement_stamp_for(&spec).expect("remote child is stamped");
1840 assert!(stamp.contains(r#""kind":"remote""#), "{stamp}");
1841 assert!(stamp.contains(r#""host":"mini""#), "{stamp}");
1842
1843 // Remote WITHOUT a node label → falls back to the endpoint host.
1844 let mut remote_nolabel = HashMap::new();
1845 remote_nolabel.insert(
1846 "explorer".to_string(),
1847 ResolvedRemotePlacement {
1848 endpoint: "ws://169.254.230.101:8899".into(),
1849 token: None,
1850 ca_cert_file: None,
1851 host_label: None,
1852 },
1853 );
1854 let r2 = bogus_runner(remote_nolabel);
1855 let spec2 = r2.build_spec(&session_of_role("explorer", "go"), &job_for("c1"));
1856 assert!(
1857 r2.placement_stamp_for(&spec2)
1858 .unwrap()
1859 .contains(r#""host":"169.254.230.101""#)
1860 );
1861
1862 // Schedulable WITH a node label → {remote, <label>} (a node, not a pool name).
1863 let mut sched = HashMap::new();
1864 sched.insert(
1865 "mac-mini-monitor".to_string(),
1866 ResolvedSchedulablePlacement {
1867 pool: "mac-mini-monitor".into(),
1868 host_label: Some("mini".into()),
1869 },
1870 );
1871 let sr = bogus_sched_runner(HashMap::new(), sched);
1872 let spec3 = sr.build_spec(&session_of_role("mac-mini-monitor", "go"), &job_for("c1"));
1873 let stamp3 = sr.placement_stamp_for(&spec3).expect("scheduled child is stamped");
1874 assert!(stamp3.contains(r#""kind":"remote""#), "{stamp3}");
1875 assert!(stamp3.contains(r#""host":"mini""#), "{stamp3}");
1876
1877 // A Local (unmatched) child gets NO stamp.
1878 let local = bogus_runner(HashMap::new());
1879 let spec4 = local.build_spec(&session_of_role("writer", "go"), &job_for("c1"));
1880 assert_eq!(local.placement_stamp_for(&spec4), None);
1881 }
1882
1883 // ---- #181: schedulable selection over the BUS (Phase 3 cutover) ----------
1884
1885 async fn start_bus() -> (String, tempfile::TempDir) {
1886 let dir = tempfile::tempdir().unwrap();
1887 let core = std::sync::Arc::new(bamboo_broker::BrokerCore::new(dir.path()));
1888 let server = std::sync::Arc::new(bamboo_broker::BrokerServer::new(core, "t"));
1889 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1890 let addr = listener.local_addr().unwrap();
1891 tokio::spawn(async move {
1892 let _ = server.serve(listener).await;
1893 });
1894 (format!("ws://{addr}"), dir)
1895 }
1896
1897 async fn join_pool(endpoint: &str, id: &str, pool: &str) -> bamboo_broker::BrokerClient {
1898 let mut c = bamboo_broker::BrokerClient::connect(
1899 endpoint,
1900 bamboo_subagent::AgentRef {
1901 session_id: id.into(),
1902 role: Some(pool.into()),
1903 },
1904 "t",
1905 )
1906 .await
1907 .unwrap();
1908 c.subscribe().await.unwrap();
1909 c
1910 }
1911
1912 fn sched_runner_on_bus(endpoint: &str, child_role: &str, pool: &str) -> ActorChildRunner {
1913 let mut sched = HashMap::new();
1914 sched.insert(child_role.to_string(), sched_placement(pool, "unused"));
1915 bogus_sched_runner(HashMap::new(), sched).with_bus(Some(bamboo_subagent::BusEndpoint {
1916 endpoint: endpoint.into(),
1917 token: "t".into(),
1918 }))
1919 }
1920
1921 #[tokio::test]
1922 async fn resolve_schedulable_picks_a_live_bus_worker() {
1923 let (endpoint, _dir) = start_bus().await;
1924 let _w = join_pool(&endpoint, "w-gpu", "gpu-pool").await;
1925 let runner = sched_runner_on_bus(&endpoint, "explorer", "gpu-pool");
1926
1927 let mailbox = runner
1928 .resolve_schedulable_worker("explorer")
1929 .await
1930 .expect("a live pool worker is found on the bus");
1931 assert_eq!(mailbox, "w-gpu");
1932 }
1933
1934 #[tokio::test]
1935 async fn resolve_schedulable_round_robins_over_pool_workers() {
1936 let (endpoint, _dir) = start_bus().await;
1937 let _a = join_pool(&endpoint, "w-a", "gpu-pool").await;
1938 let _b = join_pool(&endpoint, "w-b", "gpu-pool").await;
1939 let runner = sched_runner_on_bus(&endpoint, "explorer", "gpu-pool");
1940
1941 // Successive resolves spread across both connected workers.
1942 let mut picked = std::collections::HashSet::new();
1943 for _ in 0..6 {
1944 picked.insert(runner.resolve_schedulable_worker("explorer").await.unwrap());
1945 }
1946 assert_eq!(
1947 picked,
1948 ["w-a".to_string(), "w-b".to_string()].into_iter().collect(),
1949 "round-robin must cover every connected pool worker"
1950 );
1951 }
1952
1953 #[tokio::test]
1954 async fn resolve_schedulable_errors_on_empty_pool() {
1955 let (endpoint, _dir) = start_bus().await;
1956 // No worker subscribes to "gpu-pool".
1957 let runner = sched_runner_on_bus(&endpoint, "explorer", "gpu-pool");
1958
1959 let err = runner
1960 .resolve_schedulable_worker("explorer")
1961 .await
1962 .expect_err("an empty pool is terminal — no local fallback")
1963 .to_string();
1964 assert!(err.contains("no live worker in pool"), "got: {err}");
1965 assert!(err.contains("NOT spawning"), "got: {err}");
1966 }
1967
1968 /// FULL schedulable run over the bus: a worker SERVING `EchoExecutor` joins the
1969 /// pool by role; `execute_external_child` with a Schedulable placement resolves
1970 /// it from the bus (no local subprocess — the worker_bin is `/bin/false`),
1971 /// drives the run, gets the echo back, AND stamps the child session with the
1972 /// pool's cluster-node label — `{kind:remote, host:"mini"}`. The end-to-end
1973 /// analogue of the live `mac-mini-monitor`→mini run.
1974 #[tokio::test]
1975 async fn execute_external_child_runs_schedulable_over_bus_and_stamps_node_label() {
1976 let (endpoint, _dir) = start_bus().await;
1977
1978 // A bus worker SERVING runs (not just presence), joined to the pool by role.
1979 let ep = endpoint.clone();
1980 let worker = tokio::spawn(async move {
1981 let _ = bamboo_broker::serve_executor(
1982 &ep,
1983 bamboo_subagent::AgentRef {
1984 session_id: "mmm-worker".into(),
1985 role: Some("mac-mini-monitor".into()),
1986 },
1987 "t",
1988 std::sync::Arc::new(bamboo_subagent::executor::EchoExecutor),
1989 )
1990 .await;
1991 });
1992
1993 // Wait until the worker is visible on the bus so the pool is non-empty
1994 // when execute_external_child resolves it (serve_executor connects async).
1995 let mut probe = bamboo_broker::BrokerClient::connect(
1996 &endpoint,
1997 bamboo_subagent::AgentRef { session_id: "probe".into(), role: None },
1998 "t",
1999 )
2000 .await
2001 .unwrap();
2002 let mut ready = false;
2003 for _ in 0..100 {
2004 if probe
2005 .list_connected("mac-mini-monitor")
2006 .await
2007 .unwrap()
2008 .iter()
2009 .any(|id| id == "mmm-worker")
2010 {
2011 ready = true;
2012 break;
2013 }
2014 tokio::time::sleep(Duration::from_millis(30)).await;
2015 }
2016 assert!(ready, "worker never joined the pool");
2017
2018 // Runner: child role → schedulable pool "mac-mini-monitor" carrying the
2019 // cluster node's label "mini"; bogus worker_bin so any local spawn fails.
2020 let mut sched = HashMap::new();
2021 sched.insert(
2022 "mac-mini-monitor".to_string(),
2023 ResolvedSchedulablePlacement {
2024 pool: "mac-mini-monitor".into(),
2025 host_label: Some("mini".into()),
2026 },
2027 );
2028 let runner = bogus_sched_runner(HashMap::new(), sched).with_bus(Some(
2029 bamboo_subagent::BusEndpoint { endpoint: endpoint.clone(), token: "t".into() },
2030 ));
2031
2032 let mut session = session_of_role("mac-mini-monitor", "hello scheduled");
2033 let job = job_for("child-1");
2034 let (event_tx, _rx) = mpsc::channel::<AgentEvent>(64);
2035 let cancel = CancellationToken::new();
2036
2037 tokio::time::timeout(
2038 Duration::from_secs(10),
2039 runner.execute_external_child(&mut session, &job, event_tx, cancel),
2040 )
2041 .await
2042 .expect("run did not hang")
2043 .expect("schedulable run succeeded over the bus (no local spawn)");
2044
2045 // Echo reply flowed back — proves it routed to the bus worker, not local.
2046 let last = session
2047 .messages
2048 .iter()
2049 .rev()
2050 .find(|m| matches!(m.role, Role::Assistant))
2051 .expect("an assistant reply was written back");
2052 assert!(last.content.contains("echo:"), "got {:?}", last.content);
2053
2054 // ...and the child is stamped with the pool's cluster-node label.
2055 let placement = session
2056 .metadata
2057 .get("placement")
2058 .expect("scheduled child session stamped with a placement");
2059 assert!(placement.contains(r#""kind":"remote""#), "{placement}");
2060 assert!(placement.contains(r#""host":"mini""#), "{placement}");
2061
2062 worker.abort();
2063 }
2064}