bamboo_engine/runtime/execution/
spawn.rs1use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::Duration;
10
11use chrono::Utc;
12use tokio::sync::{broadcast, mpsc, RwLock};
13use tokio_util::sync::CancellationToken;
14
15use bamboo_agent_core::tools::ToolExecutor;
16use bamboo_agent_core::{AgentEvent, Session};
17use bamboo_llm::ProviderModelRouter;
18
19use crate::runtime::Agent;
20
21use super::child_completion::{ChildCompletion, ChildCompletionHandler};
22use super::runner_state::AgentRunner;
23
24#[derive(Debug, Clone)]
25pub struct SpawnJob {
26 pub parent_session_id: String,
27 pub child_session_id: String,
28 pub model: String,
29 pub disabled_tools: Option<Vec<String>>,
32}
33
34#[async_trait::async_trait]
39pub trait ExternalChildRunner: Send + Sync {
40 async fn should_handle(&self, session: &Session) -> bool;
42
43 async fn execute_external_child(
45 &self,
46 session: &mut Session,
47 job: &SpawnJob,
48 event_tx: tokio::sync::mpsc::Sender<AgentEvent>,
49 cancel_token: CancellationToken,
50 ) -> crate::runtime::runner::Result<()>;
51
52 fn set_escalation_bridge(&self, _bridge: Option<bamboo_subagent::executor::HostBridge>) {}
59}
60
61#[derive(Clone)]
62pub struct SpawnContext {
63 pub agent: Arc<Agent>,
64 pub tools: Arc<dyn ToolExecutor>,
65 pub sessions_cache: crate::SessionCache,
66 pub agent_runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
67 pub session_event_senders: Arc<RwLock<HashMap<String, broadcast::Sender<AgentEvent>>>>,
68 pub external_child_runner: Arc<dyn ExternalChildRunner>,
69 pub provider_router: Option<Arc<ProviderModelRouter>>,
70 pub app_data_dir: Option<std::path::PathBuf>,
71 pub completion_handler: Option<Arc<dyn ChildCompletionHandler>>,
76 pub account_feed_inbox: Option<super::event_forwarder::AccountFeedInbox>,
80}
81
82#[derive(Clone)]
83pub struct SpawnScheduler {
84 tx: mpsc::Sender<SpawnJob>,
85}
86
87impl SpawnScheduler {
88 pub fn new(ctx: SpawnContext) -> Self {
89 let (tx, mut rx) = mpsc::channel::<SpawnJob>(128);
90
91 tokio::spawn(async move {
92 while let Some(job) = rx.recv().await {
93 if let Err(err) = run_spawn_job(ctx.clone(), job).await {
94 tracing::warn!("spawn job failed: {}", err);
95 }
96 }
97 });
98
99 Self { tx }
100 }
101
102 pub async fn enqueue(&self, job: SpawnJob) -> Result<(), String> {
103 self.tx
104 .send(job)
105 .await
106 .map_err(|_| "spawn scheduler is not running".to_string())
107 }
108}
109
110#[derive(Debug, Clone, Copy)]
111pub(crate) struct ChildWatchdogPolicy {
112 check_interval_secs: i64,
113 max_total_secs: i64,
114 max_idle_secs: i64,
115}
116
117impl Default for ChildWatchdogPolicy {
118 fn default() -> Self {
119 Self {
120 check_interval_secs: 15,
121 max_total_secs: 60 * 60,
125 max_idle_secs: 15 * 60,
127 }
128 }
129}
130
131fn metadata_i64(session: &Session, key: &str) -> Option<i64> {
132 session
133 .metadata
134 .get(key)
135 .and_then(|value| value.trim().parse::<i64>().ok())
136 .filter(|value| *value > 0)
137}
138
139pub(crate) fn watchdog_policy_for_session(session: &Session) -> ChildWatchdogPolicy {
140 let mut policy = ChildWatchdogPolicy::default();
141 if let Some(value) = metadata_i64(session, "child_watchdog.max_total_secs") {
142 policy.max_total_secs = value;
143 }
144 if let Some(value) = metadata_i64(session, "child_watchdog.max_idle_secs") {
145 policy.max_idle_secs = value;
146 }
147 if let Some(value) = metadata_i64(session, "child_watchdog.check_interval_secs") {
148 policy.check_interval_secs = value;
149 }
150 policy
151}
152
153async fn publish_child_completion(
154 parent_tx: &broadcast::Sender<AgentEvent>,
155 completion_handler: Option<Arc<dyn ChildCompletionHandler>>,
156 completion: ChildCompletion,
157) {
158 let _ = parent_tx.send(AgentEvent::SubAgentCompleted {
159 parent_session_id: completion.parent_session_id.clone(),
160 child_session_id: completion.child_session_id.clone(),
161 status: completion.status.clone(),
162 error: completion.error.clone(),
163 });
164
165 if let Some(handler) = completion_handler {
166 handler.on_child_completed(completion).await;
167 }
168}
169
170pub(crate) async fn publish_child_completion_parts(
171 parent_tx: &broadcast::Sender<AgentEvent>,
172 completion_handler: Option<Arc<dyn ChildCompletionHandler>>,
173 parent_session_id: String,
174 child_session_id: String,
175 status: String,
176 error: Option<String>,
177) {
178 publish_child_completion(
179 parent_tx,
180 completion_handler,
181 ChildCompletion {
182 parent_session_id,
183 child_session_id,
184 status,
185 error,
186 completed_at: Utc::now(),
187 },
188 )
189 .await;
190}
191
192pub(crate) async fn watch_child_liveness(
193 parent_session_id: String,
194 child_session_id: String,
195 runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
196 cancel_token: CancellationToken,
197 timeout_reason: Arc<RwLock<Option<String>>>,
198 done: CancellationToken,
199 policy: ChildWatchdogPolicy,
200) {
201 let mut ticker =
202 tokio::time::interval(Duration::from_secs(policy.check_interval_secs.max(1) as u64));
203 ticker.tick().await;
205
206 loop {
207 tokio::select! {
208 _ = done.cancelled() => return,
209 _ = ticker.tick() => {
210 if cancel_token.is_cancelled() {
211 return;
212 }
213
214 let snapshot = {
215 let guard = runners.read().await;
216 guard.get(&child_session_id).cloned()
217 };
218 let Some(runner) = snapshot else {
219 return;
220 };
221 if !matches!(runner.status, super::runner_state::AgentStatus::Running) {
222 return;
223 }
224
225 let now = Utc::now();
226 let total_secs = now.signed_duration_since(runner.started_at).num_seconds();
227 if total_secs >= policy.max_total_secs {
228 let reason = format!(
229 "Child session timed out after {} seconds (max_total_secs={})",
230 total_secs, policy.max_total_secs
231 );
232 tracing::warn!(
233 parent_session_id = %parent_session_id,
234 child_session_id = %child_session_id,
235 reason = %reason,
236 "child session total timeout; cancelling child runner"
237 );
238 *timeout_reason.write().await = Some(reason);
239 cancel_token.cancel();
240 return;
241 }
242
243 let last_activity_at = runner.last_event_at.unwrap_or(runner.started_at);
244 let idle_secs = now.signed_duration_since(last_activity_at).num_seconds();
245 if idle_secs >= policy.max_idle_secs {
246 let reason = format!(
247 "Child session idle timeout after {} seconds without events (max_idle_secs={})",
248 idle_secs, policy.max_idle_secs
249 );
250 tracing::warn!(
251 parent_session_id = %parent_session_id,
252 child_session_id = %child_session_id,
253 reason = %reason,
254 last_tool_name = ?runner.last_tool_name,
255 last_tool_phase = ?runner.last_tool_phase,
256 round_count = runner.round_count,
257 "child session idle timeout; cancelling child runner"
258 );
259 *timeout_reason.write().await = Some(reason);
260 cancel_token.cancel();
261 return;
262 }
263 }
264 }
265 }
266}
267
268async fn run_spawn_job(ctx: SpawnContext, job: SpawnJob) -> Result<(), String> {
275 crate::sdk::spawn::run_child_spawn(ctx, job).await
276}