Skip to main content

bamboo_engine/external_agents/
runtime.rs

1use std::sync::Arc;
2
3use crate::runtime::execution::{ExternalChildRunner, SpawnJob};
4use async_trait::async_trait;
5use bamboo_a2a::A2AJsonRpcClient;
6use bamboo_agent_core::{AgentError, AgentEvent};
7use bamboo_llm::Config;
8use tokio::sync::mpsc;
9use tokio_util::sync::CancellationToken;
10
11use super::a2a_adapter::A2AExternalChildRunner;
12use super::actor_adapter::ActorChildRunner;
13use super::config::{parse_external_agents, ExternalAgentProtocol};
14
15/// Composite router that delegates to the first matching external child runner.
16pub struct CompositeExternalChildRunner {
17    runners: Vec<Arc<dyn ExternalChildRunner>>,
18}
19
20impl CompositeExternalChildRunner {
21    pub fn new(runners: Vec<Arc<dyn ExternalChildRunner>>) -> Self {
22        Self { runners }
23    }
24}
25
26#[async_trait]
27impl ExternalChildRunner for CompositeExternalChildRunner {
28    async fn should_handle(&self, session: &bamboo_agent_core::Session) -> bool {
29        for runner in &self.runners {
30            if runner.should_handle(session).await {
31                return true;
32            }
33        }
34        false
35    }
36
37    async fn execute_external_child(
38        &self,
39        session: &mut bamboo_agent_core::Session,
40        job: &SpawnJob,
41        event_tx: mpsc::Sender<AgentEvent>,
42        cancel_token: CancellationToken,
43    ) -> crate::runtime::runner::Result<()> {
44        for runner in &self.runners {
45            if runner.should_handle(session).await {
46                return runner
47                    .execute_external_child(session, job, event_tx, cancel_token)
48                    .await;
49            }
50        }
51        Err(AgentError::LLM(
52            "No matching external child runner found for session metadata".to_string(),
53        ))
54    }
55}
56
57/// Build the child runner from the application config.
58///
59/// Sub-agents always run as actors (the in-process runtime was removed), so the
60/// built-in **local actor** worker is always part of the composite — its worker
61/// binary, arguments, and discovery dir are all derived; no expert tables
62/// needed. Expert `externalAgents` profiles add extra routers so
63/// `external.agent_id` metadata can pin specific roles to other agents. Returns
64/// a composite router that delegates to the first matching runner.
65pub fn build_external_child_runner(config: &Config) -> Arc<dyn ExternalChildRunner> {
66    let agents = parse_external_agents(config);
67
68    let mut runners: Vec<Arc<dyn ExternalChildRunner>> = Vec::new();
69
70    // The built-in local actor worker is the default runtime for every
71    // sub-agent. Always build it; a build failure here is logged and leaves the
72    // composite without a default handler (dispatch then errors clearly).
73    match build_local_actor_runner(config) {
74        Ok(runner) => runners.push(runner),
75        Err(e) => tracing::error!("local actor sub-agent runner unavailable: {e}"),
76    }
77
78    for (_agent_id, profile) in agents {
79        // Actor protocol: spawn a local worker binary over the bamboo-subagent WS protocol.
80        if matches!(profile.protocol, ExternalAgentProtocol::Actor) {
81            let Some(worker_bin) = profile.worker_bin.as_ref() else {
82                tracing::error!(
83                    "Actor agent profile {} has no worker_bin; skipping",
84                    profile.agent_id
85                );
86                continue;
87            };
88            let fabric_dir = profile
89                .fabric_dir
90                .clone()
91                .map(std::path::PathBuf::from)
92                .unwrap_or_else(|| std::env::temp_dir().join("bamboo-subagents"));
93            let executor = match profile.executor.as_deref() {
94                Some("echo") => bamboo_subagent::provision::ExecutorSpec::Echo,
95                Some("bamboo_runtime") | None => {
96                    bamboo_subagent::provision::ExecutorSpec::BambooRuntime
97                }
98                Some(other) => {
99                    tracing::error!(
100                        "Actor agent profile {} has unknown executor '{}'; skipping",
101                        profile.agent_id,
102                        other
103                    );
104                    continue;
105                }
106            };
107            runners.push(Arc::new(ActorChildRunner::new(
108                profile.agent_id.clone(),
109                std::path::PathBuf::from(worker_bin),
110                profile.worker_args.clone(),
111                fabric_dir,
112                executor,
113                extract_provider_credentials(config),
114                config.provider.clone(),
115                config
116                    .subagents
117                    .max_concurrent
118                    .unwrap_or(super::actor_adapter::DEFAULT_MAX_CONCURRENT_ACTORS),
119            )));
120            continue;
121        }
122
123        if !matches!(profile.protocol, ExternalAgentProtocol::A2aJsonRpc) {
124            tracing::warn!(
125                "External agent profile {} uses unsupported protocol {:?}",
126                profile.agent_id,
127                profile.protocol
128            );
129            continue;
130        }
131
132        let auth_token = match profile.auth_ref.as_ref() {
133            Some(ref_name) => match std::env::var(ref_name) {
134                Ok(token) => Some(token),
135                Err(_) => {
136                    tracing::error!(
137                        "External agent profile {} auth_ref env var {} is not set",
138                        profile.agent_id,
139                        ref_name
140                    );
141                    continue;
142                }
143            },
144            None => None,
145        };
146
147        let client_config = match A2AExternalChildRunner::build_client_config(&profile, auth_token)
148        {
149            Ok(cfg) => cfg,
150            Err(e) => {
151                tracing::error!(
152                    "Failed to build A2A client config for profile {}: {}",
153                    profile.agent_id,
154                    e
155                );
156                continue;
157            }
158        };
159
160        let client = match A2AJsonRpcClient::new(client_config) {
161            Ok(c) => c,
162            Err(e) => {
163                tracing::error!(
164                    "Failed to create A2A JSON-RPC client for profile {}: {}",
165                    profile.agent_id,
166                    e
167                );
168                continue;
169            }
170        };
171
172        runners.push(Arc::new(A2AExternalChildRunner::new(client, profile)));
173    }
174
175    Arc::new(CompositeExternalChildRunner::new(runners))
176}
177
178/// Build the built-in local actor runner from the typed `subagents`
179/// config. Everything is derived: worker = the current bamboo executable +
180/// `subagent-worker`, fabric = per-user temp dir — unless expert fields
181/// override them.
182fn build_local_actor_runner(config: &Config) -> Result<Arc<dyn ExternalChildRunner>, String> {
183    let sub = &config.subagents;
184
185    let (worker_bin, worker_args) = match &sub.worker_bin {
186        Some(custom) => (
187            std::path::PathBuf::from(custom),
188            sub.worker_args.clone().unwrap_or_default(),
189        ),
190        None => (
191            std::env::current_exe().map_err(|e| format!("cannot locate own executable: {e}"))?,
192            sub.worker_args
193                .clone()
194                .unwrap_or_else(|| vec!["subagent-worker".to_string()]),
195        ),
196    };
197
198    let fabric_dir = sub
199        .fabric_dir
200        .clone()
201        .map(std::path::PathBuf::from)
202        .unwrap_or_else(|| std::env::temp_dir().join("bamboo-subagents"));
203
204    let executor = match sub.executor.as_deref() {
205        Some("echo") => bamboo_subagent::provision::ExecutorSpec::Echo,
206        Some("bamboo_runtime") | None => bamboo_subagent::provision::ExecutorSpec::BambooRuntime,
207        Some(other) => return Err(format!("unknown subagents.executor '{other}'")),
208    };
209
210    Ok(Arc::new(ActorChildRunner::new(
211        super::config::LOCAL_ACTOR_AGENT_ID.to_string(),
212        worker_bin,
213        worker_args,
214        fabric_dir,
215        executor,
216        extract_provider_credentials(config),
217        config.provider.clone(),
218        sub.max_concurrent
219            .unwrap_or(super::actor_adapter::DEFAULT_MAX_CONCURRENT_ACTORS),
220    )))
221}
222
223/// Snapshot per-provider credentials from the parent config for actor
224/// provisioning. Serialized generically so this code does not chase the
225/// per-provider config struct shapes — any slot with a non-empty `api_key`
226/// yields a scoped credential (plus `base_url` when present).
227pub fn extract_provider_credentials(
228    config: &Config,
229) -> Vec<bamboo_subagent::provision::ScopedCredential> {
230    let mut out = Vec::new();
231
232    // Legacy single-instance slots: providers.anthropic / openai / …
233    if let Ok(serde_json::Value::Object(providers)) = serde_json::to_value(&config.providers) {
234        out.extend(providers.into_iter().filter_map(|(name, slot)| {
235            let api_key = slot.get("api_key")?.as_str()?.trim().to_string();
236            if api_key.is_empty() {
237                return None;
238            }
239            Some(bamboo_subagent::provision::ScopedCredential {
240                provider: name.clone(),
241                api_key,
242                base_url: slot
243                    .get("base_url")
244                    .and_then(|v| v.as_str())
245                    .map(str::to_string),
246                provider_type: Some(name),
247            })
248        }));
249    }
250
251    // Multi-instance providers: provider_instances keyed by instance id; the
252    // child routes by instance id, the worker constructs by provider_type.
253    // Read the typed struct directly — `api_key` is hydrated in memory but
254    // deliberately `skip_serializing`, so a serde projection would miss it.
255    out.extend(config.provider_instances.iter().filter_map(|(id, inst)| {
256        let api_key = inst.api_key.trim().to_string();
257        if api_key.is_empty() {
258            return None;
259        }
260        Some(bamboo_subagent::provision::ScopedCredential {
261            provider: id.clone(),
262            api_key,
263            base_url: inst.base_url.clone(),
264            provider_type: Some(inst.provider_type.clone()),
265        })
266    }));
267
268    out
269}