1use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::Duration;
10
11use chrono::Utc;
12use tokio::sync::{broadcast, mpsc, RwLock};
13use tokio_util::sync::CancellationToken;
14
15use bamboo_agent_core::tools::ToolExecutor;
16use bamboo_agent_core::{AgentEvent, Role, Session, SessionKind};
17use bamboo_domain::ProviderModelRef;
18use bamboo_infrastructure::{LLMProvider, ProviderModelRouter};
19
20use crate::runtime::Agent;
21use crate::runtime::ExecuteRequest;
22
23use super::child_completion::{ChildCompletion, ChildCompletionHandler};
24use super::event_forwarder::create_event_forwarder;
25use super::runner_lifecycle::{finalize_runner, try_reserve_runner, RunnerReservation};
26use super::runner_state::AgentRunner;
27use super::session_events::get_or_create_event_sender;
28
29#[derive(Debug, Clone)]
30pub struct SpawnJob {
31 pub parent_session_id: String,
32 pub child_session_id: String,
33 pub model: String,
34 pub disabled_tools: Option<Vec<String>>,
37}
38
39#[async_trait::async_trait]
44pub trait ExternalChildRunner: Send + Sync {
45 async fn should_handle(&self, session: &Session) -> bool;
47
48 async fn execute_external_child(
50 &self,
51 session: &mut Session,
52 job: &SpawnJob,
53 event_tx: tokio::sync::mpsc::Sender<AgentEvent>,
54 cancel_token: CancellationToken,
55 ) -> crate::runtime::runner::Result<()>;
56}
57
58#[derive(Clone)]
59pub struct SpawnContext {
60 pub agent: Arc<Agent>,
61 pub tools: Arc<dyn ToolExecutor>,
62 pub sessions_cache: Arc<RwLock<HashMap<String, Session>>>,
63 pub agent_runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
64 pub session_event_senders: Arc<RwLock<HashMap<String, broadcast::Sender<AgentEvent>>>>,
65 pub external_child_runner: Option<Arc<dyn ExternalChildRunner>>,
66 pub provider_router: Option<Arc<ProviderModelRouter>>,
67 pub app_data_dir: Option<std::path::PathBuf>,
68 pub completion_handler: Option<Arc<dyn ChildCompletionHandler>>,
73 pub account_feed_inbox: Option<super::event_forwarder::AccountFeedInbox>,
77}
78
79#[derive(Clone)]
80pub struct SpawnScheduler {
81 tx: mpsc::Sender<SpawnJob>,
82}
83
84impl SpawnScheduler {
85 pub fn new(ctx: SpawnContext) -> Self {
86 let (tx, mut rx) = mpsc::channel::<SpawnJob>(128);
87
88 tokio::spawn(async move {
89 while let Some(job) = rx.recv().await {
90 if let Err(err) = run_spawn_job(ctx.clone(), job).await {
91 tracing::warn!("spawn job failed: {}", err);
92 }
93 }
94 });
95
96 Self { tx }
97 }
98
99 pub async fn enqueue(&self, job: SpawnJob) -> Result<(), String> {
100 self.tx
101 .send(job)
102 .await
103 .map_err(|_| "spawn scheduler is not running".to_string())
104 }
105}
106
107fn child_model_ref(session: &Session, model: &str) -> Option<ProviderModelRef> {
108 if let Some(model_ref) = session.model_ref.clone() {
109 let provider = model_ref.provider.trim();
110 let model_name = model_ref.model.trim();
111 if !provider.is_empty() && !model_name.is_empty() {
112 return Some(ProviderModelRef::new(provider, model_name));
113 }
114 }
115
116 let provider = session
117 .metadata
118 .get("provider_name")
119 .map(String::as_str)
120 .map(str::trim)
121 .filter(|value| !value.is_empty())?;
122 let model_name = model.trim();
123 if model_name.is_empty() {
124 return None;
125 }
126 Some(ProviderModelRef::new(provider, model_name))
127}
128
129#[derive(Debug, Clone, Copy)]
130struct ChildWatchdogPolicy {
131 check_interval_secs: i64,
132 max_total_secs: i64,
133 max_idle_secs: i64,
134}
135
136impl Default for ChildWatchdogPolicy {
137 fn default() -> Self {
138 Self {
139 check_interval_secs: 15,
140 max_total_secs: 60 * 60,
144 max_idle_secs: 15 * 60,
146 }
147 }
148}
149
150fn metadata_i64(session: &Session, key: &str) -> Option<i64> {
151 session
152 .metadata
153 .get(key)
154 .and_then(|value| value.trim().parse::<i64>().ok())
155 .filter(|value| *value > 0)
156}
157
158fn watchdog_policy_for_session(session: &Session) -> ChildWatchdogPolicy {
159 let mut policy = ChildWatchdogPolicy::default();
160 if let Some(value) = metadata_i64(session, "child_watchdog.max_total_secs") {
161 policy.max_total_secs = value;
162 }
163 if let Some(value) = metadata_i64(session, "child_watchdog.max_idle_secs") {
164 policy.max_idle_secs = value;
165 }
166 if let Some(value) = metadata_i64(session, "child_watchdog.check_interval_secs") {
167 policy.check_interval_secs = value;
168 }
169 policy
170}
171
172async fn publish_child_completion(
173 parent_tx: &broadcast::Sender<AgentEvent>,
174 completion_handler: Option<Arc<dyn ChildCompletionHandler>>,
175 completion: ChildCompletion,
176) {
177 let _ = parent_tx.send(AgentEvent::SubAgentCompleted {
178 parent_session_id: completion.parent_session_id.clone(),
179 child_session_id: completion.child_session_id.clone(),
180 status: completion.status.clone(),
181 error: completion.error.clone(),
182 });
183
184 if let Some(handler) = completion_handler {
185 handler.on_child_completed(completion).await;
186 }
187}
188
189async fn publish_child_completion_parts(
190 parent_tx: &broadcast::Sender<AgentEvent>,
191 completion_handler: Option<Arc<dyn ChildCompletionHandler>>,
192 parent_session_id: String,
193 child_session_id: String,
194 status: String,
195 error: Option<String>,
196) {
197 publish_child_completion(
198 parent_tx,
199 completion_handler,
200 ChildCompletion {
201 parent_session_id,
202 child_session_id,
203 status,
204 error,
205 completed_at: Utc::now(),
206 },
207 )
208 .await;
209}
210
211async fn watch_child_liveness(
212 parent_session_id: String,
213 child_session_id: String,
214 runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
215 cancel_token: CancellationToken,
216 timeout_reason: Arc<RwLock<Option<String>>>,
217 done: CancellationToken,
218 policy: ChildWatchdogPolicy,
219) {
220 let mut ticker =
221 tokio::time::interval(Duration::from_secs(policy.check_interval_secs.max(1) as u64));
222 ticker.tick().await;
224
225 loop {
226 tokio::select! {
227 _ = done.cancelled() => return,
228 _ = ticker.tick() => {
229 if cancel_token.is_cancelled() {
230 return;
231 }
232
233 let snapshot = {
234 let guard = runners.read().await;
235 guard.get(&child_session_id).cloned()
236 };
237 let Some(runner) = snapshot else {
238 return;
239 };
240 if !matches!(runner.status, super::runner_state::AgentStatus::Running) {
241 return;
242 }
243
244 let now = Utc::now();
245 let total_secs = now.signed_duration_since(runner.started_at).num_seconds();
246 if total_secs >= policy.max_total_secs {
247 let reason = format!(
248 "Child session timed out after {} seconds (max_total_secs={})",
249 total_secs, policy.max_total_secs
250 );
251 tracing::warn!(
252 parent_session_id = %parent_session_id,
253 child_session_id = %child_session_id,
254 reason = %reason,
255 "child session total timeout; cancelling child runner"
256 );
257 *timeout_reason.write().await = Some(reason);
258 cancel_token.cancel();
259 return;
260 }
261
262 let last_activity_at = runner.last_event_at.unwrap_or(runner.started_at);
263 let idle_secs = now.signed_duration_since(last_activity_at).num_seconds();
264 if idle_secs >= policy.max_idle_secs {
265 let reason = format!(
266 "Child session idle timeout after {} seconds without events (max_idle_secs={})",
267 idle_secs, policy.max_idle_secs
268 );
269 tracing::warn!(
270 parent_session_id = %parent_session_id,
271 child_session_id = %child_session_id,
272 reason = %reason,
273 last_tool_name = ?runner.last_tool_name,
274 last_tool_phase = ?runner.last_tool_phase,
275 round_count = runner.round_count,
276 "child session idle timeout; cancelling child runner"
277 );
278 *timeout_reason.write().await = Some(reason);
279 cancel_token.cancel();
280 return;
281 }
282 }
283 }
284 }
285}
286
287fn resolve_child_provider_override(
288 router: Option<&Arc<ProviderModelRouter>>,
289 session: &Session,
290 model: &str,
291) -> (Option<Arc<dyn LLMProvider>>, Option<String>, Option<String>) {
292 let model_ref = child_model_ref(session, model);
293 let provider_name = model_ref
294 .as_ref()
295 .map(|model_ref| model_ref.provider.clone());
296 let provider_type = if let (Some(router), Some(model_ref)) = (router, model_ref.as_ref()) {
297 router.provider_type_for(model_ref)
298 } else {
299 provider_name.clone()
300 };
301 let provider = router.and_then(|router| {
302 let model_ref = model_ref.as_ref()?;
303 match router.route(model_ref) {
304 Ok(provider) => Some(provider),
305 Err(error) => {
306 tracing::warn!(
307 session_id = %session.id,
308 provider = %model_ref.provider,
309 model = %model_ref.model,
310 error = %error,
311 "failed to resolve provider override for child session; falling back to runtime provider"
312 );
313 None
314 }
315 }
316 });
317 (provider, provider_name, provider_type)
318}
319
320async fn run_spawn_job(ctx: SpawnContext, job: SpawnJob) -> Result<(), String> {
321 let parent_tx =
323 get_or_create_event_sender(&ctx.session_event_senders, &job.parent_session_id).await;
324 let child_tx =
325 get_or_create_event_sender(&ctx.session_event_senders, &job.child_session_id).await;
326
327 let mut session = match ctx
329 .agent
330 .storage()
331 .load_session(&job.child_session_id)
332 .await
333 {
334 Ok(Some(s)) => s,
335 Ok(None) => {
336 let error = "child session not found".to_string();
337 publish_child_completion_parts(
338 &parent_tx,
339 ctx.completion_handler.clone(),
340 job.parent_session_id.clone(),
341 job.child_session_id.clone(),
342 "error".to_string(),
343 Some(error.clone()),
344 )
345 .await;
346 return Err(error);
347 }
348 Err(e) => {
349 let error = format!("failed to load child session: {e}");
350 publish_child_completion_parts(
351 &parent_tx,
352 ctx.completion_handler.clone(),
353 job.parent_session_id.clone(),
354 job.child_session_id.clone(),
355 "error".to_string(),
356 Some(error.clone()),
357 )
358 .await;
359 return Err(error);
360 }
361 };
362
363 if let Some(ref ws) = session.workspace {
366 bamboo_agent_core::workspace_state::set_workspace(
367 &session.id,
368 std::path::PathBuf::from(ws),
369 );
370 }
371
372 if session.kind != SessionKind::Child {
373 let error = "spawn job child session is not kind=child".to_string();
374 publish_child_completion_parts(
375 &parent_tx,
376 ctx.completion_handler.clone(),
377 job.parent_session_id.clone(),
378 job.child_session_id.clone(),
379 "error".to_string(),
380 Some(error.clone()),
381 )
382 .await;
383 return Err(error);
384 }
385
386 let last_is_user = session
388 .messages
389 .last()
390 .map(|m| matches!(m.role, Role::User))
391 .unwrap_or(false);
392 if !last_is_user {
393 session
394 .metadata
395 .insert("last_run_status".to_string(), "skipped".to_string());
396 session.metadata.insert(
397 "last_run_error".to_string(),
398 "No pending message to execute".to_string(),
399 );
400 let _ = ctx
401 .agent
402 .persistence()
403 .save_runtime_session(&mut session)
404 .await;
405 {
406 let mut sessions = ctx.sessions_cache.write().await;
407 sessions.insert(job.child_session_id.clone(), session);
408 }
409 publish_child_completion_parts(
410 &parent_tx,
411 ctx.completion_handler.clone(),
412 job.parent_session_id.clone(),
413 job.child_session_id.clone(),
414 "skipped".to_string(),
415 Some("No pending message to execute".to_string()),
416 )
417 .await;
418 return Ok(());
419 }
420
421 session
423 .metadata
424 .insert("last_run_status".to_string(), "running".to_string());
425 session.metadata.remove("last_run_error");
426 let _ = ctx
427 .agent
428 .persistence()
429 .save_runtime_session(&mut session)
430 .await;
431
432 let Some(RunnerReservation { cancel_token, .. }) =
434 try_reserve_runner(&ctx.agent_runners, &job.child_session_id, &child_tx).await
435 else {
436 return Ok(());
437 };
438
439 let forwarder_done = CancellationToken::new();
441 {
442 let mut rx = child_tx.subscribe();
443 let parent_tx = parent_tx.clone();
444 let job_clone = job.clone();
445 let done = forwarder_done.clone();
446 tokio::spawn(async move {
447 loop {
448 tokio::select! {
449 _ = done.cancelled() => break,
450 evt = rx.recv() => {
451 match evt {
452 Ok(event) => {
453 let _ = parent_tx.send(AgentEvent::SubAgentEvent {
454 parent_session_id: job_clone.parent_session_id.clone(),
455 child_session_id: job_clone.child_session_id.clone(),
456 event: Box::new(event),
457 });
458 }
459 Err(broadcast::error::RecvError::Lagged(_)) => {
460 continue;
461 }
462 Err(_) => break,
463 }
464 }
465 }
466 }
467 });
468 }
469 {
470 let parent_tx = parent_tx.clone();
471 let job_clone = job.clone();
472 let done = forwarder_done.clone();
473 tokio::spawn(async move {
474 let mut ticker = tokio::time::interval(Duration::from_secs(5));
475 loop {
476 tokio::select! {
477 _ = done.cancelled() => break,
478 _ = ticker.tick() => {
479 let _ = parent_tx.send(AgentEvent::SubAgentHeartbeat {
480 parent_session_id: job_clone.parent_session_id.clone(),
481 child_session_id: job_clone.child_session_id.clone(),
482 timestamp: Utc::now(),
483 });
484 }
485 }
486 }
487 });
488 }
489
490 let (mpsc_tx, _forwarder_handle) = create_event_forwarder(
492 job.child_session_id.clone(),
493 child_tx.clone(),
494 ctx.agent_runners.clone(),
495 ctx.account_feed_inbox.clone(),
496 );
497
498 let timeout_reason = Arc::new(RwLock::new(None::<String>));
501 let watchdog_policy = watchdog_policy_for_session(&session);
502 tokio::spawn(watch_child_liveness(
503 job.parent_session_id.clone(),
504 job.child_session_id.clone(),
505 ctx.agent_runners.clone(),
506 cancel_token.clone(),
507 timeout_reason.clone(),
508 forwarder_done.clone(),
509 watchdog_policy,
510 ));
511
512 let model = job.model.clone();
514 let session_id_clone = job.child_session_id.clone();
515 let agent_runners_for_status = ctx.agent_runners.clone();
516 let sessions_cache = ctx.sessions_cache.clone();
517 let agent = ctx.agent.clone();
518 let tools = ctx.tools.clone();
519 let external_runner = ctx.external_child_runner.clone();
520 let done = forwarder_done.clone();
521 let parent_tx_for_done = parent_tx.clone();
522 let parent_id_for_done = job.parent_session_id.clone();
523 let child_id_for_done = job.child_session_id.clone();
524 let session_event_senders = ctx.session_event_senders.clone();
525 let provider_router = ctx.provider_router.clone();
526 let completion_handler = ctx.completion_handler.clone();
527
528 tokio::spawn(async move {
529 session.model = model.clone();
530
531 let wants_external = session
532 .metadata
533 .get("runtime.kind")
534 .is_some_and(|v| v == "external");
535
536 let result: crate::runtime::runner::Result<()> = if wants_external {
537 if let Some(runner) = external_runner {
538 if runner.should_handle(&session).await {
539 runner
540 .execute_external_child(&mut session, &job, mpsc_tx, cancel_token.clone())
541 .await
542 } else {
543 Err(bamboo_agent_core::AgentError::LLM(format!(
544 "No external runner matched child session runtime metadata: agent_id={:?}, protocol={:?}",
545 session.metadata.get("external.agent_id"),
546 session.metadata.get("external.protocol"),
547 )))
548 }
549 } else {
550 Err(bamboo_agent_core::AgentError::LLM(
551 "Child session requires external runtime, but no external runner is configured"
552 .to_string(),
553 ))
554 }
555 } else {
556 let (provider_override, provider_name, provider_type) =
557 resolve_child_provider_override(provider_router.as_ref(), &session, &model);
558 let disabled_tools: Option<std::collections::BTreeSet<String>> =
559 job.disabled_tools.map(|v| v.into_iter().collect());
560 agent
561 .execute(
562 &mut session,
563 ExecuteRequest {
564 initial_message: String::new(), event_tx: mpsc_tx,
566 cancel_token: cancel_token.clone(),
567 tools: Some(tools),
568 provider_override,
569 model: Some(model.clone()),
570 provider_name,
571 provider_type,
572 fast_model: None,
573 fast_model_provider: None,
574 background_model: None,
575 background_model_provider: None,
576 summarization_model: None,
577 summarization_model_provider: None,
578 reasoning_effort: None,
579 auxiliary_model_resolver: None,
580 disabled_tools,
581 disabled_skill_ids: None,
582 selected_skill_ids: None,
583 selected_skill_mode: None,
584 image_fallback: None,
585 gold_config: None,
586 app_data_dir: ctx.app_data_dir.clone(),
587 },
588 )
589 .await
590 };
591
592 let timeout_error = timeout_reason.read().await.clone();
593 let (status, error) = if let Some(reason) = timeout_error {
594 ("timeout".to_string(), Some(reason))
595 } else {
596 match &result {
597 Ok(_) => ("completed".to_string(), None),
598 Err(e) if e.to_string().contains("cancelled") => {
599 ("cancelled".to_string(), Some(e.to_string()))
600 }
601 Err(e) => ("error".to_string(), Some(e.to_string())),
602 }
603 };
604
605 finalize_runner(&agent_runners_for_status, &session_id_clone, &result).await;
606
607 crate::runtime::runner::state_bridge::merge_pending_injected_messages(
610 &mut session,
611 Some(agent.storage()),
612 Some(agent.persistence()),
613 )
614 .await;
615
616 session
618 .metadata
619 .insert("last_run_status".to_string(), status.clone());
620 if let Some(err) = &error {
621 session
622 .metadata
623 .insert("last_run_error".to_string(), err.clone());
624 } else {
625 session.metadata.remove("last_run_error");
626 }
627 let _ = agent.persistence().save_runtime_session(&mut session).await;
628 {
629 let mut sessions = sessions_cache.write().await;
630 sessions.insert(session_id_clone.clone(), session);
631 }
632
633 done.cancel();
636 publish_child_completion_parts(
637 &parent_tx_for_done,
638 completion_handler,
639 parent_id_for_done,
640 child_id_for_done,
641 status,
642 error,
643 )
644 .await;
645
646 drop(session_event_senders);
648 });
649
650 Ok(())
651}