1use std::sync::Arc;
2
3use crate::runtime::execution::{ExternalChildRunner, SpawnJob};
4use async_trait::async_trait;
5use bamboo_a2a::A2AJsonRpcClient;
6use bamboo_agent_core::{AgentError, AgentEvent};
7use bamboo_llm::Config;
8use tokio::sync::mpsc;
9use tokio_util::sync::CancellationToken;
10
11use super::a2a_adapter::A2AExternalChildRunner;
12use super::actor_adapter::ActorChildRunner;
13use super::config::{parse_external_agents, ExternalAgentProtocol};
14
15pub struct CompositeExternalChildRunner {
17 runners: Vec<Arc<dyn ExternalChildRunner>>,
18}
19
20impl CompositeExternalChildRunner {
21 pub fn new(runners: Vec<Arc<dyn ExternalChildRunner>>) -> Self {
22 Self { runners }
23 }
24}
25
26#[async_trait]
27impl ExternalChildRunner for CompositeExternalChildRunner {
28 async fn should_handle(&self, session: &bamboo_agent_core::Session) -> bool {
29 for runner in &self.runners {
30 if runner.should_handle(session).await {
31 return true;
32 }
33 }
34 false
35 }
36
37 async fn execute_external_child(
38 &self,
39 session: &mut bamboo_agent_core::Session,
40 job: &SpawnJob,
41 event_tx: mpsc::Sender<AgentEvent>,
42 cancel_token: CancellationToken,
43 ) -> crate::runtime::runner::Result<()> {
44 for runner in &self.runners {
45 if runner.should_handle(session).await {
46 return runner
47 .execute_external_child(session, job, event_tx, cancel_token)
48 .await;
49 }
50 }
51 Err(AgentError::LLM(
52 "No matching external child runner found for session metadata".to_string(),
53 ))
54 }
55
56 fn set_escalation_bridge(&self, bridge: Option<bamboo_subagent::executor::HostBridge>) {
61 for runner in &self.runners {
62 runner.set_escalation_bridge(bridge.clone());
63 }
64 }
65}
66
67pub fn build_external_child_runner(config: &Config) -> Arc<dyn ExternalChildRunner> {
76 let agents = parse_external_agents(config);
77
78 let mut runners: Vec<Arc<dyn ExternalChildRunner>> = Vec::new();
79
80 match build_local_actor_runner(config) {
84 Ok(runner) => runners.push(runner),
85 Err(e) => tracing::error!("local actor sub-agent runner unavailable: {e}"),
86 }
87
88 for (_agent_id, profile) in agents {
89 if matches!(profile.protocol, ExternalAgentProtocol::Actor) {
91 let Some(worker_bin) = profile.worker_bin.as_ref() else {
92 tracing::error!(
93 "Actor agent profile {} has no worker_bin; skipping",
94 profile.agent_id
95 );
96 continue;
97 };
98 let fabric_dir = profile
99 .fabric_dir
100 .clone()
101 .map(std::path::PathBuf::from)
102 .unwrap_or_else(|| std::env::temp_dir().join("bamboo-subagents"));
103 let executor = match profile.executor.as_deref() {
104 Some("echo") => bamboo_subagent::provision::ExecutorSpec::Echo,
105 Some("bamboo_runtime") | None => {
106 bamboo_subagent::provision::ExecutorSpec::BambooRuntime
107 }
108 Some(other) => {
109 tracing::error!(
110 "Actor agent profile {} has unknown executor '{}'; skipping",
111 profile.agent_id,
112 other
113 );
114 continue;
115 }
116 };
117 runners.push(Arc::new(ActorChildRunner::new(
118 profile.agent_id.clone(),
119 std::path::PathBuf::from(worker_bin),
120 profile.worker_args.clone(),
121 fabric_dir,
122 executor,
123 extract_provider_credentials(config),
124 config.provider.clone(),
125 config
126 .subagents
127 .max_concurrent
128 .unwrap_or(super::actor_adapter::DEFAULT_MAX_CONCURRENT_ACTORS),
129 )));
130 continue;
131 }
132
133 if !matches!(profile.protocol, ExternalAgentProtocol::A2aJsonRpc) {
134 tracing::warn!(
135 "External agent profile {} uses unsupported protocol {:?}",
136 profile.agent_id,
137 profile.protocol
138 );
139 continue;
140 }
141
142 let auth_token = match profile.auth_ref.as_ref() {
143 Some(ref_name) => match std::env::var(ref_name) {
144 Ok(token) => Some(token),
145 Err(_) => {
146 tracing::error!(
147 "External agent profile {} auth_ref env var {} is not set",
148 profile.agent_id,
149 ref_name
150 );
151 continue;
152 }
153 },
154 None => None,
155 };
156
157 let client_config = match A2AExternalChildRunner::build_client_config(&profile, auth_token)
158 {
159 Ok(cfg) => cfg,
160 Err(e) => {
161 tracing::error!(
162 "Failed to build A2A client config for profile {}: {}",
163 profile.agent_id,
164 e
165 );
166 continue;
167 }
168 };
169
170 let client = match A2AJsonRpcClient::new(client_config) {
171 Ok(c) => c,
172 Err(e) => {
173 tracing::error!(
174 "Failed to create A2A JSON-RPC client for profile {}: {}",
175 profile.agent_id,
176 e
177 );
178 continue;
179 }
180 };
181
182 runners.push(Arc::new(A2AExternalChildRunner::new(client, profile)));
183 }
184
185 Arc::new(CompositeExternalChildRunner::new(runners))
186}
187
188fn build_local_actor_runner(config: &Config) -> Result<Arc<dyn ExternalChildRunner>, String> {
193 let sub = &config.subagents;
194
195 let (worker_bin, worker_args) = match &sub.worker_bin {
196 Some(custom) => (
197 std::path::PathBuf::from(custom),
198 sub.worker_args.clone().unwrap_or_default(),
199 ),
200 None => (
201 std::env::current_exe().map_err(|e| format!("cannot locate own executable: {e}"))?,
202 sub.worker_args
203 .clone()
204 .unwrap_or_else(|| vec!["subagent-worker".to_string()]),
205 ),
206 };
207
208 let fabric_dir = sub
209 .fabric_dir
210 .clone()
211 .map(std::path::PathBuf::from)
212 .unwrap_or_else(|| std::env::temp_dir().join("bamboo-subagents"));
213
214 let executor = match sub.executor.as_deref() {
215 Some("echo") => bamboo_subagent::provision::ExecutorSpec::Echo,
216 Some("bamboo_runtime") | None => bamboo_subagent::provision::ExecutorSpec::BambooRuntime,
217 Some(other) => return Err(format!("unknown subagents.executor '{other}'")),
218 };
219
220 Ok(Arc::new(
221 ActorChildRunner::new(
222 super::config::LOCAL_ACTOR_AGENT_ID.to_string(),
223 worker_bin,
224 worker_args,
225 fabric_dir,
226 executor,
227 extract_provider_credentials(config),
228 config.provider.clone(),
229 sub.max_concurrent
230 .unwrap_or(super::actor_adapter::DEFAULT_MAX_CONCURRENT_ACTORS),
231 )
232 .with_remote_placements(resolve_remote_placements(
233 &sub.remote_placements,
234 &config.cluster_fabric.nodes,
235 ))
236 .with_schedulable_placements(resolve_schedulable_placements(
237 &sub.schedulable_placements,
238 &config.cluster_fabric.nodes,
239 ))
240 .with_bus(sub.broker.as_ref().map(|b| bamboo_subagent::BusEndpoint {
241 endpoint: b.endpoint.clone(),
242 token: b.token.clone(),
243 })),
244 ))
245}
246
247fn resolve_schedulable_placements(
256 placements: &[bamboo_config::SchedulablePlacement],
257 nodes: &[bamboo_config::cluster_fabric::Node],
258) -> std::collections::HashMap<String, super::actor_adapter::ResolvedSchedulablePlacement> {
259 placements
262 .iter()
263 .map(|p| {
264 (
265 p.role.clone(),
266 super::actor_adapter::ResolvedSchedulablePlacement {
267 pool: p.pool.clone(),
268 host_label: node_label_for_role(nodes, &p.pool),
271 },
272 )
273 })
274 .collect()
275}
276
277fn node_label_for_role(
281 nodes: &[bamboo_config::cluster_fabric::Node],
282 role: &str,
283) -> Option<String> {
284 nodes
285 .iter()
286 .find(|n| n.deploy.default_role.as_deref() == Some(role))
287 .map(node_display_name)
288}
289
290fn node_label_for_endpoint(
294 nodes: &[bamboo_config::cluster_fabric::Node],
295 endpoint: &str,
296) -> Option<String> {
297 let host = endpoint
298 .trim()
299 .trim_start_matches("wss://")
300 .trim_start_matches("ws://")
301 .split(['/', ':'])
302 .next()
303 .unwrap_or("");
304 if host.is_empty() {
305 return None;
306 }
307 nodes
308 .iter()
309 .find(|n| match &n.placement {
310 bamboo_config::cluster_fabric::NodePlacement::Ssh(t) => t.host == host,
311 bamboo_config::cluster_fabric::NodePlacement::Local => false,
312 })
313 .map(node_display_name)
314}
315
316fn node_display_name(n: &bamboo_config::cluster_fabric::Node) -> String {
317 if !n.label.trim().is_empty() {
318 return n.label.clone();
319 }
320 match &n.placement {
321 bamboo_config::cluster_fabric::NodePlacement::Ssh(t) => t.host.clone(),
322 bamboo_config::cluster_fabric::NodePlacement::Local => "local".to_string(),
323 }
324}
325
326fn endpoint_looks_public(endpoint: &str) -> bool {
337 if endpoint.starts_with("wss://") {
338 return true;
339 }
340 let host = endpoint
341 .strip_prefix("ws://")
342 .unwrap_or(endpoint)
343 .split(['/', ':'])
344 .next()
345 .unwrap_or("");
346 !(host == "localhost" || host == "127.0.0.1" || host == "::1" || host.is_empty())
347}
348
349fn resolve_remote_placements(
350 placements: &[bamboo_config::RemoteActorPlacement],
351 nodes: &[bamboo_config::cluster_fabric::Node],
352) -> std::collections::HashMap<String, super::actor_adapter::ResolvedRemotePlacement> {
353 let mut out = std::collections::HashMap::new();
354 for p in placements {
355 let token = match p.token_env.as_deref() {
356 Some(env_var) => match std::env::var(env_var) {
357 Ok(token) => Some(token),
358 Err(_) => {
359 tracing::error!(
360 "remote placement for role '{}' token_env '{}' is not set; \
361 skipping (role falls back to local, NOT unauthenticated remote)",
362 p.role,
363 env_var
364 );
365 continue;
366 }
367 },
368 None => {
369 if endpoint_looks_public(&p.endpoint) {
373 tracing::warn!(
374 "remote placement for role '{}' has no token_env but targets a \
375 public-looking endpoint '{}'; work will be dispatched with NO bearer. \
376 Set token_env (and use wss://) for any non-loopback worker.",
377 p.role,
378 p.endpoint
379 );
380 }
381 None
382 }
383 };
384 out.insert(
385 p.role.clone(),
386 super::actor_adapter::ResolvedRemotePlacement {
387 endpoint: p.endpoint.clone(),
388 token,
389 ca_cert_file: p.ca_cert_file.as_ref().map(std::path::PathBuf::from),
390 host_label: node_label_for_endpoint(nodes, &p.endpoint),
393 },
394 );
395 }
396 out
397}
398
399pub fn extract_provider_credentials(
404 config: &Config,
405) -> Vec<bamboo_subagent::provision::ScopedCredential> {
406 let mut out = Vec::new();
407
408 if let Ok(serde_json::Value::Object(providers)) = serde_json::to_value(&config.providers) {
410 out.extend(providers.into_iter().filter_map(|(name, slot)| {
411 let api_key = slot.get("api_key")?.as_str()?.trim().to_string();
412 if api_key.is_empty() {
413 return None;
414 }
415 Some(bamboo_subagent::provision::ScopedCredential {
416 provider: name.clone(),
417 api_key,
418 base_url: slot
419 .get("base_url")
420 .and_then(|v| v.as_str())
421 .map(str::to_string),
422 provider_type: Some(name),
423 })
424 }));
425 }
426
427 out.extend(config.provider_instances.iter().filter_map(|(id, inst)| {
432 let api_key = inst.api_key.trim().to_string();
433 if api_key.is_empty() {
434 return None;
435 }
436 Some(bamboo_subagent::provision::ScopedCredential {
437 provider: id.clone(),
438 api_key,
439 base_url: inst.base_url.clone(),
440 provider_type: Some(inst.provider_type.clone()),
441 })
442 }));
443
444 out
445}
446
447#[cfg(test)]
448mod placement_resolver_tests {
449 use super::{node_display_name, resolve_remote_placements, resolve_schedulable_placements};
450 use bamboo_config::cluster_fabric::{
451 DeployProfile, Node, NodePlacement, SshAuth, SshTarget, TrustLevel,
452 };
453 use bamboo_config::{RemoteActorPlacement, SchedulablePlacement};
454
455 fn ssh_node(id: &str, label: &str, host: &str, default_role: Option<&str>) -> Node {
456 Node {
457 id: id.into(),
458 label: label.into(),
459 placement: NodePlacement::Ssh(SshTarget {
460 host: host.into(),
461 port: 22,
462 username: "u".into(),
463 auth: SshAuth::SystemSshConfig,
464 host_key_fingerprint: None,
465 }),
466 trust_level: TrustLevel::default(),
467 deploy: DeployProfile {
468 default_role: default_role.map(String::from),
469 ..Default::default()
470 },
471 state: None,
472 enabled: true,
473 }
474 }
475
476 #[test]
477 fn node_display_name_prefers_label_then_ssh_host() {
478 let n = ssh_node("n1", "mini", "mini.local", None);
479 assert_eq!(node_display_name(&n), "mini");
480 let mut unlabeled = n.clone();
481 unlabeled.label = String::new();
482 assert_eq!(node_display_name(&unlabeled), "mini.local");
483 }
484
485 #[test]
486 fn schedulable_placement_takes_host_label_from_node_by_default_role() {
487 let nodes = vec![ssh_node("n1", "mini", "mini.local", Some("mac-mini-monitor"))];
488 let placements = vec![SchedulablePlacement {
489 role: "mac-mini-monitor".into(),
490 pool: "mac-mini-monitor".into(),
491 ..Default::default()
492 }];
493 let out = resolve_schedulable_placements(&placements, &nodes);
494 let r = out.get("mac-mini-monitor").expect("role resolved");
495 assert_eq!(r.pool, "mac-mini-monitor");
496 assert_eq!(r.host_label.as_deref(), Some("mini"));
497 }
498
499 #[test]
500 fn remote_placement_takes_host_label_from_node_by_ssh_host() {
501 let nodes = vec![ssh_node("n1", "mini", "mini.local", None)];
502 let placements = vec![RemoteActorPlacement {
503 role: "explorer".into(),
504 endpoint: "ws://mini.local:8899".into(),
505 ..Default::default()
506 }];
507 let out = resolve_remote_placements(&placements, &nodes);
508 assert_eq!(out.get("explorer").unwrap().host_label.as_deref(), Some("mini"));
509 }
510
511 #[test]
512 fn no_host_label_when_no_node_matches() {
513 let nodes = vec![ssh_node("n1", "mini", "mini.local", Some("other-role"))];
514 let sched = vec![SchedulablePlacement {
515 role: "x".into(),
516 pool: "unmatched".into(),
517 ..Default::default()
518 }];
519 assert_eq!(
520 resolve_schedulable_placements(&sched, &nodes)
521 .get("x")
522 .unwrap()
523 .host_label,
524 None
525 );
526 let remote = vec![RemoteActorPlacement {
527 role: "y".into(),
528 endpoint: "ws://other-host:9000".into(),
529 ..Default::default()
530 }];
531 assert_eq!(
532 resolve_remote_placements(&remote, &nodes)
533 .get("y")
534 .unwrap()
535 .host_label,
536 None
537 );
538 }
539}