Skip to main content

bamboo_engine/external_agents/
actor_adapter.rs

1//! Actor external child runner.
2//!
3//! Runs a child session as an independent **actor**: a separate OS process with its own
4//! isolated context, speaking the `bamboo-subagent` WebSocket protocol. This is the
5//! engine-side adapter on the `wants_external` seam: it spawns the worker binary, waits for
6//! it to self-register into the Tier-1 file fabric, connects, sends the assignment, and
7//! forwards the child's `AgentEvent`s back onto the parent's `event_tx`.
8//!
9//! The built-in **local actor** instance of this runner is the default runtime for
10//! every sub-agent (the in-process runtime was removed). The expert `externalAgents`
11//! tables can additionally route specific roles to other actor/a2a agents.
12
13use std::collections::HashMap;
14use std::path::PathBuf;
15use std::sync::Arc;
16use std::time::Duration;
17
18use async_trait::async_trait;
19use bamboo_agent_core::{AgentError, AgentEvent, Role, Session};
20use tokio::sync::{mpsc, Mutex};
21use tokio_util::sync::CancellationToken;
22
23use bamboo_subagent::discovery::Fabric;
24use bamboo_subagent::fleet::{spawn_worker, SpawnedChild};
25use bamboo_subagent::proto::{ChildFrame, ParentFrame, RunSpec, TerminalStatus};
26use bamboo_subagent::provision::{
27    ChildIdentity, ExecutorSpec, ModelRefSpec, ProvisionSpec, ScopedCredential,
28};
29use bamboo_subagent::transport::ChildClient;
30
31use crate::runtime::execution::{ExternalChildRunner, SpawnJob};
32
33/// Default cap on simultaneously running actor processes.
34pub const DEFAULT_MAX_CONCURRENT_ACTORS: usize = 8;
35
36/// Default cap on idle pooled (warm, reusable) workers kept per fingerprint.
37const DEFAULT_MAX_IDLE_PER_KEY: usize = 4;
38
39/// How long a pooled worker waits for its next assignment before reclaiming
40/// itself (must comfortably exceed the gap between sibling spawns).
41const POOLED_IDLE_TIMEOUT_SECS: u64 = 300;
42
43/// A warm worker parked for reuse: its process handle (killed on drop), the WS
44/// endpoint to reconnect to, and the id it registered under in the fabric.
45struct PooledActor {
46    worker: SpawnedChild,
47    endpoint: String,
48    agent_id: String,
49}
50
51/// Spawns and drives a child session as an independent actor: a `bamboo-subagent` worker process.
52pub struct ActorChildRunner {
53    agent_id: String,
54    worker_bin: PathBuf,
55    worker_args: Vec<String>,
56    fabric_dir: PathBuf,
57    executor: ExecutorSpec,
58    /// Per-provider credentials snapshotted from the parent config at build
59    /// time; the spec carries only the ONE the child's provider needs.
60    credentials: Vec<ScopedCredential>,
61    /// Parent's default provider (used when the child has no explicit one).
62    default_provider: String,
63    /// Backpressure: bounds the number of concurrently *running* actors; further
64    /// runs wait for a slot instead of exploding the process table. (Idle pooled
65    /// workers do not hold a slot.)
66    concurrency: std::sync::Arc<tokio::sync::Semaphore>,
67    spawn_timeout: Duration,
68    /// Warm-worker pool keyed by a reuse fingerprint
69    /// (role/provider/model/workspace/disabled-tools). A finished run parks its
70    /// worker here so the next matching child reuses it instead of spawning a
71    /// fresh process — collapsing N sibling sub-agents onto a few processes.
72    pool: Arc<Mutex<HashMap<String, Vec<PooledActor>>>>,
73    max_idle_per_key: usize,
74}
75
76impl ActorChildRunner {
77    #[allow(clippy::too_many_arguments)]
78    pub fn new(
79        agent_id: String,
80        worker_bin: PathBuf,
81        worker_args: Vec<String>,
82        fabric_dir: PathBuf,
83        executor: ExecutorSpec,
84        credentials: Vec<ScopedCredential>,
85        default_provider: String,
86        max_concurrent: usize,
87    ) -> Self {
88        Self {
89            agent_id,
90            worker_bin,
91            worker_args,
92            fabric_dir,
93            executor,
94            credentials,
95            default_provider,
96            concurrency: std::sync::Arc::new(tokio::sync::Semaphore::new(max_concurrent.max(1))),
97            spawn_timeout: Duration::from_secs(30),
98            pool: Arc::new(Mutex::new(HashMap::new())),
99            max_idle_per_key: DEFAULT_MAX_IDLE_PER_KEY,
100        }
101    }
102
103    /// Reuse fingerprint: two children are interchangeable on one warm worker iff
104    /// they share role, provider, model, workspace, and disabled-tool set (those
105    /// are baked into the worker at provision time; everything else — assignment,
106    /// history — is shipped per-run in the `RunSpec`).
107    fn fingerprint(spec: &ProvisionSpec) -> String {
108        let role = spec.identity.role.as_str();
109        let (provider, model) = spec
110            .model
111            .as_ref()
112            .map(|m| (m.provider.as_str(), m.model.as_str()))
113            .unwrap_or(("", ""));
114        let workspace = spec.workspace.as_deref().unwrap_or("");
115        let mut tools = spec.disabled_tools.clone().unwrap_or_default();
116        tools.sort();
117        format!(
118            "{role}\u{1}{provider}\u{1}{model}\u{1}{workspace}\u{1}{}",
119            tools.join(",")
120        )
121    }
122
123    /// Check out a worker for this assignment: reuse a live pooled one matching
124    /// `key`, else spawn a fresh reusable worker.
125    async fn acquire_worker(
126        &self,
127        key: &str,
128        spec: &ProvisionSpec,
129    ) -> crate::runtime::runner::Result<PooledActor> {
130        // Drain the pool bucket, validating liveness; a worker that hit its idle
131        // timeout has exited and withdrawn its fabric record — skip and reap it.
132        loop {
133            let candidate = {
134                let mut pool = self.pool.lock().await;
135                pool.get_mut(key).and_then(|bucket| bucket.pop())
136            };
137            let Some(candidate) = candidate else { break };
138            let alive = Fabric::at(&self.fabric_dir)
139                .resolve(&candidate.agent_id)
140                .await
141                .ok()
142                .flatten()
143                .is_some();
144            if alive {
145                return Ok(candidate);
146            }
147            candidate.worker.kill().await;
148        }
149
150        let spawned = spawn_worker(
151            &self.worker_bin,
152            &self.worker_args,
153            spec,
154            self.spawn_timeout,
155        )
156        .await
157        .map_err(|e| AgentError::LLM(format!("actor spawn/register failed: {e}")))?;
158        let endpoint = spawned.record.endpoint.clone();
159        let agent_id = spawned.record.agent_id.clone();
160        Ok(PooledActor {
161            worker: spawned,
162            endpoint,
163            agent_id,
164        })
165    }
166
167    /// Park a worker for reuse after a clean run; if the bucket is full, retire it.
168    async fn release_worker(&self, key: &str, actor: PooledActor) {
169        let mut pool = self.pool.lock().await;
170        let bucket = pool.entry(key.to_string()).or_default();
171        if bucket.len() >= self.max_idle_per_key {
172            drop(pool);
173            self.retire_worker(actor).await;
174            return;
175        }
176        bucket.push(actor);
177    }
178
179    /// Forcefully stop a worker and clean its discovery record.
180    async fn retire_worker(&self, actor: PooledActor) {
181        let agent_id = actor.agent_id.clone();
182        actor.worker.kill().await;
183        let _ = Fabric::at(&self.fabric_dir).withdraw(&agent_id).await;
184    }
185
186    /// Assemble the parent-resolved provisioning document for this child.
187    fn build_spec(&self, session: &Session, job: &SpawnJob) -> ProvisionSpec {
188        let mut spec = ProvisionSpec::new(
189            ChildIdentity {
190                child_id: job.child_session_id.clone(),
191                parent_id: Some(job.parent_session_id.clone()),
192                project_key: None,
193                role: session
194                    .metadata
195                    .get("subagent_type")
196                    .cloned()
197                    .unwrap_or_else(|| "worker".to_string()),
198            },
199            self.executor.clone(),
200            self.fabric_dir.to_string_lossy().into_owned(),
201        );
202        spec.workspace = session.workspace.clone();
203        // Final model: the session's pinned model_ref (create.model / routing already applied),
204        // falling back to the job's bare model on the parent's default provider.
205        spec.model = session
206            .model_ref
207            .as_ref()
208            .map(|r| ModelRefSpec {
209                provider: r.provider.clone(),
210                model: r.model.clone(),
211            })
212            .or_else(|| {
213                let m = job.model.trim();
214                (!m.is_empty()).then(|| ModelRefSpec {
215                    provider: self.default_provider.clone(),
216                    model: m.to_string(),
217                })
218            });
219        spec.disabled_tools = job.disabled_tools.clone();
220        // Least-privilege secrets: only the credential for the child's provider.
221        let provider = spec
222            .model
223            .as_ref()
224            .map(|m| m.provider.as_str())
225            .filter(|p| !p.trim().is_empty())
226            .unwrap_or(&self.default_provider);
227        if let Some(cred) = self.credentials.iter().find(|c| c.provider == provider) {
228            spec.secrets.provider_credentials.push(cred.clone());
229        } else {
230            tracing::warn!(
231                "actor child {}: no credential found for provider '{}'",
232                job.child_session_id,
233                provider
234            );
235        }
236        spec
237    }
238}
239
240#[async_trait]
241impl ExternalChildRunner for ActorChildRunner {
242    async fn should_handle(&self, session: &Session) -> bool {
243        session.metadata.get("runtime.kind") == Some(&"external".to_string())
244            && session.metadata.get("external.protocol") == Some(&"actor".to_string())
245            && session.metadata.get("external.agent_id") == Some(&self.agent_id)
246    }
247
248    async fn execute_external_child(
249        &self,
250        session: &mut Session,
251        job: &SpawnJob,
252        event_tx: mpsc::Sender<AgentEvent>,
253        cancel_token: CancellationToken,
254    ) -> crate::runtime::runner::Result<()> {
255        let assignment = extract_assignment(session);
256        let mut spec = self.build_spec(session, job);
257        // Make every actor a warm, reusable worker so the pool can recycle it for
258        // the next sibling with a matching fingerprint.
259        spec.reusable = true;
260        if spec.limits.idle_timeout_secs.is_none() {
261            spec.limits.idle_timeout_secs = Some(POOLED_IDLE_TIMEOUT_SECS);
262        }
263        let pool_key = Self::fingerprint(&spec);
264        // Rehydration: the child session in the parent's store is the actor's
265        // durable state. Ship the full conversation so a reactivation
266        // (send_message / update / rerun) carries its history. A reused worker is
267        // stateless between runs, so this is also what isolates each child's
268        // context on a shared process.
269        let messages: Vec<serde_json::Value> = session
270            .messages
271            .iter()
272            .filter_map(|m| serde_json::to_value(m).ok())
273            .collect();
274
275        // Backpressure: hold a concurrency slot for the lifetime of the *run*
276        // (cancellation still proceeds — the cancel branch in drive() runs while
277        // we hold the permit). Released when this fn returns, i.e. once the worker
278        // is parked back into the pool, so idle workers don't pin slots.
279        let _slot = self
280            .concurrency
281            .acquire()
282            .await
283            .map_err(|_| AgentError::LLM("actor concurrency limiter closed".to_string()))?;
284
285        // Check out a warm worker (reuse-or-spawn).
286        let mut actor = self.acquire_worker(&pool_key, &spec).await?;
287
288        let mut client = match ChildClient::connect(&actor.endpoint).await {
289            Ok(client) => client,
290            Err(e) => {
291                // The pooled worker may have died between checkout and connect;
292                // retire it and spawn one fresh, once.
293                self.retire_worker(actor).await;
294                let spawned = spawn_worker(
295                    &self.worker_bin,
296                    &self.worker_args,
297                    &spec,
298                    self.spawn_timeout,
299                )
300                .await
301                .map_err(|e2| {
302                    AgentError::LLM(format!("actor respawn after reuse miss ({e}): {e2}"))
303                })?;
304                let endpoint = spawned.record.endpoint.clone();
305                let agent_id = spawned.record.agent_id.clone();
306                let client = ChildClient::connect(&endpoint)
307                    .await
308                    .map_err(|e2| AgentError::LLM(format!("actor connect failed: {e2}")))?;
309                actor = PooledActor {
310                    worker: spawned,
311                    endpoint,
312                    agent_id,
313                };
314                client
315            }
316        };
317
318        client
319            .send(ParentFrame::Run(RunSpec {
320                assignment,
321                reasoning_effort: None,
322                messages,
323            }))
324            .await
325            .map_err(|e| AgentError::LLM(format!("actor run dispatch failed: {e}")))?;
326
327        // Register as a live actor so send_message (running, no interrupt) can
328        // steer this child in-band over the existing WS connection. The guard
329        // unregisters on every exit path.
330        let (live_tx, mut live_rx) = mpsc::unbounded_channel::<ParentFrame>();
331        let live_guard = super::live::register(&job.child_session_id, live_tx);
332
333        let result = drive(&mut client, &event_tx, &cancel_token, &mut live_rx).await;
334        // Unregister IMMEDIATELY: after drive returns nobody consumes live_rx,
335        // so a send_message landing in the close/park window below must see
336        // "not live" and take the durable-queue fallback instead of vanishing.
337        // (Even if one slipped in earlier, send_message also appends it to the
338        // durable transcript, so the next activation still rehydrates it.)
339        drop(live_guard);
340
341        // Close the connection: the worker's serve loop then accepts the next
342        // assignment (reuse) or idles out. Park the worker on a clean run; retire
343        // it on error/cancel (a wedged worker must not be reused).
344        let _ = client.close().await;
345        match &result {
346            Ok(_) => self.release_worker(&pool_key, actor).await,
347            Err(_) => self.retire_worker(actor).await,
348        }
349
350        // Write-back: persist the actor's final reply onto the child session so
351        // the transcript survives and the NEXT activation sees it as history.
352        // (run_child_spawn saves the session right after we return.)
353        match result {
354            Ok(Some(text)) => {
355                if !text.is_empty() {
356                    session.add_message(bamboo_agent_core::Message::assistant(text, None));
357                }
358                Ok(())
359            }
360            Ok(None) => Ok(()),
361            Err(e) => Err(e),
362        }
363    }
364}
365
366/// Pump child frames -> parent events until a terminal frame (or cancellation).
367/// On success, yields the actor's final result text (for session write-back).
368/// `live_rx` carries in-band frames (steering messages) from the live registry.
369async fn drive(
370    client: &mut ChildClient,
371    event_tx: &mpsc::Sender<AgentEvent>,
372    cancel_token: &CancellationToken,
373    live_rx: &mut mpsc::UnboundedReceiver<ParentFrame>,
374) -> crate::runtime::runner::Result<Option<String>> {
375    loop {
376        tokio::select! {
377            _ = cancel_token.cancelled() => {
378                // fall through to the cancel handling below
379                break;
380            }
381            Some(frame) = live_rx.recv() => {
382                // Forward in-band steering to the worker over the existing WS.
383                if client.send(frame).await.is_err() {
384                    tracing::warn!("live steering frame could not be sent; connection failing");
385                }
386            }
387            frame = client.next_frame() => {
388                match frame {
389                    Ok(Some(ChildFrame::Event { event })) => {
390                        // AgentEvent is serialized verbatim on the wire (zero mapping).
391                        if let Ok(ev) = serde_json::from_value::<AgentEvent>(event) {
392                            let _ = event_tx.send(ev).await;
393                        }
394                    }
395                    Ok(Some(ChildFrame::Terminal { status, result, error })) => {
396                        return match status {
397                            TerminalStatus::Completed => Ok(result),
398                            TerminalStatus::Cancelled => Err(AgentError::Cancelled),
399                            TerminalStatus::Error => Err(AgentError::LLM(
400                                error.unwrap_or_else(|| "actor child errored".to_string()),
401                            )),
402                        };
403                    }
404                    Ok(None) => {
405                        return Err(AgentError::LLM(
406                            "actor child closed before terminal".to_string(),
407                        ));
408                    }
409                    Err(e) => {
410                        return Err(AgentError::LLM(format!("actor transport error: {e}")));
411                    }
412                }
413            }
414        }
415    }
416
417    // Only reached on cancellation: ask the child to stop (best-effort), then report cancelled.
418    let _ = client.send(ParentFrame::Cancel).await;
419    Err(AgentError::Cancelled)
420}
421
422/// The assignment text = the child session's latest user message (falls back to its title).
423fn extract_assignment(session: &Session) -> String {
424    session
425        .messages
426        .iter()
427        .rev()
428        .find(|m| matches!(m.role, Role::User))
429        .map(|m| m.content.clone())
430        .unwrap_or_else(|| {
431            session
432                .metadata
433                .get("title")
434                .cloned()
435                .unwrap_or_else(|| "Execute task".to_string())
436        })
437}
438
439#[cfg(test)]
440mod tests {
441    use super::*;
442
443    fn spec_with(
444        role: &str,
445        provider: &str,
446        model: &str,
447        workspace: Option<&str>,
448        disabled: Option<Vec<&str>>,
449    ) -> ProvisionSpec {
450        let mut spec = ProvisionSpec::new(
451            ChildIdentity {
452                child_id: "c".into(),
453                parent_id: None,
454                project_key: None,
455                role: role.into(),
456            },
457            ExecutorSpec::Echo,
458            "/tmp/fab".into(),
459        );
460        spec.workspace = workspace.map(|w| w.to_string());
461        spec.model = Some(ModelRefSpec {
462            provider: provider.into(),
463            model: model.into(),
464        });
465        spec.disabled_tools = disabled.map(|d| d.into_iter().map(String::from).collect());
466        spec
467    }
468
469    #[test]
470    fn fingerprint_matches_interchangeable_children() {
471        // Same role/provider/model/workspace and equal tool sets (order-insensitive)
472        // are interchangeable on one warm worker — and differ only in child_id.
473        let a = spec_with(
474            "explorer",
475            "p",
476            "m",
477            Some("/ws"),
478            Some(vec!["Bash", "Edit"]),
479        );
480        let mut b = spec_with(
481            "explorer",
482            "p",
483            "m",
484            Some("/ws"),
485            Some(vec!["Edit", "Bash"]),
486        );
487        b.identity.child_id = "other".into();
488        assert_eq!(
489            ActorChildRunner::fingerprint(&a),
490            ActorChildRunner::fingerprint(&b)
491        );
492    }
493
494    #[test]
495    fn fingerprint_separates_distinct_runtimes() {
496        let base = spec_with("explorer", "p", "m", Some("/ws"), None);
497        let base_fp = ActorChildRunner::fingerprint(&base);
498        // Each axis that is baked into the worker must split the pool bucket.
499        assert_ne!(
500            base_fp,
501            ActorChildRunner::fingerprint(&spec_with("writer", "p", "m", Some("/ws"), None))
502        );
503        assert_ne!(
504            base_fp,
505            ActorChildRunner::fingerprint(&spec_with("explorer", "p2", "m", Some("/ws"), None))
506        );
507        assert_ne!(
508            base_fp,
509            ActorChildRunner::fingerprint(&spec_with("explorer", "p", "m2", Some("/ws"), None))
510        );
511        assert_ne!(
512            base_fp,
513            ActorChildRunner::fingerprint(&spec_with("explorer", "p", "m", Some("/ws2"), None))
514        );
515        assert_ne!(
516            base_fp,
517            ActorChildRunner::fingerprint(&spec_with(
518                "explorer",
519                "p",
520                "m",
521                Some("/ws"),
522                Some(vec!["Bash"])
523            ))
524        );
525    }
526}