use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use bamboo_agent_core::{AgentError, AgentEvent, Role, Session};
use tokio::sync::{mpsc, Mutex};
use tokio_util::sync::CancellationToken;
use bamboo_subagent::discovery::{Discovery, Fabric};
use bamboo_subagent::fleet::{spawn_worker, SpawnedChild};
use bamboo_subagent::proto::{AgentRecord, ChildFrame, ParentFrame, RunSpec, TerminalStatus};
use bamboo_subagent::provision::{
ChildIdentity, ExecutorSpec, ModelRefSpec, Placement, ProvisionSpec, ScopedCredential,
};
use bamboo_subagent::transport::{client_config_trusting_cert, ChildClient};
use crate::runtime::execution::{ExternalChildRunner, SpawnJob};
pub const DEFAULT_MAX_CONCURRENT_ACTORS: usize = 8;
pub const MAX_SPAWN_DEPTH: u32 = 4;
const DEFAULT_MAX_IDLE_PER_KEY: usize = 4;
const POOLED_IDLE_TIMEOUT_SECS: u64 = 300;
struct PooledActor {
worker: SpawnedChild,
endpoint: String,
agent_id: String,
}
#[derive(Debug, Clone)]
pub struct ResolvedRemotePlacement {
pub endpoint: String,
pub token: Option<String>,
pub ca_cert_file: Option<PathBuf>,
}
#[derive(Debug, Clone)]
pub struct ResolvedSchedulablePlacement {
pub pool: String,
pub registry_url: String,
pub token: Option<String>,
pub ca_cert_file: Option<PathBuf>,
}
enum PlacementKind {
Local,
Remote,
Schedulable,
}
pub struct ActorChildRunner {
agent_id: String,
worker_bin: PathBuf,
worker_args: Vec<String>,
fabric_dir: PathBuf,
executor: ExecutorSpec,
credentials: Vec<ScopedCredential>,
default_provider: String,
concurrency: std::sync::Arc<tokio::sync::Semaphore>,
spawn_timeout: Duration,
pool: Arc<Mutex<HashMap<String, Vec<PooledActor>>>>,
max_idle_per_key: usize,
approval_decider: Option<Arc<dyn ChildApprovalDecider>>,
escalation_bridge: Arc<std::sync::Mutex<Option<bamboo_subagent::executor::HostBridge>>>,
remote_placements: HashMap<String, ResolvedRemotePlacement>,
schedulable_placements: HashMap<String, ResolvedSchedulablePlacement>,
schedule_cursor: Arc<std::sync::Mutex<HashMap<String, usize>>>,
fabric_cache: Arc<
std::sync::Mutex<HashMap<(String, Option<String>), Arc<bamboo_subagent::RegistryFabric>>>,
>,
}
#[async_trait]
pub trait ChildApprovalDecider: Send + Sync {
async fn decide(&self, child_session_id: &str, request: &serde_json::Value) -> bool;
}
async fn decide_child_approval(
decider: Option<&Arc<dyn ChildApprovalDecider>>,
child_session_id: &str,
request: &serde_json::Value,
) -> bool {
match decider {
Some(decider) => decider.decide(child_session_id, request).await,
None => false,
}
}
const CHILD_APPROVAL_TIMEOUT: Duration = Duration::from_secs(300);
fn approval_request_fields(body: &serde_json::Value) -> (String, String, String) {
let field = |k: &str| {
body.get(k)
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string()
};
(field("tool_name"), field("permission"), field("resource"))
}
#[async_trait]
pub trait ChildApprovalReviewer: Send + Sync {
async fn review(&self, child_session_id: &str, request: &serde_json::Value) -> bool;
}
fn child_approval_reviewer_slot() -> &'static std::sync::OnceLock<Arc<dyn ChildApprovalReviewer>> {
static SLOT: std::sync::OnceLock<Arc<dyn ChildApprovalReviewer>> = std::sync::OnceLock::new();
&SLOT
}
pub fn set_child_approval_reviewer(reviewer: Arc<dyn ChildApprovalReviewer>) {
let _ = child_approval_reviewer_slot().set(reviewer);
}
pub fn child_approval_reviewer() -> Option<Arc<dyn ChildApprovalReviewer>> {
child_approval_reviewer_slot().get().cloned()
}
impl ActorChildRunner {
#[allow(clippy::too_many_arguments)]
pub fn new(
agent_id: String,
worker_bin: PathBuf,
worker_args: Vec<String>,
fabric_dir: PathBuf,
executor: ExecutorSpec,
credentials: Vec<ScopedCredential>,
default_provider: String,
max_concurrent: usize,
) -> Self {
Self {
agent_id,
worker_bin,
worker_args,
fabric_dir,
executor,
credentials,
default_provider,
concurrency: std::sync::Arc::new(tokio::sync::Semaphore::new(max_concurrent.max(1))),
spawn_timeout: Duration::from_secs(30),
pool: Arc::new(Mutex::new(HashMap::new())),
max_idle_per_key: DEFAULT_MAX_IDLE_PER_KEY,
approval_decider: None,
escalation_bridge: Arc::new(std::sync::Mutex::new(None)),
remote_placements: HashMap::new(),
schedulable_placements: HashMap::new(),
schedule_cursor: Arc::new(std::sync::Mutex::new(HashMap::new())),
fabric_cache: Arc::new(std::sync::Mutex::new(HashMap::new())),
}
}
pub fn with_approval_decider(mut self, decider: Arc<dyn ChildApprovalDecider>) -> Self {
self.approval_decider = Some(decider);
self
}
pub fn with_remote_placements(
mut self,
placements: HashMap<String, ResolvedRemotePlacement>,
) -> Self {
self.remote_placements = placements;
self
}
pub fn with_schedulable_placements(
mut self,
placements: HashMap<String, ResolvedSchedulablePlacement>,
) -> Self {
self.schedulable_placements = placements;
self
}
fn fingerprint(spec: &ProvisionSpec) -> String {
let role = spec.identity.role.as_str();
let (provider, model) = spec
.model
.as_ref()
.map(|m| (m.provider.as_str(), m.model.as_str()))
.unwrap_or(("", ""));
let workspace = spec.workspace.as_deref().unwrap_or("");
let mut tools = spec.disabled_tools.clone().unwrap_or_default();
tools.sort();
let caps = &spec.capabilities;
format!(
"{role}\u{1}{provider}\u{1}{model}\u{1}{workspace}\u{1}{}\u{1}d={}\u{1}ns={}\u{1}by={}\u{1}ep={}\u{1}md={}\u{1}nha={}\u{1}gro={}",
tools.join(","),
spec.identity.depth,
caps.nested_spawn,
caps.bypass,
caps.enforce_permissions,
caps.max_spawn_depth.unwrap_or(0),
caps.no_human_approver,
caps.guardian_read_only,
)
}
async fn acquire_worker(
&self,
key: &str,
spec: &ProvisionSpec,
) -> crate::runtime::runner::Result<PooledActor> {
loop {
let candidate = {
let mut pool = self.pool.lock().await;
pool.get_mut(key).and_then(|bucket| bucket.pop())
};
let Some(candidate) = candidate else { break };
let alive = Fabric::at(&self.fabric_dir)
.resolve(&candidate.agent_id)
.await
.ok()
.flatten()
.is_some();
if alive {
return Ok(candidate);
}
candidate.worker.kill().await;
}
let spawned = spawn_worker(
&self.worker_bin,
&self.worker_args,
spec,
self.spawn_timeout,
)
.await
.map_err(|e| AgentError::LLM(format!("actor spawn/register failed: {e}")))?;
let endpoint = spawned.record.endpoint.clone();
let agent_id = spawned.record.agent_id.clone();
Ok(PooledActor {
worker: spawned,
endpoint,
agent_id,
})
}
async fn release_worker(&self, key: &str, actor: PooledActor) {
let mut pool = self.pool.lock().await;
let bucket = pool.entry(key.to_string()).or_default();
if bucket.len() >= self.max_idle_per_key {
drop(pool);
self.retire_worker(actor).await;
return;
}
bucket.push(actor);
}
async fn retire_worker(&self, actor: PooledActor) {
let agent_id = actor.agent_id.clone();
actor.worker.kill().await;
let _ = Fabric::at(&self.fabric_dir).withdraw(&agent_id).await;
}
fn build_spec(&self, session: &Session, job: &SpawnJob) -> ProvisionSpec {
let mut spec = ProvisionSpec::new(
ChildIdentity {
child_id: job.child_session_id.clone(),
parent_id: Some(job.parent_session_id.clone()),
project_key: None,
role: session
.metadata
.get("subagent_type")
.cloned()
.unwrap_or_else(|| "worker".to_string()),
depth: session.spawn_depth,
},
self.executor.clone(),
self.fabric_dir.to_string_lossy().into_owned(),
);
spec.workspace = session.workspace.clone();
spec.model = session
.model_ref
.as_ref()
.map(|r| ModelRefSpec {
provider: r.provider.clone(),
model: r.model.clone(),
})
.or_else(|| {
let m = job.model.trim();
(!m.is_empty()).then(|| ModelRefSpec {
provider: self.default_provider.clone(),
model: m.to_string(),
})
});
spec.disabled_tools = job.disabled_tools.clone();
let provider = spec
.model
.as_ref()
.map(|m| m.provider.as_str())
.filter(|p| !p.trim().is_empty())
.unwrap_or(&self.default_provider);
if let Some(cred) = self.credentials.iter().find(|c| c.provider == provider) {
spec.secrets.provider_credentials.push(cred.clone());
} else {
tracing::warn!(
"actor child {}: no credential found for provider '{}'",
job.child_session_id,
provider
);
}
spec.capabilities.nested_spawn = session.spawn_depth < MAX_SPAWN_DEPTH;
spec.capabilities.max_spawn_depth = Some(MAX_SPAWN_DEPTH);
spec.capabilities.enforce_permissions = true;
spec.capabilities.bypass = session
.agent_runtime_state
.as_ref()
.is_some_and(|s| s.bypass_permissions);
spec.capabilities.no_human_approver = session
.agent_runtime_state
.as_ref()
.is_some_and(|s| s.no_human_approver);
spec.capabilities.guardian_read_only =
session.metadata.get("subagent_type").map(String::as_str) == Some("guardian");
if let Some(placement) = self.remote_placements.get(spec.identity.role.as_str()) {
spec.placement = Placement::Remote {
endpoint: placement.endpoint.clone(),
};
spec.secrets.worker_auth_token = placement.token.clone();
} else if let Some(placement) = self.schedulable_placements.get(spec.identity.role.as_str())
{
spec.placement = Placement::Schedulable {
pool: placement.pool.clone(),
};
spec.secrets.worker_auth_token = placement.token.clone();
}
spec
}
const MAX_SCHEDULE_CONNECT_ATTEMPTS: usize = 3;
fn fabric_for(
&self,
placement: &ResolvedSchedulablePlacement,
role: &str,
) -> std::result::Result<Arc<bamboo_subagent::RegistryFabric>, AgentError> {
let key = (placement.registry_url.clone(), placement.token.clone());
if let Some(fabric) = self.fabric_cache.lock().unwrap().get(&key).cloned() {
return Ok(fabric);
}
let built = match placement.token.as_deref() {
Some(token) => {
bamboo_subagent::RegistryFabric::with_token(placement.registry_url.clone(), token)
}
None => bamboo_subagent::RegistryFabric::new(placement.registry_url.clone()),
}
.map_err(|e| {
AgentError::LLM(format!(
"schedulable role '{role}': registry client for '{}' failed: {e}",
placement.registry_url
))
})?;
let arc = Arc::new(built);
let mut cache = self.fabric_cache.lock().unwrap();
Ok(cache.entry(key).or_insert(arc).clone())
}
async fn resolve_schedulable_worker(
&self,
role: &str,
) -> std::result::Result<(ChildClient, AgentRecord, ResolvedSchedulablePlacement), AgentError>
{
let placement = self
.schedulable_placements
.get(role)
.ok_or_else(|| {
AgentError::LLM(format!(
"schedulable placement for role '{role}' vanished before scheduling"
))
})?
.clone();
let fabric = self.fabric_for(&placement, role)?;
let records = fabric.discover().await.map_err(|e| {
AgentError::LLM(format!(
"schedulable role '{role}': registry '{}' query failed: {e}",
placement.registry_url
))
})?;
let candidates: Vec<AgentRecord> = records
.into_iter()
.filter(|r| r.role == placement.pool)
.collect();
if candidates.is_empty() {
return Err(AgentError::LLM(format!(
"schedulable role '{role}': no live worker in pool '{}' at registry '{}' \
(NOT spawning a local subprocess — a schedulable role has no local fallback)",
placement.pool, placement.registry_url
)));
}
let start = {
let mut cursors = self.schedule_cursor.lock().unwrap();
let cursor = cursors.entry(placement.pool.clone()).or_insert(0);
let i = *cursor % candidates.len();
*cursor = cursor.wrapping_add(1);
i
};
let trust_cfg = match placement.ca_cert_file.as_deref() {
Some(path) => Some(client_config_trusting_cert(path).map_err(|e| {
AgentError::LLM(format!(
"scheduled worker CA cert '{}': {e}",
path.display()
))
})?),
None => None,
};
let max_attempts = candidates.len().min(Self::MAX_SCHEDULE_CONNECT_ATTEMPTS);
let mut last_err: Option<String> = None;
for attempt in 0..max_attempts {
let idx = (start + attempt) % candidates.len();
let record = &candidates[idx];
let endpoint = record.endpoint.clone();
match ChildClient::connect_with_auth_tls(
&endpoint,
placement.token.as_deref(),
trust_cfg.clone(),
)
.await
{
Ok(client) => {
if attempt > 0 {
tracing::info!(
"schedulable role '{role}': connected to pool '{}' worker after \
{attempt} stale candidate(s) skipped",
placement.pool
);
}
return Ok((client, candidates[idx].clone(), placement));
}
Err(e) => {
tracing::warn!(
"schedulable role '{role}': pool '{}' candidate connect failed \
(attempt {}/{max_attempts}): {e}",
placement.pool,
attempt + 1
);
last_err = Some(e.to_string());
}
}
}
Err(AgentError::LLM(format!(
"schedulable role '{role}': all {max_attempts} live candidate(s) in pool '{}' at \
registry '{}' failed to connect (last error: {}) — NOT spawning a local subprocess",
placement.pool,
placement.registry_url,
last_err.as_deref().unwrap_or("unknown")
)))
}
}
#[async_trait]
impl ExternalChildRunner for ActorChildRunner {
async fn should_handle(&self, session: &Session) -> bool {
session.metadata.get("runtime.kind") == Some(&"external".to_string())
&& session.metadata.get("external.protocol") == Some(&"actor".to_string())
&& session.metadata.get("external.agent_id") == Some(&self.agent_id)
}
fn set_escalation_bridge(&self, bridge: Option<bamboo_subagent::executor::HostBridge>) {
*self.escalation_bridge.lock().unwrap() = bridge;
}
async fn execute_external_child(
&self,
session: &mut Session,
job: &SpawnJob,
event_tx: mpsc::Sender<AgentEvent>,
cancel_token: CancellationToken,
) -> crate::runtime::runner::Result<()> {
let escalation = self.escalation_bridge.lock().unwrap().clone();
let assignment = extract_assignment(session);
let mut spec = self.build_spec(session, job);
spec.reusable = true;
if spec.limits.idle_timeout_secs.is_none() {
spec.limits.idle_timeout_secs = Some(POOLED_IDLE_TIMEOUT_SECS);
}
let pool_key = Self::fingerprint(&spec);
let messages: Vec<serde_json::Value> = session
.messages
.iter()
.filter_map(|m| serde_json::to_value(m).ok())
.collect();
let _slot = self
.concurrency
.acquire()
.await
.map_err(|_| AgentError::LLM("actor concurrency limiter closed".to_string()))?;
let kind = match spec.placement {
Placement::Remote { .. } => PlacementKind::Remote,
Placement::Schedulable { .. } => PlacementKind::Schedulable,
Placement::Local => PlacementKind::Local,
};
let remote = !matches!(kind, PlacementKind::Local);
let (actor, mut client) = match kind {
PlacementKind::Remote => {
let placement = self
.remote_placements
.get(spec.identity.role.as_str())
.ok_or_else(|| {
AgentError::LLM(format!(
"remote placement for role '{}' vanished before connect",
spec.identity.role
))
})?;
let endpoint = placement.endpoint.clone();
let trust_cfg = match placement.ca_cert_file.as_deref() {
Some(path) => Some(client_config_trusting_cert(path).map_err(|e| {
AgentError::LLM(format!("remote worker CA cert '{}': {e}", path.display()))
})?),
None => None,
};
let client = ChildClient::connect_with_auth_tls(
&endpoint,
placement.token.as_deref(),
trust_cfg,
)
.await
.map_err(|e| {
AgentError::LLM(format!("remote actor connect to '{endpoint}' failed: {e}"))
})?;
let record = AgentRecord {
agent_id: job.child_session_id.clone(),
role: spec.identity.role.clone(),
labels: Vec::new(),
endpoint: endpoint.clone(),
pid: 0,
version: String::new(),
started_at: chrono::Utc::now(),
lease_expires_at: chrono::Utc::now(),
};
let actor = PooledActor {
worker: SpawnedChild::remote(record),
endpoint,
agent_id: job.child_session_id.clone(),
};
(actor, client)
}
PlacementKind::Schedulable => {
let (client, record, _placement) = self
.resolve_schedulable_worker(spec.identity.role.as_str())
.await?;
let endpoint = record.endpoint.clone();
let actor = PooledActor {
worker: SpawnedChild::remote(record),
endpoint,
agent_id: job.child_session_id.clone(),
};
(actor, client)
}
PlacementKind::Local => {
let mut actor = self.acquire_worker(&pool_key, &spec).await?;
let client = match ChildClient::connect(&actor.endpoint).await {
Ok(client) => client,
Err(e) => {
self.retire_worker(actor).await;
let spawned = spawn_worker(
&self.worker_bin,
&self.worker_args,
&spec,
self.spawn_timeout,
)
.await
.map_err(|e2| {
AgentError::LLM(format!("actor respawn after reuse miss ({e}): {e2}"))
})?;
let endpoint = spawned.record.endpoint.clone();
let agent_id = spawned.record.agent_id.clone();
let client = ChildClient::connect(&endpoint)
.await
.map_err(|e2| AgentError::LLM(format!("actor connect failed: {e2}")))?;
actor = PooledActor {
worker: spawned,
endpoint,
agent_id,
};
client
}
};
(actor, client)
}
};
client
.send(ParentFrame::Run(RunSpec {
assignment,
reasoning_effort: None,
messages,
}))
.await
.map_err(|e| AgentError::LLM(format!("actor run dispatch failed: {e}")))?;
let (live_tx, mut live_rx) = mpsc::unbounded_channel::<ParentFrame>();
let live_guard = super::live::register(&job.child_session_id, live_tx);
let result = drive(
&mut client,
&job.child_session_id,
self.approval_decider.as_ref(),
escalation,
&event_tx,
&cancel_token,
&mut live_rx,
)
.await;
drop(live_guard);
let _ = client.close().await;
if remote {
drop(actor);
} else {
match &result {
Ok(_) => self.release_worker(&pool_key, actor).await,
Err(_) => self.retire_worker(actor).await,
}
}
match result {
Ok(Some(text)) => {
if !text.is_empty() {
session.add_message(bamboo_agent_core::Message::assistant(text, None));
}
Ok(())
}
Ok(None) => Ok(()),
Err(e) => Err(e),
}
}
}
async fn drive(
client: &mut ChildClient,
child_session_id: &str,
approval_decider: Option<&Arc<dyn ChildApprovalDecider>>,
escalation_bridge: Option<bamboo_subagent::executor::HostBridge>,
event_tx: &mpsc::Sender<AgentEvent>,
cancel_token: &CancellationToken,
live_rx: &mut mpsc::UnboundedReceiver<ParentFrame>,
) -> crate::runtime::runner::Result<Option<String>> {
loop {
tokio::select! {
_ = cancel_token.cancelled() => {
break;
}
Some(frame) = live_rx.recv() => {
if client.send(frame).await.is_err() {
tracing::warn!("live steering frame could not be sent; connection failing");
}
}
frame = client.next_frame() => {
match frame {
Ok(Some(ChildFrame::Event { event })) => {
if let Ok(ev) = serde_json::from_value::<AgentEvent>(event) {
let _ = event_tx.send(ev).await;
}
}
Ok(Some(ChildFrame::ApprovalRequest { id, body })) => {
if let Some(reviewer) = child_approval_reviewer() {
let child = child_session_id.to_string();
let req_id = id.clone();
let body = body.clone();
tokio::spawn(async move {
let approved = tokio::time::timeout(
CHILD_APPROVAL_TIMEOUT,
reviewer.review(&child, &body),
)
.await
.unwrap_or(false);
super::live::deliver_approval(&child, &req_id, approved);
});
} else if approval_decider.is_some() {
let approved =
decide_child_approval(approval_decider, child_session_id, &body)
.await;
if client
.send(ParentFrame::ApprovalReply { id, approved })
.await
.is_err()
{
tracing::warn!(
"failed to answer approval_request; connection failing"
);
}
} else if let Some(host) = escalation_bridge.clone() {
let child = child_session_id.to_string();
let req_id = id.clone();
let body = body.clone();
tokio::spawn(async move {
let approved = match tokio::time::timeout(
CHILD_APPROVAL_TIMEOUT,
host.approval_call(body),
)
.await
{
Ok(Ok(reply)) => reply
.get("approved")
.and_then(|v| v.as_bool())
.unwrap_or(false),
_ => false,
};
super::live::deliver_approval(&child, &req_id, approved);
});
} else {
let (tool_name, permission, resource) =
approval_request_fields(&body);
super::live::register_pending_approval(child_session_id, &id);
let _ = event_tx
.send(AgentEvent::ChildApprovalRequested {
child_session_id: child_session_id.to_string(),
request_id: id.clone(),
tool_name,
permission,
resource,
})
.await;
let child = child_session_id.to_string();
tokio::spawn(async move {
tokio::time::sleep(CHILD_APPROVAL_TIMEOUT).await;
if super::live::take_pending_approval(&child, &id) {
super::live::deliver_approval(&child, &id, false);
}
});
}
}
Ok(Some(ChildFrame::Terminal { status, result, error, .. })) => {
return match status {
TerminalStatus::Completed => Ok(result),
TerminalStatus::Cancelled => Err(AgentError::Cancelled),
TerminalStatus::Error => Err(AgentError::LLM(
error.unwrap_or_else(|| "actor child errored".to_string()),
)),
TerminalStatus::Suspended => Err(AgentError::LLM(
"nested sub-agent suspend received but resume transport is not wired"
.to_string(),
)),
};
}
Ok(None) => {
return Err(AgentError::LLM(
"actor child closed before terminal".to_string(),
));
}
Err(e) => {
return Err(AgentError::LLM(format!("actor transport error: {e}")));
}
}
}
}
}
let _ = client.send(ParentFrame::Cancel).await;
Err(AgentError::Cancelled)
}
fn extract_assignment(session: &Session) -> String {
session
.messages
.iter()
.rev()
.find(|m| matches!(m.role, Role::User))
.map(|m| m.content.clone())
.unwrap_or_else(|| {
session
.metadata
.get("title")
.cloned()
.unwrap_or_else(|| "Execute task".to_string())
})
}
#[cfg(test)]
mod tests {
use super::*;
fn spec_with(
role: &str,
provider: &str,
model: &str,
workspace: Option<&str>,
disabled: Option<Vec<&str>>,
) -> ProvisionSpec {
let mut spec = ProvisionSpec::new(
ChildIdentity {
child_id: "c".into(),
parent_id: None,
project_key: None,
role: role.into(),
depth: 0,
},
ExecutorSpec::Echo,
"/tmp/fab".into(),
);
spec.workspace = workspace.map(|w| w.to_string());
spec.model = Some(ModelRefSpec {
provider: provider.into(),
model: model.into(),
});
spec.disabled_tools = disabled.map(|d| d.into_iter().map(String::from).collect());
spec
}
#[test]
fn fingerprint_matches_interchangeable_children() {
let a = spec_with(
"explorer",
"p",
"m",
Some("/ws"),
Some(vec!["Bash", "Edit"]),
);
let mut b = spec_with(
"explorer",
"p",
"m",
Some("/ws"),
Some(vec!["Edit", "Bash"]),
);
b.identity.child_id = "other".into();
assert_eq!(
ActorChildRunner::fingerprint(&a),
ActorChildRunner::fingerprint(&b)
);
}
#[test]
fn fingerprint_separates_distinct_runtimes() {
let base = spec_with("explorer", "p", "m", Some("/ws"), None);
let base_fp = ActorChildRunner::fingerprint(&base);
assert_ne!(
base_fp,
ActorChildRunner::fingerprint(&spec_with("writer", "p", "m", Some("/ws"), None))
);
assert_ne!(
base_fp,
ActorChildRunner::fingerprint(&spec_with("explorer", "p2", "m", Some("/ws"), None))
);
assert_ne!(
base_fp,
ActorChildRunner::fingerprint(&spec_with("explorer", "p", "m2", Some("/ws"), None))
);
assert_ne!(
base_fp,
ActorChildRunner::fingerprint(&spec_with("explorer", "p", "m", Some("/ws2"), None))
);
assert_ne!(
base_fp,
ActorChildRunner::fingerprint(&spec_with(
"explorer",
"p",
"m",
Some("/ws"),
Some(vec!["Bash"])
))
);
}
#[test]
fn fingerprint_splits_on_baked_capabilities() {
let base_fp =
ActorChildRunner::fingerprint(&spec_with("explorer", "p", "m", Some("/ws"), None));
let mut depth = spec_with("explorer", "p", "m", Some("/ws"), None);
depth.identity.depth = 2;
assert_ne!(
base_fp,
ActorChildRunner::fingerprint(&depth),
"depth must split"
);
let mut nested = spec_with("explorer", "p", "m", Some("/ws"), None);
nested.capabilities.nested_spawn = true;
assert_ne!(
base_fp,
ActorChildRunner::fingerprint(&nested),
"nested_spawn must split"
);
let mut bypass = spec_with("explorer", "p", "m", Some("/ws"), None);
bypass.capabilities.bypass = true;
assert_ne!(
base_fp,
ActorChildRunner::fingerprint(&bypass),
"bypass must split"
);
let mut enforce = spec_with("explorer", "p", "m", Some("/ws"), None);
enforce.capabilities.enforce_permissions = true;
assert_ne!(
base_fp,
ActorChildRunner::fingerprint(&enforce),
"enforce_permissions must split"
);
let mut cap = spec_with("explorer", "p", "m", Some("/ws"), None);
cap.capabilities.max_spawn_depth = Some(8);
assert_ne!(
base_fp,
ActorChildRunner::fingerprint(&cap),
"max_spawn_depth must split"
);
let mut nha = spec_with("explorer", "p", "m", Some("/ws"), None);
nha.capabilities.no_human_approver = true;
assert_ne!(
base_fp,
ActorChildRunner::fingerprint(&nha),
"no_human_approver must split"
);
let mut gro = spec_with("explorer", "p", "m", Some("/ws"), None);
gro.capabilities.guardian_read_only = true;
assert_ne!(
base_fp,
ActorChildRunner::fingerprint(&gro),
"guardian_read_only must split"
);
}
struct StaticDecider(bool);
#[async_trait]
impl ChildApprovalDecider for StaticDecider {
async fn decide(&self, _child: &str, _req: &serde_json::Value) -> bool {
self.0
}
}
#[tokio::test]
async fn child_approval_fails_closed_without_decider() {
let body = serde_json::json!({"tool_name":"Bash","permission":"run","resource":"rm -rf /"});
assert!(!decide_child_approval(None, "child-1", &body).await);
}
#[tokio::test]
async fn child_approval_honors_wired_decider() {
let body =
serde_json::json!({"tool_name":"Write","permission":"write","resource":"/tmp/x"});
let approve: Arc<dyn ChildApprovalDecider> = Arc::new(StaticDecider(true));
let deny: Arc<dyn ChildApprovalDecider> = Arc::new(StaticDecider(false));
assert!(decide_child_approval(Some(&approve), "child-1", &body).await);
assert!(!decide_child_approval(Some(&deny), "child-1", &body).await);
}
#[test]
fn approval_request_fields_extracts_and_defaults() {
let full = serde_json::json!({"tool_name":"Bash","permission":"run","resource":"ls"});
assert_eq!(
approval_request_fields(&full),
("Bash".to_string(), "run".to_string(), "ls".to_string())
);
let partial = serde_json::json!({"tool_name":"Write"});
assert_eq!(
approval_request_fields(&partial),
("Write".to_string(), String::new(), String::new())
);
}
use crate::runtime::execution::SpawnJob;
use bamboo_agent_core::Session;
fn bogus_runner(placements: HashMap<String, ResolvedRemotePlacement>) -> ActorChildRunner {
ActorChildRunner::new(
"test-actor".into(),
PathBuf::from("/bin/false"),
vec![],
std::env::temp_dir().join("bamboo-test-fab-193"),
ExecutorSpec::Echo,
vec![],
"anthropic".into(),
4,
)
.with_remote_placements(placements)
}
fn session_of_role(role: &str, assignment: &str) -> Session {
let mut s = Session::new("child-1", "test-model");
s.metadata
.insert("subagent_type".to_string(), role.to_string());
s.add_message(bamboo_agent_core::Message::user(assignment));
s
}
fn job_for(child: &str) -> SpawnJob {
SpawnJob {
parent_session_id: "parent-1".into(),
child_session_id: child.into(),
model: String::new(),
disabled_tools: None,
}
}
#[test]
fn build_spec_sets_remote_placement_for_matching_role() {
let mut placements = HashMap::new();
placements.insert(
"explorer".to_string(),
ResolvedRemotePlacement {
endpoint: "wss://gpu-host:8443".into(),
token: Some("T-secret".into()),
ca_cert_file: None,
},
);
let runner = bogus_runner(placements);
let s = session_of_role("explorer", "do the thing");
let spec = runner.build_spec(&s, &job_for("child-1"));
match &spec.placement {
Placement::Remote { endpoint } => assert_eq!(endpoint, "wss://gpu-host:8443"),
other => panic!("expected Remote, got {other:?}"),
}
assert_eq!(spec.secrets.worker_auth_token.as_deref(), Some("T-secret"));
}
#[test]
fn build_spec_leaves_local_for_unmatched_role() {
let mut placements = HashMap::new();
placements.insert(
"explorer".to_string(),
ResolvedRemotePlacement {
endpoint: "wss://gpu-host:8443".into(),
token: Some("T".into()),
ca_cert_file: None,
},
);
let runner = bogus_runner(placements);
let s = session_of_role("writer", "do the thing");
let spec = runner.build_spec(&s, &job_for("child-1"));
assert_eq!(spec.placement, Placement::Local);
assert!(spec.secrets.worker_auth_token.is_none());
}
#[test]
fn build_spec_local_when_no_placements() {
let runner = bogus_runner(HashMap::new());
let s = session_of_role("explorer", "do the thing");
let spec = runner.build_spec(&s, &job_for("child-1"));
assert_eq!(spec.placement, Placement::Local);
assert!(spec.secrets.worker_auth_token.is_none());
}
#[tokio::test]
async fn execute_external_child_routes_role_to_remote_worker_without_spawning() {
let token = "remote-test-token";
let server = bamboo_subagent::transport::WsServer::bind_with_token(
(std::net::Ipv4Addr::LOCALHOST, 0).into(),
Some(token.to_string()),
)
.await
.expect("bind resident worker");
let endpoint = server.ws_endpoint(); let srv = tokio::spawn(async move {
let _ = server
.serve(Arc::new(bamboo_subagent::executor::EchoExecutor))
.await;
});
let mut placements = HashMap::new();
placements.insert(
"explorer".to_string(),
ResolvedRemotePlacement {
endpoint: endpoint.clone(),
token: Some(token.to_string()),
ca_cert_file: None, },
);
let runner = bogus_runner(placements);
let mut session = session_of_role("explorer", "hello remote");
let job = job_for("child-1");
let (event_tx, mut event_rx) = mpsc::channel::<AgentEvent>(64);
let cancel = CancellationToken::new();
let result = tokio::time::timeout(
Duration::from_secs(10),
runner.execute_external_child(&mut session, &job, event_tx, cancel),
)
.await
.expect("run did not hang")
.expect("remote run succeeded (connected to resident worker, did not spawn)");
let _ = result;
let last = session
.messages
.iter()
.rev()
.find(|m| matches!(m.role, Role::Assistant))
.expect("an assistant reply was written back");
assert!(
last.content.contains("echo:"),
"expected echo reply, got {:?}",
last.content
);
let mut saw_event = false;
while let Ok(Some(_ev)) =
tokio::time::timeout(Duration::from_millis(50), event_rx.recv()).await
{
saw_event = true;
}
let _ = saw_event;
srv.abort();
}
use wiremock::matchers::{method as wm_method, path as wm_path};
use wiremock::{Mock, MockServer, ResponseTemplate};
fn bogus_sched_runner(
remote: HashMap<String, ResolvedRemotePlacement>,
sched: HashMap<String, ResolvedSchedulablePlacement>,
) -> ActorChildRunner {
ActorChildRunner::new(
"test-actor".into(),
PathBuf::from("/bin/false"),
vec![],
std::env::temp_dir().join("bamboo-test-fab-181"),
ExecutorSpec::Echo,
vec![],
"anthropic".into(),
4,
)
.with_remote_placements(remote)
.with_schedulable_placements(sched)
}
fn sched_placement(
pool: &str,
registry_url: impl Into<String>,
) -> ResolvedSchedulablePlacement {
ResolvedSchedulablePlacement {
pool: pool.into(),
registry_url: registry_url.into(),
token: None,
ca_cert_file: None,
}
}
fn live_record(agent_id: &str, role: &str, endpoint: &str) -> AgentRecord {
AgentRecord {
agent_id: agent_id.into(),
role: role.into(),
labels: Vec::new(),
endpoint: endpoint.into(),
pid: 0,
version: String::new(),
started_at: chrono::Utc::now(),
lease_expires_at: chrono::Utc::now() + chrono::Duration::seconds(60),
}
}
#[test]
fn build_spec_sets_schedulable_placement_for_matching_role() {
let mut sched = HashMap::new();
sched.insert("explorer".to_string(), {
let mut p = sched_placement("gpu-pool", "https://control-plane:9562");
p.token = Some("T-sched".into());
p
});
let runner = bogus_sched_runner(HashMap::new(), sched);
let s = session_of_role("explorer", "do the thing");
let spec = runner.build_spec(&s, &job_for("child-1"));
match &spec.placement {
Placement::Schedulable { pool } => assert_eq!(pool, "gpu-pool"),
other => panic!("expected Schedulable, got {other:?}"),
}
assert_eq!(spec.secrets.worker_auth_token.as_deref(), Some("T-sched"));
}
#[test]
fn build_spec_remote_wins_when_role_in_both_maps() {
let mut remote = HashMap::new();
remote.insert(
"explorer".to_string(),
ResolvedRemotePlacement {
endpoint: "wss://fixed-host:8443".into(),
token: Some("T-remote".into()),
ca_cert_file: None,
},
);
let mut sched = HashMap::new();
sched.insert(
"explorer".to_string(),
sched_placement("gpu-pool", "https://control-plane:9562"),
);
let runner = bogus_sched_runner(remote, sched);
let s = session_of_role("explorer", "do the thing");
let spec = runner.build_spec(&s, &job_for("child-1"));
match &spec.placement {
Placement::Remote { endpoint } => assert_eq!(endpoint, "wss://fixed-host:8443"),
other => panic!("expected Remote (precedence), got {other:?}"),
}
assert_eq!(spec.secrets.worker_auth_token.as_deref(), Some("T-remote"));
}
#[test]
fn build_spec_local_for_unmatched_schedulable_role() {
let mut sched = HashMap::new();
sched.insert(
"explorer".to_string(),
sched_placement("gpu-pool", "https://control-plane:9562"),
);
let runner = bogus_sched_runner(HashMap::new(), sched);
let s = session_of_role("writer", "do the thing");
let spec = runner.build_spec(&s, &job_for("child-1"));
assert_eq!(spec.placement, Placement::Local);
assert!(spec.secrets.worker_auth_token.is_none());
}
async fn spawn_echo_workers(n: usize) -> (Vec<String>, Vec<tokio::task::JoinHandle<()>>) {
let mut endpoints = Vec::new();
let mut handles = Vec::new();
for _ in 0..n {
let server = bamboo_subagent::transport::WsServer::bind_loopback()
.await
.expect("bind echo worker");
endpoints.push(server.ws_endpoint());
handles.push(tokio::spawn(async move {
let _ = server
.serve(Arc::new(bamboo_subagent::executor::EchoExecutor))
.await;
}));
}
(endpoints, handles)
}
#[tokio::test]
async fn resolve_schedulable_worker_round_robin_spreads_over_candidates() {
let (eps, handles) = spawn_echo_workers(3).await;
let registry = MockServer::start().await;
let recs = vec![
live_record("w-0", "gpu-pool", &eps[0]),
live_record("w-1", "gpu-pool", &eps[1]),
live_record("w-2", "gpu-pool", &eps[2]),
live_record("other", "cpu-pool", "ws://127.0.0.1:9"),
];
Mock::given(wm_method("GET"))
.and(wm_path("/v1/agents"))
.respond_with(ResponseTemplate::new(200).set_body_json(recs))
.mount(®istry)
.await;
let mut sched = HashMap::new();
sched.insert(
"explorer".to_string(),
sched_placement("gpu-pool", registry.uri()),
);
let runner = bogus_sched_runner(HashMap::new(), sched);
let mut picked = Vec::new();
for _ in 0..3 {
let (client, rec, placement) = match runner.resolve_schedulable_worker("explorer").await
{
Ok(v) => v,
Err(e) => panic!("a live worker is picked: {e}"),
};
assert_eq!(placement.pool, "gpu-pool");
assert_eq!(rec.role, "gpu-pool", "only pool workers are candidates");
picked.push(rec.agent_id);
let _ = client.close().await;
}
picked.sort();
assert_eq!(
picked,
vec!["w-0".to_string(), "w-1".to_string(), "w-2".to_string()],
"round-robin covered every candidate over three picks"
);
for h in handles {
h.abort();
}
}
#[tokio::test]
async fn resolve_schedulable_worker_fails_over_dead_first_candidate() {
let (eps, handles) = spawn_echo_workers(1).await;
let dead = {
let l = tokio::net::TcpListener::bind((std::net::Ipv4Addr::LOCALHOST, 0))
.await
.unwrap();
let port = l.local_addr().unwrap().port();
drop(l);
format!("ws://127.0.0.1:{port}")
};
let registry = MockServer::start().await;
let recs = vec![
live_record("w-dead", "gpu-pool", &dead),
live_record("w-live", "gpu-pool", &eps[0]),
];
Mock::given(wm_method("GET"))
.and(wm_path("/v1/agents"))
.respond_with(ResponseTemplate::new(200).set_body_json(recs))
.mount(®istry)
.await;
let mut sched = HashMap::new();
sched.insert(
"explorer".to_string(),
sched_placement("gpu-pool", registry.uri()),
);
let runner = bogus_sched_runner(HashMap::new(), sched);
let (client, rec, _placement) = match runner.resolve_schedulable_worker("explorer").await {
Ok(v) => v,
Err(e) => panic!("failover skips the dead candidate, connects to the live one: {e}"),
};
assert_eq!(
rec.agent_id, "w-live",
"the dead first candidate was skipped; the live one was chosen"
);
let _ = client.close().await;
for h in handles {
h.abort();
}
}
#[tokio::test]
async fn resolve_schedulable_worker_errors_when_all_candidates_dead() {
let mut dead = Vec::new();
for _ in 0..2 {
let l = tokio::net::TcpListener::bind((std::net::Ipv4Addr::LOCALHOST, 0))
.await
.unwrap();
let port = l.local_addr().unwrap().port();
drop(l);
dead.push(format!("ws://127.0.0.1:{port}"));
}
let registry = MockServer::start().await;
let recs = vec![
live_record("d-0", "gpu-pool", &dead[0]),
live_record("d-1", "gpu-pool", &dead[1]),
];
Mock::given(wm_method("GET"))
.and(wm_path("/v1/agents"))
.respond_with(ResponseTemplate::new(200).set_body_json(recs))
.mount(®istry)
.await;
let mut sched = HashMap::new();
sched.insert(
"explorer".to_string(),
sched_placement("gpu-pool", registry.uri()),
);
let runner = bogus_sched_runner(HashMap::new(), sched);
let msg = match runner.resolve_schedulable_worker("explorer").await {
Ok(_) => panic!("all candidates dead must error, not connect"),
Err(e) => e.to_string(),
};
assert!(
msg.contains("failed to connect"),
"names the connect failure: {msg}"
);
assert!(msg.contains("gpu-pool"), "names the pool: {msg}");
assert!(
msg.contains("NOT spawning"),
"confirms no local-subprocess fallback: {msg}"
);
}
#[tokio::test]
async fn fabric_cache_reuses_one_fabric_per_registry_url() {
let (eps, handles) = spawn_echo_workers(1).await;
let registry = MockServer::start().await;
Mock::given(wm_method("GET"))
.and(wm_path("/v1/agents"))
.respond_with(
ResponseTemplate::new(200)
.set_body_json(vec![live_record("w-0", "gpu-pool", &eps[0])]),
)
.mount(®istry)
.await;
let mut sched = HashMap::new();
sched.insert(
"explorer".to_string(),
sched_placement("gpu-pool", registry.uri()),
);
let runner = bogus_sched_runner(HashMap::new(), sched);
for _ in 0..3 {
let (client, _rec, _p) = match runner.resolve_schedulable_worker("explorer").await {
Ok(v) => v,
Err(e) => panic!("resolve succeeds: {e}"),
};
let _ = client.close().await;
}
let cache = runner.fabric_cache.lock().unwrap();
assert_eq!(
cache.len(),
1,
"three resolves to the same (registry_url, token) reused ONE cached fabric"
);
assert!(cache.contains_key(&(registry.uri(), None)));
drop(cache);
for h in handles {
h.abort();
}
}
#[test]
fn fabric_cache_keys_on_registry_url_and_token() {
let runner = bogus_sched_runner(HashMap::new(), HashMap::new());
let url = "http://127.0.0.1:9/".to_string();
let mut p = sched_placement("pool", url.clone());
p.token = Some("token-a".into());
let fa = runner.fabric_for(&p, "role-a").expect("build a");
p.token = Some("token-b".into());
let fb = runner.fabric_for(&p, "role-b").expect("build b");
assert!(
!Arc::ptr_eq(&fa, &fb),
"different tokens must not share a fabric"
);
assert_eq!(runner.fabric_cache.lock().unwrap().len(), 2);
p.token = Some("token-a".into());
let fa2 = runner.fabric_for(&p, "role-a").expect("reuse a");
assert!(Arc::ptr_eq(&fa, &fa2), "same (url, token) must reuse");
assert_eq!(runner.fabric_cache.lock().unwrap().len(), 2);
}
#[tokio::test]
async fn resolve_schedulable_worker_errors_with_no_live_worker() {
let registry = MockServer::start().await;
Mock::given(wm_method("GET"))
.and(wm_path("/v1/agents"))
.respond_with(ResponseTemplate::new(200).set_body_json(vec![live_record(
"x",
"cpu-pool",
"ws://127.0.0.1:9",
)]))
.mount(®istry)
.await;
let mut sched = HashMap::new();
sched.insert(
"explorer".to_string(),
sched_placement("gpu-pool", registry.uri()),
);
let runner = bogus_sched_runner(HashMap::new(), sched);
let msg = match runner.resolve_schedulable_worker("explorer").await {
Ok(_) => panic!("no live worker in pool must error, not connect"),
Err(e) => e.to_string(),
};
assert!(msg.contains("no live worker"), "clear error: {msg}");
assert!(msg.contains("gpu-pool"), "names the pool: {msg}");
}
#[tokio::test]
async fn execute_external_child_schedules_role_from_registry_without_spawning() {
let server = bamboo_subagent::transport::WsServer::bind_loopback()
.await
.expect("bind resident worker");
let endpoint = server.ws_endpoint(); let srv = tokio::spawn(async move {
let _ = server
.serve(Arc::new(bamboo_subagent::executor::EchoExecutor))
.await;
});
let registry = MockServer::start().await;
Mock::given(wm_method("GET"))
.and(wm_path("/v1/agents"))
.respond_with(ResponseTemplate::new(200).set_body_json(vec![live_record(
"live-explorer",
"gpu-pool",
&endpoint,
)]))
.mount(®istry)
.await;
let mut sched = HashMap::new();
sched.insert(
"explorer".to_string(),
sched_placement("gpu-pool", registry.uri()),
);
let runner = bogus_sched_runner(HashMap::new(), sched);
let mut session = session_of_role("explorer", "hello scheduled");
let job = job_for("child-1");
let (event_tx, mut event_rx) = mpsc::channel::<AgentEvent>(64);
let cancel = CancellationToken::new();
tokio::time::timeout(
Duration::from_secs(10),
runner.execute_external_child(&mut session, &job, event_tx, cancel),
)
.await
.expect("run did not hang")
.expect("scheduled run succeeded (resolved from registry, did not spawn)");
let last = session
.messages
.iter()
.rev()
.find(|m| matches!(m.role, Role::Assistant))
.expect("an assistant reply was written back");
assert!(
last.content.contains("echo:"),
"expected echo reply, got {:?}",
last.content
);
let mut saw_event = false;
while let Ok(Some(_ev)) =
tokio::time::timeout(Duration::from_millis(50), event_rx.recv()).await
{
saw_event = true;
}
let _ = saw_event;
srv.abort();
}
}