Skip to main content

bamboo_engine/runtime/execution/
spawn.rs

1//! Sub-session spawn scheduler.
2//!
3//! Provides a background queue for spawning child sessions. Spawn is async
4//! (tool returns immediately), but the UI can observe child progress via
5//! events forwarded to the parent session stream.
6
7use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::Duration;
10
11use chrono::Utc;
12use tokio::sync::{broadcast, mpsc, RwLock};
13use tokio_util::sync::CancellationToken;
14
15use bamboo_agent_core::tools::ToolExecutor;
16use bamboo_agent_core::{AgentEvent, Session};
17use bamboo_llm::ProviderModelRouter;
18
19use crate::runtime::Agent;
20
21use super::child_completion::{ChildCompletion, ChildCompletionHandler};
22use super::runner_state::AgentRunner;
23
24#[derive(Debug, Clone)]
25pub struct SpawnJob {
26    pub parent_session_id: String,
27    pub child_session_id: String,
28    pub model: String,
29    /// Tool names to hide from the LLM schema for this child session.
30    /// Computed from the child's `subagent_type` profile policy.
31    pub disabled_tools: Option<Vec<String>>,
32}
33
34/// Trait for external child session runtimes (e.g. A2A, CLI adapters).
35///
36/// Implementors are responsible for emitting AgentEvents via `event_tx`
37/// and respecting the `cancel_token`.
38#[async_trait::async_trait]
39pub trait ExternalChildRunner: Send + Sync {
40    /// Returns true if this runner should handle the given child session.
41    async fn should_handle(&self, session: &Session) -> bool;
42
43    /// Execute the child session using an external runtime.
44    async fn execute_external_child(
45        &self,
46        session: &mut Session,
47        job: &SpawnJob,
48        event_tx: tokio::sync::mpsc::Sender<AgentEvent>,
49        cancel_token: CancellationToken,
50    ) -> crate::runtime::runner::Result<()>;
51
52    /// Bind this runner's per-run escalation host bridge (#68). A nested worker's
53    /// `run()` installs its OWN host bridge here so the runner can hand it to each
54    /// grandchild's `drive()` AT SPAWN time (captured into the drive task, not read
55    /// later), letting the grandchild re-proxy a non-bypass approval request UP to
56    /// its parent run for its whole lifetime — even when it outlives the run that
57    /// spawned it. Default no-op for runners that don't escalate (e.g. A2A).
58    fn set_escalation_bridge(&self, _bridge: Option<bamboo_subagent::executor::HostBridge>) {}
59}
60
61#[derive(Clone)]
62pub struct SpawnContext {
63    pub agent: Arc<Agent>,
64    pub tools: Arc<dyn ToolExecutor>,
65    pub sessions_cache: crate::SessionCache,
66    pub agent_runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
67    pub session_event_senders: Arc<RwLock<HashMap<String, broadcast::Sender<AgentEvent>>>>,
68    pub external_child_runner: Arc<dyn ExternalChildRunner>,
69    pub provider_router: Option<Arc<ProviderModelRouter>>,
70    pub app_data_dir: Option<std::path::PathBuf>,
71    /// Optional application-layer completion hook. The engine still emits
72    /// `SubAgentCompleted` to the parent stream itself; this hook lets the
73    /// server persist parent wait state and resume the parent runner without
74    /// introducing an engine -> AppState dependency.
75    pub completion_handler: Option<Arc<dyn ChildCompletionHandler>>,
76    /// Optional inbox to the account-wide change feed. When present, durable
77    /// change events from child-session execution are mirrored onto the feed
78    /// for resumable multi-client sync.
79    pub account_feed_inbox: Option<super::event_forwarder::AccountFeedInbox>,
80}
81
82#[derive(Clone)]
83pub struct SpawnScheduler {
84    tx: mpsc::Sender<SpawnJob>,
85}
86
87impl SpawnScheduler {
88    pub fn new(ctx: SpawnContext) -> Self {
89        let (tx, mut rx) = mpsc::channel::<SpawnJob>(128);
90
91        tokio::spawn(async move {
92            while let Some(job) = rx.recv().await {
93                if let Err(err) = run_spawn_job(ctx.clone(), job).await {
94                    tracing::warn!("spawn job failed: {}", err);
95                }
96            }
97        });
98
99        Self { tx }
100    }
101
102    pub async fn enqueue(&self, job: SpawnJob) -> Result<(), String> {
103        self.tx
104            .send(job)
105            .await
106            .map_err(|_| "spawn scheduler is not running".to_string())
107    }
108}
109
110#[derive(Debug, Clone, Copy)]
111pub(crate) struct ChildWatchdogPolicy {
112    check_interval_secs: i64,
113    max_total_secs: i64,
114    max_idle_secs: i64,
115}
116
117impl Default for ChildWatchdogPolicy {
118    fn default() -> Self {
119        Self {
120            check_interval_secs: 15,
121            // Parent waits may be longer, but child execution owns its own
122            // liveness. A one hour total cap avoids indefinitely orphaned
123            // sub-session runners.
124            max_total_secs: 60 * 60,
125            // No child event for 15 minutes is considered stalled.
126            max_idle_secs: 15 * 60,
127        }
128    }
129}
130
131fn metadata_i64(session: &Session, key: &str) -> Option<i64> {
132    session
133        .metadata
134        .get(key)
135        .and_then(|value| value.trim().parse::<i64>().ok())
136        .filter(|value| *value > 0)
137}
138
139pub(crate) fn watchdog_policy_for_session(session: &Session) -> ChildWatchdogPolicy {
140    let mut policy = ChildWatchdogPolicy::default();
141    if let Some(value) = metadata_i64(session, "child_watchdog.max_total_secs") {
142        policy.max_total_secs = value;
143    }
144    if let Some(value) = metadata_i64(session, "child_watchdog.max_idle_secs") {
145        policy.max_idle_secs = value;
146    }
147    if let Some(value) = metadata_i64(session, "child_watchdog.check_interval_secs") {
148        policy.check_interval_secs = value;
149    }
150    policy
151}
152
153async fn publish_child_completion(
154    parent_tx: &broadcast::Sender<AgentEvent>,
155    completion_handler: Option<Arc<dyn ChildCompletionHandler>>,
156    completion: ChildCompletion,
157) {
158    let _ = parent_tx.send(AgentEvent::SubAgentCompleted {
159        parent_session_id: completion.parent_session_id.clone(),
160        child_session_id: completion.child_session_id.clone(),
161        status: completion.status.clone(),
162        error: completion.error.clone(),
163    });
164
165    if let Some(handler) = completion_handler {
166        handler.on_child_completed(completion).await;
167    }
168}
169
170pub(crate) async fn publish_child_completion_parts(
171    parent_tx: &broadcast::Sender<AgentEvent>,
172    completion_handler: Option<Arc<dyn ChildCompletionHandler>>,
173    parent_session_id: String,
174    child_session_id: String,
175    status: String,
176    error: Option<String>,
177) {
178    publish_child_completion(
179        parent_tx,
180        completion_handler,
181        ChildCompletion {
182            parent_session_id,
183            child_session_id,
184            status,
185            error,
186            completed_at: Utc::now(),
187        },
188    )
189    .await;
190}
191
192pub(crate) async fn watch_child_liveness(
193    parent_session_id: String,
194    child_session_id: String,
195    runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
196    cancel_token: CancellationToken,
197    timeout_reason: Arc<RwLock<Option<String>>>,
198    done: CancellationToken,
199    policy: ChildWatchdogPolicy,
200) {
201    let mut ticker =
202        tokio::time::interval(Duration::from_secs(policy.check_interval_secs.max(1) as u64));
203    // Skip the immediate tick.
204    ticker.tick().await;
205
206    loop {
207        tokio::select! {
208            _ = done.cancelled() => return,
209            _ = ticker.tick() => {
210                if cancel_token.is_cancelled() {
211                    return;
212                }
213
214                let snapshot = {
215                    let guard = runners.read().await;
216                    guard.get(&child_session_id).cloned()
217                };
218                let Some(runner) = snapshot else {
219                    return;
220                };
221                if !matches!(runner.status, super::runner_state::AgentStatus::Running) {
222                    return;
223                }
224
225                let now = Utc::now();
226                let total_secs = now.signed_duration_since(runner.started_at).num_seconds();
227                if total_secs >= policy.max_total_secs {
228                    let reason = format!(
229                        "Child session timed out after {} seconds (max_total_secs={})",
230                        total_secs, policy.max_total_secs
231                    );
232                    tracing::warn!(
233                        parent_session_id = %parent_session_id,
234                        child_session_id = %child_session_id,
235                        reason = %reason,
236                        "child session total timeout; cancelling child runner"
237                    );
238                    *timeout_reason.write().await = Some(reason);
239                    cancel_token.cancel();
240                    return;
241                }
242
243                let last_activity_at = runner.last_event_at.unwrap_or(runner.started_at);
244                let idle_secs = now.signed_duration_since(last_activity_at).num_seconds();
245                if idle_secs >= policy.max_idle_secs {
246                    let reason = format!(
247                        "Child session idle timeout after {} seconds without events (max_idle_secs={})",
248                        idle_secs, policy.max_idle_secs
249                    );
250                    tracing::warn!(
251                        parent_session_id = %parent_session_id,
252                        child_session_id = %child_session_id,
253                        reason = %reason,
254                        last_tool_name = ?runner.last_tool_name,
255                        last_tool_phase = ?runner.last_tool_phase,
256                        round_count = runner.round_count,
257                        "child session idle timeout; cancelling child runner"
258                    );
259                    *timeout_reason.write().await = Some(reason);
260                    cancel_token.cancel();
261                    return;
262                }
263            }
264        }
265    }
266}
267
268/// Drive a single queued spawn job through the canonical child-spawn path.
269///
270/// ANTI-FORK: this is a 1-line delegator to [`crate::sdk::spawn::run_child_spawn`],
271/// which is the single implementation of the spawn/execute/finalize logic. The
272/// `SpawnScheduler` queue mechanics (above) remain here; the body lives in the SDK
273/// core so both the scheduler and the ergonomic `ChildRunner` funnel into it.
274async fn run_spawn_job(ctx: SpawnContext, job: SpawnJob) -> Result<(), String> {
275    crate::sdk::spawn::run_child_spawn(ctx, job).await
276}