bamboo_engine/runtime/execution/
spawn.rs1use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::Duration;
10
11use chrono::Utc;
12use tokio::sync::{broadcast, mpsc, RwLock};
13use tokio_util::sync::CancellationToken;
14
15use bamboo_agent_core::tools::ToolExecutor;
16use bamboo_agent_core::{AgentEvent, Session};
17use bamboo_llm::ProviderModelRouter;
18
19use crate::runtime::Agent;
20
21use super::child_completion::{ChildCompletion, ChildCompletionHandler};
22use super::runner_state::AgentRunner;
23
24#[derive(Debug, Clone)]
25pub struct SpawnJob {
26 pub parent_session_id: String,
27 pub child_session_id: String,
28 pub model: String,
29 pub disabled_tools: Option<Vec<String>>,
32}
33
34#[async_trait::async_trait]
39pub trait ExternalChildRunner: Send + Sync {
40 async fn should_handle(&self, session: &Session) -> bool;
42
43 async fn execute_external_child(
45 &self,
46 session: &mut Session,
47 job: &SpawnJob,
48 event_tx: tokio::sync::mpsc::Sender<AgentEvent>,
49 cancel_token: CancellationToken,
50 ) -> crate::runtime::runner::Result<()>;
51}
52
53#[derive(Clone)]
54pub struct SpawnContext {
55 pub agent: Arc<Agent>,
56 pub tools: Arc<dyn ToolExecutor>,
57 pub sessions_cache: crate::SessionCache,
58 pub agent_runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
59 pub session_event_senders: Arc<RwLock<HashMap<String, broadcast::Sender<AgentEvent>>>>,
60 pub external_child_runner: Arc<dyn ExternalChildRunner>,
61 pub provider_router: Option<Arc<ProviderModelRouter>>,
62 pub app_data_dir: Option<std::path::PathBuf>,
63 pub completion_handler: Option<Arc<dyn ChildCompletionHandler>>,
68 pub account_feed_inbox: Option<super::event_forwarder::AccountFeedInbox>,
72}
73
74#[derive(Clone)]
75pub struct SpawnScheduler {
76 tx: mpsc::Sender<SpawnJob>,
77}
78
79impl SpawnScheduler {
80 pub fn new(ctx: SpawnContext) -> Self {
81 let (tx, mut rx) = mpsc::channel::<SpawnJob>(128);
82
83 tokio::spawn(async move {
84 while let Some(job) = rx.recv().await {
85 if let Err(err) = run_spawn_job(ctx.clone(), job).await {
86 tracing::warn!("spawn job failed: {}", err);
87 }
88 }
89 });
90
91 Self { tx }
92 }
93
94 pub async fn enqueue(&self, job: SpawnJob) -> Result<(), String> {
95 self.tx
96 .send(job)
97 .await
98 .map_err(|_| "spawn scheduler is not running".to_string())
99 }
100}
101
102#[derive(Debug, Clone, Copy)]
103pub(crate) struct ChildWatchdogPolicy {
104 check_interval_secs: i64,
105 max_total_secs: i64,
106 max_idle_secs: i64,
107}
108
109impl Default for ChildWatchdogPolicy {
110 fn default() -> Self {
111 Self {
112 check_interval_secs: 15,
113 max_total_secs: 60 * 60,
117 max_idle_secs: 15 * 60,
119 }
120 }
121}
122
123fn metadata_i64(session: &Session, key: &str) -> Option<i64> {
124 session
125 .metadata
126 .get(key)
127 .and_then(|value| value.trim().parse::<i64>().ok())
128 .filter(|value| *value > 0)
129}
130
131pub(crate) fn watchdog_policy_for_session(session: &Session) -> ChildWatchdogPolicy {
132 let mut policy = ChildWatchdogPolicy::default();
133 if let Some(value) = metadata_i64(session, "child_watchdog.max_total_secs") {
134 policy.max_total_secs = value;
135 }
136 if let Some(value) = metadata_i64(session, "child_watchdog.max_idle_secs") {
137 policy.max_idle_secs = value;
138 }
139 if let Some(value) = metadata_i64(session, "child_watchdog.check_interval_secs") {
140 policy.check_interval_secs = value;
141 }
142 policy
143}
144
145async fn publish_child_completion(
146 parent_tx: &broadcast::Sender<AgentEvent>,
147 completion_handler: Option<Arc<dyn ChildCompletionHandler>>,
148 completion: ChildCompletion,
149) {
150 let _ = parent_tx.send(AgentEvent::SubAgentCompleted {
151 parent_session_id: completion.parent_session_id.clone(),
152 child_session_id: completion.child_session_id.clone(),
153 status: completion.status.clone(),
154 error: completion.error.clone(),
155 });
156
157 if let Some(handler) = completion_handler {
158 handler.on_child_completed(completion).await;
159 }
160}
161
162pub(crate) async fn publish_child_completion_parts(
163 parent_tx: &broadcast::Sender<AgentEvent>,
164 completion_handler: Option<Arc<dyn ChildCompletionHandler>>,
165 parent_session_id: String,
166 child_session_id: String,
167 status: String,
168 error: Option<String>,
169) {
170 publish_child_completion(
171 parent_tx,
172 completion_handler,
173 ChildCompletion {
174 parent_session_id,
175 child_session_id,
176 status,
177 error,
178 completed_at: Utc::now(),
179 },
180 )
181 .await;
182}
183
184pub(crate) async fn watch_child_liveness(
185 parent_session_id: String,
186 child_session_id: String,
187 runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
188 cancel_token: CancellationToken,
189 timeout_reason: Arc<RwLock<Option<String>>>,
190 done: CancellationToken,
191 policy: ChildWatchdogPolicy,
192) {
193 let mut ticker =
194 tokio::time::interval(Duration::from_secs(policy.check_interval_secs.max(1) as u64));
195 ticker.tick().await;
197
198 loop {
199 tokio::select! {
200 _ = done.cancelled() => return,
201 _ = ticker.tick() => {
202 if cancel_token.is_cancelled() {
203 return;
204 }
205
206 let snapshot = {
207 let guard = runners.read().await;
208 guard.get(&child_session_id).cloned()
209 };
210 let Some(runner) = snapshot else {
211 return;
212 };
213 if !matches!(runner.status, super::runner_state::AgentStatus::Running) {
214 return;
215 }
216
217 let now = Utc::now();
218 let total_secs = now.signed_duration_since(runner.started_at).num_seconds();
219 if total_secs >= policy.max_total_secs {
220 let reason = format!(
221 "Child session timed out after {} seconds (max_total_secs={})",
222 total_secs, policy.max_total_secs
223 );
224 tracing::warn!(
225 parent_session_id = %parent_session_id,
226 child_session_id = %child_session_id,
227 reason = %reason,
228 "child session total timeout; cancelling child runner"
229 );
230 *timeout_reason.write().await = Some(reason);
231 cancel_token.cancel();
232 return;
233 }
234
235 let last_activity_at = runner.last_event_at.unwrap_or(runner.started_at);
236 let idle_secs = now.signed_duration_since(last_activity_at).num_seconds();
237 if idle_secs >= policy.max_idle_secs {
238 let reason = format!(
239 "Child session idle timeout after {} seconds without events (max_idle_secs={})",
240 idle_secs, policy.max_idle_secs
241 );
242 tracing::warn!(
243 parent_session_id = %parent_session_id,
244 child_session_id = %child_session_id,
245 reason = %reason,
246 last_tool_name = ?runner.last_tool_name,
247 last_tool_phase = ?runner.last_tool_phase,
248 round_count = runner.round_count,
249 "child session idle timeout; cancelling child runner"
250 );
251 *timeout_reason.write().await = Some(reason);
252 cancel_token.cancel();
253 return;
254 }
255 }
256 }
257 }
258}
259
260async fn run_spawn_job(ctx: SpawnContext, job: SpawnJob) -> Result<(), String> {
267 crate::sdk::spawn::run_child_spawn(ctx, job).await
268}