Skip to main content

bamboo_engine/external_agents/
runtime.rs

1use std::sync::Arc;
2
3use crate::runtime::execution::{ExternalChildRunner, SpawnJob};
4use async_trait::async_trait;
5use bamboo_a2a::A2AJsonRpcClient;
6use bamboo_agent_core::{AgentError, AgentEvent};
7use bamboo_llm::Config;
8use tokio::sync::mpsc;
9use tokio_util::sync::CancellationToken;
10
11use super::a2a_adapter::A2AExternalChildRunner;
12use super::actor_adapter::ActorChildRunner;
13use super::config::{parse_external_agents, ExternalAgentProtocol};
14
15/// Composite router that delegates to the first matching external child runner.
16pub struct CompositeExternalChildRunner {
17    runners: Vec<Arc<dyn ExternalChildRunner>>,
18}
19
20impl CompositeExternalChildRunner {
21    pub fn new(runners: Vec<Arc<dyn ExternalChildRunner>>) -> Self {
22        Self { runners }
23    }
24}
25
26#[async_trait]
27impl ExternalChildRunner for CompositeExternalChildRunner {
28    async fn should_handle(&self, session: &bamboo_agent_core::Session) -> bool {
29        for runner in &self.runners {
30            if runner.should_handle(session).await {
31                return true;
32            }
33        }
34        false
35    }
36
37    async fn execute_external_child(
38        &self,
39        session: &mut bamboo_agent_core::Session,
40        job: &SpawnJob,
41        event_tx: mpsc::Sender<AgentEvent>,
42        cancel_token: CancellationToken,
43    ) -> crate::runtime::runner::Result<()> {
44        for runner in &self.runners {
45            if runner.should_handle(session).await {
46                return runner
47                    .execute_external_child(session, job, event_tx, cancel_token)
48                    .await;
49            }
50        }
51        Err(AgentError::LLM(
52            "No matching external child runner found for session metadata".to_string(),
53        ))
54    }
55
56    /// #68: fan the per-run escalation bridge out to every inner runner. The
57    /// composite is what `build_external_child_runner` returns and what the
58    /// worker retains, so without this forward the bind would hit the trait's
59    /// no-op default and the wrapped `ActorChildRunner`s would never see it.
60    fn set_escalation_bridge(&self, bridge: Option<bamboo_subagent::executor::HostBridge>) {
61        for runner in &self.runners {
62            runner.set_escalation_bridge(bridge.clone());
63        }
64    }
65}
66
67/// Build the child runner from the application config.
68///
69/// Sub-agents always run as actors (the in-process runtime was removed), so the
70/// built-in **local actor** worker is always part of the composite — its worker
71/// binary, arguments, and discovery dir are all derived; no expert tables
72/// needed. Expert `externalAgents` profiles add extra routers so
73/// `external.agent_id` metadata can pin specific roles to other agents. Returns
74/// a composite router that delegates to the first matching runner.
75pub fn build_external_child_runner(config: &Config) -> Arc<dyn ExternalChildRunner> {
76    let agents = parse_external_agents(config);
77
78    let mut runners: Vec<Arc<dyn ExternalChildRunner>> = Vec::new();
79
80    // The built-in local actor worker is the default runtime for every
81    // sub-agent. Always build it; a build failure here is logged and leaves the
82    // composite without a default handler (dispatch then errors clearly).
83    match build_local_actor_runner(config) {
84        Ok(runner) => runners.push(runner),
85        Err(e) => tracing::error!("local actor sub-agent runner unavailable: {e}"),
86    }
87
88    for (_agent_id, profile) in agents {
89        // Actor protocol: spawn a local worker binary over the bamboo-subagent WS protocol.
90        if matches!(profile.protocol, ExternalAgentProtocol::Actor) {
91            let Some(worker_bin) = profile.worker_bin.as_ref() else {
92                tracing::error!(
93                    "Actor agent profile {} has no worker_bin; skipping",
94                    profile.agent_id
95                );
96                continue;
97            };
98            let fabric_dir = profile
99                .fabric_dir
100                .clone()
101                .map(std::path::PathBuf::from)
102                .unwrap_or_else(|| std::env::temp_dir().join("bamboo-subagents"));
103            let executor = match profile.executor.as_deref() {
104                Some("echo") => bamboo_subagent::provision::ExecutorSpec::Echo,
105                Some("bamboo_runtime") | None => {
106                    bamboo_subagent::provision::ExecutorSpec::BambooRuntime
107                }
108                Some(other) => {
109                    tracing::error!(
110                        "Actor agent profile {} has unknown executor '{}'; skipping",
111                        profile.agent_id,
112                        other
113                    );
114                    continue;
115                }
116            };
117            runners.push(Arc::new(ActorChildRunner::new(
118                profile.agent_id.clone(),
119                std::path::PathBuf::from(worker_bin),
120                profile.worker_args.clone(),
121                fabric_dir,
122                executor,
123                extract_provider_credentials(config),
124                config.provider.clone(),
125                config
126                    .subagents
127                    .max_concurrent
128                    .unwrap_or(super::actor_adapter::DEFAULT_MAX_CONCURRENT_ACTORS),
129            )));
130            continue;
131        }
132
133        if !matches!(profile.protocol, ExternalAgentProtocol::A2aJsonRpc) {
134            tracing::warn!(
135                "External agent profile {} uses unsupported protocol {:?}",
136                profile.agent_id,
137                profile.protocol
138            );
139            continue;
140        }
141
142        let auth_token = match profile.auth_ref.as_ref() {
143            Some(ref_name) => match std::env::var(ref_name) {
144                Ok(token) => Some(token),
145                Err(_) => {
146                    tracing::error!(
147                        "External agent profile {} auth_ref env var {} is not set",
148                        profile.agent_id,
149                        ref_name
150                    );
151                    continue;
152                }
153            },
154            None => None,
155        };
156
157        let client_config = match A2AExternalChildRunner::build_client_config(&profile, auth_token)
158        {
159            Ok(cfg) => cfg,
160            Err(e) => {
161                tracing::error!(
162                    "Failed to build A2A client config for profile {}: {}",
163                    profile.agent_id,
164                    e
165                );
166                continue;
167            }
168        };
169
170        let client = match A2AJsonRpcClient::new(client_config) {
171            Ok(c) => c,
172            Err(e) => {
173                tracing::error!(
174                    "Failed to create A2A JSON-RPC client for profile {}: {}",
175                    profile.agent_id,
176                    e
177                );
178                continue;
179            }
180        };
181
182        runners.push(Arc::new(A2AExternalChildRunner::new(client, profile)));
183    }
184
185    Arc::new(CompositeExternalChildRunner::new(runners))
186}
187
188/// Build the built-in local actor runner from the typed `subagents`
189/// config. Everything is derived: worker = the current bamboo executable +
190/// `subagent-worker`, fabric = per-user temp dir — unless expert fields
191/// override them.
192fn build_local_actor_runner(config: &Config) -> Result<Arc<dyn ExternalChildRunner>, String> {
193    let sub = &config.subagents;
194
195    let (worker_bin, worker_args) = match &sub.worker_bin {
196        Some(custom) => (
197            std::path::PathBuf::from(custom),
198            sub.worker_args.clone().unwrap_or_default(),
199        ),
200        None => (
201            std::env::current_exe().map_err(|e| format!("cannot locate own executable: {e}"))?,
202            sub.worker_args
203                .clone()
204                .unwrap_or_else(|| vec!["subagent-worker".to_string()]),
205        ),
206    };
207
208    let fabric_dir = sub
209        .fabric_dir
210        .clone()
211        .map(std::path::PathBuf::from)
212        .unwrap_or_else(|| std::env::temp_dir().join("bamboo-subagents"));
213
214    let executor = match sub.executor.as_deref() {
215        Some("echo") => bamboo_subagent::provision::ExecutorSpec::Echo,
216        Some("bamboo_runtime") | None => bamboo_subagent::provision::ExecutorSpec::BambooRuntime,
217        Some(other) => return Err(format!("unknown subagents.executor '{other}'")),
218    };
219
220    Ok(Arc::new(
221        ActorChildRunner::new(
222            super::config::LOCAL_ACTOR_AGENT_ID.to_string(),
223            worker_bin,
224            worker_args,
225            fabric_dir,
226            executor,
227            extract_provider_credentials(config),
228            config.provider.clone(),
229            sub.max_concurrent
230                .unwrap_or(super::actor_adapter::DEFAULT_MAX_CONCURRENT_ACTORS),
231        )
232        .with_remote_placements(resolve_remote_placements(&sub.remote_placements))
233        .with_schedulable_placements(resolve_schedulable_placements(&sub.schedulable_placements)),
234    ))
235}
236
237/// Resolve config `schedulable_placements` into runner-ready handles (#181, P2b),
238/// keyed by role. Mirrors `resolve_remote_placements`: the bearer is read from
239/// `token_env` HERE (the raw token never rides the config) and is used for BOTH
240/// the registry query and the chosen worker's connect. If `token_env` is `Some`
241/// but the env var is UNSET, log an error and SKIP that placement so a misconfig
242/// fails SAFE to the local path rather than querying/connecting with no bearer. A
243/// placement with no `token_env` is tokenless (trusted/loopback link only).
244/// Duplicate roles: last one wins.
245fn resolve_schedulable_placements(
246    placements: &[bamboo_config::SchedulablePlacement],
247) -> std::collections::HashMap<String, super::actor_adapter::ResolvedSchedulablePlacement> {
248    let mut out = std::collections::HashMap::new();
249    for p in placements {
250        let token = match p.token_env.as_deref() {
251            Some(env_var) => match std::env::var(env_var) {
252                Ok(token) => Some(token),
253                Err(_) => {
254                    tracing::error!(
255                        "schedulable placement for role '{}' token_env '{}' is not set; \
256                         skipping (role falls back to local, NOT an unauthenticated registry query)",
257                        p.role,
258                        env_var
259                    );
260                    continue;
261                }
262            },
263            None => {
264                // A tokenless schedulable placement only makes sense on a trusted
265                // link. Warn if it targets what looks like a public registry so an
266                // operator footgun is visible in logs.
267                if registry_url_looks_public(&p.registry_url) {
268                    tracing::warn!(
269                        "schedulable placement for role '{}' has no token_env but targets a \
270                         public-looking registry '{}'; the registry query AND the worker connect \
271                         will carry NO bearer. Set token_env for any non-loopback control plane.",
272                        p.role,
273                        p.registry_url
274                    );
275                }
276                None
277            }
278        };
279        out.insert(
280            p.role.clone(),
281            super::actor_adapter::ResolvedSchedulablePlacement {
282                pool: p.pool.clone(),
283                registry_url: p.registry_url.clone(),
284                token,
285                ca_cert_file: p.ca_cert_file.as_ref().map(std::path::PathBuf::from),
286            },
287        );
288    }
289    out
290}
291
292/// Resolve config `remote_placements` into runner-ready handles (#193), keyed by
293/// role. The bearer is read from `token_env` HERE (mirroring the A2A `auth_ref`
294/// handling at ~runtime.rs:142): if the env var is set use it; if `token_env` is
295/// `Some` but the var is UNSET, log an error and SKIP that placement so a
296/// misconfig fails SAFE to the local path rather than connecting to a remote
297/// worker with no bearer. A placement with no `token_env` connects tokenless
298/// (trusted/loopback link only). Duplicate roles: last one wins.
299/// Heuristic: does this endpoint reach off-box (so a missing bearer is a real
300/// exposure)? `wss://` is always public-grade; for `ws://` we flag any host that
301/// is not loopback/localhost.
302fn endpoint_looks_public(endpoint: &str) -> bool {
303    if endpoint.starts_with("wss://") {
304        return true;
305    }
306    let host = endpoint
307        .strip_prefix("ws://")
308        .unwrap_or(endpoint)
309        .split(['/', ':'])
310        .next()
311        .unwrap_or("");
312    !(host == "localhost" || host == "127.0.0.1" || host == "::1" || host.is_empty())
313}
314
315/// Like `endpoint_looks_public` but for a registry URL (`http://` / `https://`).
316/// `https://` is always public-grade; for `http://` we flag any non-loopback
317/// host. Used to surface a tokenless-schedulable-placement footgun in logs.
318fn registry_url_looks_public(url: &str) -> bool {
319    if url.starts_with("https://") {
320        return true;
321    }
322    let host = url
323        .strip_prefix("http://")
324        .unwrap_or(url)
325        .split(['/', ':'])
326        .next()
327        .unwrap_or("");
328    !(host == "localhost" || host == "127.0.0.1" || host == "::1" || host.is_empty())
329}
330
331fn resolve_remote_placements(
332    placements: &[bamboo_config::RemoteActorPlacement],
333) -> std::collections::HashMap<String, super::actor_adapter::ResolvedRemotePlacement> {
334    let mut out = std::collections::HashMap::new();
335    for p in placements {
336        let token = match p.token_env.as_deref() {
337            Some(env_var) => match std::env::var(env_var) {
338                Ok(token) => Some(token),
339                Err(_) => {
340                    tracing::error!(
341                        "remote placement for role '{}' token_env '{}' is not set; \
342                         skipping (role falls back to local, NOT unauthenticated remote)",
343                        p.role,
344                        env_var
345                    );
346                    continue;
347                }
348            },
349            None => {
350                // A tokenless placement is only safe on a trusted link. Warn if
351                // it targets what looks like a public endpoint (wss:// or a
352                // non-loopback host) so an operator footgun is visible in logs.
353                if endpoint_looks_public(&p.endpoint) {
354                    tracing::warn!(
355                        "remote placement for role '{}' has no token_env but targets a \
356                         public-looking endpoint '{}'; work will be dispatched with NO bearer. \
357                         Set token_env (and use wss://) for any non-loopback worker.",
358                        p.role,
359                        p.endpoint
360                    );
361                }
362                None
363            }
364        };
365        out.insert(
366            p.role.clone(),
367            super::actor_adapter::ResolvedRemotePlacement {
368                endpoint: p.endpoint.clone(),
369                token,
370                ca_cert_file: p.ca_cert_file.as_ref().map(std::path::PathBuf::from),
371            },
372        );
373    }
374    out
375}
376
377/// Snapshot per-provider credentials from the parent config for actor
378/// provisioning. Serialized generically so this code does not chase the
379/// per-provider config struct shapes — any slot with a non-empty `api_key`
380/// yields a scoped credential (plus `base_url` when present).
381pub fn extract_provider_credentials(
382    config: &Config,
383) -> Vec<bamboo_subagent::provision::ScopedCredential> {
384    let mut out = Vec::new();
385
386    // Legacy single-instance slots: providers.anthropic / openai / …
387    if let Ok(serde_json::Value::Object(providers)) = serde_json::to_value(&config.providers) {
388        out.extend(providers.into_iter().filter_map(|(name, slot)| {
389            let api_key = slot.get("api_key")?.as_str()?.trim().to_string();
390            if api_key.is_empty() {
391                return None;
392            }
393            Some(bamboo_subagent::provision::ScopedCredential {
394                provider: name.clone(),
395                api_key,
396                base_url: slot
397                    .get("base_url")
398                    .and_then(|v| v.as_str())
399                    .map(str::to_string),
400                provider_type: Some(name),
401            })
402        }));
403    }
404
405    // Multi-instance providers: provider_instances keyed by instance id; the
406    // child routes by instance id, the worker constructs by provider_type.
407    // Read the typed struct directly — `api_key` is hydrated in memory but
408    // deliberately `skip_serializing`, so a serde projection would miss it.
409    out.extend(config.provider_instances.iter().filter_map(|(id, inst)| {
410        let api_key = inst.api_key.trim().to_string();
411        if api_key.is_empty() {
412            return None;
413        }
414        Some(bamboo_subagent::provision::ScopedCredential {
415            provider: id.clone(),
416            api_key,
417            base_url: inst.base_url.clone(),
418            provider_type: Some(inst.provider_type.clone()),
419        })
420    }));
421
422    out
423}