Skip to main content

bamboo_engine/runtime/execution/
spawn.rs

1//! Sub-session spawn scheduler.
2//!
3//! Provides a background queue for spawning child sessions. Spawn is async
4//! (tool returns immediately), but the UI can observe child progress via
5//! events forwarded to the parent session stream.
6
7use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::Duration;
10
11use chrono::Utc;
12use tokio::sync::{broadcast, mpsc, RwLock};
13use tokio_util::sync::CancellationToken;
14
15use bamboo_agent_core::tools::ToolExecutor;
16use bamboo_agent_core::{AgentEvent, Role, Session, SessionKind};
17use bamboo_domain::ProviderModelRef;
18use bamboo_infrastructure::{LLMProvider, ProviderModelRouter};
19
20use crate::runtime::Agent;
21use crate::runtime::ExecuteRequest;
22
23use super::child_completion::{ChildCompletion, ChildCompletionHandler};
24use super::event_forwarder::create_event_forwarder;
25use super::runner_lifecycle::{finalize_runner, try_reserve_runner, RunnerReservation};
26use super::runner_state::AgentRunner;
27use super::session_events::get_or_create_event_sender;
28
29#[derive(Debug, Clone)]
30pub struct SpawnJob {
31    pub parent_session_id: String,
32    pub child_session_id: String,
33    pub model: String,
34    /// Tool names to hide from the LLM schema for this child session.
35    /// Computed from the child's `subagent_type` profile policy.
36    pub disabled_tools: Option<Vec<String>>,
37}
38
39/// Trait for external child session runtimes (e.g. A2A, CLI adapters).
40///
41/// Implementors are responsible for emitting AgentEvents via `event_tx`
42/// and respecting the `cancel_token`.
43#[async_trait::async_trait]
44pub trait ExternalChildRunner: Send + Sync {
45    /// Returns true if this runner should handle the given child session.
46    async fn should_handle(&self, session: &Session) -> bool;
47
48    /// Execute the child session using an external runtime.
49    async fn execute_external_child(
50        &self,
51        session: &mut Session,
52        job: &SpawnJob,
53        event_tx: tokio::sync::mpsc::Sender<AgentEvent>,
54        cancel_token: CancellationToken,
55    ) -> crate::runtime::runner::Result<()>;
56}
57
58#[derive(Clone)]
59pub struct SpawnContext {
60    pub agent: Arc<Agent>,
61    pub tools: Arc<dyn ToolExecutor>,
62    pub sessions_cache: Arc<RwLock<HashMap<String, Session>>>,
63    pub agent_runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
64    pub session_event_senders: Arc<RwLock<HashMap<String, broadcast::Sender<AgentEvent>>>>,
65    pub external_child_runner: Option<Arc<dyn ExternalChildRunner>>,
66    pub provider_router: Option<Arc<ProviderModelRouter>>,
67    pub app_data_dir: Option<std::path::PathBuf>,
68    /// Optional application-layer completion hook. The engine still emits
69    /// `SubAgentCompleted` to the parent stream itself; this hook lets the
70    /// server persist parent wait state and resume the parent runner without
71    /// introducing an engine -> AppState dependency.
72    pub completion_handler: Option<Arc<dyn ChildCompletionHandler>>,
73    /// Optional inbox to the account-wide change feed. When present, durable
74    /// change events from child-session execution are mirrored onto the feed
75    /// for resumable multi-client sync.
76    pub account_feed_inbox: Option<super::event_forwarder::AccountFeedInbox>,
77}
78
79#[derive(Clone)]
80pub struct SpawnScheduler {
81    tx: mpsc::Sender<SpawnJob>,
82}
83
84impl SpawnScheduler {
85    pub fn new(ctx: SpawnContext) -> Self {
86        let (tx, mut rx) = mpsc::channel::<SpawnJob>(128);
87
88        tokio::spawn(async move {
89            while let Some(job) = rx.recv().await {
90                if let Err(err) = run_spawn_job(ctx.clone(), job).await {
91                    tracing::warn!("spawn job failed: {}", err);
92                }
93            }
94        });
95
96        Self { tx }
97    }
98
99    pub async fn enqueue(&self, job: SpawnJob) -> Result<(), String> {
100        self.tx
101            .send(job)
102            .await
103            .map_err(|_| "spawn scheduler is not running".to_string())
104    }
105}
106
107fn child_model_ref(session: &Session, model: &str) -> Option<ProviderModelRef> {
108    if let Some(model_ref) = session.model_ref.clone() {
109        let provider = model_ref.provider.trim();
110        let model_name = model_ref.model.trim();
111        if !provider.is_empty() && !model_name.is_empty() {
112            return Some(ProviderModelRef::new(provider, model_name));
113        }
114    }
115
116    let provider = session
117        .metadata
118        .get("provider_name")
119        .map(String::as_str)
120        .map(str::trim)
121        .filter(|value| !value.is_empty())?;
122    let model_name = model.trim();
123    if model_name.is_empty() {
124        return None;
125    }
126    Some(ProviderModelRef::new(provider, model_name))
127}
128
129#[derive(Debug, Clone, Copy)]
130struct ChildWatchdogPolicy {
131    check_interval_secs: i64,
132    max_total_secs: i64,
133    max_idle_secs: i64,
134}
135
136impl Default for ChildWatchdogPolicy {
137    fn default() -> Self {
138        Self {
139            check_interval_secs: 15,
140            // Parent waits may be longer, but child execution owns its own
141            // liveness. A one hour total cap avoids indefinitely orphaned
142            // sub-session runners.
143            max_total_secs: 60 * 60,
144            // No child event for 15 minutes is considered stalled.
145            max_idle_secs: 15 * 60,
146        }
147    }
148}
149
150fn metadata_i64(session: &Session, key: &str) -> Option<i64> {
151    session
152        .metadata
153        .get(key)
154        .and_then(|value| value.trim().parse::<i64>().ok())
155        .filter(|value| *value > 0)
156}
157
158fn watchdog_policy_for_session(session: &Session) -> ChildWatchdogPolicy {
159    let mut policy = ChildWatchdogPolicy::default();
160    if let Some(value) = metadata_i64(session, "child_watchdog.max_total_secs") {
161        policy.max_total_secs = value;
162    }
163    if let Some(value) = metadata_i64(session, "child_watchdog.max_idle_secs") {
164        policy.max_idle_secs = value;
165    }
166    if let Some(value) = metadata_i64(session, "child_watchdog.check_interval_secs") {
167        policy.check_interval_secs = value;
168    }
169    policy
170}
171
172async fn publish_child_completion(
173    parent_tx: &broadcast::Sender<AgentEvent>,
174    completion_handler: Option<Arc<dyn ChildCompletionHandler>>,
175    completion: ChildCompletion,
176) {
177    let _ = parent_tx.send(AgentEvent::SubAgentCompleted {
178        parent_session_id: completion.parent_session_id.clone(),
179        child_session_id: completion.child_session_id.clone(),
180        status: completion.status.clone(),
181        error: completion.error.clone(),
182    });
183
184    if let Some(handler) = completion_handler {
185        handler.on_child_completed(completion).await;
186    }
187}
188
189async fn publish_child_completion_parts(
190    parent_tx: &broadcast::Sender<AgentEvent>,
191    completion_handler: Option<Arc<dyn ChildCompletionHandler>>,
192    parent_session_id: String,
193    child_session_id: String,
194    status: String,
195    error: Option<String>,
196) {
197    publish_child_completion(
198        parent_tx,
199        completion_handler,
200        ChildCompletion {
201            parent_session_id,
202            child_session_id,
203            status,
204            error,
205            completed_at: Utc::now(),
206        },
207    )
208    .await;
209}
210
211async fn watch_child_liveness(
212    parent_session_id: String,
213    child_session_id: String,
214    runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
215    cancel_token: CancellationToken,
216    timeout_reason: Arc<RwLock<Option<String>>>,
217    done: CancellationToken,
218    policy: ChildWatchdogPolicy,
219) {
220    let mut ticker =
221        tokio::time::interval(Duration::from_secs(policy.check_interval_secs.max(1) as u64));
222    // Skip the immediate tick.
223    ticker.tick().await;
224
225    loop {
226        tokio::select! {
227            _ = done.cancelled() => return,
228            _ = ticker.tick() => {
229                if cancel_token.is_cancelled() {
230                    return;
231                }
232
233                let snapshot = {
234                    let guard = runners.read().await;
235                    guard.get(&child_session_id).cloned()
236                };
237                let Some(runner) = snapshot else {
238                    return;
239                };
240                if !matches!(runner.status, super::runner_state::AgentStatus::Running) {
241                    return;
242                }
243
244                let now = Utc::now();
245                let total_secs = now.signed_duration_since(runner.started_at).num_seconds();
246                if total_secs >= policy.max_total_secs {
247                    let reason = format!(
248                        "Child session timed out after {} seconds (max_total_secs={})",
249                        total_secs, policy.max_total_secs
250                    );
251                    tracing::warn!(
252                        parent_session_id = %parent_session_id,
253                        child_session_id = %child_session_id,
254                        reason = %reason,
255                        "child session total timeout; cancelling child runner"
256                    );
257                    *timeout_reason.write().await = Some(reason);
258                    cancel_token.cancel();
259                    return;
260                }
261
262                let last_activity_at = runner.last_event_at.unwrap_or(runner.started_at);
263                let idle_secs = now.signed_duration_since(last_activity_at).num_seconds();
264                if idle_secs >= policy.max_idle_secs {
265                    let reason = format!(
266                        "Child session idle timeout after {} seconds without events (max_idle_secs={})",
267                        idle_secs, policy.max_idle_secs
268                    );
269                    tracing::warn!(
270                        parent_session_id = %parent_session_id,
271                        child_session_id = %child_session_id,
272                        reason = %reason,
273                        last_tool_name = ?runner.last_tool_name,
274                        last_tool_phase = ?runner.last_tool_phase,
275                        round_count = runner.round_count,
276                        "child session idle timeout; cancelling child runner"
277                    );
278                    *timeout_reason.write().await = Some(reason);
279                    cancel_token.cancel();
280                    return;
281                }
282            }
283        }
284    }
285}
286
287fn resolve_child_provider_override(
288    router: Option<&Arc<ProviderModelRouter>>,
289    session: &Session,
290    model: &str,
291) -> (Option<Arc<dyn LLMProvider>>, Option<String>, Option<String>) {
292    let model_ref = child_model_ref(session, model);
293    let provider_name = model_ref
294        .as_ref()
295        .map(|model_ref| model_ref.provider.clone());
296    let provider_type = if let (Some(router), Some(model_ref)) = (router, model_ref.as_ref()) {
297        router.provider_type_for(model_ref)
298    } else {
299        provider_name.clone()
300    };
301    let provider = router.and_then(|router| {
302        let model_ref = model_ref.as_ref()?;
303        match router.route(model_ref) {
304            Ok(provider) => Some(provider),
305            Err(error) => {
306                tracing::warn!(
307                    session_id = %session.id,
308                    provider = %model_ref.provider,
309                    model = %model_ref.model,
310                    error = %error,
311                    "failed to resolve provider override for child session; falling back to runtime provider"
312                );
313                None
314            }
315        }
316    });
317    (provider, provider_name, provider_type)
318}
319
320async fn run_spawn_job(ctx: SpawnContext, job: SpawnJob) -> Result<(), String> {
321    // Ensure both session event streams exist.
322    let parent_tx =
323        get_or_create_event_sender(&ctx.session_event_senders, &job.parent_session_id).await;
324    let child_tx =
325        get_or_create_event_sender(&ctx.session_event_senders, &job.child_session_id).await;
326
327    // Load child session.
328    let mut session = match ctx
329        .agent
330        .storage()
331        .load_session(&job.child_session_id)
332        .await
333    {
334        Ok(Some(s)) => s,
335        Ok(None) => {
336            let error = "child session not found".to_string();
337            publish_child_completion_parts(
338                &parent_tx,
339                ctx.completion_handler.clone(),
340                job.parent_session_id.clone(),
341                job.child_session_id.clone(),
342                "error".to_string(),
343                Some(error.clone()),
344            )
345            .await;
346            return Err(error);
347        }
348        Err(e) => {
349            let error = format!("failed to load child session: {e}");
350            publish_child_completion_parts(
351                &parent_tx,
352                ctx.completion_handler.clone(),
353                job.parent_session_id.clone(),
354                job.child_session_id.clone(),
355                "error".to_string(),
356                Some(error.clone()),
357            )
358            .await;
359            return Err(error);
360        }
361    };
362
363    // Register the child's workspace in the global state so tools
364    // (Read, Glob, Grep, Bash, etc.) can resolve relative paths.
365    if let Some(ref ws) = session.workspace {
366        bamboo_agent_core::workspace_state::set_workspace(
367            &session.id,
368            std::path::PathBuf::from(ws),
369        );
370    }
371
372    if session.kind != SessionKind::Child {
373        let error = "spawn job child session is not kind=child".to_string();
374        publish_child_completion_parts(
375            &parent_tx,
376            ctx.completion_handler.clone(),
377            job.parent_session_id.clone(),
378            job.child_session_id.clone(),
379            "error".to_string(),
380            Some(error.clone()),
381        )
382        .await;
383        return Err(error);
384    }
385
386    // Ensure last message is user (otherwise nothing to do).
387    let last_is_user = session
388        .messages
389        .last()
390        .map(|m| matches!(m.role, Role::User))
391        .unwrap_or(false);
392    if !last_is_user {
393        session
394            .metadata
395            .insert("last_run_status".to_string(), "skipped".to_string());
396        session.metadata.insert(
397            "last_run_error".to_string(),
398            "No pending message to execute".to_string(),
399        );
400        let _ = ctx
401            .agent
402            .persistence()
403            .save_runtime_session(&mut session)
404            .await;
405        {
406            let mut sessions = ctx.sessions_cache.write().await;
407            sessions.insert(job.child_session_id.clone(), session);
408        }
409        publish_child_completion_parts(
410            &parent_tx,
411            ctx.completion_handler.clone(),
412            job.parent_session_id.clone(),
413            job.child_session_id.clone(),
414            "skipped".to_string(),
415            Some("No pending message to execute".to_string()),
416        )
417        .await;
418        return Ok(());
419    }
420
421    // Persist a running marker early so list_sessions can reconstruct status.
422    session
423        .metadata
424        .insert("last_run_status".to_string(), "running".to_string());
425    session.metadata.remove("last_run_error");
426    let _ = ctx
427        .agent
428        .persistence()
429        .save_runtime_session(&mut session)
430        .await;
431
432    // Insert runner status.
433    let Some(RunnerReservation { cancel_token, .. }) =
434        try_reserve_runner(&ctx.agent_runners, &job.child_session_id, &child_tx).await
435    else {
436        return Ok(());
437    };
438
439    // Forward ALL child events to parent.
440    let forwarder_done = CancellationToken::new();
441    {
442        let mut rx = child_tx.subscribe();
443        let parent_tx = parent_tx.clone();
444        let job_clone = job.clone();
445        let done = forwarder_done.clone();
446        tokio::spawn(async move {
447            loop {
448                tokio::select! {
449                    _ = done.cancelled() => break,
450                    evt = rx.recv() => {
451                        match evt {
452                            Ok(event) => {
453                                let _ = parent_tx.send(AgentEvent::SubAgentEvent {
454                                    parent_session_id: job_clone.parent_session_id.clone(),
455                                    child_session_id: job_clone.child_session_id.clone(),
456                                    event: Box::new(event),
457                                });
458                            }
459                            Err(broadcast::error::RecvError::Lagged(_)) => {
460                                continue;
461                            }
462                            Err(_) => break,
463                        }
464                    }
465                }
466            }
467        });
468    }
469    {
470        let parent_tx = parent_tx.clone();
471        let job_clone = job.clone();
472        let done = forwarder_done.clone();
473        tokio::spawn(async move {
474            let mut ticker = tokio::time::interval(Duration::from_secs(5));
475            loop {
476                tokio::select! {
477                    _ = done.cancelled() => break,
478                    _ = ticker.tick() => {
479                        let _ = parent_tx.send(AgentEvent::SubAgentHeartbeat {
480                            parent_session_id: job_clone.parent_session_id.clone(),
481                            child_session_id: job_clone.child_session_id.clone(),
482                            timestamp: Utc::now(),
483                        });
484                    }
485                }
486            }
487        });
488    }
489
490    // Create mpsc channel for agent loop → session events sender.
491    let (mpsc_tx, _forwarder_handle) = create_event_forwarder(
492        job.child_session_id.clone(),
493        child_tx.clone(),
494        ctx.agent_runners.clone(),
495        ctx.account_feed_inbox.clone(),
496    );
497
498    // Child liveness is owned by the child runner. The parent wait state can
499    // have a longer lease, but it should not poll or terminate children.
500    let timeout_reason = Arc::new(RwLock::new(None::<String>));
501    let watchdog_policy = watchdog_policy_for_session(&session);
502    tokio::spawn(watch_child_liveness(
503        job.parent_session_id.clone(),
504        job.child_session_id.clone(),
505        ctx.agent_runners.clone(),
506        cancel_token.clone(),
507        timeout_reason.clone(),
508        forwarder_done.clone(),
509        watchdog_policy,
510    ));
511
512    // Run child loop via unified spawn_session_execution.
513    let model = job.model.clone();
514    let session_id_clone = job.child_session_id.clone();
515    let agent_runners_for_status = ctx.agent_runners.clone();
516    let sessions_cache = ctx.sessions_cache.clone();
517    let agent = ctx.agent.clone();
518    let tools = ctx.tools.clone();
519    let external_runner = ctx.external_child_runner.clone();
520    let done = forwarder_done.clone();
521    let parent_tx_for_done = parent_tx.clone();
522    let parent_id_for_done = job.parent_session_id.clone();
523    let child_id_for_done = job.child_session_id.clone();
524    let session_event_senders = ctx.session_event_senders.clone();
525    let provider_router = ctx.provider_router.clone();
526    let completion_handler = ctx.completion_handler.clone();
527
528    tokio::spawn(async move {
529        session.model = model.clone();
530
531        let wants_external = session
532            .metadata
533            .get("runtime.kind")
534            .is_some_and(|v| v == "external");
535
536        let result: crate::runtime::runner::Result<()> = if wants_external {
537            if let Some(runner) = external_runner {
538                if runner.should_handle(&session).await {
539                    runner
540                        .execute_external_child(&mut session, &job, mpsc_tx, cancel_token.clone())
541                        .await
542                } else {
543                    Err(bamboo_agent_core::AgentError::LLM(format!(
544                        "No external runner matched child session runtime metadata: agent_id={:?}, protocol={:?}",
545                        session.metadata.get("external.agent_id"),
546                        session.metadata.get("external.protocol"),
547                    )))
548                }
549            } else {
550                Err(bamboo_agent_core::AgentError::LLM(
551                    "Child session requires external runtime, but no external runner is configured"
552                        .to_string(),
553                ))
554            }
555        } else {
556            let (provider_override, provider_name, provider_type) =
557                resolve_child_provider_override(provider_router.as_ref(), &session, &model);
558            let disabled_tools: Option<std::collections::BTreeSet<String>> =
559                job.disabled_tools.map(|v| v.into_iter().collect());
560            agent
561                .execute(
562                    &mut session,
563                    ExecuteRequest {
564                        initial_message: String::new(), // handled by agent loop
565                        event_tx: mpsc_tx,
566                        cancel_token: cancel_token.clone(),
567                        tools: Some(tools),
568                        provider_override,
569                        model: Some(model.clone()),
570                        provider_name,
571                        provider_type,
572                        fast_model: None,
573                        fast_model_provider: None,
574                        background_model: None,
575                        background_model_provider: None,
576                        summarization_model: None,
577                        summarization_model_provider: None,
578                        reasoning_effort: None,
579                        auxiliary_model_resolver: None,
580                        disabled_tools,
581                        disabled_skill_ids: None,
582                        selected_skill_ids: None,
583                        selected_skill_mode: None,
584                        image_fallback: None,
585                        gold_config: None,
586                        app_data_dir: ctx.app_data_dir.clone(),
587                    },
588                )
589                .await
590        };
591
592        let timeout_error = timeout_reason.read().await.clone();
593        let (status, error) = if let Some(reason) = timeout_error {
594            ("timeout".to_string(), Some(reason))
595        } else {
596            match &result {
597                Ok(_) => ("completed".to_string(), None),
598                Err(e) if e.to_string().contains("cancelled") => {
599                    ("cancelled".to_string(), Some(e.to_string()))
600                }
601                Err(e) => ("error".to_string(), Some(e.to_string())),
602            }
603        };
604
605        finalize_runner(&agent_runners_for_status, &session_id_clone, &result).await;
606
607        // Merge any queued injected messages that the pipeline didn't pick up
608        // (e.g. if the loop exited before the next turn boundary).
609        crate::runtime::runner::state_bridge::merge_pending_injected_messages(
610            &mut session,
611            Some(agent.storage()),
612            Some(agent.persistence()),
613        )
614        .await;
615
616        // Persist final session snapshot.
617        session
618            .metadata
619            .insert("last_run_status".to_string(), status.clone());
620        if let Some(err) = &error {
621            session
622                .metadata
623                .insert("last_run_error".to_string(), err.clone());
624        } else {
625            session.metadata.remove("last_run_error");
626        }
627        let _ = agent.persistence().save_runtime_session(&mut session).await;
628        {
629            let mut sessions = sessions_cache.write().await;
630            sessions.insert(session_id_clone.clone(), session);
631        }
632
633        // Stop forwarding/heartbeats and emit terminal child status through the
634        // same durable completion path used by success/error/cancel/timeout.
635        done.cancel();
636        publish_child_completion_parts(
637            &parent_tx_for_done,
638            completion_handler,
639            parent_id_for_done,
640            child_id_for_done,
641            status,
642            error,
643        )
644        .await;
645
646        // Allow dead code: session_event_senders keeps the sender alive during the task.
647        drop(session_event_senders);
648    });
649
650    Ok(())
651}