bamboo_server/external_agents/
runtime.rs1use std::sync::Arc;
2
3use async_trait::async_trait;
4use bamboo_agent_core::{AgentError, AgentEvent};
5use bamboo_engine::runtime::execution::{ExternalChildRunner, SpawnJob};
6use bamboo_infrastructure::a2a::A2AJsonRpcClient;
7use bamboo_infrastructure::Config;
8use tokio::sync::mpsc;
9use tokio_util::sync::CancellationToken;
10
11use super::a2a_adapter::A2AExternalChildRunner;
12use super::config::{parse_external_agents, ExternalAgentProtocol};
13
14pub struct CompositeExternalChildRunner {
16 runners: Vec<Arc<dyn ExternalChildRunner>>,
17}
18
19impl CompositeExternalChildRunner {
20 pub fn new(runners: Vec<Arc<dyn ExternalChildRunner>>) -> Self {
21 Self { runners }
22 }
23}
24
25#[async_trait]
26impl ExternalChildRunner for CompositeExternalChildRunner {
27 async fn should_handle(&self, session: &bamboo_agent_core::Session) -> bool {
28 for runner in &self.runners {
29 if runner.should_handle(session).await {
30 return true;
31 }
32 }
33 false
34 }
35
36 async fn execute_external_child(
37 &self,
38 session: &mut bamboo_agent_core::Session,
39 job: &SpawnJob,
40 event_tx: mpsc::Sender<AgentEvent>,
41 cancel_token: CancellationToken,
42 ) -> bamboo_engine::runtime::runner::Result<()> {
43 for runner in &self.runners {
44 if runner.should_handle(session).await {
45 return runner
46 .execute_external_child(session, job, event_tx, cancel_token)
47 .await;
48 }
49 }
50 Err(AgentError::LLM(
51 "No matching external child runner found for session metadata".to_string(),
52 ))
53 }
54}
55
56pub fn build_external_child_runner(config: &Config) -> Option<Arc<dyn ExternalChildRunner>> {
62 let agents = parse_external_agents(config);
63 if agents.is_empty() {
64 return None;
65 }
66
67 let mut runners: Vec<Arc<dyn ExternalChildRunner>> = Vec::new();
68
69 for (_agent_id, profile) in agents {
70 if !matches!(profile.protocol, ExternalAgentProtocol::A2aJsonRpc) {
71 tracing::warn!(
72 "External agent profile {} uses unsupported protocol {:?}",
73 profile.agent_id,
74 profile.protocol
75 );
76 continue;
77 }
78
79 let auth_token = match profile.auth_ref.as_ref() {
80 Some(ref_name) => match std::env::var(ref_name) {
81 Ok(token) => Some(token),
82 Err(_) => {
83 tracing::error!(
84 "External agent profile {} auth_ref env var {} is not set",
85 profile.agent_id,
86 ref_name
87 );
88 continue;
89 }
90 },
91 None => None,
92 };
93
94 let client_config = match A2AExternalChildRunner::build_client_config(&profile, auth_token)
95 {
96 Ok(cfg) => cfg,
97 Err(e) => {
98 tracing::error!(
99 "Failed to build A2A client config for profile {}: {}",
100 profile.agent_id,
101 e
102 );
103 continue;
104 }
105 };
106
107 let client = match A2AJsonRpcClient::new(client_config) {
108 Ok(c) => c,
109 Err(e) => {
110 tracing::error!(
111 "Failed to create A2A JSON-RPC client for profile {}: {}",
112 profile.agent_id,
113 e
114 );
115 continue;
116 }
117 };
118
119 runners.push(Arc::new(A2AExternalChildRunner::new(client, profile)));
120 }
121
122 if runners.is_empty() {
123 None
124 } else {
125 Some(Arc::new(CompositeExternalChildRunner::new(runners)))
126 }
127}