bamboo_engine/external_agents/
runtime.rs1use std::sync::Arc;
2
3use crate::runtime::execution::{ExternalChildRunner, SpawnJob};
4use async_trait::async_trait;
5use bamboo_a2a::A2AJsonRpcClient;
6use bamboo_agent_core::{AgentError, AgentEvent};
7use bamboo_llm::Config;
8use tokio::sync::mpsc;
9use tokio_util::sync::CancellationToken;
10
11use super::a2a_adapter::A2AExternalChildRunner;
12use super::actor_adapter::ActorChildRunner;
13use super::config::{parse_external_agents, ExternalAgentProtocol};
14
15pub struct CompositeExternalChildRunner {
17 runners: Vec<Arc<dyn ExternalChildRunner>>,
18}
19
20impl CompositeExternalChildRunner {
21 pub fn new(runners: Vec<Arc<dyn ExternalChildRunner>>) -> Self {
22 Self { runners }
23 }
24}
25
26#[async_trait]
27impl ExternalChildRunner for CompositeExternalChildRunner {
28 async fn should_handle(&self, session: &bamboo_agent_core::Session) -> bool {
29 for runner in &self.runners {
30 if runner.should_handle(session).await {
31 return true;
32 }
33 }
34 false
35 }
36
37 async fn execute_external_child(
38 &self,
39 session: &mut bamboo_agent_core::Session,
40 job: &SpawnJob,
41 event_tx: mpsc::Sender<AgentEvent>,
42 cancel_token: CancellationToken,
43 ) -> crate::runtime::runner::Result<()> {
44 for runner in &self.runners {
45 if runner.should_handle(session).await {
46 return runner
47 .execute_external_child(session, job, event_tx, cancel_token)
48 .await;
49 }
50 }
51 Err(AgentError::LLM(
52 "No matching external child runner found for session metadata".to_string(),
53 ))
54 }
55}
56
57pub fn build_external_child_runner(config: &Config) -> Option<Arc<dyn ExternalChildRunner>> {
65 let agents = parse_external_agents(config);
66
67 let mut runners: Vec<Arc<dyn ExternalChildRunner>> = Vec::new();
68
69 if config.subagents.any_actor() {
71 match build_local_actor_runner(config) {
72 Ok(runner) => runners.push(runner),
73 Err(e) => tracing::error!("subagents.runtime=actor unavailable: {e}"),
74 }
75 }
76
77 if agents.is_empty() && runners.is_empty() {
78 return None;
79 }
80
81 for (_agent_id, profile) in agents {
82 if matches!(profile.protocol, ExternalAgentProtocol::Actor) {
84 let Some(worker_bin) = profile.worker_bin.as_ref() else {
85 tracing::error!(
86 "Actor agent profile {} has no worker_bin; skipping",
87 profile.agent_id
88 );
89 continue;
90 };
91 let fabric_dir = profile
92 .fabric_dir
93 .clone()
94 .map(std::path::PathBuf::from)
95 .unwrap_or_else(|| std::env::temp_dir().join("bamboo-subagents"));
96 let executor = match profile.executor.as_deref() {
97 Some("echo") => bamboo_subagent::provision::ExecutorSpec::Echo,
98 Some("bamboo_runtime") | None => {
99 bamboo_subagent::provision::ExecutorSpec::BambooRuntime
100 }
101 Some(other) => {
102 tracing::error!(
103 "Actor agent profile {} has unknown executor '{}'; skipping",
104 profile.agent_id,
105 other
106 );
107 continue;
108 }
109 };
110 runners.push(Arc::new(ActorChildRunner::new(
111 profile.agent_id.clone(),
112 std::path::PathBuf::from(worker_bin),
113 profile.worker_args.clone(),
114 fabric_dir,
115 executor,
116 extract_provider_credentials(config),
117 config.provider.clone(),
118 config
119 .subagents
120 .max_concurrent
121 .unwrap_or(super::actor_adapter::DEFAULT_MAX_CONCURRENT_ACTORS),
122 )));
123 continue;
124 }
125
126 if !matches!(profile.protocol, ExternalAgentProtocol::A2aJsonRpc) {
127 tracing::warn!(
128 "External agent profile {} uses unsupported protocol {:?}",
129 profile.agent_id,
130 profile.protocol
131 );
132 continue;
133 }
134
135 let auth_token = match profile.auth_ref.as_ref() {
136 Some(ref_name) => match std::env::var(ref_name) {
137 Ok(token) => Some(token),
138 Err(_) => {
139 tracing::error!(
140 "External agent profile {} auth_ref env var {} is not set",
141 profile.agent_id,
142 ref_name
143 );
144 continue;
145 }
146 },
147 None => None,
148 };
149
150 let client_config = match A2AExternalChildRunner::build_client_config(&profile, auth_token)
151 {
152 Ok(cfg) => cfg,
153 Err(e) => {
154 tracing::error!(
155 "Failed to build A2A client config for profile {}: {}",
156 profile.agent_id,
157 e
158 );
159 continue;
160 }
161 };
162
163 let client = match A2AJsonRpcClient::new(client_config) {
164 Ok(c) => c,
165 Err(e) => {
166 tracing::error!(
167 "Failed to create A2A JSON-RPC client for profile {}: {}",
168 profile.agent_id,
169 e
170 );
171 continue;
172 }
173 };
174
175 runners.push(Arc::new(A2AExternalChildRunner::new(client, profile)));
176 }
177
178 if runners.is_empty() {
179 None
180 } else {
181 Some(Arc::new(CompositeExternalChildRunner::new(runners)))
182 }
183}
184
185fn build_local_actor_runner(config: &Config) -> Result<Arc<dyn ExternalChildRunner>, String> {
190 let sub = &config.subagents;
191
192 let (worker_bin, worker_args) = match &sub.worker_bin {
193 Some(custom) => (
194 std::path::PathBuf::from(custom),
195 sub.worker_args.clone().unwrap_or_default(),
196 ),
197 None => (
198 std::env::current_exe().map_err(|e| format!("cannot locate own executable: {e}"))?,
199 sub.worker_args
200 .clone()
201 .unwrap_or_else(|| vec!["subagent-worker".to_string()]),
202 ),
203 };
204
205 let fabric_dir = sub
206 .fabric_dir
207 .clone()
208 .map(std::path::PathBuf::from)
209 .unwrap_or_else(|| std::env::temp_dir().join("bamboo-subagents"));
210
211 let executor = match sub.executor.as_deref() {
212 Some("echo") => bamboo_subagent::provision::ExecutorSpec::Echo,
213 Some("bamboo_runtime") | None => bamboo_subagent::provision::ExecutorSpec::BambooRuntime,
214 Some(other) => return Err(format!("unknown subagents.executor '{other}'")),
215 };
216
217 Ok(Arc::new(ActorChildRunner::new(
218 super::config::LOCAL_ACTOR_AGENT_ID.to_string(),
219 worker_bin,
220 worker_args,
221 fabric_dir,
222 executor,
223 extract_provider_credentials(config),
224 config.provider.clone(),
225 sub.max_concurrent
226 .unwrap_or(super::actor_adapter::DEFAULT_MAX_CONCURRENT_ACTORS),
227 )))
228}
229
230pub fn extract_provider_credentials(
235 config: &Config,
236) -> Vec<bamboo_subagent::provision::ScopedCredential> {
237 let mut out = Vec::new();
238
239 if let Ok(serde_json::Value::Object(providers)) = serde_json::to_value(&config.providers) {
241 out.extend(providers.into_iter().filter_map(|(name, slot)| {
242 let api_key = slot.get("api_key")?.as_str()?.trim().to_string();
243 if api_key.is_empty() {
244 return None;
245 }
246 Some(bamboo_subagent::provision::ScopedCredential {
247 provider: name.clone(),
248 api_key,
249 base_url: slot
250 .get("base_url")
251 .and_then(|v| v.as_str())
252 .map(str::to_string),
253 provider_type: Some(name),
254 })
255 }));
256 }
257
258 out.extend(config.provider_instances.iter().filter_map(|(id, inst)| {
263 let api_key = inst.api_key.trim().to_string();
264 if api_key.is_empty() {
265 return None;
266 }
267 Some(bamboo_subagent::provision::ScopedCredential {
268 provider: id.clone(),
269 api_key,
270 base_url: inst.base_url.clone(),
271 provider_type: Some(inst.provider_type.clone()),
272 })
273 }));
274
275 out
276}