Skip to main content

bamboo_engine/runtime/execution/
spawn.rs

1//! Sub-session spawn scheduler.
2//!
3//! Provides a background queue for spawning child sessions. Spawn is async
4//! (tool returns immediately), but the UI can observe child progress via
5//! events forwarded to the parent session stream.
6
7use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::Duration;
10
11use chrono::Utc;
12use tokio::sync::{broadcast, mpsc, RwLock};
13use tokio_util::sync::CancellationToken;
14
15use bamboo_agent_core::tools::ToolExecutor;
16use bamboo_agent_core::{AgentEvent, Session};
17use bamboo_llm::ProviderModelRouter;
18
19use crate::runtime::Agent;
20
21use super::child_completion::{ChildCompletion, ChildCompletionHandler};
22use super::runner_state::AgentRunner;
23
24#[derive(Debug, Clone)]
25pub struct SpawnJob {
26    pub parent_session_id: String,
27    pub child_session_id: String,
28    pub model: String,
29    /// Tool names to hide from the LLM schema for this child session.
30    /// Computed from the child's `subagent_type` profile policy.
31    pub disabled_tools: Option<Vec<String>>,
32}
33
34/// Trait for external child session runtimes (e.g. A2A, CLI adapters).
35///
36/// Implementors are responsible for emitting AgentEvents via `event_tx`
37/// and respecting the `cancel_token`.
38#[async_trait::async_trait]
39pub trait ExternalChildRunner: Send + Sync {
40    /// Returns true if this runner should handle the given child session.
41    async fn should_handle(&self, session: &Session) -> bool;
42
43    /// Execute the child session using an external runtime.
44    async fn execute_external_child(
45        &self,
46        session: &mut Session,
47        job: &SpawnJob,
48        event_tx: tokio::sync::mpsc::Sender<AgentEvent>,
49        cancel_token: CancellationToken,
50    ) -> crate::runtime::runner::Result<()>;
51}
52
53#[derive(Clone)]
54pub struct SpawnContext {
55    pub agent: Arc<Agent>,
56    pub tools: Arc<dyn ToolExecutor>,
57    pub sessions_cache: crate::SessionCache,
58    pub agent_runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
59    pub session_event_senders: Arc<RwLock<HashMap<String, broadcast::Sender<AgentEvent>>>>,
60    pub external_child_runner: Arc<dyn ExternalChildRunner>,
61    pub provider_router: Option<Arc<ProviderModelRouter>>,
62    pub app_data_dir: Option<std::path::PathBuf>,
63    /// Optional application-layer completion hook. The engine still emits
64    /// `SubAgentCompleted` to the parent stream itself; this hook lets the
65    /// server persist parent wait state and resume the parent runner without
66    /// introducing an engine -> AppState dependency.
67    pub completion_handler: Option<Arc<dyn ChildCompletionHandler>>,
68    /// Optional inbox to the account-wide change feed. When present, durable
69    /// change events from child-session execution are mirrored onto the feed
70    /// for resumable multi-client sync.
71    pub account_feed_inbox: Option<super::event_forwarder::AccountFeedInbox>,
72}
73
74#[derive(Clone)]
75pub struct SpawnScheduler {
76    tx: mpsc::Sender<SpawnJob>,
77}
78
79impl SpawnScheduler {
80    pub fn new(ctx: SpawnContext) -> Self {
81        let (tx, mut rx) = mpsc::channel::<SpawnJob>(128);
82
83        tokio::spawn(async move {
84            while let Some(job) = rx.recv().await {
85                if let Err(err) = run_spawn_job(ctx.clone(), job).await {
86                    tracing::warn!("spawn job failed: {}", err);
87                }
88            }
89        });
90
91        Self { tx }
92    }
93
94    pub async fn enqueue(&self, job: SpawnJob) -> Result<(), String> {
95        self.tx
96            .send(job)
97            .await
98            .map_err(|_| "spawn scheduler is not running".to_string())
99    }
100}
101
102#[derive(Debug, Clone, Copy)]
103pub(crate) struct ChildWatchdogPolicy {
104    check_interval_secs: i64,
105    max_total_secs: i64,
106    max_idle_secs: i64,
107}
108
109impl Default for ChildWatchdogPolicy {
110    fn default() -> Self {
111        Self {
112            check_interval_secs: 15,
113            // Parent waits may be longer, but child execution owns its own
114            // liveness. A one hour total cap avoids indefinitely orphaned
115            // sub-session runners.
116            max_total_secs: 60 * 60,
117            // No child event for 15 minutes is considered stalled.
118            max_idle_secs: 15 * 60,
119        }
120    }
121}
122
123fn metadata_i64(session: &Session, key: &str) -> Option<i64> {
124    session
125        .metadata
126        .get(key)
127        .and_then(|value| value.trim().parse::<i64>().ok())
128        .filter(|value| *value > 0)
129}
130
131pub(crate) fn watchdog_policy_for_session(session: &Session) -> ChildWatchdogPolicy {
132    let mut policy = ChildWatchdogPolicy::default();
133    if let Some(value) = metadata_i64(session, "child_watchdog.max_total_secs") {
134        policy.max_total_secs = value;
135    }
136    if let Some(value) = metadata_i64(session, "child_watchdog.max_idle_secs") {
137        policy.max_idle_secs = value;
138    }
139    if let Some(value) = metadata_i64(session, "child_watchdog.check_interval_secs") {
140        policy.check_interval_secs = value;
141    }
142    policy
143}
144
145async fn publish_child_completion(
146    parent_tx: &broadcast::Sender<AgentEvent>,
147    completion_handler: Option<Arc<dyn ChildCompletionHandler>>,
148    completion: ChildCompletion,
149) {
150    let _ = parent_tx.send(AgentEvent::SubAgentCompleted {
151        parent_session_id: completion.parent_session_id.clone(),
152        child_session_id: completion.child_session_id.clone(),
153        status: completion.status.clone(),
154        error: completion.error.clone(),
155    });
156
157    if let Some(handler) = completion_handler {
158        handler.on_child_completed(completion).await;
159    }
160}
161
162pub(crate) async fn publish_child_completion_parts(
163    parent_tx: &broadcast::Sender<AgentEvent>,
164    completion_handler: Option<Arc<dyn ChildCompletionHandler>>,
165    parent_session_id: String,
166    child_session_id: String,
167    status: String,
168    error: Option<String>,
169) {
170    publish_child_completion(
171        parent_tx,
172        completion_handler,
173        ChildCompletion {
174            parent_session_id,
175            child_session_id,
176            status,
177            error,
178            completed_at: Utc::now(),
179        },
180    )
181    .await;
182}
183
184pub(crate) async fn watch_child_liveness(
185    parent_session_id: String,
186    child_session_id: String,
187    runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
188    cancel_token: CancellationToken,
189    timeout_reason: Arc<RwLock<Option<String>>>,
190    done: CancellationToken,
191    policy: ChildWatchdogPolicy,
192) {
193    let mut ticker =
194        tokio::time::interval(Duration::from_secs(policy.check_interval_secs.max(1) as u64));
195    // Skip the immediate tick.
196    ticker.tick().await;
197
198    loop {
199        tokio::select! {
200            _ = done.cancelled() => return,
201            _ = ticker.tick() => {
202                if cancel_token.is_cancelled() {
203                    return;
204                }
205
206                let snapshot = {
207                    let guard = runners.read().await;
208                    guard.get(&child_session_id).cloned()
209                };
210                let Some(runner) = snapshot else {
211                    return;
212                };
213                if !matches!(runner.status, super::runner_state::AgentStatus::Running) {
214                    return;
215                }
216
217                let now = Utc::now();
218                let total_secs = now.signed_duration_since(runner.started_at).num_seconds();
219                if total_secs >= policy.max_total_secs {
220                    let reason = format!(
221                        "Child session timed out after {} seconds (max_total_secs={})",
222                        total_secs, policy.max_total_secs
223                    );
224                    tracing::warn!(
225                        parent_session_id = %parent_session_id,
226                        child_session_id = %child_session_id,
227                        reason = %reason,
228                        "child session total timeout; cancelling child runner"
229                    );
230                    *timeout_reason.write().await = Some(reason);
231                    cancel_token.cancel();
232                    return;
233                }
234
235                let last_activity_at = runner.last_event_at.unwrap_or(runner.started_at);
236                let idle_secs = now.signed_duration_since(last_activity_at).num_seconds();
237                if idle_secs >= policy.max_idle_secs {
238                    let reason = format!(
239                        "Child session idle timeout after {} seconds without events (max_idle_secs={})",
240                        idle_secs, policy.max_idle_secs
241                    );
242                    tracing::warn!(
243                        parent_session_id = %parent_session_id,
244                        child_session_id = %child_session_id,
245                        reason = %reason,
246                        last_tool_name = ?runner.last_tool_name,
247                        last_tool_phase = ?runner.last_tool_phase,
248                        round_count = runner.round_count,
249                        "child session idle timeout; cancelling child runner"
250                    );
251                    *timeout_reason.write().await = Some(reason);
252                    cancel_token.cancel();
253                    return;
254                }
255            }
256        }
257    }
258}
259
260/// Drive a single queued spawn job through the canonical child-spawn path.
261///
262/// ANTI-FORK: this is a 1-line delegator to [`crate::sdk::spawn::run_child_spawn`],
263/// which is the single implementation of the spawn/execute/finalize logic. The
264/// `SpawnScheduler` queue mechanics (above) remain here; the body lives in the SDK
265/// core so both the scheduler and the ergonomic `ChildRunner` funnel into it.
266async fn run_spawn_job(ctx: SpawnContext, job: SpawnJob) -> Result<(), String> {
267    crate::sdk::spawn::run_child_spawn(ctx, job).await
268}