Skip to main content

bamboo_engine/runtime/execution/
spawn.rs

1//! Sub-session spawn scheduler.
2//!
3//! Provides a background queue for spawning child sessions. Spawn is async
4//! (tool returns immediately), but the UI can observe child progress via
5//! events forwarded to the parent session stream.
6
7use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::Duration;
10
11use chrono::Utc;
12use tokio::sync::{broadcast, mpsc, RwLock};
13use tokio_util::sync::CancellationToken;
14
15use bamboo_agent_core::tools::ToolExecutor;
16use bamboo_agent_core::{AgentEvent, Session};
17use bamboo_domain::ProviderModelRef;
18use bamboo_infrastructure::{LLMProvider, ProviderModelRouter};
19
20use crate::runtime::Agent;
21
22use super::child_completion::{ChildCompletion, ChildCompletionHandler};
23use super::runner_state::AgentRunner;
24
25#[derive(Debug, Clone)]
26pub struct SpawnJob {
27    pub parent_session_id: String,
28    pub child_session_id: String,
29    pub model: String,
30    /// Tool names to hide from the LLM schema for this child session.
31    /// Computed from the child's `subagent_type` profile policy.
32    pub disabled_tools: Option<Vec<String>>,
33}
34
35/// Trait for external child session runtimes (e.g. A2A, CLI adapters).
36///
37/// Implementors are responsible for emitting AgentEvents via `event_tx`
38/// and respecting the `cancel_token`.
39#[async_trait::async_trait]
40pub trait ExternalChildRunner: Send + Sync {
41    /// Returns true if this runner should handle the given child session.
42    async fn should_handle(&self, session: &Session) -> bool;
43
44    /// Execute the child session using an external runtime.
45    async fn execute_external_child(
46        &self,
47        session: &mut Session,
48        job: &SpawnJob,
49        event_tx: tokio::sync::mpsc::Sender<AgentEvent>,
50        cancel_token: CancellationToken,
51    ) -> crate::runtime::runner::Result<()>;
52}
53
54#[derive(Clone)]
55pub struct SpawnContext {
56    pub agent: Arc<Agent>,
57    pub tools: Arc<dyn ToolExecutor>,
58    pub sessions_cache: Arc<RwLock<HashMap<String, Session>>>,
59    pub agent_runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
60    pub session_event_senders: Arc<RwLock<HashMap<String, broadcast::Sender<AgentEvent>>>>,
61    pub external_child_runner: Option<Arc<dyn ExternalChildRunner>>,
62    pub provider_router: Option<Arc<ProviderModelRouter>>,
63    pub app_data_dir: Option<std::path::PathBuf>,
64    /// Optional application-layer completion hook. The engine still emits
65    /// `SubAgentCompleted` to the parent stream itself; this hook lets the
66    /// server persist parent wait state and resume the parent runner without
67    /// introducing an engine -> AppState dependency.
68    pub completion_handler: Option<Arc<dyn ChildCompletionHandler>>,
69    /// Optional inbox to the account-wide change feed. When present, durable
70    /// change events from child-session execution are mirrored onto the feed
71    /// for resumable multi-client sync.
72    pub account_feed_inbox: Option<super::event_forwarder::AccountFeedInbox>,
73}
74
75#[derive(Clone)]
76pub struct SpawnScheduler {
77    tx: mpsc::Sender<SpawnJob>,
78}
79
80impl SpawnScheduler {
81    pub fn new(ctx: SpawnContext) -> Self {
82        let (tx, mut rx) = mpsc::channel::<SpawnJob>(128);
83
84        tokio::spawn(async move {
85            while let Some(job) = rx.recv().await {
86                if let Err(err) = run_spawn_job(ctx.clone(), job).await {
87                    tracing::warn!("spawn job failed: {}", err);
88                }
89            }
90        });
91
92        Self { tx }
93    }
94
95    pub async fn enqueue(&self, job: SpawnJob) -> Result<(), String> {
96        self.tx
97            .send(job)
98            .await
99            .map_err(|_| "spawn scheduler is not running".to_string())
100    }
101}
102
103pub(crate) fn child_model_ref(session: &Session, model: &str) -> Option<ProviderModelRef> {
104    if let Some(model_ref) = session.model_ref.clone() {
105        let provider = model_ref.provider.trim();
106        let model_name = model_ref.model.trim();
107        if !provider.is_empty() && !model_name.is_empty() {
108            return Some(ProviderModelRef::new(provider, model_name));
109        }
110    }
111
112    let provider = session
113        .metadata
114        .get("provider_name")
115        .map(String::as_str)
116        .map(str::trim)
117        .filter(|value| !value.is_empty())?;
118    let model_name = model.trim();
119    if model_name.is_empty() {
120        return None;
121    }
122    Some(ProviderModelRef::new(provider, model_name))
123}
124
125#[derive(Debug, Clone, Copy)]
126pub(crate) struct ChildWatchdogPolicy {
127    check_interval_secs: i64,
128    max_total_secs: i64,
129    max_idle_secs: i64,
130}
131
132impl Default for ChildWatchdogPolicy {
133    fn default() -> Self {
134        Self {
135            check_interval_secs: 15,
136            // Parent waits may be longer, but child execution owns its own
137            // liveness. A one hour total cap avoids indefinitely orphaned
138            // sub-session runners.
139            max_total_secs: 60 * 60,
140            // No child event for 15 minutes is considered stalled.
141            max_idle_secs: 15 * 60,
142        }
143    }
144}
145
146fn metadata_i64(session: &Session, key: &str) -> Option<i64> {
147    session
148        .metadata
149        .get(key)
150        .and_then(|value| value.trim().parse::<i64>().ok())
151        .filter(|value| *value > 0)
152}
153
154pub(crate) fn watchdog_policy_for_session(session: &Session) -> ChildWatchdogPolicy {
155    let mut policy = ChildWatchdogPolicy::default();
156    if let Some(value) = metadata_i64(session, "child_watchdog.max_total_secs") {
157        policy.max_total_secs = value;
158    }
159    if let Some(value) = metadata_i64(session, "child_watchdog.max_idle_secs") {
160        policy.max_idle_secs = value;
161    }
162    if let Some(value) = metadata_i64(session, "child_watchdog.check_interval_secs") {
163        policy.check_interval_secs = value;
164    }
165    policy
166}
167
168async fn publish_child_completion(
169    parent_tx: &broadcast::Sender<AgentEvent>,
170    completion_handler: Option<Arc<dyn ChildCompletionHandler>>,
171    completion: ChildCompletion,
172) {
173    let _ = parent_tx.send(AgentEvent::SubAgentCompleted {
174        parent_session_id: completion.parent_session_id.clone(),
175        child_session_id: completion.child_session_id.clone(),
176        status: completion.status.clone(),
177        error: completion.error.clone(),
178    });
179
180    if let Some(handler) = completion_handler {
181        handler.on_child_completed(completion).await;
182    }
183}
184
185pub(crate) async fn publish_child_completion_parts(
186    parent_tx: &broadcast::Sender<AgentEvent>,
187    completion_handler: Option<Arc<dyn ChildCompletionHandler>>,
188    parent_session_id: String,
189    child_session_id: String,
190    status: String,
191    error: Option<String>,
192) {
193    publish_child_completion(
194        parent_tx,
195        completion_handler,
196        ChildCompletion {
197            parent_session_id,
198            child_session_id,
199            status,
200            error,
201            completed_at: Utc::now(),
202        },
203    )
204    .await;
205}
206
207pub(crate) async fn watch_child_liveness(
208    parent_session_id: String,
209    child_session_id: String,
210    runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
211    cancel_token: CancellationToken,
212    timeout_reason: Arc<RwLock<Option<String>>>,
213    done: CancellationToken,
214    policy: ChildWatchdogPolicy,
215) {
216    let mut ticker =
217        tokio::time::interval(Duration::from_secs(policy.check_interval_secs.max(1) as u64));
218    // Skip the immediate tick.
219    ticker.tick().await;
220
221    loop {
222        tokio::select! {
223            _ = done.cancelled() => return,
224            _ = ticker.tick() => {
225                if cancel_token.is_cancelled() {
226                    return;
227                }
228
229                let snapshot = {
230                    let guard = runners.read().await;
231                    guard.get(&child_session_id).cloned()
232                };
233                let Some(runner) = snapshot else {
234                    return;
235                };
236                if !matches!(runner.status, super::runner_state::AgentStatus::Running) {
237                    return;
238                }
239
240                let now = Utc::now();
241                let total_secs = now.signed_duration_since(runner.started_at).num_seconds();
242                if total_secs >= policy.max_total_secs {
243                    let reason = format!(
244                        "Child session timed out after {} seconds (max_total_secs={})",
245                        total_secs, policy.max_total_secs
246                    );
247                    tracing::warn!(
248                        parent_session_id = %parent_session_id,
249                        child_session_id = %child_session_id,
250                        reason = %reason,
251                        "child session total timeout; cancelling child runner"
252                    );
253                    *timeout_reason.write().await = Some(reason);
254                    cancel_token.cancel();
255                    return;
256                }
257
258                let last_activity_at = runner.last_event_at.unwrap_or(runner.started_at);
259                let idle_secs = now.signed_duration_since(last_activity_at).num_seconds();
260                if idle_secs >= policy.max_idle_secs {
261                    let reason = format!(
262                        "Child session idle timeout after {} seconds without events (max_idle_secs={})",
263                        idle_secs, policy.max_idle_secs
264                    );
265                    tracing::warn!(
266                        parent_session_id = %parent_session_id,
267                        child_session_id = %child_session_id,
268                        reason = %reason,
269                        last_tool_name = ?runner.last_tool_name,
270                        last_tool_phase = ?runner.last_tool_phase,
271                        round_count = runner.round_count,
272                        "child session idle timeout; cancelling child runner"
273                    );
274                    *timeout_reason.write().await = Some(reason);
275                    cancel_token.cancel();
276                    return;
277                }
278            }
279        }
280    }
281}
282
283pub(crate) fn resolve_child_provider_override(
284    router: Option<&Arc<ProviderModelRouter>>,
285    session: &Session,
286    model: &str,
287) -> (Option<Arc<dyn LLMProvider>>, Option<String>, Option<String>) {
288    let model_ref = child_model_ref(session, model);
289    let provider_name = model_ref
290        .as_ref()
291        .map(|model_ref| model_ref.provider.clone());
292    let provider_type = if let (Some(router), Some(model_ref)) = (router, model_ref.as_ref()) {
293        router.provider_type_for(model_ref)
294    } else {
295        provider_name.clone()
296    };
297    let provider = router.and_then(|router| {
298        let model_ref = model_ref.as_ref()?;
299        match router.route(model_ref) {
300            Ok(provider) => Some(provider),
301            Err(error) => {
302                tracing::warn!(
303                    session_id = %session.id,
304                    provider = %model_ref.provider,
305                    model = %model_ref.model,
306                    error = %error,
307                    "failed to resolve provider override for child session; falling back to runtime provider"
308                );
309                None
310            }
311        }
312    });
313    (provider, provider_name, provider_type)
314}
315
316/// Drive a single queued spawn job through the canonical child-spawn path.
317///
318/// ANTI-FORK: this is a 1-line delegator to [`crate::sdk::spawn::run_child_spawn`],
319/// which is the single implementation of the spawn/execute/finalize logic. The
320/// `SpawnScheduler` queue mechanics (above) remain here; the body lives in the SDK
321/// core so both the scheduler and the ergonomic `ProfileRunner` funnel into it.
322async fn run_spawn_job(ctx: SpawnContext, job: SpawnJob) -> Result<(), String> {
323    crate::sdk::spawn::run_child_spawn(ctx, job).await
324}