Skip to main content

bamboo_engine/external_agents/
runtime.rs

1use std::sync::Arc;
2
3use crate::runtime::execution::{ExternalChildRunner, SpawnJob};
4use async_trait::async_trait;
5use bamboo_a2a::A2AJsonRpcClient;
6use bamboo_agent_core::{AgentError, AgentEvent};
7use bamboo_llm::Config;
8use tokio::sync::mpsc;
9use tokio_util::sync::CancellationToken;
10
11use super::a2a_adapter::A2AExternalChildRunner;
12use super::actor_adapter::ActorChildRunner;
13use super::config::{parse_external_agents, ExternalAgentProtocol};
14
15/// Composite router that delegates to the first matching external child runner.
16pub struct CompositeExternalChildRunner {
17    runners: Vec<Arc<dyn ExternalChildRunner>>,
18}
19
20impl CompositeExternalChildRunner {
21    pub fn new(runners: Vec<Arc<dyn ExternalChildRunner>>) -> Self {
22        Self { runners }
23    }
24}
25
26#[async_trait]
27impl ExternalChildRunner for CompositeExternalChildRunner {
28    async fn should_handle(&self, session: &bamboo_agent_core::Session) -> bool {
29        for runner in &self.runners {
30            if runner.should_handle(session).await {
31                return true;
32            }
33        }
34        false
35    }
36
37    async fn execute_external_child(
38        &self,
39        session: &mut bamboo_agent_core::Session,
40        job: &SpawnJob,
41        event_tx: mpsc::Sender<AgentEvent>,
42        cancel_token: CancellationToken,
43    ) -> crate::runtime::runner::Result<()> {
44        for runner in &self.runners {
45            if runner.should_handle(session).await {
46                return runner
47                    .execute_external_child(session, job, event_tx, cancel_token)
48                    .await;
49            }
50        }
51        Err(AgentError::LLM(
52            "No matching external child runner found for session metadata".to_string(),
53        ))
54    }
55
56    /// #68: fan the per-run escalation bridge out to every inner runner. The
57    /// composite is what `build_external_child_runner` returns and what the
58    /// worker retains, so without this forward the bind would hit the trait's
59    /// no-op default and the wrapped `ActorChildRunner`s would never see it.
60    fn set_escalation_bridge(&self, bridge: Option<bamboo_subagent::executor::HostBridge>) {
61        for runner in &self.runners {
62            runner.set_escalation_bridge(bridge.clone());
63        }
64    }
65}
66
67/// Build the child runner from the application config.
68///
69/// Sub-agents always run as actors (the in-process runtime was removed), so the
70/// built-in **local actor** worker is always part of the composite — its worker
71/// binary, arguments, and discovery dir are all derived; no expert tables
72/// needed. Expert `externalAgents` profiles add extra routers so
73/// `external.agent_id` metadata can pin specific roles to other agents. Returns
74/// a composite router that delegates to the first matching runner.
75pub fn build_external_child_runner(config: &Config) -> Arc<dyn ExternalChildRunner> {
76    let agents = parse_external_agents(config);
77
78    let mut runners: Vec<Arc<dyn ExternalChildRunner>> = Vec::new();
79
80    // The built-in local actor worker is the default runtime for every
81    // sub-agent. Always build it; a build failure here is logged and leaves the
82    // composite without a default handler (dispatch then errors clearly).
83    match build_local_actor_runner(config) {
84        Ok(runner) => runners.push(runner),
85        Err(e) => tracing::error!("local actor sub-agent runner unavailable: {e}"),
86    }
87
88    for (_agent_id, profile) in agents {
89        // Actor protocol: spawn a local worker binary over the bamboo-subagent WS protocol.
90        if matches!(profile.protocol, ExternalAgentProtocol::Actor) {
91            let Some(worker_bin) = profile.worker_bin.as_ref() else {
92                tracing::error!(
93                    "Actor agent profile {} has no worker_bin; skipping",
94                    profile.agent_id
95                );
96                continue;
97            };
98            let fabric_dir = profile
99                .fabric_dir
100                .clone()
101                .map(std::path::PathBuf::from)
102                .unwrap_or_else(|| std::env::temp_dir().join("bamboo-subagents"));
103            let executor = match profile.executor.as_deref() {
104                Some("echo") => bamboo_subagent::provision::ExecutorSpec::Echo,
105                Some("bamboo_runtime") | None => {
106                    bamboo_subagent::provision::ExecutorSpec::BambooRuntime
107                }
108                Some(other) => {
109                    tracing::error!(
110                        "Actor agent profile {} has unknown executor '{}'; skipping",
111                        profile.agent_id,
112                        other
113                    );
114                    continue;
115                }
116            };
117            runners.push(Arc::new(ActorChildRunner::new(
118                profile.agent_id.clone(),
119                std::path::PathBuf::from(worker_bin),
120                profile.worker_args.clone(),
121                fabric_dir,
122                executor,
123                extract_provider_credentials(config),
124                config.provider.clone(),
125                config
126                    .subagents
127                    .max_concurrent
128                    .unwrap_or(super::actor_adapter::DEFAULT_MAX_CONCURRENT_ACTORS),
129            )));
130            continue;
131        }
132
133        if !matches!(profile.protocol, ExternalAgentProtocol::A2aJsonRpc) {
134            tracing::warn!(
135                "External agent profile {} uses unsupported protocol {:?}",
136                profile.agent_id,
137                profile.protocol
138            );
139            continue;
140        }
141
142        let auth_token = match profile.auth_ref.as_ref() {
143            Some(ref_name) => match std::env::var(ref_name) {
144                Ok(token) => Some(token),
145                Err(_) => {
146                    tracing::error!(
147                        "External agent profile {} auth_ref env var {} is not set",
148                        profile.agent_id,
149                        ref_name
150                    );
151                    continue;
152                }
153            },
154            None => None,
155        };
156
157        let client_config = match A2AExternalChildRunner::build_client_config(&profile, auth_token)
158        {
159            Ok(cfg) => cfg,
160            Err(e) => {
161                tracing::error!(
162                    "Failed to build A2A client config for profile {}: {}",
163                    profile.agent_id,
164                    e
165                );
166                continue;
167            }
168        };
169
170        let client = match A2AJsonRpcClient::new(client_config) {
171            Ok(c) => c,
172            Err(e) => {
173                tracing::error!(
174                    "Failed to create A2A JSON-RPC client for profile {}: {}",
175                    profile.agent_id,
176                    e
177                );
178                continue;
179            }
180        };
181
182        runners.push(Arc::new(A2AExternalChildRunner::new(client, profile)));
183    }
184
185    Arc::new(CompositeExternalChildRunner::new(runners))
186}
187
188/// Build the built-in local actor runner from the typed `subagents`
189/// config. Everything is derived: worker = the current bamboo executable +
190/// `subagent-worker`, fabric = per-user temp dir — unless expert fields
191/// override them.
192fn build_local_actor_runner(config: &Config) -> Result<Arc<dyn ExternalChildRunner>, String> {
193    let sub = &config.subagents;
194
195    let (worker_bin, worker_args) = match &sub.worker_bin {
196        Some(custom) => (
197            std::path::PathBuf::from(custom),
198            sub.worker_args.clone().unwrap_or_default(),
199        ),
200        None => (
201            std::env::current_exe().map_err(|e| format!("cannot locate own executable: {e}"))?,
202            sub.worker_args
203                .clone()
204                .unwrap_or_else(|| vec!["subagent-worker".to_string()]),
205        ),
206    };
207
208    let fabric_dir = sub
209        .fabric_dir
210        .clone()
211        .map(std::path::PathBuf::from)
212        .unwrap_or_else(|| std::env::temp_dir().join("bamboo-subagents"));
213
214    let executor = match sub.executor.as_deref() {
215        Some("echo") => bamboo_subagent::provision::ExecutorSpec::Echo,
216        Some("bamboo_runtime") | None => bamboo_subagent::provision::ExecutorSpec::BambooRuntime,
217        Some(other) => return Err(format!("unknown subagents.executor '{other}'")),
218    };
219
220    Ok(Arc::new(
221        ActorChildRunner::new(
222            super::config::LOCAL_ACTOR_AGENT_ID.to_string(),
223            worker_bin,
224            worker_args,
225            fabric_dir,
226            executor,
227            extract_provider_credentials(config),
228            config.provider.clone(),
229            sub.max_concurrent
230                .unwrap_or(super::actor_adapter::DEFAULT_MAX_CONCURRENT_ACTORS),
231        )
232        .with_remote_placements(resolve_remote_placements(
233            &sub.remote_placements,
234            &config.cluster_fabric.nodes,
235        ))
236        .with_schedulable_placements(resolve_schedulable_placements(
237            &sub.schedulable_placements,
238            &config.cluster_fabric.nodes,
239        ))
240        .with_bus(sub.broker.as_ref().map(|b| bamboo_subagent::BusEndpoint {
241            endpoint: b.endpoint.clone(),
242            token: b.token.clone(),
243        })),
244    ))
245}
246
247/// Resolve config `schedulable_placements` into runner-ready handles (#181, P2b),
248/// keyed by role. Mirrors `resolve_remote_placements`: the bearer is read from
249/// `token_env` HERE (the raw token never rides the config) and is used for BOTH
250/// the registry query and the chosen worker's connect. If `token_env` is `Some`
251/// but the env var is UNSET, log an error and SKIP that placement so a misconfig
252/// fails SAFE to the local path rather than querying/connecting with no bearer. A
253/// placement with no `token_env` is tokenless (trusted/loopback link only).
254/// Duplicate roles: last one wins.
255fn resolve_schedulable_placements(
256    placements: &[bamboo_config::SchedulablePlacement],
257    nodes: &[bamboo_config::cluster_fabric::Node],
258) -> std::collections::HashMap<String, super::actor_adapter::ResolvedSchedulablePlacement> {
259    // Phase 3: a pool is just a bus role. The runner picks a live connected worker
260    // of that role via the bus presence query — no registry url / token / cert.
261    placements
262        .iter()
263        .map(|p| {
264            (
265                p.role.clone(),
266                super::actor_adapter::ResolvedSchedulablePlacement {
267                    pool: p.pool.clone(),
268                    // The badge shows the cluster node's own metadata: a node
269                    // deployed to serve this pool (its `deploy.default_role`).
270                    host_label: node_label_for_role(nodes, &p.pool),
271                },
272            )
273        })
274        .collect()
275}
276
277/// Friendly display name for a cluster node whose worker serves `role`
278/// (`deploy.default_role`) — the operator `label`, else its ssh host. Used to
279/// stamp the UI placement badge from the node's own metadata.
280fn node_label_for_role(
281    nodes: &[bamboo_config::cluster_fabric::Node],
282    role: &str,
283) -> Option<String> {
284    nodes
285        .iter()
286        .find(|n| n.deploy.default_role.as_deref() == Some(role))
287        .map(node_display_name)
288}
289
290/// Friendly display name for a cluster node whose ssh host matches `endpoint`'s
291/// host — so a `remote_placements` endpoint pointing at a known node shows the
292/// node's label rather than a bare IP.
293fn node_label_for_endpoint(
294    nodes: &[bamboo_config::cluster_fabric::Node],
295    endpoint: &str,
296) -> Option<String> {
297    let host = endpoint
298        .trim()
299        .trim_start_matches("wss://")
300        .trim_start_matches("ws://")
301        .split(['/', ':'])
302        .next()
303        .unwrap_or("");
304    if host.is_empty() {
305        return None;
306    }
307    nodes
308        .iter()
309        .find(|n| match &n.placement {
310            bamboo_config::cluster_fabric::NodePlacement::Ssh(t) => t.host == host,
311            bamboo_config::cluster_fabric::NodePlacement::Local => false,
312        })
313        .map(node_display_name)
314}
315
316fn node_display_name(n: &bamboo_config::cluster_fabric::Node) -> String {
317    if !n.label.trim().is_empty() {
318        return n.label.clone();
319    }
320    match &n.placement {
321        bamboo_config::cluster_fabric::NodePlacement::Ssh(t) => t.host.clone(),
322        bamboo_config::cluster_fabric::NodePlacement::Local => "local".to_string(),
323    }
324}
325
326/// Resolve config `remote_placements` into runner-ready handles (#193), keyed by
327/// role. The bearer is read from `token_env` HERE (mirroring the A2A `auth_ref`
328/// handling at ~runtime.rs:142): if the env var is set use it; if `token_env` is
329/// `Some` but the var is UNSET, log an error and SKIP that placement so a
330/// misconfig fails SAFE to the local path rather than connecting to a remote
331/// worker with no bearer. A placement with no `token_env` connects tokenless
332/// (trusted/loopback link only). Duplicate roles: last one wins.
333/// Heuristic: does this endpoint reach off-box (so a missing bearer is a real
334/// exposure)? `wss://` is always public-grade; for `ws://` we flag any host that
335/// is not loopback/localhost.
336fn endpoint_looks_public(endpoint: &str) -> bool {
337    if endpoint.starts_with("wss://") {
338        return true;
339    }
340    let host = endpoint
341        .strip_prefix("ws://")
342        .unwrap_or(endpoint)
343        .split(['/', ':'])
344        .next()
345        .unwrap_or("");
346    !(host == "localhost" || host == "127.0.0.1" || host == "::1" || host.is_empty())
347}
348
349fn resolve_remote_placements(
350    placements: &[bamboo_config::RemoteActorPlacement],
351    nodes: &[bamboo_config::cluster_fabric::Node],
352) -> std::collections::HashMap<String, super::actor_adapter::ResolvedRemotePlacement> {
353    let mut out = std::collections::HashMap::new();
354    for p in placements {
355        let token = match p.token_env.as_deref() {
356            Some(env_var) => match std::env::var(env_var) {
357                Ok(token) => Some(token),
358                Err(_) => {
359                    tracing::error!(
360                        "remote placement for role '{}' token_env '{}' is not set; \
361                         skipping (role falls back to local, NOT unauthenticated remote)",
362                        p.role,
363                        env_var
364                    );
365                    continue;
366                }
367            },
368            None => {
369                // A tokenless placement is only safe on a trusted link. Warn if
370                // it targets what looks like a public endpoint (wss:// or a
371                // non-loopback host) so an operator footgun is visible in logs.
372                if endpoint_looks_public(&p.endpoint) {
373                    tracing::warn!(
374                        "remote placement for role '{}' has no token_env but targets a \
375                         public-looking endpoint '{}'; work will be dispatched with NO bearer. \
376                         Set token_env (and use wss://) for any non-loopback worker.",
377                        p.role,
378                        p.endpoint
379                    );
380                }
381                None
382            }
383        };
384        out.insert(
385            p.role.clone(),
386            super::actor_adapter::ResolvedRemotePlacement {
387                endpoint: p.endpoint.clone(),
388                token,
389                ca_cert_file: p.ca_cert_file.as_ref().map(std::path::PathBuf::from),
390                // Badge from the node's own metadata when the endpoint points at
391                // a known cluster node; else the endpoint host is used downstream.
392                host_label: node_label_for_endpoint(nodes, &p.endpoint),
393            },
394        );
395    }
396    out
397}
398
399/// Snapshot per-provider credentials from the parent config for actor
400/// provisioning. Serialized generically so this code does not chase the
401/// per-provider config struct shapes — any slot with a non-empty `api_key`
402/// yields a scoped credential (plus `base_url` when present).
403pub fn extract_provider_credentials(
404    config: &Config,
405) -> Vec<bamboo_subagent::provision::ScopedCredential> {
406    let mut out = Vec::new();
407
408    // Legacy single-instance slots: providers.anthropic / openai / …
409    if let Ok(serde_json::Value::Object(providers)) = serde_json::to_value(&config.providers) {
410        out.extend(providers.into_iter().filter_map(|(name, slot)| {
411            let api_key = slot.get("api_key")?.as_str()?.trim().to_string();
412            if api_key.is_empty() {
413                return None;
414            }
415            Some(bamboo_subagent::provision::ScopedCredential {
416                provider: name.clone(),
417                api_key,
418                base_url: slot
419                    .get("base_url")
420                    .and_then(|v| v.as_str())
421                    .map(str::to_string),
422                provider_type: Some(name),
423            })
424        }));
425    }
426
427    // Multi-instance providers: provider_instances keyed by instance id; the
428    // child routes by instance id, the worker constructs by provider_type.
429    // Read the typed struct directly — `api_key` is hydrated in memory but
430    // deliberately `skip_serializing`, so a serde projection would miss it.
431    out.extend(config.provider_instances.iter().filter_map(|(id, inst)| {
432        let api_key = inst.api_key.trim().to_string();
433        if api_key.is_empty() {
434            return None;
435        }
436        Some(bamboo_subagent::provision::ScopedCredential {
437            provider: id.clone(),
438            api_key,
439            base_url: inst.base_url.clone(),
440            provider_type: Some(inst.provider_type.clone()),
441        })
442    }));
443
444    out
445}
446
447#[cfg(test)]
448mod placement_resolver_tests {
449    use super::{node_display_name, resolve_remote_placements, resolve_schedulable_placements};
450    use bamboo_config::cluster_fabric::{
451        DeployProfile, Node, NodePlacement, SshAuth, SshTarget, TrustLevel,
452    };
453    use bamboo_config::{RemoteActorPlacement, SchedulablePlacement};
454
455    fn ssh_node(id: &str, label: &str, host: &str, default_role: Option<&str>) -> Node {
456        Node {
457            id: id.into(),
458            label: label.into(),
459            placement: NodePlacement::Ssh(SshTarget {
460                host: host.into(),
461                port: 22,
462                username: "u".into(),
463                auth: SshAuth::SystemSshConfig,
464                host_key_fingerprint: None,
465            }),
466            trust_level: TrustLevel::default(),
467            deploy: DeployProfile {
468                default_role: default_role.map(String::from),
469                ..Default::default()
470            },
471            state: None,
472            enabled: true,
473        }
474    }
475
476    #[test]
477    fn node_display_name_prefers_label_then_ssh_host() {
478        let n = ssh_node("n1", "mini", "mini.local", None);
479        assert_eq!(node_display_name(&n), "mini");
480        let mut unlabeled = n.clone();
481        unlabeled.label = String::new();
482        assert_eq!(node_display_name(&unlabeled), "mini.local");
483    }
484
485    #[test]
486    fn schedulable_placement_takes_host_label_from_node_by_default_role() {
487        let nodes = vec![ssh_node("n1", "mini", "mini.local", Some("mac-mini-monitor"))];
488        let placements = vec![SchedulablePlacement {
489            role: "mac-mini-monitor".into(),
490            pool: "mac-mini-monitor".into(),
491            ..Default::default()
492        }];
493        let out = resolve_schedulable_placements(&placements, &nodes);
494        let r = out.get("mac-mini-monitor").expect("role resolved");
495        assert_eq!(r.pool, "mac-mini-monitor");
496        assert_eq!(r.host_label.as_deref(), Some("mini"));
497    }
498
499    #[test]
500    fn remote_placement_takes_host_label_from_node_by_ssh_host() {
501        let nodes = vec![ssh_node("n1", "mini", "mini.local", None)];
502        let placements = vec![RemoteActorPlacement {
503            role: "explorer".into(),
504            endpoint: "ws://mini.local:8899".into(),
505            ..Default::default()
506        }];
507        let out = resolve_remote_placements(&placements, &nodes);
508        assert_eq!(out.get("explorer").unwrap().host_label.as_deref(), Some("mini"));
509    }
510
511    #[test]
512    fn no_host_label_when_no_node_matches() {
513        let nodes = vec![ssh_node("n1", "mini", "mini.local", Some("other-role"))];
514        let sched = vec![SchedulablePlacement {
515            role: "x".into(),
516            pool: "unmatched".into(),
517            ..Default::default()
518        }];
519        assert_eq!(
520            resolve_schedulable_placements(&sched, &nodes)
521                .get("x")
522                .unwrap()
523                .host_label,
524            None
525        );
526        let remote = vec![RemoteActorPlacement {
527            role: "y".into(),
528            endpoint: "ws://other-host:9000".into(),
529            ..Default::default()
530        }];
531        assert_eq!(
532            resolve_remote_placements(&remote, &nodes)
533                .get("y")
534                .unwrap()
535                .host_label,
536            None
537        );
538    }
539}