Skip to main content

bamboo_server/external_agents/
runtime.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use bamboo_agent_core::{AgentError, AgentEvent};
5use bamboo_engine::runtime::execution::{ExternalChildRunner, SpawnJob};
6use bamboo_infrastructure::a2a::A2AJsonRpcClient;
7use bamboo_infrastructure::Config;
8use tokio::sync::mpsc;
9use tokio_util::sync::CancellationToken;
10
11use super::a2a_adapter::A2AExternalChildRunner;
12use super::config::{parse_external_agents, ExternalAgentProtocol};
13
14/// Composite router that delegates to the first matching external child runner.
15pub struct CompositeExternalChildRunner {
16    runners: Vec<Arc<dyn ExternalChildRunner>>,
17}
18
19impl CompositeExternalChildRunner {
20    pub fn new(runners: Vec<Arc<dyn ExternalChildRunner>>) -> Self {
21        Self { runners }
22    }
23}
24
25#[async_trait]
26impl ExternalChildRunner for CompositeExternalChildRunner {
27    async fn should_handle(&self, session: &bamboo_agent_core::Session) -> bool {
28        for runner in &self.runners {
29            if runner.should_handle(session).await {
30                return true;
31            }
32        }
33        false
34    }
35
36    async fn execute_external_child(
37        &self,
38        session: &mut bamboo_agent_core::Session,
39        job: &SpawnJob,
40        event_tx: mpsc::Sender<AgentEvent>,
41        cancel_token: CancellationToken,
42    ) -> bamboo_engine::runtime::runner::Result<()> {
43        for runner in &self.runners {
44            if runner.should_handle(session).await {
45                return runner
46                    .execute_external_child(session, job, event_tx, cancel_token)
47                    .await;
48            }
49        }
50        Err(AgentError::LLM(
51            "No matching external child runner found for session metadata".to_string(),
52        ))
53    }
54}
55
56/// Build an external child runner from the application config.
57///
58/// Returns `None` if no A2A profiles are configured.
59/// Builds a composite router over all configured A2A profiles so that
60/// `external.agent_id` metadata selects the correct runner.
61pub fn build_external_child_runner(config: &Config) -> Option<Arc<dyn ExternalChildRunner>> {
62    let agents = parse_external_agents(config);
63    if agents.is_empty() {
64        return None;
65    }
66
67    let mut runners: Vec<Arc<dyn ExternalChildRunner>> = Vec::new();
68
69    for (_agent_id, profile) in agents {
70        if !matches!(profile.protocol, ExternalAgentProtocol::A2aJsonRpc) {
71            tracing::warn!(
72                "External agent profile {} uses unsupported protocol {:?}",
73                profile.agent_id,
74                profile.protocol
75            );
76            continue;
77        }
78
79        let auth_token = match profile.auth_ref.as_ref() {
80            Some(ref_name) => match std::env::var(ref_name) {
81                Ok(token) => Some(token),
82                Err(_) => {
83                    tracing::error!(
84                        "External agent profile {} auth_ref env var {} is not set",
85                        profile.agent_id,
86                        ref_name
87                    );
88                    continue;
89                }
90            },
91            None => None,
92        };
93
94        let client_config = match A2AExternalChildRunner::build_client_config(&profile, auth_token)
95        {
96            Ok(cfg) => cfg,
97            Err(e) => {
98                tracing::error!(
99                    "Failed to build A2A client config for profile {}: {}",
100                    profile.agent_id,
101                    e
102                );
103                continue;
104            }
105        };
106
107        let client = match A2AJsonRpcClient::new(client_config) {
108            Ok(c) => c,
109            Err(e) => {
110                tracing::error!(
111                    "Failed to create A2A JSON-RPC client for profile {}: {}",
112                    profile.agent_id,
113                    e
114                );
115                continue;
116            }
117        };
118
119        runners.push(Arc::new(A2AExternalChildRunner::new(client, profile)));
120    }
121
122    if runners.is_empty() {
123        None
124    } else {
125        Some(Arc::new(CompositeExternalChildRunner::new(runners)))
126    }
127}