Skip to main content

bamboo_engine/external_agents/
runtime.rs

1use std::sync::Arc;
2
3use crate::runtime::execution::{ExternalChildRunner, SpawnJob};
4use async_trait::async_trait;
5use bamboo_a2a::A2AJsonRpcClient;
6use bamboo_agent_core::{AgentError, AgentEvent};
7use bamboo_llm::Config;
8use tokio::sync::mpsc;
9use tokio_util::sync::CancellationToken;
10
11use super::a2a_adapter::A2AExternalChildRunner;
12use super::actor_adapter::ActorChildRunner;
13use super::config::{parse_external_agents, ExternalAgentProtocol};
14
15/// Composite router that delegates to the first matching external child runner.
16pub struct CompositeExternalChildRunner {
17    runners: Vec<Arc<dyn ExternalChildRunner>>,
18}
19
20impl CompositeExternalChildRunner {
21    pub fn new(runners: Vec<Arc<dyn ExternalChildRunner>>) -> Self {
22        Self { runners }
23    }
24}
25
26#[async_trait]
27impl ExternalChildRunner for CompositeExternalChildRunner {
28    async fn should_handle(&self, session: &bamboo_agent_core::Session) -> bool {
29        for runner in &self.runners {
30            if runner.should_handle(session).await {
31                return true;
32            }
33        }
34        false
35    }
36
37    async fn execute_external_child(
38        &self,
39        session: &mut bamboo_agent_core::Session,
40        job: &SpawnJob,
41        event_tx: mpsc::Sender<AgentEvent>,
42        cancel_token: CancellationToken,
43    ) -> crate::runtime::runner::Result<()> {
44        for runner in &self.runners {
45            if runner.should_handle(session).await {
46                return runner
47                    .execute_external_child(session, job, event_tx, cancel_token)
48                    .await;
49            }
50        }
51        Err(AgentError::LLM(
52            "No matching external child runner found for session metadata".to_string(),
53        ))
54    }
55}
56
57/// Build an external child runner from the application config.
58///
59/// Returns `None` when nothing routes externally. Builds a composite router
60/// over all configured runners so `external.agent_id` metadata selects the
61/// right one. The friendly `subagents.runtime = "actor"` switch
62/// synthesizes a local actor runner automatically — worker binary,
63/// arguments, and discovery dir are all derived; no expert tables needed.
64pub fn build_external_child_runner(config: &Config) -> Option<Arc<dyn ExternalChildRunner>> {
65    let agents = parse_external_agents(config);
66
67    let mut runners: Vec<Arc<dyn ExternalChildRunner>> = Vec::new();
68
69    // Friendly path: one switch in typed config -> built-in local worker.
70    if config.subagents.any_actor() {
71        match build_local_actor_runner(config) {
72            Ok(runner) => runners.push(runner),
73            Err(e) => tracing::error!("subagents.runtime=actor unavailable: {e}"),
74        }
75    }
76
77    if agents.is_empty() && runners.is_empty() {
78        return None;
79    }
80
81    for (_agent_id, profile) in agents {
82        // Actor protocol: spawn a local worker binary over the bamboo-subagent WS protocol.
83        if matches!(profile.protocol, ExternalAgentProtocol::Actor) {
84            let Some(worker_bin) = profile.worker_bin.as_ref() else {
85                tracing::error!(
86                    "Actor agent profile {} has no worker_bin; skipping",
87                    profile.agent_id
88                );
89                continue;
90            };
91            let fabric_dir = profile
92                .fabric_dir
93                .clone()
94                .map(std::path::PathBuf::from)
95                .unwrap_or_else(|| std::env::temp_dir().join("bamboo-subagents"));
96            let executor = match profile.executor.as_deref() {
97                Some("echo") => bamboo_subagent::provision::ExecutorSpec::Echo,
98                Some("bamboo_runtime") | None => {
99                    bamboo_subagent::provision::ExecutorSpec::BambooRuntime
100                }
101                Some(other) => {
102                    tracing::error!(
103                        "Actor agent profile {} has unknown executor '{}'; skipping",
104                        profile.agent_id,
105                        other
106                    );
107                    continue;
108                }
109            };
110            runners.push(Arc::new(ActorChildRunner::new(
111                profile.agent_id.clone(),
112                std::path::PathBuf::from(worker_bin),
113                profile.worker_args.clone(),
114                fabric_dir,
115                executor,
116                extract_provider_credentials(config),
117                config.provider.clone(),
118                config
119                    .subagents
120                    .max_concurrent
121                    .unwrap_or(super::actor_adapter::DEFAULT_MAX_CONCURRENT_ACTORS),
122            )));
123            continue;
124        }
125
126        if !matches!(profile.protocol, ExternalAgentProtocol::A2aJsonRpc) {
127            tracing::warn!(
128                "External agent profile {} uses unsupported protocol {:?}",
129                profile.agent_id,
130                profile.protocol
131            );
132            continue;
133        }
134
135        let auth_token = match profile.auth_ref.as_ref() {
136            Some(ref_name) => match std::env::var(ref_name) {
137                Ok(token) => Some(token),
138                Err(_) => {
139                    tracing::error!(
140                        "External agent profile {} auth_ref env var {} is not set",
141                        profile.agent_id,
142                        ref_name
143                    );
144                    continue;
145                }
146            },
147            None => None,
148        };
149
150        let client_config = match A2AExternalChildRunner::build_client_config(&profile, auth_token)
151        {
152            Ok(cfg) => cfg,
153            Err(e) => {
154                tracing::error!(
155                    "Failed to build A2A client config for profile {}: {}",
156                    profile.agent_id,
157                    e
158                );
159                continue;
160            }
161        };
162
163        let client = match A2AJsonRpcClient::new(client_config) {
164            Ok(c) => c,
165            Err(e) => {
166                tracing::error!(
167                    "Failed to create A2A JSON-RPC client for profile {}: {}",
168                    profile.agent_id,
169                    e
170                );
171                continue;
172            }
173        };
174
175        runners.push(Arc::new(A2AExternalChildRunner::new(client, profile)));
176    }
177
178    if runners.is_empty() {
179        None
180    } else {
181        Some(Arc::new(CompositeExternalChildRunner::new(runners)))
182    }
183}
184
185/// Build the built-in local actor runner from the typed `subagents`
186/// config. Everything is derived: worker = the current bamboo executable +
187/// `subagent-worker`, fabric = per-user temp dir — unless expert fields
188/// override them.
189fn build_local_actor_runner(config: &Config) -> Result<Arc<dyn ExternalChildRunner>, String> {
190    let sub = &config.subagents;
191
192    let (worker_bin, worker_args) = match &sub.worker_bin {
193        Some(custom) => (
194            std::path::PathBuf::from(custom),
195            sub.worker_args.clone().unwrap_or_default(),
196        ),
197        None => (
198            std::env::current_exe().map_err(|e| format!("cannot locate own executable: {e}"))?,
199            sub.worker_args
200                .clone()
201                .unwrap_or_else(|| vec!["subagent-worker".to_string()]),
202        ),
203    };
204
205    let fabric_dir = sub
206        .fabric_dir
207        .clone()
208        .map(std::path::PathBuf::from)
209        .unwrap_or_else(|| std::env::temp_dir().join("bamboo-subagents"));
210
211    let executor = match sub.executor.as_deref() {
212        Some("echo") => bamboo_subagent::provision::ExecutorSpec::Echo,
213        Some("bamboo_runtime") | None => bamboo_subagent::provision::ExecutorSpec::BambooRuntime,
214        Some(other) => return Err(format!("unknown subagents.executor '{other}'")),
215    };
216
217    Ok(Arc::new(ActorChildRunner::new(
218        super::config::LOCAL_ACTOR_AGENT_ID.to_string(),
219        worker_bin,
220        worker_args,
221        fabric_dir,
222        executor,
223        extract_provider_credentials(config),
224        config.provider.clone(),
225        sub.max_concurrent
226            .unwrap_or(super::actor_adapter::DEFAULT_MAX_CONCURRENT_ACTORS),
227    )))
228}
229
230/// Snapshot per-provider credentials from the parent config for actor
231/// provisioning. Serialized generically so this code does not chase the
232/// per-provider config struct shapes — any slot with a non-empty `api_key`
233/// yields a scoped credential (plus `base_url` when present).
234pub fn extract_provider_credentials(
235    config: &Config,
236) -> Vec<bamboo_subagent::provision::ScopedCredential> {
237    let mut out = Vec::new();
238
239    // Legacy single-instance slots: providers.anthropic / openai / …
240    if let Ok(serde_json::Value::Object(providers)) = serde_json::to_value(&config.providers) {
241        out.extend(providers.into_iter().filter_map(|(name, slot)| {
242            let api_key = slot.get("api_key")?.as_str()?.trim().to_string();
243            if api_key.is_empty() {
244                return None;
245            }
246            Some(bamboo_subagent::provision::ScopedCredential {
247                provider: name.clone(),
248                api_key,
249                base_url: slot
250                    .get("base_url")
251                    .and_then(|v| v.as_str())
252                    .map(str::to_string),
253                provider_type: Some(name),
254            })
255        }));
256    }
257
258    // Multi-instance providers: provider_instances keyed by instance id; the
259    // child routes by instance id, the worker constructs by provider_type.
260    // Read the typed struct directly — `api_key` is hydrated in memory but
261    // deliberately `skip_serializing`, so a serde projection would miss it.
262    out.extend(config.provider_instances.iter().filter_map(|(id, inst)| {
263        let api_key = inst.api_key.trim().to_string();
264        if api_key.is_empty() {
265            return None;
266        }
267        Some(bamboo_subagent::provision::ScopedCredential {
268            provider: id.clone(),
269            api_key,
270            base_url: inst.base_url.clone(),
271            provider_type: Some(inst.provider_type.clone()),
272        })
273    }));
274
275    out
276}