use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use bamboo_agent_core::{AgentError, AgentEvent, Role, Session};
use tokio::sync::{mpsc, Mutex};
use tokio_util::sync::CancellationToken;
use bamboo_subagent::discovery::Fabric;
use bamboo_subagent::fleet::{spawn_worker, SpawnedChild};
use bamboo_subagent::proto::{ChildFrame, ParentFrame, RunSpec, TerminalStatus};
use bamboo_subagent::provision::{
ChildIdentity, ExecutorSpec, ModelRefSpec, ProvisionSpec, ScopedCredential,
};
use bamboo_subagent::transport::ChildClient;
use crate::runtime::execution::{ExternalChildRunner, SpawnJob};
pub const DEFAULT_MAX_CONCURRENT_ACTORS: usize = 8;
const DEFAULT_MAX_IDLE_PER_KEY: usize = 4;
const POOLED_IDLE_TIMEOUT_SECS: u64 = 300;
struct PooledActor {
worker: SpawnedChild,
endpoint: String,
agent_id: String,
}
pub struct ActorChildRunner {
agent_id: String,
worker_bin: PathBuf,
worker_args: Vec<String>,
fabric_dir: PathBuf,
executor: ExecutorSpec,
credentials: Vec<ScopedCredential>,
default_provider: String,
concurrency: std::sync::Arc<tokio::sync::Semaphore>,
spawn_timeout: Duration,
pool: Arc<Mutex<HashMap<String, Vec<PooledActor>>>>,
max_idle_per_key: usize,
}
impl ActorChildRunner {
#[allow(clippy::too_many_arguments)]
pub fn new(
agent_id: String,
worker_bin: PathBuf,
worker_args: Vec<String>,
fabric_dir: PathBuf,
executor: ExecutorSpec,
credentials: Vec<ScopedCredential>,
default_provider: String,
max_concurrent: usize,
) -> Self {
Self {
agent_id,
worker_bin,
worker_args,
fabric_dir,
executor,
credentials,
default_provider,
concurrency: std::sync::Arc::new(tokio::sync::Semaphore::new(max_concurrent.max(1))),
spawn_timeout: Duration::from_secs(30),
pool: Arc::new(Mutex::new(HashMap::new())),
max_idle_per_key: DEFAULT_MAX_IDLE_PER_KEY,
}
}
fn fingerprint(spec: &ProvisionSpec) -> String {
let role = spec.identity.role.as_str();
let (provider, model) = spec
.model
.as_ref()
.map(|m| (m.provider.as_str(), m.model.as_str()))
.unwrap_or(("", ""));
let workspace = spec.workspace.as_deref().unwrap_or("");
let mut tools = spec.disabled_tools.clone().unwrap_or_default();
tools.sort();
format!(
"{role}\u{1}{provider}\u{1}{model}\u{1}{workspace}\u{1}{}",
tools.join(",")
)
}
async fn acquire_worker(
&self,
key: &str,
spec: &ProvisionSpec,
) -> crate::runtime::runner::Result<PooledActor> {
loop {
let candidate = {
let mut pool = self.pool.lock().await;
pool.get_mut(key).and_then(|bucket| bucket.pop())
};
let Some(candidate) = candidate else { break };
let alive = Fabric::at(&self.fabric_dir)
.resolve(&candidate.agent_id)
.await
.ok()
.flatten()
.is_some();
if alive {
return Ok(candidate);
}
candidate.worker.kill().await;
}
let spawned = spawn_worker(
&self.worker_bin,
&self.worker_args,
spec,
self.spawn_timeout,
)
.await
.map_err(|e| AgentError::LLM(format!("actor spawn/register failed: {e}")))?;
let endpoint = spawned.record.endpoint.clone();
let agent_id = spawned.record.agent_id.clone();
Ok(PooledActor {
worker: spawned,
endpoint,
agent_id,
})
}
async fn release_worker(&self, key: &str, actor: PooledActor) {
let mut pool = self.pool.lock().await;
let bucket = pool.entry(key.to_string()).or_default();
if bucket.len() >= self.max_idle_per_key {
drop(pool);
self.retire_worker(actor).await;
return;
}
bucket.push(actor);
}
async fn retire_worker(&self, actor: PooledActor) {
let agent_id = actor.agent_id.clone();
actor.worker.kill().await;
let _ = Fabric::at(&self.fabric_dir).withdraw(&agent_id).await;
}
fn build_spec(&self, session: &Session, job: &SpawnJob) -> ProvisionSpec {
let mut spec = ProvisionSpec::new(
ChildIdentity {
child_id: job.child_session_id.clone(),
parent_id: Some(job.parent_session_id.clone()),
project_key: None,
role: session
.metadata
.get("subagent_type")
.cloned()
.unwrap_or_else(|| "worker".to_string()),
},
self.executor.clone(),
self.fabric_dir.to_string_lossy().into_owned(),
);
spec.workspace = session.workspace.clone();
spec.model = session
.model_ref
.as_ref()
.map(|r| ModelRefSpec {
provider: r.provider.clone(),
model: r.model.clone(),
})
.or_else(|| {
let m = job.model.trim();
(!m.is_empty()).then(|| ModelRefSpec {
provider: self.default_provider.clone(),
model: m.to_string(),
})
});
spec.disabled_tools = job.disabled_tools.clone();
let provider = spec
.model
.as_ref()
.map(|m| m.provider.as_str())
.filter(|p| !p.trim().is_empty())
.unwrap_or(&self.default_provider);
if let Some(cred) = self.credentials.iter().find(|c| c.provider == provider) {
spec.secrets.provider_credentials.push(cred.clone());
} else {
tracing::warn!(
"actor child {}: no credential found for provider '{}'",
job.child_session_id,
provider
);
}
spec
}
}
#[async_trait]
impl ExternalChildRunner for ActorChildRunner {
async fn should_handle(&self, session: &Session) -> bool {
session.metadata.get("runtime.kind") == Some(&"external".to_string())
&& session.metadata.get("external.protocol") == Some(&"actor".to_string())
&& session.metadata.get("external.agent_id") == Some(&self.agent_id)
}
async fn execute_external_child(
&self,
session: &mut Session,
job: &SpawnJob,
event_tx: mpsc::Sender<AgentEvent>,
cancel_token: CancellationToken,
) -> crate::runtime::runner::Result<()> {
let assignment = extract_assignment(session);
let mut spec = self.build_spec(session, job);
spec.reusable = true;
if spec.limits.idle_timeout_secs.is_none() {
spec.limits.idle_timeout_secs = Some(POOLED_IDLE_TIMEOUT_SECS);
}
let pool_key = Self::fingerprint(&spec);
let messages: Vec<serde_json::Value> = session
.messages
.iter()
.filter_map(|m| serde_json::to_value(m).ok())
.collect();
let _slot = self
.concurrency
.acquire()
.await
.map_err(|_| AgentError::LLM("actor concurrency limiter closed".to_string()))?;
let mut actor = self.acquire_worker(&pool_key, &spec).await?;
let mut client = match ChildClient::connect(&actor.endpoint).await {
Ok(client) => client,
Err(e) => {
self.retire_worker(actor).await;
let spawned = spawn_worker(
&self.worker_bin,
&self.worker_args,
&spec,
self.spawn_timeout,
)
.await
.map_err(|e2| {
AgentError::LLM(format!("actor respawn after reuse miss ({e}): {e2}"))
})?;
let endpoint = spawned.record.endpoint.clone();
let agent_id = spawned.record.agent_id.clone();
let client = ChildClient::connect(&endpoint)
.await
.map_err(|e2| AgentError::LLM(format!("actor connect failed: {e2}")))?;
actor = PooledActor {
worker: spawned,
endpoint,
agent_id,
};
client
}
};
client
.send(ParentFrame::Run(RunSpec {
assignment,
reasoning_effort: None,
messages,
}))
.await
.map_err(|e| AgentError::LLM(format!("actor run dispatch failed: {e}")))?;
let (live_tx, mut live_rx) = mpsc::unbounded_channel::<ParentFrame>();
let live_guard = super::live::register(&job.child_session_id, live_tx);
let result = drive(&mut client, &event_tx, &cancel_token, &mut live_rx).await;
drop(live_guard);
let _ = client.close().await;
match &result {
Ok(_) => self.release_worker(&pool_key, actor).await,
Err(_) => self.retire_worker(actor).await,
}
match result {
Ok(Some(text)) => {
if !text.is_empty() {
session.add_message(bamboo_agent_core::Message::assistant(text, None));
}
Ok(())
}
Ok(None) => Ok(()),
Err(e) => Err(e),
}
}
}
async fn drive(
client: &mut ChildClient,
event_tx: &mpsc::Sender<AgentEvent>,
cancel_token: &CancellationToken,
live_rx: &mut mpsc::UnboundedReceiver<ParentFrame>,
) -> crate::runtime::runner::Result<Option<String>> {
loop {
tokio::select! {
_ = cancel_token.cancelled() => {
break;
}
Some(frame) = live_rx.recv() => {
if client.send(frame).await.is_err() {
tracing::warn!("live steering frame could not be sent; connection failing");
}
}
frame = client.next_frame() => {
match frame {
Ok(Some(ChildFrame::Event { event })) => {
if let Ok(ev) = serde_json::from_value::<AgentEvent>(event) {
let _ = event_tx.send(ev).await;
}
}
Ok(Some(ChildFrame::Terminal { status, result, error })) => {
return match status {
TerminalStatus::Completed => Ok(result),
TerminalStatus::Cancelled => Err(AgentError::Cancelled),
TerminalStatus::Error => Err(AgentError::LLM(
error.unwrap_or_else(|| "actor child errored".to_string()),
)),
};
}
Ok(None) => {
return Err(AgentError::LLM(
"actor child closed before terminal".to_string(),
));
}
Err(e) => {
return Err(AgentError::LLM(format!("actor transport error: {e}")));
}
}
}
}
}
let _ = client.send(ParentFrame::Cancel).await;
Err(AgentError::Cancelled)
}
fn extract_assignment(session: &Session) -> String {
session
.messages
.iter()
.rev()
.find(|m| matches!(m.role, Role::User))
.map(|m| m.content.clone())
.unwrap_or_else(|| {
session
.metadata
.get("title")
.cloned()
.unwrap_or_else(|| "Execute task".to_string())
})
}
#[cfg(test)]
mod tests {
use super::*;
fn spec_with(
role: &str,
provider: &str,
model: &str,
workspace: Option<&str>,
disabled: Option<Vec<&str>>,
) -> ProvisionSpec {
let mut spec = ProvisionSpec::new(
ChildIdentity {
child_id: "c".into(),
parent_id: None,
project_key: None,
role: role.into(),
},
ExecutorSpec::Echo,
"/tmp/fab".into(),
);
spec.workspace = workspace.map(|w| w.to_string());
spec.model = Some(ModelRefSpec {
provider: provider.into(),
model: model.into(),
});
spec.disabled_tools = disabled.map(|d| d.into_iter().map(String::from).collect());
spec
}
#[test]
fn fingerprint_matches_interchangeable_children() {
let a = spec_with(
"explorer",
"p",
"m",
Some("/ws"),
Some(vec!["Bash", "Edit"]),
);
let mut b = spec_with(
"explorer",
"p",
"m",
Some("/ws"),
Some(vec!["Edit", "Bash"]),
);
b.identity.child_id = "other".into();
assert_eq!(
ActorChildRunner::fingerprint(&a),
ActorChildRunner::fingerprint(&b)
);
}
#[test]
fn fingerprint_separates_distinct_runtimes() {
let base = spec_with("explorer", "p", "m", Some("/ws"), None);
let base_fp = ActorChildRunner::fingerprint(&base);
assert_ne!(
base_fp,
ActorChildRunner::fingerprint(&spec_with("writer", "p", "m", Some("/ws"), None))
);
assert_ne!(
base_fp,
ActorChildRunner::fingerprint(&spec_with("explorer", "p2", "m", Some("/ws"), None))
);
assert_ne!(
base_fp,
ActorChildRunner::fingerprint(&spec_with("explorer", "p", "m2", Some("/ws"), None))
);
assert_ne!(
base_fp,
ActorChildRunner::fingerprint(&spec_with("explorer", "p", "m", Some("/ws2"), None))
);
assert_ne!(
base_fp,
ActorChildRunner::fingerprint(&spec_with(
"explorer",
"p",
"m",
Some("/ws"),
Some(vec!["Bash"])
))
);
}
}