use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use bamboo_agent_core::{AgentError, AgentEvent, Role, Session};
use tokio::sync::{mpsc, Mutex};
use tokio_util::sync::CancellationToken;
use bamboo_subagent::discovery::Fabric;
use bamboo_subagent::fleet::{spawn_worker, SpawnedChild};
use bamboo_subagent::proto::{ChildFrame, ParentFrame, RunSpec, TerminalStatus};
use bamboo_subagent::provision::{
ChildIdentity, ExecutorSpec, ModelRefSpec, ProvisionSpec, ScopedCredential,
};
use bamboo_subagent::transport::ChildClient;
use crate::runtime::execution::{ExternalChildRunner, SpawnJob};
pub const DEFAULT_MAX_CONCURRENT_ACTORS: usize = 8;
pub const MAX_SPAWN_DEPTH: u32 = 4;
const DEFAULT_MAX_IDLE_PER_KEY: usize = 4;
const POOLED_IDLE_TIMEOUT_SECS: u64 = 300;
struct PooledActor {
worker: SpawnedChild,
endpoint: String,
agent_id: String,
}
pub struct ActorChildRunner {
agent_id: String,
worker_bin: PathBuf,
worker_args: Vec<String>,
fabric_dir: PathBuf,
executor: ExecutorSpec,
credentials: Vec<ScopedCredential>,
default_provider: String,
concurrency: std::sync::Arc<tokio::sync::Semaphore>,
spawn_timeout: Duration,
pool: Arc<Mutex<HashMap<String, Vec<PooledActor>>>>,
max_idle_per_key: usize,
approval_decider: Option<Arc<dyn ChildApprovalDecider>>,
nested_spawn_handler: Option<Arc<dyn NestedSpawnHandler>>,
}
#[async_trait]
pub trait ChildApprovalDecider: Send + Sync {
async fn decide(&self, child_session_id: &str, request: &serde_json::Value) -> bool;
}
async fn decide_child_approval(
decider: Option<&Arc<dyn ChildApprovalDecider>>,
child_session_id: &str,
request: &serde_json::Value,
) -> bool {
match decider {
Some(decider) => decider.decide(child_session_id, request).await,
None => false,
}
}
const CHILD_APPROVAL_TIMEOUT: Duration = Duration::from_secs(300);
fn approval_request_fields(body: &serde_json::Value) -> (String, String, String) {
let field = |k: &str| {
body.get(k)
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string()
};
(field("tool_name"), field("permission"), field("resource"))
}
#[async_trait]
pub trait NestedSpawnHandler: Send + Sync {
async fn spawn_nested(
&self,
child_session_id: &str,
request: serde_json::Value,
) -> Result<serde_json::Value, String>;
}
fn nested_spawn_unavailable(reason: &str) -> serde_json::Value {
serde_json::json!({
"success": false,
"result": reason,
"display_preference": null,
})
}
async fn fulfil_nested_spawn(
handler: Option<&Arc<dyn NestedSpawnHandler>>,
child_session_id: &str,
request: serde_json::Value,
) -> serde_json::Value {
match handler {
Some(handler) => match handler.spawn_nested(child_session_id, request).await {
Ok(result) => result,
Err(e) => nested_spawn_unavailable(&format!("nested sub-agent spawn failed: {e}")),
},
None => nested_spawn_unavailable("nested sub-agent spawn is not available in this build"),
}
}
fn nested_spawn_handler_slot() -> &'static std::sync::OnceLock<Arc<dyn NestedSpawnHandler>> {
static SLOT: std::sync::OnceLock<Arc<dyn NestedSpawnHandler>> = std::sync::OnceLock::new();
&SLOT
}
pub fn set_nested_spawn_handler(handler: Arc<dyn NestedSpawnHandler>) {
let _ = nested_spawn_handler_slot().set(handler);
}
pub fn nested_spawn_handler() -> Option<Arc<dyn NestedSpawnHandler>> {
nested_spawn_handler_slot().get().cloned()
}
#[async_trait]
pub trait ChildApprovalReviewer: Send + Sync {
async fn review(&self, child_session_id: &str, request: &serde_json::Value) -> bool;
}
fn child_approval_reviewer_slot() -> &'static std::sync::OnceLock<Arc<dyn ChildApprovalReviewer>> {
static SLOT: std::sync::OnceLock<Arc<dyn ChildApprovalReviewer>> = std::sync::OnceLock::new();
&SLOT
}
pub fn set_child_approval_reviewer(reviewer: Arc<dyn ChildApprovalReviewer>) {
let _ = child_approval_reviewer_slot().set(reviewer);
}
pub fn child_approval_reviewer() -> Option<Arc<dyn ChildApprovalReviewer>> {
child_approval_reviewer_slot().get().cloned()
}
fn escalation_bridge_slot(
) -> &'static std::sync::Mutex<Option<bamboo_subagent::executor::HostBridge>> {
static SLOT: std::sync::Mutex<Option<bamboo_subagent::executor::HostBridge>> =
std::sync::Mutex::new(None);
&SLOT
}
pub fn set_escalation_host_bridge(bridge: Option<bamboo_subagent::executor::HostBridge>) {
*escalation_bridge_slot().lock().unwrap() = bridge;
}
fn escalation_host_bridge() -> Option<bamboo_subagent::executor::HostBridge> {
escalation_bridge_slot().lock().unwrap().clone()
}
impl ActorChildRunner {
#[allow(clippy::too_many_arguments)]
pub fn new(
agent_id: String,
worker_bin: PathBuf,
worker_args: Vec<String>,
fabric_dir: PathBuf,
executor: ExecutorSpec,
credentials: Vec<ScopedCredential>,
default_provider: String,
max_concurrent: usize,
) -> Self {
Self {
agent_id,
worker_bin,
worker_args,
fabric_dir,
executor,
credentials,
default_provider,
concurrency: std::sync::Arc::new(tokio::sync::Semaphore::new(max_concurrent.max(1))),
spawn_timeout: Duration::from_secs(30),
pool: Arc::new(Mutex::new(HashMap::new())),
max_idle_per_key: DEFAULT_MAX_IDLE_PER_KEY,
approval_decider: None,
nested_spawn_handler: None,
}
}
pub fn with_approval_decider(mut self, decider: Arc<dyn ChildApprovalDecider>) -> Self {
self.approval_decider = Some(decider);
self
}
pub fn with_nested_spawn_handler(mut self, handler: Arc<dyn NestedSpawnHandler>) -> Self {
self.nested_spawn_handler = Some(handler);
self
}
fn fingerprint(spec: &ProvisionSpec) -> String {
let role = spec.identity.role.as_str();
let (provider, model) = spec
.model
.as_ref()
.map(|m| (m.provider.as_str(), m.model.as_str()))
.unwrap_or(("", ""));
let workspace = spec.workspace.as_deref().unwrap_or("");
let mut tools = spec.disabled_tools.clone().unwrap_or_default();
tools.sort();
let caps = &spec.capabilities;
format!(
"{role}\u{1}{provider}\u{1}{model}\u{1}{workspace}\u{1}{}\u{1}d={}\u{1}ns={}\u{1}by={}\u{1}ep={}\u{1}md={}",
tools.join(","),
spec.identity.depth,
caps.nested_spawn,
caps.bypass,
caps.enforce_permissions,
caps.max_spawn_depth.unwrap_or(0),
)
}
async fn acquire_worker(
&self,
key: &str,
spec: &ProvisionSpec,
) -> crate::runtime::runner::Result<PooledActor> {
loop {
let candidate = {
let mut pool = self.pool.lock().await;
pool.get_mut(key).and_then(|bucket| bucket.pop())
};
let Some(candidate) = candidate else { break };
let alive = Fabric::at(&self.fabric_dir)
.resolve(&candidate.agent_id)
.await
.ok()
.flatten()
.is_some();
if alive {
return Ok(candidate);
}
candidate.worker.kill().await;
}
let spawned = spawn_worker(
&self.worker_bin,
&self.worker_args,
spec,
self.spawn_timeout,
)
.await
.map_err(|e| AgentError::LLM(format!("actor spawn/register failed: {e}")))?;
let endpoint = spawned.record.endpoint.clone();
let agent_id = spawned.record.agent_id.clone();
Ok(PooledActor {
worker: spawned,
endpoint,
agent_id,
})
}
async fn release_worker(&self, key: &str, actor: PooledActor) {
let mut pool = self.pool.lock().await;
let bucket = pool.entry(key.to_string()).or_default();
if bucket.len() >= self.max_idle_per_key {
drop(pool);
self.retire_worker(actor).await;
return;
}
bucket.push(actor);
}
async fn retire_worker(&self, actor: PooledActor) {
let agent_id = actor.agent_id.clone();
actor.worker.kill().await;
let _ = Fabric::at(&self.fabric_dir).withdraw(&agent_id).await;
}
fn build_spec(&self, session: &Session, job: &SpawnJob) -> ProvisionSpec {
let mut spec = ProvisionSpec::new(
ChildIdentity {
child_id: job.child_session_id.clone(),
parent_id: Some(job.parent_session_id.clone()),
project_key: None,
role: session
.metadata
.get("subagent_type")
.cloned()
.unwrap_or_else(|| "worker".to_string()),
depth: session.spawn_depth,
},
self.executor.clone(),
self.fabric_dir.to_string_lossy().into_owned(),
);
spec.workspace = session.workspace.clone();
spec.model = session
.model_ref
.as_ref()
.map(|r| ModelRefSpec {
provider: r.provider.clone(),
model: r.model.clone(),
})
.or_else(|| {
let m = job.model.trim();
(!m.is_empty()).then(|| ModelRefSpec {
provider: self.default_provider.clone(),
model: m.to_string(),
})
});
spec.disabled_tools = job.disabled_tools.clone();
let provider = spec
.model
.as_ref()
.map(|m| m.provider.as_str())
.filter(|p| !p.trim().is_empty())
.unwrap_or(&self.default_provider);
if let Some(cred) = self.credentials.iter().find(|c| c.provider == provider) {
spec.secrets.provider_credentials.push(cred.clone());
} else {
tracing::warn!(
"actor child {}: no credential found for provider '{}'",
job.child_session_id,
provider
);
}
spec.capabilities.nested_spawn = session.spawn_depth < MAX_SPAWN_DEPTH;
spec.capabilities.max_spawn_depth = Some(MAX_SPAWN_DEPTH);
spec.capabilities.bypass = session
.agent_runtime_state
.as_ref()
.is_some_and(|s| s.bypass_permissions);
spec
}
}
#[async_trait]
impl ExternalChildRunner for ActorChildRunner {
async fn should_handle(&self, session: &Session) -> bool {
session.metadata.get("runtime.kind") == Some(&"external".to_string())
&& session.metadata.get("external.protocol") == Some(&"actor".to_string())
&& session.metadata.get("external.agent_id") == Some(&self.agent_id)
}
async fn execute_external_child(
&self,
session: &mut Session,
job: &SpawnJob,
event_tx: mpsc::Sender<AgentEvent>,
cancel_token: CancellationToken,
) -> crate::runtime::runner::Result<()> {
let assignment = extract_assignment(session);
let mut spec = self.build_spec(session, job);
spec.reusable = true;
if spec.limits.idle_timeout_secs.is_none() {
spec.limits.idle_timeout_secs = Some(POOLED_IDLE_TIMEOUT_SECS);
}
let pool_key = Self::fingerprint(&spec);
let messages: Vec<serde_json::Value> = session
.messages
.iter()
.filter_map(|m| serde_json::to_value(m).ok())
.collect();
let _slot = self
.concurrency
.acquire()
.await
.map_err(|_| AgentError::LLM("actor concurrency limiter closed".to_string()))?;
let mut actor = self.acquire_worker(&pool_key, &spec).await?;
let mut client = match ChildClient::connect(&actor.endpoint).await {
Ok(client) => client,
Err(e) => {
self.retire_worker(actor).await;
let spawned = spawn_worker(
&self.worker_bin,
&self.worker_args,
&spec,
self.spawn_timeout,
)
.await
.map_err(|e2| {
AgentError::LLM(format!("actor respawn after reuse miss ({e}): {e2}"))
})?;
let endpoint = spawned.record.endpoint.clone();
let agent_id = spawned.record.agent_id.clone();
let client = ChildClient::connect(&endpoint)
.await
.map_err(|e2| AgentError::LLM(format!("actor connect failed: {e2}")))?;
actor = PooledActor {
worker: spawned,
endpoint,
agent_id,
};
client
}
};
client
.send(ParentFrame::Run(RunSpec {
assignment,
reasoning_effort: None,
messages,
}))
.await
.map_err(|e| AgentError::LLM(format!("actor run dispatch failed: {e}")))?;
let (live_tx, mut live_rx) = mpsc::unbounded_channel::<ParentFrame>();
let live_guard = super::live::register(&job.child_session_id, live_tx);
let nested_handler = self
.nested_spawn_handler
.clone()
.or_else(nested_spawn_handler);
let result = drive(
&mut client,
&job.child_session_id,
self.approval_decider.as_ref(),
nested_handler.as_ref(),
&event_tx,
&cancel_token,
&mut live_rx,
)
.await;
drop(live_guard);
let _ = client.close().await;
match &result {
Ok(_) => self.release_worker(&pool_key, actor).await,
Err(_) => self.retire_worker(actor).await,
}
match result {
Ok(Some(text)) => {
if !text.is_empty() {
session.add_message(bamboo_agent_core::Message::assistant(text, None));
}
Ok(())
}
Ok(None) => Ok(()),
Err(e) => Err(e),
}
}
}
#[allow(clippy::too_many_arguments)]
async fn drive(
client: &mut ChildClient,
child_session_id: &str,
approval_decider: Option<&Arc<dyn ChildApprovalDecider>>,
nested_spawn_handler: Option<&Arc<dyn NestedSpawnHandler>>,
event_tx: &mpsc::Sender<AgentEvent>,
cancel_token: &CancellationToken,
live_rx: &mut mpsc::UnboundedReceiver<ParentFrame>,
) -> crate::runtime::runner::Result<Option<String>> {
loop {
tokio::select! {
_ = cancel_token.cancelled() => {
break;
}
Some(frame) = live_rx.recv() => {
if client.send(frame).await.is_err() {
tracing::warn!("live steering frame could not be sent; connection failing");
}
}
frame = client.next_frame() => {
match frame {
Ok(Some(ChildFrame::Event { event })) => {
if let Ok(ev) = serde_json::from_value::<AgentEvent>(event) {
let _ = event_tx.send(ev).await;
}
}
Ok(Some(ChildFrame::SubagentRequest { id, body })) => {
let reply =
fulfil_nested_spawn(nested_spawn_handler, child_session_id, body).await;
if client
.send(ParentFrame::SubagentReply { id, body: reply })
.await
.is_err()
{
tracing::warn!("failed to answer subagent_request; connection failing");
}
}
Ok(Some(ChildFrame::ApprovalRequest { id, body })) => {
if let Some(reviewer) = child_approval_reviewer() {
let child = child_session_id.to_string();
let req_id = id.clone();
let body = body.clone();
tokio::spawn(async move {
let approved = tokio::time::timeout(
CHILD_APPROVAL_TIMEOUT,
reviewer.review(&child, &body),
)
.await
.unwrap_or(false);
super::live::deliver_approval(&child, &req_id, approved);
});
} else if approval_decider.is_some() {
let approved =
decide_child_approval(approval_decider, child_session_id, &body)
.await;
if client
.send(ParentFrame::ApprovalReply { id, approved })
.await
.is_err()
{
tracing::warn!(
"failed to answer approval_request; connection failing"
);
}
} else if let Some(host) = escalation_host_bridge() {
let child = child_session_id.to_string();
let req_id = id.clone();
let body = body.clone();
tokio::spawn(async move {
let approved = match tokio::time::timeout(
CHILD_APPROVAL_TIMEOUT,
host.approval_call(body),
)
.await
{
Ok(Ok(reply)) => reply
.get("approved")
.and_then(|v| v.as_bool())
.unwrap_or(false),
_ => false,
};
super::live::deliver_approval(&child, &req_id, approved);
});
} else {
let (tool_name, permission, resource) =
approval_request_fields(&body);
let _ = event_tx
.send(AgentEvent::ChildApprovalRequested {
child_session_id: child_session_id.to_string(),
request_id: id.clone(),
tool_name,
permission,
resource,
})
.await;
let child = child_session_id.to_string();
tokio::spawn(async move {
tokio::time::sleep(CHILD_APPROVAL_TIMEOUT).await;
super::live::deliver_approval(&child, &id, false);
});
}
}
Ok(Some(ChildFrame::Terminal { status, result, error, .. })) => {
return match status {
TerminalStatus::Completed => Ok(result),
TerminalStatus::Cancelled => Err(AgentError::Cancelled),
TerminalStatus::Error => Err(AgentError::LLM(
error.unwrap_or_else(|| "actor child errored".to_string()),
)),
TerminalStatus::Suspended => Err(AgentError::LLM(
"nested sub-agent suspend received but resume transport is not wired"
.to_string(),
)),
};
}
Ok(None) => {
return Err(AgentError::LLM(
"actor child closed before terminal".to_string(),
));
}
Err(e) => {
return Err(AgentError::LLM(format!("actor transport error: {e}")));
}
}
}
}
}
let _ = client.send(ParentFrame::Cancel).await;
Err(AgentError::Cancelled)
}
fn extract_assignment(session: &Session) -> String {
session
.messages
.iter()
.rev()
.find(|m| matches!(m.role, Role::User))
.map(|m| m.content.clone())
.unwrap_or_else(|| {
session
.metadata
.get("title")
.cloned()
.unwrap_or_else(|| "Execute task".to_string())
})
}
#[cfg(test)]
mod tests {
use super::*;
fn spec_with(
role: &str,
provider: &str,
model: &str,
workspace: Option<&str>,
disabled: Option<Vec<&str>>,
) -> ProvisionSpec {
let mut spec = ProvisionSpec::new(
ChildIdentity {
child_id: "c".into(),
parent_id: None,
project_key: None,
role: role.into(),
depth: 0,
},
ExecutorSpec::Echo,
"/tmp/fab".into(),
);
spec.workspace = workspace.map(|w| w.to_string());
spec.model = Some(ModelRefSpec {
provider: provider.into(),
model: model.into(),
});
spec.disabled_tools = disabled.map(|d| d.into_iter().map(String::from).collect());
spec
}
#[test]
fn fingerprint_matches_interchangeable_children() {
let a = spec_with(
"explorer",
"p",
"m",
Some("/ws"),
Some(vec!["Bash", "Edit"]),
);
let mut b = spec_with(
"explorer",
"p",
"m",
Some("/ws"),
Some(vec!["Edit", "Bash"]),
);
b.identity.child_id = "other".into();
assert_eq!(
ActorChildRunner::fingerprint(&a),
ActorChildRunner::fingerprint(&b)
);
}
#[test]
fn fingerprint_separates_distinct_runtimes() {
let base = spec_with("explorer", "p", "m", Some("/ws"), None);
let base_fp = ActorChildRunner::fingerprint(&base);
assert_ne!(
base_fp,
ActorChildRunner::fingerprint(&spec_with("writer", "p", "m", Some("/ws"), None))
);
assert_ne!(
base_fp,
ActorChildRunner::fingerprint(&spec_with("explorer", "p2", "m", Some("/ws"), None))
);
assert_ne!(
base_fp,
ActorChildRunner::fingerprint(&spec_with("explorer", "p", "m2", Some("/ws"), None))
);
assert_ne!(
base_fp,
ActorChildRunner::fingerprint(&spec_with("explorer", "p", "m", Some("/ws2"), None))
);
assert_ne!(
base_fp,
ActorChildRunner::fingerprint(&spec_with(
"explorer",
"p",
"m",
Some("/ws"),
Some(vec!["Bash"])
))
);
}
#[test]
fn fingerprint_splits_on_baked_capabilities() {
let base_fp =
ActorChildRunner::fingerprint(&spec_with("explorer", "p", "m", Some("/ws"), None));
let mut depth = spec_with("explorer", "p", "m", Some("/ws"), None);
depth.identity.depth = 2;
assert_ne!(
base_fp,
ActorChildRunner::fingerprint(&depth),
"depth must split"
);
let mut nested = spec_with("explorer", "p", "m", Some("/ws"), None);
nested.capabilities.nested_spawn = true;
assert_ne!(
base_fp,
ActorChildRunner::fingerprint(&nested),
"nested_spawn must split"
);
let mut bypass = spec_with("explorer", "p", "m", Some("/ws"), None);
bypass.capabilities.bypass = true;
assert_ne!(
base_fp,
ActorChildRunner::fingerprint(&bypass),
"bypass must split"
);
let mut enforce = spec_with("explorer", "p", "m", Some("/ws"), None);
enforce.capabilities.enforce_permissions = true;
assert_ne!(
base_fp,
ActorChildRunner::fingerprint(&enforce),
"enforce_permissions must split"
);
let mut cap = spec_with("explorer", "p", "m", Some("/ws"), None);
cap.capabilities.max_spawn_depth = Some(8);
assert_ne!(
base_fp,
ActorChildRunner::fingerprint(&cap),
"max_spawn_depth must split"
);
}
struct StaticDecider(bool);
#[async_trait]
impl ChildApprovalDecider for StaticDecider {
async fn decide(&self, _child: &str, _req: &serde_json::Value) -> bool {
self.0
}
}
#[tokio::test]
async fn child_approval_fails_closed_without_decider() {
let body = serde_json::json!({"tool_name":"Bash","permission":"run","resource":"rm -rf /"});
assert!(!decide_child_approval(None, "child-1", &body).await);
}
#[tokio::test]
async fn child_approval_honors_wired_decider() {
let body =
serde_json::json!({"tool_name":"Write","permission":"write","resource":"/tmp/x"});
let approve: Arc<dyn ChildApprovalDecider> = Arc::new(StaticDecider(true));
let deny: Arc<dyn ChildApprovalDecider> = Arc::new(StaticDecider(false));
assert!(decide_child_approval(Some(&approve), "child-1", &body).await);
assert!(!decide_child_approval(Some(&deny), "child-1", &body).await);
}
struct StaticSpawn(Result<serde_json::Value, String>);
#[async_trait]
impl NestedSpawnHandler for StaticSpawn {
async fn spawn_nested(
&self,
_child: &str,
_req: serde_json::Value,
) -> Result<serde_json::Value, String> {
self.0.clone()
}
}
#[tokio::test]
async fn nested_spawn_unavailable_without_handler() {
let reply = fulfil_nested_spawn(None, "child-1", serde_json::json!({})).await;
assert_eq!(reply["success"], serde_json::json!(false));
assert!(reply["result"].as_str().unwrap().contains("not available"));
}
#[tokio::test]
async fn nested_spawn_returns_handler_result_or_error_body() {
let ok: Arc<dyn NestedSpawnHandler> = Arc::new(StaticSpawn(Ok(
serde_json::json!({"success": true, "result": "spawned"}),
)));
let reply = fulfil_nested_spawn(Some(&ok), "child-1", serde_json::json!({})).await;
assert_eq!(reply["result"], serde_json::json!("spawned"));
let err: Arc<dyn NestedSpawnHandler> = Arc::new(StaticSpawn(Err("boom".to_string())));
let reply = fulfil_nested_spawn(Some(&err), "child-1", serde_json::json!({})).await;
assert_eq!(reply["success"], serde_json::json!(false));
assert!(reply["result"].as_str().unwrap().contains("boom"));
}
#[test]
fn approval_request_fields_extracts_and_defaults() {
let full = serde_json::json!({"tool_name":"Bash","permission":"run","resource":"ls"});
assert_eq!(
approval_request_fields(&full),
("Bash".to_string(), "run".to_string(), "ls".to_string())
);
let partial = serde_json::json!({"tool_name":"Write"});
assert_eq!(
approval_request_fields(&partial),
("Write".to_string(), String::new(), String::new())
);
}
}