bamboo_engine/external_agents/
runtime.rs1use std::sync::Arc;
2
3use crate::runtime::execution::{ExternalChildRunner, SpawnJob};
4use async_trait::async_trait;
5use bamboo_a2a::A2AJsonRpcClient;
6use bamboo_agent_core::{AgentError, AgentEvent};
7use bamboo_llm::Config;
8use tokio::sync::mpsc;
9use tokio_util::sync::CancellationToken;
10
11use super::a2a_adapter::A2AExternalChildRunner;
12use super::actor_adapter::ActorChildRunner;
13use super::config::{parse_external_agents, ExternalAgentProtocol};
14
15pub struct CompositeExternalChildRunner {
17 runners: Vec<Arc<dyn ExternalChildRunner>>,
18}
19
20impl CompositeExternalChildRunner {
21 pub fn new(runners: Vec<Arc<dyn ExternalChildRunner>>) -> Self {
22 Self { runners }
23 }
24}
25
26#[async_trait]
27impl ExternalChildRunner for CompositeExternalChildRunner {
28 async fn should_handle(&self, session: &bamboo_agent_core::Session) -> bool {
29 for runner in &self.runners {
30 if runner.should_handle(session).await {
31 return true;
32 }
33 }
34 false
35 }
36
37 async fn execute_external_child(
38 &self,
39 session: &mut bamboo_agent_core::Session,
40 job: &SpawnJob,
41 event_tx: mpsc::Sender<AgentEvent>,
42 cancel_token: CancellationToken,
43 ) -> crate::runtime::runner::Result<()> {
44 for runner in &self.runners {
45 if runner.should_handle(session).await {
46 return runner
47 .execute_external_child(session, job, event_tx, cancel_token)
48 .await;
49 }
50 }
51 Err(AgentError::LLM(
52 "No matching external child runner found for session metadata".to_string(),
53 ))
54 }
55}
56
57pub fn build_external_child_runner(config: &Config) -> Arc<dyn ExternalChildRunner> {
66 let agents = parse_external_agents(config);
67
68 let mut runners: Vec<Arc<dyn ExternalChildRunner>> = Vec::new();
69
70 match build_local_actor_runner(config) {
74 Ok(runner) => runners.push(runner),
75 Err(e) => tracing::error!("local actor sub-agent runner unavailable: {e}"),
76 }
77
78 for (_agent_id, profile) in agents {
79 if matches!(profile.protocol, ExternalAgentProtocol::Actor) {
81 let Some(worker_bin) = profile.worker_bin.as_ref() else {
82 tracing::error!(
83 "Actor agent profile {} has no worker_bin; skipping",
84 profile.agent_id
85 );
86 continue;
87 };
88 let fabric_dir = profile
89 .fabric_dir
90 .clone()
91 .map(std::path::PathBuf::from)
92 .unwrap_or_else(|| std::env::temp_dir().join("bamboo-subagents"));
93 let executor = match profile.executor.as_deref() {
94 Some("echo") => bamboo_subagent::provision::ExecutorSpec::Echo,
95 Some("bamboo_runtime") | None => {
96 bamboo_subagent::provision::ExecutorSpec::BambooRuntime
97 }
98 Some(other) => {
99 tracing::error!(
100 "Actor agent profile {} has unknown executor '{}'; skipping",
101 profile.agent_id,
102 other
103 );
104 continue;
105 }
106 };
107 runners.push(Arc::new(ActorChildRunner::new(
108 profile.agent_id.clone(),
109 std::path::PathBuf::from(worker_bin),
110 profile.worker_args.clone(),
111 fabric_dir,
112 executor,
113 extract_provider_credentials(config),
114 config.provider.clone(),
115 config
116 .subagents
117 .max_concurrent
118 .unwrap_or(super::actor_adapter::DEFAULT_MAX_CONCURRENT_ACTORS),
119 )));
120 continue;
121 }
122
123 if !matches!(profile.protocol, ExternalAgentProtocol::A2aJsonRpc) {
124 tracing::warn!(
125 "External agent profile {} uses unsupported protocol {:?}",
126 profile.agent_id,
127 profile.protocol
128 );
129 continue;
130 }
131
132 let auth_token = match profile.auth_ref.as_ref() {
133 Some(ref_name) => match std::env::var(ref_name) {
134 Ok(token) => Some(token),
135 Err(_) => {
136 tracing::error!(
137 "External agent profile {} auth_ref env var {} is not set",
138 profile.agent_id,
139 ref_name
140 );
141 continue;
142 }
143 },
144 None => None,
145 };
146
147 let client_config = match A2AExternalChildRunner::build_client_config(&profile, auth_token)
148 {
149 Ok(cfg) => cfg,
150 Err(e) => {
151 tracing::error!(
152 "Failed to build A2A client config for profile {}: {}",
153 profile.agent_id,
154 e
155 );
156 continue;
157 }
158 };
159
160 let client = match A2AJsonRpcClient::new(client_config) {
161 Ok(c) => c,
162 Err(e) => {
163 tracing::error!(
164 "Failed to create A2A JSON-RPC client for profile {}: {}",
165 profile.agent_id,
166 e
167 );
168 continue;
169 }
170 };
171
172 runners.push(Arc::new(A2AExternalChildRunner::new(client, profile)));
173 }
174
175 Arc::new(CompositeExternalChildRunner::new(runners))
176}
177
178fn build_local_actor_runner(config: &Config) -> Result<Arc<dyn ExternalChildRunner>, String> {
183 let sub = &config.subagents;
184
185 let (worker_bin, worker_args) = match &sub.worker_bin {
186 Some(custom) => (
187 std::path::PathBuf::from(custom),
188 sub.worker_args.clone().unwrap_or_default(),
189 ),
190 None => (
191 std::env::current_exe().map_err(|e| format!("cannot locate own executable: {e}"))?,
192 sub.worker_args
193 .clone()
194 .unwrap_or_else(|| vec!["subagent-worker".to_string()]),
195 ),
196 };
197
198 let fabric_dir = sub
199 .fabric_dir
200 .clone()
201 .map(std::path::PathBuf::from)
202 .unwrap_or_else(|| std::env::temp_dir().join("bamboo-subagents"));
203
204 let executor = match sub.executor.as_deref() {
205 Some("echo") => bamboo_subagent::provision::ExecutorSpec::Echo,
206 Some("bamboo_runtime") | None => bamboo_subagent::provision::ExecutorSpec::BambooRuntime,
207 Some(other) => return Err(format!("unknown subagents.executor '{other}'")),
208 };
209
210 Ok(Arc::new(ActorChildRunner::new(
211 super::config::LOCAL_ACTOR_AGENT_ID.to_string(),
212 worker_bin,
213 worker_args,
214 fabric_dir,
215 executor,
216 extract_provider_credentials(config),
217 config.provider.clone(),
218 sub.max_concurrent
219 .unwrap_or(super::actor_adapter::DEFAULT_MAX_CONCURRENT_ACTORS),
220 )))
221}
222
223pub fn extract_provider_credentials(
228 config: &Config,
229) -> Vec<bamboo_subagent::provision::ScopedCredential> {
230 let mut out = Vec::new();
231
232 if let Ok(serde_json::Value::Object(providers)) = serde_json::to_value(&config.providers) {
234 out.extend(providers.into_iter().filter_map(|(name, slot)| {
235 let api_key = slot.get("api_key")?.as_str()?.trim().to_string();
236 if api_key.is_empty() {
237 return None;
238 }
239 Some(bamboo_subagent::provision::ScopedCredential {
240 provider: name.clone(),
241 api_key,
242 base_url: slot
243 .get("base_url")
244 .and_then(|v| v.as_str())
245 .map(str::to_string),
246 provider_type: Some(name),
247 })
248 }));
249 }
250
251 out.extend(config.provider_instances.iter().filter_map(|(id, inst)| {
256 let api_key = inst.api_key.trim().to_string();
257 if api_key.is_empty() {
258 return None;
259 }
260 Some(bamboo_subagent::provision::ScopedCredential {
261 provider: id.clone(),
262 api_key,
263 base_url: inst.base_url.clone(),
264 provider_type: Some(inst.provider_type.clone()),
265 })
266 }));
267
268 out
269}