use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use bamboo_agent_core::{AgentError, AgentEvent, Role, Session};
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use bamboo_subagent::fleet::{spawn_worker_on_bus, SpawnedChild};
use bamboo_subagent::proto::{AgentRecord, ChildFrame, ParentFrame, RunSpec, TerminalStatus};
use bamboo_subagent::provision::{
ChildIdentity, ExecutorSpec, ModelRefSpec, Placement, ProvisionSpec, ScopedCredential,
};
use bamboo_subagent::transport::{client_config_trusting_cert, ChildClient};
use crate::runtime::execution::{ExternalChildRunner, SpawnJob};
pub const DEFAULT_MAX_CONCURRENT_ACTORS: usize = 8;
pub const MAX_SPAWN_DEPTH: u32 = 4;
const DEFAULT_MAX_IDLE_PER_KEY: usize = 4;
const POOLED_IDLE_TIMEOUT_SECS: u64 = 300;
const WORKER_FIRST_FRAME_TIMEOUT: Duration = Duration::from_secs(60);
struct PooledWorker {
worker: SpawnedChild,
mailbox_id: String,
}
#[derive(Debug, Clone)]
pub struct ResolvedRemotePlacement {
pub endpoint: String,
pub token: Option<String>,
pub ca_cert_file: Option<PathBuf>,
pub host_label: Option<String>,
}
#[derive(Debug, Clone)]
pub struct ResolvedSchedulablePlacement {
pub pool: String,
pub host_label: Option<String>,
}
enum PlacementKind {
Local,
Remote,
Schedulable,
}
pub struct ActorChildRunner {
agent_id: String,
worker_bin: PathBuf,
worker_args: Vec<String>,
fabric_dir: PathBuf,
executor: ExecutorSpec,
credentials: Vec<ScopedCredential>,
default_provider: String,
bus: Option<bamboo_subagent::BusEndpoint>,
concurrency: std::sync::Arc<tokio::sync::Semaphore>,
pool: Arc<tokio::sync::Mutex<HashMap<String, Vec<PooledWorker>>>>,
max_idle_per_key: usize,
approval_decider: Option<Arc<dyn ChildApprovalDecider>>,
escalation_bridge: Arc<std::sync::Mutex<Option<bamboo_subagent::executor::HostBridge>>>,
remote_placements: HashMap<String, ResolvedRemotePlacement>,
schedulable_placements: HashMap<String, ResolvedSchedulablePlacement>,
schedule_cursor: Arc<std::sync::Mutex<HashMap<String, usize>>>,
}
#[async_trait]
pub trait ChildApprovalDecider: Send + Sync {
async fn decide(&self, child_session_id: &str, request: &serde_json::Value) -> bool;
}
async fn decide_child_approval(
decider: Option<&Arc<dyn ChildApprovalDecider>>,
child_session_id: &str,
request: &serde_json::Value,
) -> bool {
match decider {
Some(decider) => decider.decide(child_session_id, request).await,
None => false,
}
}
const CHILD_APPROVAL_TIMEOUT: Duration = Duration::from_secs(300);
fn approval_request_fields(body: &serde_json::Value) -> (String, String, String) {
let field = |k: &str| {
body.get(k)
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string()
};
(field("tool_name"), field("permission"), field("resource"))
}
#[async_trait]
pub trait ChildApprovalReviewer: Send + Sync {
async fn review(&self, child_session_id: &str, request: &serde_json::Value) -> bool;
}
fn child_approval_reviewer_slot() -> &'static std::sync::OnceLock<Arc<dyn ChildApprovalReviewer>> {
static SLOT: std::sync::OnceLock<Arc<dyn ChildApprovalReviewer>> = std::sync::OnceLock::new();
&SLOT
}
pub fn set_child_approval_reviewer(reviewer: Arc<dyn ChildApprovalReviewer>) {
let _ = child_approval_reviewer_slot().set(reviewer);
}
pub fn child_approval_reviewer() -> Option<Arc<dyn ChildApprovalReviewer>> {
child_approval_reviewer_slot().get().cloned()
}
impl ActorChildRunner {
#[allow(clippy::too_many_arguments)]
pub fn new(
agent_id: String,
worker_bin: PathBuf,
worker_args: Vec<String>,
fabric_dir: PathBuf,
executor: ExecutorSpec,
credentials: Vec<ScopedCredential>,
default_provider: String,
max_concurrent: usize,
) -> Self {
Self {
agent_id,
worker_bin,
worker_args,
fabric_dir,
executor,
credentials,
default_provider,
bus: None,
concurrency: std::sync::Arc::new(tokio::sync::Semaphore::new(max_concurrent.max(1))),
pool: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
max_idle_per_key: DEFAULT_MAX_IDLE_PER_KEY,
approval_decider: None,
escalation_bridge: Arc::new(std::sync::Mutex::new(None)),
remote_placements: HashMap::new(),
schedulable_placements: HashMap::new(),
schedule_cursor: Arc::new(std::sync::Mutex::new(HashMap::new())),
}
}
pub fn with_bus(mut self, bus: Option<bamboo_subagent::BusEndpoint>) -> Self {
self.bus = bus.filter(|b| !b.endpoint.trim().is_empty());
self
}
pub fn with_approval_decider(mut self, decider: Arc<dyn ChildApprovalDecider>) -> Self {
self.approval_decider = Some(decider);
self
}
pub fn with_remote_placements(
mut self,
placements: HashMap<String, ResolvedRemotePlacement>,
) -> Self {
self.remote_placements = placements;
self
}
pub fn with_schedulable_placements(
mut self,
placements: HashMap<String, ResolvedSchedulablePlacement>,
) -> Self {
self.schedulable_placements = placements;
self
}
fn fingerprint(spec: &ProvisionSpec) -> String {
let role = spec.identity.role.as_str();
let (provider, model) = spec
.model
.as_ref()
.map(|m| (m.provider.as_str(), m.model.as_str()))
.unwrap_or(("", ""));
let workspace = spec.workspace.as_deref().unwrap_or("");
let mut tools = spec.disabled_tools.clone().unwrap_or_default();
tools.sort();
let caps = &spec.capabilities;
format!(
"{role}\u{1}{provider}\u{1}{model}\u{1}{workspace}\u{1}{}\u{1}d={}\u{1}ns={}\u{1}by={}\u{1}ep={}\u{1}md={}\u{1}nha={}\u{1}gro={}",
tools.join(","),
spec.identity.depth,
caps.nested_spawn,
caps.bypass,
caps.enforce_permissions,
caps.max_spawn_depth.unwrap_or(0),
caps.no_human_approver,
caps.guardian_read_only,
)
}
async fn acquire_bus_worker(
&self,
key: &str,
spec: &ProvisionSpec,
) -> crate::runtime::runner::Result<PooledWorker> {
loop {
let candidate = {
let mut pool = self.pool.lock().await;
pool.get_mut(key).and_then(|bucket| bucket.pop())
};
let Some(mut candidate) = candidate else { break };
if candidate.worker.is_alive() {
return Ok(candidate);
}
candidate.worker.kill().await;
}
let spawned = spawn_worker_on_bus(&self.worker_bin, &self.worker_args, spec)
.await
.map_err(|e| AgentError::LLM(format!("actor spawn (bus) failed: {e}")))?;
let mailbox_id = spawned.record.agent_id.clone();
Ok(PooledWorker {
worker: spawned,
mailbox_id,
})
}
async fn release_bus_worker(&self, key: &str, mut worker: PooledWorker) {
if !worker.worker.is_alive() {
worker.worker.kill().await;
return;
}
let mut pool = self.pool.lock().await;
let bucket = pool.entry(key.to_string()).or_default();
if bucket.len() >= self.max_idle_per_key {
drop(pool);
worker.worker.kill().await;
return;
}
bucket.push(worker);
}
fn build_spec(&self, session: &Session, job: &SpawnJob) -> ProvisionSpec {
let mut spec = ProvisionSpec::new(
ChildIdentity {
child_id: job.child_session_id.clone(),
parent_id: Some(job.parent_session_id.clone()),
project_key: None,
role: session
.metadata
.get("subagent_type")
.cloned()
.unwrap_or_else(|| "worker".to_string()),
depth: session.spawn_depth,
},
self.executor.clone(),
self.fabric_dir.to_string_lossy().into_owned(),
);
spec.workspace = session.workspace.clone();
spec.bus = self.bus.clone();
spec.model = session
.model_ref
.as_ref()
.map(|r| ModelRefSpec {
provider: r.provider.clone(),
model: r.model.clone(),
})
.or_else(|| {
let m = job.model.trim();
(!m.is_empty()).then(|| ModelRefSpec {
provider: self.default_provider.clone(),
model: m.to_string(),
})
});
spec.disabled_tools = job.disabled_tools.clone();
let provider = spec
.model
.as_ref()
.map(|m| m.provider.as_str())
.filter(|p| !p.trim().is_empty())
.unwrap_or(&self.default_provider);
if let Some(cred) = self.credentials.iter().find(|c| c.provider == provider) {
spec.secrets.provider_credentials.push(cred.clone());
} else {
tracing::warn!(
"actor child {}: no credential found for provider '{}'",
job.child_session_id,
provider
);
}
spec.capabilities.nested_spawn = session.spawn_depth < MAX_SPAWN_DEPTH;
spec.capabilities.max_spawn_depth = Some(MAX_SPAWN_DEPTH);
spec.capabilities.enforce_permissions = true;
spec.capabilities.bypass = session
.agent_runtime_state
.as_ref()
.is_some_and(|s| s.bypass_permissions);
spec.capabilities.no_human_approver = session
.agent_runtime_state
.as_ref()
.is_some_and(|s| s.no_human_approver);
spec.capabilities.guardian_read_only =
session.metadata.get("subagent_type").map(String::as_str) == Some("guardian");
if let Some(placement) = self.remote_placements.get(spec.identity.role.as_str()) {
spec.placement = Placement::Remote {
endpoint: placement.endpoint.clone(),
};
spec.secrets.worker_auth_token = placement.token.clone();
} else if let Some(placement) = self.schedulable_placements.get(spec.identity.role.as_str())
{
spec.placement = Placement::Schedulable {
pool: placement.pool.clone(),
};
}
spec
}
fn placement_stamp_for(&self, spec: &ProvisionSpec) -> Option<String> {
let host_label = match &spec.placement {
Placement::Remote { .. } => self
.remote_placements
.get(spec.identity.role.as_str())
.and_then(|p| p.host_label.as_deref()),
Placement::Schedulable { .. } => self
.schedulable_placements
.get(spec.identity.role.as_str())
.and_then(|p| p.host_label.as_deref()),
Placement::Local => None,
};
placement_metadata(&spec.placement, host_label)
}
async fn resolve_schedulable_worker(
&self,
role: &str,
) -> std::result::Result<String, AgentError> {
let pool = self
.schedulable_placements
.get(role)
.ok_or_else(|| {
AgentError::LLM(format!(
"schedulable placement for role '{role}' vanished before scheduling"
))
})?
.pool
.clone();
let bus = self.bus.as_ref().ok_or_else(|| {
AgentError::LLM(format!(
"schedulable role '{role}': no mailbox bus configured (subagents.broker)"
))
})?;
let mut q = bamboo_broker::BrokerClient::connect(
&bus.endpoint,
bamboo_subagent::AgentRef {
session_id: format!("sched-q-{role}"),
role: None,
},
&bus.token,
)
.await
.map_err(|e| AgentError::LLM(format!("schedulable role '{role}': bus connect failed: {e}")))?;
let candidates = q.list_connected(&pool).await.map_err(|e| {
AgentError::LLM(format!("schedulable role '{role}': bus presence query failed: {e}"))
})?;
if candidates.is_empty() {
return Err(AgentError::LLM(format!(
"schedulable role '{role}': no live worker in pool '{pool}' on the bus \
(NOT spawning a local subprocess — a schedulable role has no local fallback)"
)));
}
let idx = {
let mut cursors = self.schedule_cursor.lock().unwrap();
let cursor = cursors.entry(pool.clone()).or_insert(0);
let i = *cursor % candidates.len();
*cursor = cursor.wrapping_add(1);
i
};
Ok(candidates[idx].clone())
}
}
#[async_trait]
impl ExternalChildRunner for ActorChildRunner {
async fn should_handle(&self, session: &Session) -> bool {
session.metadata.get("runtime.kind") == Some(&"external".to_string())
&& session.metadata.get("external.protocol") == Some(&"actor".to_string())
&& session.metadata.get("external.agent_id") == Some(&self.agent_id)
}
fn set_escalation_bridge(&self, bridge: Option<bamboo_subagent::executor::HostBridge>) {
*self.escalation_bridge.lock().unwrap() = bridge;
}
async fn execute_external_child(
&self,
session: &mut Session,
job: &SpawnJob,
event_tx: mpsc::Sender<AgentEvent>,
cancel_token: CancellationToken,
) -> crate::runtime::runner::Result<()> {
let escalation = self.escalation_bridge.lock().unwrap().clone();
let assignment = extract_assignment(session);
let mut spec = self.build_spec(session, job);
spec.reusable = true;
if spec.limits.idle_timeout_secs.is_none() {
spec.limits.idle_timeout_secs = Some(POOLED_IDLE_TIMEOUT_SECS);
}
let pool_key = Self::fingerprint(&spec);
let messages: Vec<serde_json::Value> = session
.messages
.iter()
.filter_map(|m| serde_json::to_value(m).ok())
.collect();
let _slot = self
.concurrency
.acquire()
.await
.map_err(|_| AgentError::LLM("actor concurrency limiter closed".to_string()))?;
let kind = match spec.placement {
Placement::Remote { .. } => PlacementKind::Remote,
Placement::Schedulable { .. } => PlacementKind::Schedulable,
Placement::Local => PlacementKind::Local,
};
let remote = !matches!(kind, PlacementKind::Local);
if let Some(placement_meta) = self.placement_stamp_for(&spec) {
session
.metadata
.insert("placement".to_string(), placement_meta);
}
let mut attempt = 0u8;
let (result, actor) = loop {
let (actor, mut client) = match kind {
PlacementKind::Remote => {
let placement = self
.remote_placements
.get(spec.identity.role.as_str())
.ok_or_else(|| {
AgentError::LLM(format!(
"remote placement for role '{}' vanished before connect",
spec.identity.role
))
})?;
let endpoint = placement.endpoint.clone();
let trust_cfg = match placement.ca_cert_file.as_deref() {
Some(path) => Some(client_config_trusting_cert(path).map_err(|e| {
AgentError::LLM(format!("remote worker CA cert '{}': {e}", path.display()))
})?),
None => None,
};
let client = ChildClient::connect_with_auth_tls(
&endpoint,
placement.token.as_deref(),
trust_cfg,
)
.await
.map_err(|e| {
AgentError::LLM(format!("remote actor connect to '{endpoint}' failed: {e}"))
})?;
let record = AgentRecord {
agent_id: job.child_session_id.clone(),
role: spec.identity.role.clone(),
labels: Vec::new(),
endpoint: endpoint.clone(),
pid: 0,
version: String::new(),
started_at: chrono::Utc::now(),
lease_expires_at: chrono::Utc::now(),
};
let _ = endpoint;
let actor = PooledWorker {
worker: SpawnedChild::remote(record),
mailbox_id: job.child_session_id.clone(),
};
let client: Box<dyn bamboo_subagent::ChildLink> = Box::new(client);
(actor, client)
}
PlacementKind::Schedulable => {
let bus = self.bus.as_ref().ok_or_else(|| {
AgentError::LLM(
"schedulable sub-agents require a mailbox bus (subagents.broker)"
.to_string(),
)
})?;
let mailbox_id = self
.resolve_schedulable_worker(spec.identity.role.as_str())
.await?;
let parent = bamboo_subagent::AgentRef {
session_id: format!("p-{}", job.child_session_id),
role: None,
};
let link = bamboo_broker::BrokerChildLink::connect(
&bus.endpoint,
parent,
&bus.token,
mailbox_id.clone(),
)
.await
.map_err(|e| {
AgentError::LLM(format!("schedulable link connect to '{mailbox_id}' failed: {e}"))
})?;
let actor = PooledWorker {
worker: SpawnedChild::remote(AgentRecord {
agent_id: mailbox_id.clone(),
role: spec.identity.role.clone(),
labels: Vec::new(),
endpoint: bus.endpoint.clone(),
pid: 0,
version: String::new(),
started_at: chrono::Utc::now(),
lease_expires_at: chrono::Utc::now(),
}),
mailbox_id,
};
let client: Box<dyn bamboo_subagent::ChildLink> = Box::new(link);
(actor, client)
}
PlacementKind::Local => {
let bus = self.bus.as_ref().ok_or_else(|| {
AgentError::LLM(
"local sub-agents require a mailbox bus (subagents.broker); none is \
configured and the bus could not be embedded"
.to_string(),
)
})?;
let actor = self.acquire_bus_worker(&pool_key, &spec).await?;
let parent = bamboo_subagent::AgentRef {
session_id: format!("p-{}", job.child_session_id),
role: None,
};
let link = bamboo_broker::BrokerChildLink::connect(
&bus.endpoint,
parent,
&bus.token,
actor.mailbox_id.clone(),
)
.await
.map_err(|e| AgentError::LLM(format!("broker child link connect failed: {e}")))?;
let client: Box<dyn bamboo_subagent::ChildLink> = Box::new(link);
(actor, client)
}
};
if let Err(e) = client
.send(ParentFrame::Run(RunSpec {
assignment: assignment.clone(),
reasoning_effort: None,
messages: messages.clone(),
}))
.await
{
if !remote {
actor.worker.kill().await;
}
return Err(AgentError::LLM(format!("actor run dispatch failed: {e}")));
}
let (live_tx, mut live_rx) = mpsc::unbounded_channel::<ParentFrame>();
let live_guard = super::live::register(&job.child_session_id, live_tx);
let result = drive(
&mut *client,
&job.child_session_id,
self.approval_decider.as_ref(),
escalation.clone(),
&event_tx,
&cancel_token,
&mut live_rx,
Some(WORKER_FIRST_FRAME_TIMEOUT),
)
.await;
drop(live_guard);
drop(client);
if attempt == 0 && matches!(result, Err(AgentError::WorkerUnresponsive(_))) {
match kind {
PlacementKind::Local => {
tracing::warn!(
"actor child {} got no first frame; reaping the worker and respawning once",
job.child_session_id
);
actor.worker.kill().await;
attempt += 1;
continue;
}
PlacementKind::Schedulable => {
tracing::warn!(
"scheduled actor child {} got no first frame; re-selecting a pool worker",
job.child_session_id
);
drop(actor);
attempt += 1;
continue;
}
PlacementKind::Remote => {}
}
}
break (result, actor);
};
if remote {
drop(actor);
} else {
match &result {
Ok(_) => self.release_bus_worker(&pool_key, actor).await,
Err(_) => actor.worker.kill().await,
}
}
match result {
Ok(Some(text)) => {
if !text.is_empty() {
session.add_message(bamboo_agent_core::Message::assistant(text, None));
}
Ok(())
}
Ok(None) => Ok(()),
Err(e) => Err(e),
}
}
}
fn placement_metadata(placement: &Placement, host_label: Option<&str>) -> Option<String> {
let value = match placement {
Placement::Local => return None,
Placement::Remote { endpoint } => serde_json::json!({
"kind": "remote",
"host": host_label.map(str::to_string).unwrap_or_else(|| host_of_endpoint(endpoint)),
}),
Placement::Schedulable { pool } => serde_json::json!({
"kind": "remote",
"host": host_label.unwrap_or(pool),
}),
};
serde_json::to_string(&value).ok()
}
fn host_of_endpoint(endpoint: &str) -> String {
endpoint
.trim()
.trim_start_matches("wss://")
.trim_start_matches("ws://")
.split(['/', ':'])
.next()
.unwrap_or(endpoint)
.to_string()
}
async fn drive(
client: &mut dyn bamboo_subagent::ChildLink,
child_session_id: &str,
approval_decider: Option<&Arc<dyn ChildApprovalDecider>>,
escalation_bridge: Option<bamboo_subagent::executor::HostBridge>,
event_tx: &mpsc::Sender<AgentEvent>,
cancel_token: &CancellationToken,
live_rx: &mut mpsc::UnboundedReceiver<ParentFrame>,
first_frame_timeout: Option<Duration>,
) -> crate::runtime::runner::Result<Option<String>> {
let mut got_first_frame = false;
let mut first_frame_watch = first_frame_timeout.map(|d| Box::pin(tokio::time::sleep(d)));
loop {
tokio::select! {
_ = cancel_token.cancelled() => {
break;
}
_ = async {
match first_frame_watch.as_mut() {
Some(s) => s.as_mut().await,
None => std::future::pending::<()>().await,
}
}, if !got_first_frame => {
return Err(AgentError::WorkerUnresponsive(format!(
"child {child_session_id} produced no frame within {:?}",
first_frame_timeout.unwrap_or_default()
)));
}
Some(frame) = live_rx.recv() => {
if client.send(frame).await.is_err() {
tracing::warn!("live steering frame could not be sent; connection failing");
}
}
frame = client.next_frame() => {
got_first_frame = true;
first_frame_watch = None;
match frame {
Ok(Some(ChildFrame::Event { event })) => {
if let Ok(ev) = serde_json::from_value::<AgentEvent>(event) {
let _ = event_tx.send(ev).await;
}
}
Ok(Some(ChildFrame::ApprovalRequest { id, body })) => {
if let Some(reviewer) = child_approval_reviewer() {
let child = child_session_id.to_string();
let req_id = id.clone();
let body = body.clone();
tokio::spawn(async move {
let approved = tokio::time::timeout(
CHILD_APPROVAL_TIMEOUT,
reviewer.review(&child, &body),
)
.await
.unwrap_or(false);
super::live::deliver_approval(&child, &req_id, approved);
});
} else if approval_decider.is_some() {
let approved =
decide_child_approval(approval_decider, child_session_id, &body)
.await;
if client
.send(ParentFrame::ApprovalReply { id, approved })
.await
.is_err()
{
tracing::warn!(
"failed to answer approval_request; connection failing"
);
}
} else if let Some(host) = escalation_bridge.clone() {
let child = child_session_id.to_string();
let req_id = id.clone();
let body = body.clone();
tokio::spawn(async move {
let approved = match tokio::time::timeout(
CHILD_APPROVAL_TIMEOUT,
host.approval_call(body),
)
.await
{
Ok(Ok(reply)) => reply
.get("approved")
.and_then(|v| v.as_bool())
.unwrap_or(false),
_ => false,
};
super::live::deliver_approval(&child, &req_id, approved);
});
} else {
let (tool_name, permission, resource) =
approval_request_fields(&body);
super::live::register_pending_approval(child_session_id, &id);
let _ = event_tx
.send(AgentEvent::ChildApprovalRequested {
child_session_id: child_session_id.to_string(),
request_id: id.clone(),
tool_name,
permission,
resource,
})
.await;
let child = child_session_id.to_string();
tokio::spawn(async move {
tokio::time::sleep(CHILD_APPROVAL_TIMEOUT).await;
if super::live::take_pending_approval(&child, &id) {
super::live::deliver_approval(&child, &id, false);
}
});
}
}
Ok(Some(ChildFrame::Terminal { status, result, error, .. })) => {
return match status {
TerminalStatus::Completed => Ok(result),
TerminalStatus::Cancelled => Err(AgentError::Cancelled),
TerminalStatus::Error => Err(AgentError::LLM(
error.unwrap_or_else(|| "actor child errored".to_string()),
)),
TerminalStatus::Suspended => Err(AgentError::LLM(
"nested sub-agent suspend received but resume transport is not wired"
.to_string(),
)),
};
}
Ok(None) => {
return Err(AgentError::LLM(
"actor child closed before terminal".to_string(),
));
}
Err(e) => {
return Err(AgentError::LLM(format!("actor transport error: {e}")));
}
}
}
}
}
let _ = client.send(ParentFrame::Cancel).await;
Err(AgentError::Cancelled)
}
fn extract_assignment(session: &Session) -> String {
session
.messages
.iter()
.rev()
.find(|m| matches!(m.role, Role::User))
.map(|m| m.content.clone())
.unwrap_or_else(|| {
session
.metadata
.get("title")
.cloned()
.unwrap_or_else(|| "Execute task".to_string())
})
}
#[cfg(test)]
mod tests {
use super::*;
fn spec_with(
role: &str,
provider: &str,
model: &str,
workspace: Option<&str>,
disabled: Option<Vec<&str>>,
) -> ProvisionSpec {
let mut spec = ProvisionSpec::new(
ChildIdentity {
child_id: "c".into(),
parent_id: None,
project_key: None,
role: role.into(),
depth: 0,
},
ExecutorSpec::Echo,
"/tmp/fab".into(),
);
spec.workspace = workspace.map(|w| w.to_string());
spec.model = Some(ModelRefSpec {
provider: provider.into(),
model: model.into(),
});
spec.disabled_tools = disabled.map(|d| d.into_iter().map(String::from).collect());
spec
}
#[test]
fn fingerprint_matches_interchangeable_children() {
let a = spec_with(
"explorer",
"p",
"m",
Some("/ws"),
Some(vec!["Bash", "Edit"]),
);
let mut b = spec_with(
"explorer",
"p",
"m",
Some("/ws"),
Some(vec!["Edit", "Bash"]),
);
b.identity.child_id = "other".into();
assert_eq!(
ActorChildRunner::fingerprint(&a),
ActorChildRunner::fingerprint(&b)
);
}
#[test]
fn fingerprint_separates_distinct_runtimes() {
let base = spec_with("explorer", "p", "m", Some("/ws"), None);
let base_fp = ActorChildRunner::fingerprint(&base);
assert_ne!(
base_fp,
ActorChildRunner::fingerprint(&spec_with("writer", "p", "m", Some("/ws"), None))
);
assert_ne!(
base_fp,
ActorChildRunner::fingerprint(&spec_with("explorer", "p2", "m", Some("/ws"), None))
);
assert_ne!(
base_fp,
ActorChildRunner::fingerprint(&spec_with("explorer", "p", "m2", Some("/ws"), None))
);
assert_ne!(
base_fp,
ActorChildRunner::fingerprint(&spec_with("explorer", "p", "m", Some("/ws2"), None))
);
assert_ne!(
base_fp,
ActorChildRunner::fingerprint(&spec_with(
"explorer",
"p",
"m",
Some("/ws"),
Some(vec!["Bash"])
))
);
}
#[test]
fn fingerprint_splits_on_baked_capabilities() {
let base_fp =
ActorChildRunner::fingerprint(&spec_with("explorer", "p", "m", Some("/ws"), None));
let mut depth = spec_with("explorer", "p", "m", Some("/ws"), None);
depth.identity.depth = 2;
assert_ne!(
base_fp,
ActorChildRunner::fingerprint(&depth),
"depth must split"
);
let mut nested = spec_with("explorer", "p", "m", Some("/ws"), None);
nested.capabilities.nested_spawn = true;
assert_ne!(
base_fp,
ActorChildRunner::fingerprint(&nested),
"nested_spawn must split"
);
let mut bypass = spec_with("explorer", "p", "m", Some("/ws"), None);
bypass.capabilities.bypass = true;
assert_ne!(
base_fp,
ActorChildRunner::fingerprint(&bypass),
"bypass must split"
);
let mut enforce = spec_with("explorer", "p", "m", Some("/ws"), None);
enforce.capabilities.enforce_permissions = true;
assert_ne!(
base_fp,
ActorChildRunner::fingerprint(&enforce),
"enforce_permissions must split"
);
let mut cap = spec_with("explorer", "p", "m", Some("/ws"), None);
cap.capabilities.max_spawn_depth = Some(8);
assert_ne!(
base_fp,
ActorChildRunner::fingerprint(&cap),
"max_spawn_depth must split"
);
let mut nha = spec_with("explorer", "p", "m", Some("/ws"), None);
nha.capabilities.no_human_approver = true;
assert_ne!(
base_fp,
ActorChildRunner::fingerprint(&nha),
"no_human_approver must split"
);
let mut gro = spec_with("explorer", "p", "m", Some("/ws"), None);
gro.capabilities.guardian_read_only = true;
assert_ne!(
base_fp,
ActorChildRunner::fingerprint(&gro),
"guardian_read_only must split"
);
}
struct StaticDecider(bool);
#[async_trait]
impl ChildApprovalDecider for StaticDecider {
async fn decide(&self, _child: &str, _req: &serde_json::Value) -> bool {
self.0
}
}
struct SilentLink;
#[async_trait]
impl bamboo_subagent::ChildLink for SilentLink {
async fn send(&mut self, _: ParentFrame) -> bamboo_subagent::TransportResult<()> {
Ok(())
}
async fn next_frame(
&mut self,
) -> bamboo_subagent::TransportResult<Option<ChildFrame>> {
std::future::pending().await
}
}
struct InstantTerminalLink {
done: bool,
}
#[async_trait]
impl bamboo_subagent::ChildLink for InstantTerminalLink {
async fn send(&mut self, _: ParentFrame) -> bamboo_subagent::TransportResult<()> {
Ok(())
}
async fn next_frame(
&mut self,
) -> bamboo_subagent::TransportResult<Option<ChildFrame>> {
if self.done {
std::future::pending().await
} else {
self.done = true;
Ok(Some(ChildFrame::Terminal {
status: TerminalStatus::Completed,
result: Some("done".into()),
error: None,
transcript: vec![],
}))
}
}
}
#[tokio::test]
async fn drive_trips_first_frame_watchdog_on_a_silent_worker() {
let (event_tx, _rx) = mpsc::channel::<AgentEvent>(8);
let cancel = CancellationToken::new();
let (_live_tx, mut live_rx) = mpsc::unbounded_channel::<ParentFrame>();
let mut link = SilentLink;
let r = drive(
&mut link,
"child-x",
None,
None,
&event_tx,
&cancel,
&mut live_rx,
Some(Duration::from_millis(100)),
)
.await;
assert!(
matches!(r, Err(AgentError::WorkerUnresponsive(_))),
"a silent worker must trip the first-frame watchdog, got {r:?}"
);
}
#[tokio::test]
async fn drive_does_not_trip_when_a_frame_arrives() {
let (event_tx, _rx) = mpsc::channel::<AgentEvent>(8);
let cancel = CancellationToken::new();
let (_live_tx, mut live_rx) = mpsc::unbounded_channel::<ParentFrame>();
let mut link = InstantTerminalLink { done: false };
let r = drive(
&mut link,
"child-y",
None,
None,
&event_tx,
&cancel,
&mut live_rx,
Some(Duration::from_millis(50)),
)
.await;
assert_eq!(r.ok().flatten().as_deref(), Some("done"));
}
#[tokio::test]
async fn child_approval_fails_closed_without_decider() {
let body = serde_json::json!({"tool_name":"Bash","permission":"run","resource":"rm -rf /"});
assert!(!decide_child_approval(None, "child-1", &body).await);
}
#[tokio::test]
async fn child_approval_honors_wired_decider() {
let body =
serde_json::json!({"tool_name":"Write","permission":"write","resource":"/tmp/x"});
let approve: Arc<dyn ChildApprovalDecider> = Arc::new(StaticDecider(true));
let deny: Arc<dyn ChildApprovalDecider> = Arc::new(StaticDecider(false));
assert!(decide_child_approval(Some(&approve), "child-1", &body).await);
assert!(!decide_child_approval(Some(&deny), "child-1", &body).await);
}
#[test]
fn approval_request_fields_extracts_and_defaults() {
let full = serde_json::json!({"tool_name":"Bash","permission":"run","resource":"ls"});
assert_eq!(
approval_request_fields(&full),
("Bash".to_string(), "run".to_string(), "ls".to_string())
);
let partial = serde_json::json!({"tool_name":"Write"});
assert_eq!(
approval_request_fields(&partial),
("Write".to_string(), String::new(), String::new())
);
}
use crate::runtime::execution::SpawnJob;
use bamboo_agent_core::Session;
fn bogus_runner(placements: HashMap<String, ResolvedRemotePlacement>) -> ActorChildRunner {
ActorChildRunner::new(
"test-actor".into(),
PathBuf::from("/bin/false"),
vec![],
std::env::temp_dir().join("bamboo-test-fab-193"),
ExecutorSpec::Echo,
vec![],
"anthropic".into(),
4,
)
.with_remote_placements(placements)
}
fn session_of_role(role: &str, assignment: &str) -> Session {
let mut s = Session::new("child-1", "test-model");
s.metadata
.insert("subagent_type".to_string(), role.to_string());
s.add_message(bamboo_agent_core::Message::user(assignment));
s
}
fn job_for(child: &str) -> SpawnJob {
SpawnJob {
parent_session_id: "parent-1".into(),
child_session_id: child.into(),
model: String::new(),
disabled_tools: None,
}
}
#[test]
fn build_spec_sets_remote_placement_for_matching_role() {
let mut placements = HashMap::new();
placements.insert(
"explorer".to_string(),
ResolvedRemotePlacement {
endpoint: "wss://gpu-host:8443".into(),
token: Some("T-secret".into()),
ca_cert_file: None,
host_label: None,
},
);
let runner = bogus_runner(placements);
let s = session_of_role("explorer", "do the thing");
let spec = runner.build_spec(&s, &job_for("child-1"));
match &spec.placement {
Placement::Remote { endpoint } => assert_eq!(endpoint, "wss://gpu-host:8443"),
other => panic!("expected Remote, got {other:?}"),
}
assert_eq!(spec.secrets.worker_auth_token.as_deref(), Some("T-secret"));
}
#[test]
fn build_spec_leaves_local_for_unmatched_role() {
let mut placements = HashMap::new();
placements.insert(
"explorer".to_string(),
ResolvedRemotePlacement {
endpoint: "wss://gpu-host:8443".into(),
token: Some("T".into()),
ca_cert_file: None,
host_label: None,
},
);
let runner = bogus_runner(placements);
let s = session_of_role("writer", "do the thing");
let spec = runner.build_spec(&s, &job_for("child-1"));
assert_eq!(spec.placement, Placement::Local);
assert!(spec.secrets.worker_auth_token.is_none());
}
#[test]
fn build_spec_local_when_no_placements() {
let runner = bogus_runner(HashMap::new());
let s = session_of_role("explorer", "do the thing");
let spec = runner.build_spec(&s, &job_for("child-1"));
assert_eq!(spec.placement, Placement::Local);
assert!(spec.secrets.worker_auth_token.is_none());
}
#[test]
fn placement_metadata_stamps_remote_and_schedulable_not_local() {
assert_eq!(placement_metadata(&Placement::Local, None), None);
let r = placement_metadata(
&Placement::Remote {
endpoint: "wss://10.0.0.5:8443/stream".into(),
},
None,
)
.unwrap();
assert!(r.contains(r#""kind":"remote""#), "{r}");
assert!(r.contains(r#""host":"10.0.0.5""#), "{r}");
let labeled = placement_metadata(
&Placement::Remote {
endpoint: "ws://169.254.230.101:8899".into(),
},
Some("mini"),
)
.unwrap();
assert!(labeled.contains(r#""host":"mini""#), "{labeled}");
let s =
placement_metadata(&Placement::Schedulable { pool: "explorers".into() }, Some("mini"))
.unwrap();
assert!(s.contains(r#""kind":"remote""#), "{s}");
assert!(s.contains(r#""host":"mini""#), "{s}");
let p: bamboo_storage::SessionPlacement = serde_json::from_str(&labeled).unwrap();
assert_eq!(p.kind, "remote");
assert_eq!(p.host, "mini");
}
#[tokio::test]
async fn execute_external_child_routes_role_to_remote_worker_without_spawning() {
let token = "remote-test-token";
let server = bamboo_subagent::transport::WsServer::bind_with_token(
(std::net::Ipv4Addr::LOCALHOST, 0).into(),
Some(token.to_string()),
)
.await
.expect("bind resident worker");
let endpoint = server.ws_endpoint(); let srv = tokio::spawn(async move {
let _ = server
.serve(Arc::new(bamboo_subagent::executor::EchoExecutor))
.await;
});
let mut placements = HashMap::new();
placements.insert(
"explorer".to_string(),
ResolvedRemotePlacement {
endpoint: endpoint.clone(),
token: Some(token.to_string()),
ca_cert_file: None,
host_label: Some("mini-e2e".into()), },
);
let runner = bogus_runner(placements);
let mut session = session_of_role("explorer", "hello remote");
let job = job_for("child-1");
let (event_tx, mut event_rx) = mpsc::channel::<AgentEvent>(64);
let cancel = CancellationToken::new();
let result = tokio::time::timeout(
Duration::from_secs(10),
runner.execute_external_child(&mut session, &job, event_tx, cancel),
)
.await
.expect("run did not hang")
.expect("remote run succeeded (connected to resident worker, did not spawn)");
let _ = result;
let last = session
.messages
.iter()
.rev()
.find(|m| matches!(m.role, Role::Assistant))
.expect("an assistant reply was written back");
assert!(
last.content.contains("echo:"),
"expected echo reply, got {:?}",
last.content
);
let placement = session
.metadata
.get("placement")
.expect("remote child session stamped with a placement");
assert!(placement.contains(r#""kind":"remote""#), "{placement}");
assert!(placement.contains(r#""host":"mini-e2e""#), "{placement}");
let mut saw_event = false;
while let Ok(Some(_ev)) =
tokio::time::timeout(Duration::from_millis(50), event_rx.recv()).await
{
saw_event = true;
}
let _ = saw_event;
srv.abort();
}
fn bogus_sched_runner(
remote: HashMap<String, ResolvedRemotePlacement>,
sched: HashMap<String, ResolvedSchedulablePlacement>,
) -> ActorChildRunner {
ActorChildRunner::new(
"test-actor".into(),
PathBuf::from("/bin/false"),
vec![],
std::env::temp_dir().join("bamboo-test-fab-181"),
ExecutorSpec::Echo,
vec![],
"anthropic".into(),
4,
)
.with_remote_placements(remote)
.with_schedulable_placements(sched)
}
fn sched_placement(pool: &str, _registry_url: impl Into<String>) -> ResolvedSchedulablePlacement {
ResolvedSchedulablePlacement { pool: pool.into(), host_label: None }
}
#[test]
fn build_spec_sets_schedulable_placement_for_matching_role() {
let mut sched = HashMap::new();
sched.insert(
"explorer".to_string(),
sched_placement("gpu-pool", "unused"),
);
let runner = bogus_sched_runner(HashMap::new(), sched);
let s = session_of_role("explorer", "do the thing");
let spec = runner.build_spec(&s, &job_for("child-1"));
match &spec.placement {
Placement::Schedulable { pool } => assert_eq!(pool, "gpu-pool"),
other => panic!("expected Schedulable, got {other:?}"),
}
assert!(spec.secrets.worker_auth_token.is_none());
}
#[test]
fn build_spec_remote_wins_when_role_in_both_maps() {
let mut remote = HashMap::new();
remote.insert(
"explorer".to_string(),
ResolvedRemotePlacement {
endpoint: "wss://fixed-host:8443".into(),
token: Some("T-remote".into()),
ca_cert_file: None,
host_label: None,
},
);
let mut sched = HashMap::new();
sched.insert(
"explorer".to_string(),
sched_placement("gpu-pool", "https://control-plane:9562"),
);
let runner = bogus_sched_runner(remote, sched);
let s = session_of_role("explorer", "do the thing");
let spec = runner.build_spec(&s, &job_for("child-1"));
match &spec.placement {
Placement::Remote { endpoint } => assert_eq!(endpoint, "wss://fixed-host:8443"),
other => panic!("expected Remote (precedence), got {other:?}"),
}
assert_eq!(spec.secrets.worker_auth_token.as_deref(), Some("T-remote"));
}
#[test]
fn build_spec_local_for_unmatched_schedulable_role() {
let mut sched = HashMap::new();
sched.insert(
"explorer".to_string(),
sched_placement("gpu-pool", "https://control-plane:9562"),
);
let runner = bogus_sched_runner(HashMap::new(), sched);
let s = session_of_role("writer", "do the thing");
let spec = runner.build_spec(&s, &job_for("child-1"));
assert_eq!(spec.placement, Placement::Local);
assert!(spec.secrets.worker_auth_token.is_none());
}
#[test]
fn placement_stamp_uses_node_label_for_remote_and_schedulable() {
let mut remote = HashMap::new();
remote.insert(
"explorer".to_string(),
ResolvedRemotePlacement {
endpoint: "ws://169.254.230.101:8899".into(),
token: None,
ca_cert_file: None,
host_label: Some("mini".into()),
},
);
let runner = bogus_runner(remote);
let spec = runner.build_spec(&session_of_role("explorer", "go"), &job_for("c1"));
let stamp = runner.placement_stamp_for(&spec).expect("remote child is stamped");
assert!(stamp.contains(r#""kind":"remote""#), "{stamp}");
assert!(stamp.contains(r#""host":"mini""#), "{stamp}");
let mut remote_nolabel = HashMap::new();
remote_nolabel.insert(
"explorer".to_string(),
ResolvedRemotePlacement {
endpoint: "ws://169.254.230.101:8899".into(),
token: None,
ca_cert_file: None,
host_label: None,
},
);
let r2 = bogus_runner(remote_nolabel);
let spec2 = r2.build_spec(&session_of_role("explorer", "go"), &job_for("c1"));
assert!(
r2.placement_stamp_for(&spec2)
.unwrap()
.contains(r#""host":"169.254.230.101""#)
);
let mut sched = HashMap::new();
sched.insert(
"mac-mini-monitor".to_string(),
ResolvedSchedulablePlacement {
pool: "mac-mini-monitor".into(),
host_label: Some("mini".into()),
},
);
let sr = bogus_sched_runner(HashMap::new(), sched);
let spec3 = sr.build_spec(&session_of_role("mac-mini-monitor", "go"), &job_for("c1"));
let stamp3 = sr.placement_stamp_for(&spec3).expect("scheduled child is stamped");
assert!(stamp3.contains(r#""kind":"remote""#), "{stamp3}");
assert!(stamp3.contains(r#""host":"mini""#), "{stamp3}");
let local = bogus_runner(HashMap::new());
let spec4 = local.build_spec(&session_of_role("writer", "go"), &job_for("c1"));
assert_eq!(local.placement_stamp_for(&spec4), None);
}
async fn start_bus() -> (String, tempfile::TempDir) {
let dir = tempfile::tempdir().unwrap();
let core = std::sync::Arc::new(bamboo_broker::BrokerCore::new(dir.path()));
let server = std::sync::Arc::new(bamboo_broker::BrokerServer::new(core, "t"));
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
let _ = server.serve(listener).await;
});
(format!("ws://{addr}"), dir)
}
async fn join_pool(endpoint: &str, id: &str, pool: &str) -> bamboo_broker::BrokerClient {
let mut c = bamboo_broker::BrokerClient::connect(
endpoint,
bamboo_subagent::AgentRef {
session_id: id.into(),
role: Some(pool.into()),
},
"t",
)
.await
.unwrap();
c.subscribe().await.unwrap();
c
}
fn sched_runner_on_bus(endpoint: &str, child_role: &str, pool: &str) -> ActorChildRunner {
let mut sched = HashMap::new();
sched.insert(child_role.to_string(), sched_placement(pool, "unused"));
bogus_sched_runner(HashMap::new(), sched).with_bus(Some(bamboo_subagent::BusEndpoint {
endpoint: endpoint.into(),
token: "t".into(),
}))
}
#[tokio::test]
async fn resolve_schedulable_picks_a_live_bus_worker() {
let (endpoint, _dir) = start_bus().await;
let _w = join_pool(&endpoint, "w-gpu", "gpu-pool").await;
let runner = sched_runner_on_bus(&endpoint, "explorer", "gpu-pool");
let mailbox = runner
.resolve_schedulable_worker("explorer")
.await
.expect("a live pool worker is found on the bus");
assert_eq!(mailbox, "w-gpu");
}
#[tokio::test]
async fn resolve_schedulable_round_robins_over_pool_workers() {
let (endpoint, _dir) = start_bus().await;
let _a = join_pool(&endpoint, "w-a", "gpu-pool").await;
let _b = join_pool(&endpoint, "w-b", "gpu-pool").await;
let runner = sched_runner_on_bus(&endpoint, "explorer", "gpu-pool");
let mut picked = std::collections::HashSet::new();
for _ in 0..6 {
picked.insert(runner.resolve_schedulable_worker("explorer").await.unwrap());
}
assert_eq!(
picked,
["w-a".to_string(), "w-b".to_string()].into_iter().collect(),
"round-robin must cover every connected pool worker"
);
}
#[tokio::test]
async fn resolve_schedulable_errors_on_empty_pool() {
let (endpoint, _dir) = start_bus().await;
let runner = sched_runner_on_bus(&endpoint, "explorer", "gpu-pool");
let err = runner
.resolve_schedulable_worker("explorer")
.await
.expect_err("an empty pool is terminal — no local fallback")
.to_string();
assert!(err.contains("no live worker in pool"), "got: {err}");
assert!(err.contains("NOT spawning"), "got: {err}");
}
#[tokio::test]
async fn execute_external_child_runs_schedulable_over_bus_and_stamps_node_label() {
let (endpoint, _dir) = start_bus().await;
let ep = endpoint.clone();
let worker = tokio::spawn(async move {
let _ = bamboo_broker::serve_executor(
&ep,
bamboo_subagent::AgentRef {
session_id: "mmm-worker".into(),
role: Some("mac-mini-monitor".into()),
},
"t",
std::sync::Arc::new(bamboo_subagent::executor::EchoExecutor),
)
.await;
});
let mut probe = bamboo_broker::BrokerClient::connect(
&endpoint,
bamboo_subagent::AgentRef { session_id: "probe".into(), role: None },
"t",
)
.await
.unwrap();
let mut ready = false;
for _ in 0..100 {
if probe
.list_connected("mac-mini-monitor")
.await
.unwrap()
.iter()
.any(|id| id == "mmm-worker")
{
ready = true;
break;
}
tokio::time::sleep(Duration::from_millis(30)).await;
}
assert!(ready, "worker never joined the pool");
let mut sched = HashMap::new();
sched.insert(
"mac-mini-monitor".to_string(),
ResolvedSchedulablePlacement {
pool: "mac-mini-monitor".into(),
host_label: Some("mini".into()),
},
);
let runner = bogus_sched_runner(HashMap::new(), sched).with_bus(Some(
bamboo_subagent::BusEndpoint { endpoint: endpoint.clone(), token: "t".into() },
));
let mut session = session_of_role("mac-mini-monitor", "hello scheduled");
let job = job_for("child-1");
let (event_tx, _rx) = mpsc::channel::<AgentEvent>(64);
let cancel = CancellationToken::new();
tokio::time::timeout(
Duration::from_secs(10),
runner.execute_external_child(&mut session, &job, event_tx, cancel),
)
.await
.expect("run did not hang")
.expect("schedulable run succeeded over the bus (no local spawn)");
let last = session
.messages
.iter()
.rev()
.find(|m| matches!(m.role, Role::Assistant))
.expect("an assistant reply was written back");
assert!(last.content.contains("echo:"), "got {:?}", last.content);
let placement = session
.metadata
.get("placement")
.expect("scheduled child session stamped with a placement");
assert!(placement.contains(r#""kind":"remote""#), "{placement}");
assert!(placement.contains(r#""host":"mini""#), "{placement}");
worker.abort();
}
}