Skip to main content

bamboo_engine/external_agents/
runtime.rs

1use std::sync::Arc;
2
3use crate::runtime::execution::{ExternalChildRunner, SpawnJob};
4use async_trait::async_trait;
5use bamboo_a2a::A2AJsonRpcClient;
6use bamboo_agent_core::{AgentError, AgentEvent};
7use bamboo_llm::Config;
8use tokio::sync::mpsc;
9use tokio_util::sync::CancellationToken;
10
11use super::a2a_adapter::A2AExternalChildRunner;
12use super::actor_adapter::ActorChildRunner;
13use super::config::{parse_external_agents, ExternalAgentProtocol};
14
15/// Composite router that delegates to the first matching external child runner.
16pub struct CompositeExternalChildRunner {
17    runners: Vec<Arc<dyn ExternalChildRunner>>,
18}
19
20impl CompositeExternalChildRunner {
21    pub fn new(runners: Vec<Arc<dyn ExternalChildRunner>>) -> Self {
22        Self { runners }
23    }
24}
25
26#[async_trait]
27impl ExternalChildRunner for CompositeExternalChildRunner {
28    async fn should_handle(&self, session: &bamboo_agent_core::Session) -> bool {
29        for runner in &self.runners {
30            if runner.should_handle(session).await {
31                return true;
32            }
33        }
34        false
35    }
36
37    async fn execute_external_child(
38        &self,
39        session: &mut bamboo_agent_core::Session,
40        job: &SpawnJob,
41        event_tx: mpsc::Sender<AgentEvent>,
42        cancel_token: CancellationToken,
43    ) -> crate::runtime::runner::Result<()> {
44        for runner in &self.runners {
45            if runner.should_handle(session).await {
46                return runner
47                    .execute_external_child(session, job, event_tx, cancel_token)
48                    .await;
49            }
50        }
51        Err(AgentError::LLM(
52            "No matching external child runner found for session metadata".to_string(),
53        ))
54    }
55
56    /// #68: fan the per-run escalation bridge out to every inner runner. The
57    /// composite is what `build_external_child_runner` returns and what the
58    /// worker retains, so without this forward the bind would hit the trait's
59    /// no-op default and the wrapped `ActorChildRunner`s would never see it.
60    fn set_escalation_bridge(&self, bridge: Option<bamboo_subagent::executor::HostBridge>) {
61        for runner in &self.runners {
62            runner.set_escalation_bridge(bridge.clone());
63        }
64    }
65}
66
67/// Build the child runner from the application config.
68///
69/// Sub-agents always run as actors (the in-process runtime was removed), so the
70/// built-in **local actor** worker is always part of the composite — its worker
71/// binary, arguments, and discovery dir are all derived; no expert tables
72/// needed. Expert `externalAgents` profiles add extra routers so
73/// `external.agent_id` metadata can pin specific roles to other agents. Returns
74/// a composite router that delegates to the first matching runner.
75pub fn build_external_child_runner(config: &Config) -> Arc<dyn ExternalChildRunner> {
76    let agents = parse_external_agents(config);
77
78    let mut runners: Vec<Arc<dyn ExternalChildRunner>> = Vec::new();
79
80    // The built-in local actor worker is the default runtime for every
81    // sub-agent. Always build it; a build failure here is logged and leaves the
82    // composite without a default handler (dispatch then errors clearly).
83    match build_local_actor_runner(config) {
84        Ok(runner) => runners.push(runner),
85        Err(e) => tracing::error!("local actor sub-agent runner unavailable: {e}"),
86    }
87
88    for (_agent_id, profile) in agents {
89        // Actor protocol: spawn a local worker binary over the bamboo-subagent WS protocol.
90        if matches!(profile.protocol, ExternalAgentProtocol::Actor) {
91            let Some(worker_bin) = profile.worker_bin.as_ref() else {
92                tracing::error!(
93                    "Actor agent profile {} has no worker_bin; skipping",
94                    profile.agent_id
95                );
96                continue;
97            };
98            let fabric_dir = profile
99                .fabric_dir
100                .clone()
101                .map(std::path::PathBuf::from)
102                .unwrap_or_else(|| std::env::temp_dir().join("bamboo-subagents"));
103            let executor = match profile.executor.as_deref() {
104                Some("echo") => bamboo_subagent::provision::ExecutorSpec::Echo,
105                Some("bamboo_runtime") | None => {
106                    bamboo_subagent::provision::ExecutorSpec::BambooRuntime
107                }
108                Some(other) => {
109                    tracing::error!(
110                        "Actor agent profile {} has unknown executor '{}'; skipping",
111                        profile.agent_id,
112                        other
113                    );
114                    continue;
115                }
116            };
117            runners.push(Arc::new(ActorChildRunner::new(
118                profile.agent_id.clone(),
119                std::path::PathBuf::from(worker_bin),
120                profile.worker_args.clone(),
121                fabric_dir,
122                executor,
123                extract_provider_credentials(config),
124                config.provider.clone(),
125                config
126                    .subagents
127                    .max_concurrent
128                    .unwrap_or(super::actor_adapter::DEFAULT_MAX_CONCURRENT_ACTORS),
129            )));
130            continue;
131        }
132
133        if !matches!(profile.protocol, ExternalAgentProtocol::A2aJsonRpc) {
134            tracing::warn!(
135                "External agent profile {} uses unsupported protocol {:?}",
136                profile.agent_id,
137                profile.protocol
138            );
139            continue;
140        }
141
142        let auth_token = match profile.auth_ref.as_ref() {
143            Some(ref_name) => match std::env::var(ref_name) {
144                Ok(token) => Some(token),
145                Err(_) => {
146                    tracing::error!(
147                        "External agent profile {} auth_ref env var {} is not set",
148                        profile.agent_id,
149                        ref_name
150                    );
151                    continue;
152                }
153            },
154            None => None,
155        };
156
157        let client_config = match A2AExternalChildRunner::build_client_config(&profile, auth_token)
158        {
159            Ok(cfg) => cfg,
160            Err(e) => {
161                tracing::error!(
162                    "Failed to build A2A client config for profile {}: {}",
163                    profile.agent_id,
164                    e
165                );
166                continue;
167            }
168        };
169
170        let client = match A2AJsonRpcClient::new(client_config) {
171            Ok(c) => c,
172            Err(e) => {
173                tracing::error!(
174                    "Failed to create A2A JSON-RPC client for profile {}: {}",
175                    profile.agent_id,
176                    e
177                );
178                continue;
179            }
180        };
181
182        runners.push(Arc::new(A2AExternalChildRunner::new(client, profile)));
183    }
184
185    Arc::new(CompositeExternalChildRunner::new(runners))
186}
187
188/// Build the built-in local actor runner from the typed `subagents`
189/// config. Everything is derived: worker = the current bamboo executable +
190/// `subagent-worker`, fabric = per-user temp dir — unless expert fields
191/// override them.
192fn build_local_actor_runner(config: &Config) -> Result<Arc<dyn ExternalChildRunner>, String> {
193    let sub = &config.subagents;
194
195    let (worker_bin, worker_args) = match &sub.worker_bin {
196        Some(custom) => (
197            std::path::PathBuf::from(custom),
198            sub.worker_args.clone().unwrap_or_default(),
199        ),
200        None => (
201            std::env::current_exe().map_err(|e| format!("cannot locate own executable: {e}"))?,
202            sub.worker_args
203                .clone()
204                .unwrap_or_else(|| vec!["subagent-worker".to_string()]),
205        ),
206    };
207
208    let fabric_dir = sub
209        .fabric_dir
210        .clone()
211        .map(std::path::PathBuf::from)
212        .unwrap_or_else(|| std::env::temp_dir().join("bamboo-subagents"));
213
214    let executor = match sub.executor.as_deref() {
215        Some("echo") => bamboo_subagent::provision::ExecutorSpec::Echo,
216        Some("bamboo_runtime") | None => bamboo_subagent::provision::ExecutorSpec::BambooRuntime,
217        Some(other) => return Err(format!("unknown subagents.executor '{other}'")),
218    };
219
220    Ok(Arc::new(ActorChildRunner::new(
221        super::config::LOCAL_ACTOR_AGENT_ID.to_string(),
222        worker_bin,
223        worker_args,
224        fabric_dir,
225        executor,
226        extract_provider_credentials(config),
227        config.provider.clone(),
228        sub.max_concurrent
229            .unwrap_or(super::actor_adapter::DEFAULT_MAX_CONCURRENT_ACTORS),
230    )))
231}
232
233/// Snapshot per-provider credentials from the parent config for actor
234/// provisioning. Serialized generically so this code does not chase the
235/// per-provider config struct shapes — any slot with a non-empty `api_key`
236/// yields a scoped credential (plus `base_url` when present).
237pub fn extract_provider_credentials(
238    config: &Config,
239) -> Vec<bamboo_subagent::provision::ScopedCredential> {
240    let mut out = Vec::new();
241
242    // Legacy single-instance slots: providers.anthropic / openai / …
243    if let Ok(serde_json::Value::Object(providers)) = serde_json::to_value(&config.providers) {
244        out.extend(providers.into_iter().filter_map(|(name, slot)| {
245            let api_key = slot.get("api_key")?.as_str()?.trim().to_string();
246            if api_key.is_empty() {
247                return None;
248            }
249            Some(bamboo_subagent::provision::ScopedCredential {
250                provider: name.clone(),
251                api_key,
252                base_url: slot
253                    .get("base_url")
254                    .and_then(|v| v.as_str())
255                    .map(str::to_string),
256                provider_type: Some(name),
257            })
258        }));
259    }
260
261    // Multi-instance providers: provider_instances keyed by instance id; the
262    // child routes by instance id, the worker constructs by provider_type.
263    // Read the typed struct directly — `api_key` is hydrated in memory but
264    // deliberately `skip_serializing`, so a serde projection would miss it.
265    out.extend(config.provider_instances.iter().filter_map(|(id, inst)| {
266        let api_key = inst.api_key.trim().to_string();
267        if api_key.is_empty() {
268            return None;
269        }
270        Some(bamboo_subagent::provision::ScopedCredential {
271            provider: id.clone(),
272            api_key,
273            base_url: inst.base_url.clone(),
274            provider_type: Some(inst.provider_type.clone()),
275        })
276    }));
277
278    out
279}