1use std::collections::HashMap;
14use std::path::PathBuf;
15use std::sync::Arc;
16use std::time::Duration;
17
18use async_trait::async_trait;
19use bamboo_agent_core::{AgentError, AgentEvent, Role, Session};
20use tokio::sync::{mpsc, Mutex};
21use tokio_util::sync::CancellationToken;
22
23use bamboo_subagent::discovery::Fabric;
24use bamboo_subagent::fleet::{spawn_worker, SpawnedChild};
25use bamboo_subagent::proto::{ChildFrame, ParentFrame, RunSpec, TerminalStatus};
26use bamboo_subagent::provision::{
27 ChildIdentity, ExecutorSpec, ModelRefSpec, ProvisionSpec, ScopedCredential,
28};
29use bamboo_subagent::transport::ChildClient;
30
31use crate::runtime::execution::{ExternalChildRunner, SpawnJob};
32
33pub const DEFAULT_MAX_CONCURRENT_ACTORS: usize = 8;
35
36const DEFAULT_MAX_IDLE_PER_KEY: usize = 4;
38
39const POOLED_IDLE_TIMEOUT_SECS: u64 = 300;
42
43struct PooledActor {
46 worker: SpawnedChild,
47 endpoint: String,
48 agent_id: String,
49}
50
51pub struct ActorChildRunner {
53 agent_id: String,
54 worker_bin: PathBuf,
55 worker_args: Vec<String>,
56 fabric_dir: PathBuf,
57 executor: ExecutorSpec,
58 credentials: Vec<ScopedCredential>,
61 default_provider: String,
63 concurrency: std::sync::Arc<tokio::sync::Semaphore>,
67 spawn_timeout: Duration,
68 pool: Arc<Mutex<HashMap<String, Vec<PooledActor>>>>,
73 max_idle_per_key: usize,
74}
75
76impl ActorChildRunner {
77 #[allow(clippy::too_many_arguments)]
78 pub fn new(
79 agent_id: String,
80 worker_bin: PathBuf,
81 worker_args: Vec<String>,
82 fabric_dir: PathBuf,
83 executor: ExecutorSpec,
84 credentials: Vec<ScopedCredential>,
85 default_provider: String,
86 max_concurrent: usize,
87 ) -> Self {
88 Self {
89 agent_id,
90 worker_bin,
91 worker_args,
92 fabric_dir,
93 executor,
94 credentials,
95 default_provider,
96 concurrency: std::sync::Arc::new(tokio::sync::Semaphore::new(max_concurrent.max(1))),
97 spawn_timeout: Duration::from_secs(30),
98 pool: Arc::new(Mutex::new(HashMap::new())),
99 max_idle_per_key: DEFAULT_MAX_IDLE_PER_KEY,
100 }
101 }
102
103 fn fingerprint(spec: &ProvisionSpec) -> String {
108 let role = spec.identity.role.as_str();
109 let (provider, model) = spec
110 .model
111 .as_ref()
112 .map(|m| (m.provider.as_str(), m.model.as_str()))
113 .unwrap_or(("", ""));
114 let workspace = spec.workspace.as_deref().unwrap_or("");
115 let mut tools = spec.disabled_tools.clone().unwrap_or_default();
116 tools.sort();
117 format!(
118 "{role}\u{1}{provider}\u{1}{model}\u{1}{workspace}\u{1}{}",
119 tools.join(",")
120 )
121 }
122
123 async fn acquire_worker(
126 &self,
127 key: &str,
128 spec: &ProvisionSpec,
129 ) -> crate::runtime::runner::Result<PooledActor> {
130 loop {
133 let candidate = {
134 let mut pool = self.pool.lock().await;
135 pool.get_mut(key).and_then(|bucket| bucket.pop())
136 };
137 let Some(candidate) = candidate else { break };
138 let alive = Fabric::at(&self.fabric_dir)
139 .resolve(&candidate.agent_id)
140 .await
141 .ok()
142 .flatten()
143 .is_some();
144 if alive {
145 return Ok(candidate);
146 }
147 candidate.worker.kill().await;
148 }
149
150 let spawned = spawn_worker(
151 &self.worker_bin,
152 &self.worker_args,
153 spec,
154 self.spawn_timeout,
155 )
156 .await
157 .map_err(|e| AgentError::LLM(format!("actor spawn/register failed: {e}")))?;
158 let endpoint = spawned.record.endpoint.clone();
159 let agent_id = spawned.record.agent_id.clone();
160 Ok(PooledActor {
161 worker: spawned,
162 endpoint,
163 agent_id,
164 })
165 }
166
167 async fn release_worker(&self, key: &str, actor: PooledActor) {
169 let mut pool = self.pool.lock().await;
170 let bucket = pool.entry(key.to_string()).or_default();
171 if bucket.len() >= self.max_idle_per_key {
172 drop(pool);
173 self.retire_worker(actor).await;
174 return;
175 }
176 bucket.push(actor);
177 }
178
179 async fn retire_worker(&self, actor: PooledActor) {
181 let agent_id = actor.agent_id.clone();
182 actor.worker.kill().await;
183 let _ = Fabric::at(&self.fabric_dir).withdraw(&agent_id).await;
184 }
185
186 fn build_spec(&self, session: &Session, job: &SpawnJob) -> ProvisionSpec {
188 let mut spec = ProvisionSpec::new(
189 ChildIdentity {
190 child_id: job.child_session_id.clone(),
191 parent_id: Some(job.parent_session_id.clone()),
192 project_key: None,
193 role: session
194 .metadata
195 .get("subagent_type")
196 .cloned()
197 .unwrap_or_else(|| "worker".to_string()),
198 },
199 self.executor.clone(),
200 self.fabric_dir.to_string_lossy().into_owned(),
201 );
202 spec.workspace = session.workspace.clone();
203 spec.model = session
206 .model_ref
207 .as_ref()
208 .map(|r| ModelRefSpec {
209 provider: r.provider.clone(),
210 model: r.model.clone(),
211 })
212 .or_else(|| {
213 let m = job.model.trim();
214 (!m.is_empty()).then(|| ModelRefSpec {
215 provider: self.default_provider.clone(),
216 model: m.to_string(),
217 })
218 });
219 spec.disabled_tools = job.disabled_tools.clone();
220 let provider = spec
222 .model
223 .as_ref()
224 .map(|m| m.provider.as_str())
225 .filter(|p| !p.trim().is_empty())
226 .unwrap_or(&self.default_provider);
227 if let Some(cred) = self.credentials.iter().find(|c| c.provider == provider) {
228 spec.secrets.provider_credentials.push(cred.clone());
229 } else {
230 tracing::warn!(
231 "actor child {}: no credential found for provider '{}'",
232 job.child_session_id,
233 provider
234 );
235 }
236 spec
237 }
238}
239
240#[async_trait]
241impl ExternalChildRunner for ActorChildRunner {
242 async fn should_handle(&self, session: &Session) -> bool {
243 session.metadata.get("runtime.kind") == Some(&"external".to_string())
244 && session.metadata.get("external.protocol") == Some(&"actor".to_string())
245 && session.metadata.get("external.agent_id") == Some(&self.agent_id)
246 }
247
248 async fn execute_external_child(
249 &self,
250 session: &mut Session,
251 job: &SpawnJob,
252 event_tx: mpsc::Sender<AgentEvent>,
253 cancel_token: CancellationToken,
254 ) -> crate::runtime::runner::Result<()> {
255 let assignment = extract_assignment(session);
256 let mut spec = self.build_spec(session, job);
257 spec.reusable = true;
260 if spec.limits.idle_timeout_secs.is_none() {
261 spec.limits.idle_timeout_secs = Some(POOLED_IDLE_TIMEOUT_SECS);
262 }
263 let pool_key = Self::fingerprint(&spec);
264 let messages: Vec<serde_json::Value> = session
270 .messages
271 .iter()
272 .filter_map(|m| serde_json::to_value(m).ok())
273 .collect();
274
275 let _slot = self
280 .concurrency
281 .acquire()
282 .await
283 .map_err(|_| AgentError::LLM("actor concurrency limiter closed".to_string()))?;
284
285 let mut actor = self.acquire_worker(&pool_key, &spec).await?;
287
288 let mut client = match ChildClient::connect(&actor.endpoint).await {
289 Ok(client) => client,
290 Err(e) => {
291 self.retire_worker(actor).await;
294 let spawned = spawn_worker(
295 &self.worker_bin,
296 &self.worker_args,
297 &spec,
298 self.spawn_timeout,
299 )
300 .await
301 .map_err(|e2| {
302 AgentError::LLM(format!("actor respawn after reuse miss ({e}): {e2}"))
303 })?;
304 let endpoint = spawned.record.endpoint.clone();
305 let agent_id = spawned.record.agent_id.clone();
306 let client = ChildClient::connect(&endpoint)
307 .await
308 .map_err(|e2| AgentError::LLM(format!("actor connect failed: {e2}")))?;
309 actor = PooledActor {
310 worker: spawned,
311 endpoint,
312 agent_id,
313 };
314 client
315 }
316 };
317
318 client
319 .send(ParentFrame::Run(RunSpec {
320 assignment,
321 reasoning_effort: None,
322 messages,
323 }))
324 .await
325 .map_err(|e| AgentError::LLM(format!("actor run dispatch failed: {e}")))?;
326
327 let (live_tx, mut live_rx) = mpsc::unbounded_channel::<ParentFrame>();
331 let live_guard = super::live::register(&job.child_session_id, live_tx);
332
333 let result = drive(&mut client, &event_tx, &cancel_token, &mut live_rx).await;
334 drop(live_guard);
340
341 let _ = client.close().await;
345 match &result {
346 Ok(_) => self.release_worker(&pool_key, actor).await,
347 Err(_) => self.retire_worker(actor).await,
348 }
349
350 match result {
354 Ok(Some(text)) => {
355 if !text.is_empty() {
356 session.add_message(bamboo_agent_core::Message::assistant(text, None));
357 }
358 Ok(())
359 }
360 Ok(None) => Ok(()),
361 Err(e) => Err(e),
362 }
363 }
364}
365
366async fn drive(
370 client: &mut ChildClient,
371 event_tx: &mpsc::Sender<AgentEvent>,
372 cancel_token: &CancellationToken,
373 live_rx: &mut mpsc::UnboundedReceiver<ParentFrame>,
374) -> crate::runtime::runner::Result<Option<String>> {
375 loop {
376 tokio::select! {
377 _ = cancel_token.cancelled() => {
378 break;
380 }
381 Some(frame) = live_rx.recv() => {
382 if client.send(frame).await.is_err() {
384 tracing::warn!("live steering frame could not be sent; connection failing");
385 }
386 }
387 frame = client.next_frame() => {
388 match frame {
389 Ok(Some(ChildFrame::Event { event })) => {
390 if let Ok(ev) = serde_json::from_value::<AgentEvent>(event) {
392 let _ = event_tx.send(ev).await;
393 }
394 }
395 Ok(Some(ChildFrame::Terminal { status, result, error })) => {
396 return match status {
397 TerminalStatus::Completed => Ok(result),
398 TerminalStatus::Cancelled => Err(AgentError::Cancelled),
399 TerminalStatus::Error => Err(AgentError::LLM(
400 error.unwrap_or_else(|| "actor child errored".to_string()),
401 )),
402 };
403 }
404 Ok(None) => {
405 return Err(AgentError::LLM(
406 "actor child closed before terminal".to_string(),
407 ));
408 }
409 Err(e) => {
410 return Err(AgentError::LLM(format!("actor transport error: {e}")));
411 }
412 }
413 }
414 }
415 }
416
417 let _ = client.send(ParentFrame::Cancel).await;
419 Err(AgentError::Cancelled)
420}
421
422fn extract_assignment(session: &Session) -> String {
424 session
425 .messages
426 .iter()
427 .rev()
428 .find(|m| matches!(m.role, Role::User))
429 .map(|m| m.content.clone())
430 .unwrap_or_else(|| {
431 session
432 .metadata
433 .get("title")
434 .cloned()
435 .unwrap_or_else(|| "Execute task".to_string())
436 })
437}
438
439#[cfg(test)]
440mod tests {
441 use super::*;
442
443 fn spec_with(
444 role: &str,
445 provider: &str,
446 model: &str,
447 workspace: Option<&str>,
448 disabled: Option<Vec<&str>>,
449 ) -> ProvisionSpec {
450 let mut spec = ProvisionSpec::new(
451 ChildIdentity {
452 child_id: "c".into(),
453 parent_id: None,
454 project_key: None,
455 role: role.into(),
456 },
457 ExecutorSpec::Echo,
458 "/tmp/fab".into(),
459 );
460 spec.workspace = workspace.map(|w| w.to_string());
461 spec.model = Some(ModelRefSpec {
462 provider: provider.into(),
463 model: model.into(),
464 });
465 spec.disabled_tools = disabled.map(|d| d.into_iter().map(String::from).collect());
466 spec
467 }
468
469 #[test]
470 fn fingerprint_matches_interchangeable_children() {
471 let a = spec_with(
474 "explorer",
475 "p",
476 "m",
477 Some("/ws"),
478 Some(vec!["Bash", "Edit"]),
479 );
480 let mut b = spec_with(
481 "explorer",
482 "p",
483 "m",
484 Some("/ws"),
485 Some(vec!["Edit", "Bash"]),
486 );
487 b.identity.child_id = "other".into();
488 assert_eq!(
489 ActorChildRunner::fingerprint(&a),
490 ActorChildRunner::fingerprint(&b)
491 );
492 }
493
494 #[test]
495 fn fingerprint_separates_distinct_runtimes() {
496 let base = spec_with("explorer", "p", "m", Some("/ws"), None);
497 let base_fp = ActorChildRunner::fingerprint(&base);
498 assert_ne!(
500 base_fp,
501 ActorChildRunner::fingerprint(&spec_with("writer", "p", "m", Some("/ws"), None))
502 );
503 assert_ne!(
504 base_fp,
505 ActorChildRunner::fingerprint(&spec_with("explorer", "p2", "m", Some("/ws"), None))
506 );
507 assert_ne!(
508 base_fp,
509 ActorChildRunner::fingerprint(&spec_with("explorer", "p", "m2", Some("/ws"), None))
510 );
511 assert_ne!(
512 base_fp,
513 ActorChildRunner::fingerprint(&spec_with("explorer", "p", "m", Some("/ws2"), None))
514 );
515 assert_ne!(
516 base_fp,
517 ActorChildRunner::fingerprint(&spec_with(
518 "explorer",
519 "p",
520 "m",
521 Some("/ws"),
522 Some(vec!["Bash"])
523 ))
524 );
525 }
526}