bamboo_engine/external_agents/
runtime.rs1use std::sync::Arc;
2
3use crate::runtime::execution::{ExternalChildRunner, SpawnJob};
4use async_trait::async_trait;
5use bamboo_a2a::A2AJsonRpcClient;
6use bamboo_agent_core::{AgentError, AgentEvent};
7use bamboo_llm::Config;
8use tokio::sync::mpsc;
9use tokio_util::sync::CancellationToken;
10
11use super::a2a_adapter::A2AExternalChildRunner;
12use super::actor_adapter::ActorChildRunner;
13use super::config::{parse_external_agents, ExternalAgentProtocol};
14
15pub struct CompositeExternalChildRunner {
17 runners: Vec<Arc<dyn ExternalChildRunner>>,
18}
19
20impl CompositeExternalChildRunner {
21 pub fn new(runners: Vec<Arc<dyn ExternalChildRunner>>) -> Self {
22 Self { runners }
23 }
24}
25
26#[async_trait]
27impl ExternalChildRunner for CompositeExternalChildRunner {
28 async fn should_handle(&self, session: &bamboo_agent_core::Session) -> bool {
29 for runner in &self.runners {
30 if runner.should_handle(session).await {
31 return true;
32 }
33 }
34 false
35 }
36
37 async fn execute_external_child(
38 &self,
39 session: &mut bamboo_agent_core::Session,
40 job: &SpawnJob,
41 event_tx: mpsc::Sender<AgentEvent>,
42 cancel_token: CancellationToken,
43 ) -> crate::runtime::runner::Result<()> {
44 for runner in &self.runners {
45 if runner.should_handle(session).await {
46 return runner
47 .execute_external_child(session, job, event_tx, cancel_token)
48 .await;
49 }
50 }
51 Err(AgentError::LLM(
52 "No matching external child runner found for session metadata".to_string(),
53 ))
54 }
55
56 fn set_escalation_bridge(&self, bridge: Option<bamboo_subagent::executor::HostBridge>) {
61 for runner in &self.runners {
62 runner.set_escalation_bridge(bridge.clone());
63 }
64 }
65}
66
67pub fn build_external_child_runner(config: &Config) -> Arc<dyn ExternalChildRunner> {
76 let agents = parse_external_agents(config);
77
78 let mut runners: Vec<Arc<dyn ExternalChildRunner>> = Vec::new();
79
80 match build_local_actor_runner(config) {
84 Ok(runner) => runners.push(runner),
85 Err(e) => tracing::error!("local actor sub-agent runner unavailable: {e}"),
86 }
87
88 for (_agent_id, profile) in agents {
89 if matches!(profile.protocol, ExternalAgentProtocol::Actor) {
91 let Some(worker_bin) = profile.worker_bin.as_ref() else {
92 tracing::error!(
93 "Actor agent profile {} has no worker_bin; skipping",
94 profile.agent_id
95 );
96 continue;
97 };
98 let fabric_dir = profile
99 .fabric_dir
100 .clone()
101 .map(std::path::PathBuf::from)
102 .unwrap_or_else(|| std::env::temp_dir().join("bamboo-subagents"));
103 let executor = match profile.executor.as_deref() {
104 Some("echo") => bamboo_subagent::provision::ExecutorSpec::Echo,
105 Some("bamboo_runtime") | None => {
106 bamboo_subagent::provision::ExecutorSpec::BambooRuntime
107 }
108 Some(other) => {
109 tracing::error!(
110 "Actor agent profile {} has unknown executor '{}'; skipping",
111 profile.agent_id,
112 other
113 );
114 continue;
115 }
116 };
117 runners.push(Arc::new(ActorChildRunner::new(
118 profile.agent_id.clone(),
119 std::path::PathBuf::from(worker_bin),
120 profile.worker_args.clone(),
121 fabric_dir,
122 executor,
123 extract_provider_credentials(config),
124 config.provider.clone(),
125 config
126 .subagents
127 .max_concurrent
128 .unwrap_or(super::actor_adapter::DEFAULT_MAX_CONCURRENT_ACTORS),
129 )));
130 continue;
131 }
132
133 if !matches!(profile.protocol, ExternalAgentProtocol::A2aJsonRpc) {
134 tracing::warn!(
135 "External agent profile {} uses unsupported protocol {:?}",
136 profile.agent_id,
137 profile.protocol
138 );
139 continue;
140 }
141
142 let auth_token = match profile.auth_ref.as_ref() {
143 Some(ref_name) => match std::env::var(ref_name) {
144 Ok(token) => Some(token),
145 Err(_) => {
146 tracing::error!(
147 "External agent profile {} auth_ref env var {} is not set",
148 profile.agent_id,
149 ref_name
150 );
151 continue;
152 }
153 },
154 None => None,
155 };
156
157 let client_config = match A2AExternalChildRunner::build_client_config(&profile, auth_token)
158 {
159 Ok(cfg) => cfg,
160 Err(e) => {
161 tracing::error!(
162 "Failed to build A2A client config for profile {}: {}",
163 profile.agent_id,
164 e
165 );
166 continue;
167 }
168 };
169
170 let client = match A2AJsonRpcClient::new(client_config) {
171 Ok(c) => c,
172 Err(e) => {
173 tracing::error!(
174 "Failed to create A2A JSON-RPC client for profile {}: {}",
175 profile.agent_id,
176 e
177 );
178 continue;
179 }
180 };
181
182 runners.push(Arc::new(A2AExternalChildRunner::new(client, profile)));
183 }
184
185 Arc::new(CompositeExternalChildRunner::new(runners))
186}
187
188fn build_local_actor_runner(config: &Config) -> Result<Arc<dyn ExternalChildRunner>, String> {
193 let sub = &config.subagents;
194
195 let (worker_bin, worker_args) = match &sub.worker_bin {
196 Some(custom) => (
197 std::path::PathBuf::from(custom),
198 sub.worker_args.clone().unwrap_or_default(),
199 ),
200 None => (
201 std::env::current_exe().map_err(|e| format!("cannot locate own executable: {e}"))?,
202 sub.worker_args
203 .clone()
204 .unwrap_or_else(|| vec!["subagent-worker".to_string()]),
205 ),
206 };
207
208 let fabric_dir = sub
209 .fabric_dir
210 .clone()
211 .map(std::path::PathBuf::from)
212 .unwrap_or_else(|| std::env::temp_dir().join("bamboo-subagents"));
213
214 let executor = match sub.executor.as_deref() {
215 Some("echo") => bamboo_subagent::provision::ExecutorSpec::Echo,
216 Some("bamboo_runtime") | None => bamboo_subagent::provision::ExecutorSpec::BambooRuntime,
217 Some(other) => return Err(format!("unknown subagents.executor '{other}'")),
218 };
219
220 Ok(Arc::new(ActorChildRunner::new(
221 super::config::LOCAL_ACTOR_AGENT_ID.to_string(),
222 worker_bin,
223 worker_args,
224 fabric_dir,
225 executor,
226 extract_provider_credentials(config),
227 config.provider.clone(),
228 sub.max_concurrent
229 .unwrap_or(super::actor_adapter::DEFAULT_MAX_CONCURRENT_ACTORS),
230 )))
231}
232
233pub fn extract_provider_credentials(
238 config: &Config,
239) -> Vec<bamboo_subagent::provision::ScopedCredential> {
240 let mut out = Vec::new();
241
242 if let Ok(serde_json::Value::Object(providers)) = serde_json::to_value(&config.providers) {
244 out.extend(providers.into_iter().filter_map(|(name, slot)| {
245 let api_key = slot.get("api_key")?.as_str()?.trim().to_string();
246 if api_key.is_empty() {
247 return None;
248 }
249 Some(bamboo_subagent::provision::ScopedCredential {
250 provider: name.clone(),
251 api_key,
252 base_url: slot
253 .get("base_url")
254 .and_then(|v| v.as_str())
255 .map(str::to_string),
256 provider_type: Some(name),
257 })
258 }));
259 }
260
261 out.extend(config.provider_instances.iter().filter_map(|(id, inst)| {
266 let api_key = inst.api_key.trim().to_string();
267 if api_key.is_empty() {
268 return None;
269 }
270 Some(bamboo_subagent::provision::ScopedCredential {
271 provider: id.clone(),
272 api_key,
273 base_url: inst.base_url.clone(),
274 provider_type: Some(inst.provider_type.clone()),
275 })
276 }));
277
278 out
279}