Skip to main content

bamboo_engine/sdk/
spawn.rs

1//! Canonical child-session spawn core (anti-fork single implementation).
2//!
3//! [`run_child_spawn`] is the **one** place that loads a child session, reserves
4//! its runner, wires the event forwarder + heartbeat + watchdog, builds the full
5//! [`ExecuteRequest`], runs the child loop, and publishes the terminal child
6//! completion. Both the background scheduler (`run_spawn_job`) and the ergonomic
7//! [`crate::sdk::runner::ChildRunner`] delegate here so behavior — event
8//! ordering, status strings, and field set — stays identical across entry points.
9
10use std::sync::Arc;
11use std::time::Duration;
12
13use chrono::Utc;
14use tokio::sync::broadcast;
15use tokio::sync::RwLock;
16use tokio_util::sync::CancellationToken;
17
18use bamboo_agent_core::{AgentEvent, Role, SessionKind};
19
20use crate::runtime::execution::event_forwarder::create_event_forwarder;
21use crate::runtime::execution::runner_lifecycle::{
22    finalize_runner, try_reserve_runner, RunnerReservation,
23};
24use crate::runtime::execution::session_events::get_or_create_event_sender;
25use crate::runtime::execution::spawn::{
26    publish_child_completion_parts, watch_child_liveness, watchdog_policy_for_session,
27    SpawnContext, SpawnJob,
28};
29
30/// Launch a single child spawn job.
31///
32/// This sets up the child run and **spawns the actual execution onto a
33/// background task**, returning `Ok(())` once the run has been *started* — not
34/// when it completes. Completion (and persistence finalize) is observed via the
35/// `SubAgentCompleted` event on the parent stream, not via this return value.
36/// `Err` is only returned for synchronous setup failures (e.g. child session
37/// not found) before the background task is spawned.
38///
39/// Preserves EXACTLY the canonical behavior:
40/// - `SubAgentStarted` is emitted by the *adapter* before enqueue (not here).
41/// - Event forwarder + 5s heartbeat tasks, watchdog, runner reservation.
42/// - Full real [`ExecuteRequest`] field set incl. split provider fields.
43/// - Terminal status strings `completed | cancelled | error | skipped | timeout`.
44pub async fn run_child_spawn(ctx: SpawnContext, job: SpawnJob) -> Result<(), String> {
45    // Ensure both session event streams exist.
46    let parent_tx =
47        get_or_create_event_sender(&ctx.session_event_senders, &job.parent_session_id).await;
48    let child_tx =
49        get_or_create_event_sender(&ctx.session_event_senders, &job.child_session_id).await;
50
51    // Load child session.
52    let mut session = match ctx
53        .agent
54        .storage()
55        .load_session(&job.child_session_id)
56        .await
57    {
58        Ok(Some(s)) => s,
59        Ok(None) => {
60            let error = "child session not found".to_string();
61            publish_child_completion_parts(
62                &parent_tx,
63                ctx.completion_handler.clone(),
64                job.parent_session_id.clone(),
65                job.child_session_id.clone(),
66                "error".to_string(),
67                Some(error.clone()),
68            )
69            .await;
70            return Err(error);
71        }
72        Err(e) => {
73            let error = format!("failed to load child session: {e}");
74            publish_child_completion_parts(
75                &parent_tx,
76                ctx.completion_handler.clone(),
77                job.parent_session_id.clone(),
78                job.child_session_id.clone(),
79                "error".to_string(),
80                Some(error.clone()),
81            )
82            .await;
83            return Err(error);
84        }
85    };
86
87    // Register the child's workspace in the global state so tools
88    // (Read, Glob, Grep, Bash, etc.) can resolve relative paths.
89    if let Some(ref ws) = session.workspace {
90        bamboo_agent_core::workspace_state::set_workspace(
91            &session.id,
92            std::path::PathBuf::from(ws),
93        );
94    }
95
96    if session.kind != SessionKind::Child {
97        let error = "spawn job child session is not kind=child".to_string();
98        publish_child_completion_parts(
99            &parent_tx,
100            ctx.completion_handler.clone(),
101            job.parent_session_id.clone(),
102            job.child_session_id.clone(),
103            "error".to_string(),
104            Some(error.clone()),
105        )
106        .await;
107        return Err(error);
108    }
109
110    // Ensure last message is user (otherwise nothing to do).
111    let last_is_user = session
112        .messages
113        .last()
114        .map(|m| matches!(m.role, Role::User))
115        .unwrap_or(false);
116    if !last_is_user {
117        session.set_last_run_status("skipped");
118        session.set_last_run_error("No pending message to execute");
119        let _ = ctx
120            .agent
121            .persistence()
122            .save_runtime_session(&mut session)
123            .await;
124        ctx.sessions_cache.insert(
125            job.child_session_id.clone(),
126            Arc::new(parking_lot::RwLock::new(session)),
127        );
128        publish_child_completion_parts(
129            &parent_tx,
130            ctx.completion_handler.clone(),
131            job.parent_session_id.clone(),
132            job.child_session_id.clone(),
133            "skipped".to_string(),
134            Some("No pending message to execute".to_string()),
135        )
136        .await;
137        return Ok(());
138    }
139
140    // Persist a running marker early so list_sessions can reconstruct status.
141    session.set_last_run_status("running");
142    session.clear_last_run_error();
143    let _ = ctx
144        .agent
145        .persistence()
146        .save_runtime_session(&mut session)
147        .await;
148
149    // Insert runner status.
150    let Some(RunnerReservation { cancel_token, .. }) =
151        try_reserve_runner(&ctx.agent_runners, &job.child_session_id, &child_tx).await
152    else {
153        return Ok(());
154    };
155
156    // Forward ALL child events to parent.
157    let forwarder_done = CancellationToken::new();
158    {
159        let mut rx = child_tx.subscribe();
160        let parent_tx = parent_tx.clone();
161        let job_clone = job.clone();
162        let done = forwarder_done.clone();
163        tokio::spawn(async move {
164            loop {
165                tokio::select! {
166                    _ = done.cancelled() => break,
167                    evt = rx.recv() => {
168                        match evt {
169                            Ok(event) => {
170                                let _ = parent_tx.send(AgentEvent::SubAgentEvent {
171                                    parent_session_id: job_clone.parent_session_id.clone(),
172                                    child_session_id: job_clone.child_session_id.clone(),
173                                    event: Box::new(event),
174                                });
175                            }
176                            Err(broadcast::error::RecvError::Lagged(_)) => {
177                                continue;
178                            }
179                            Err(_) => break,
180                        }
181                    }
182                }
183            }
184        });
185    }
186    {
187        let parent_tx = parent_tx.clone();
188        let job_clone = job.clone();
189        let done = forwarder_done.clone();
190        tokio::spawn(async move {
191            let mut ticker = tokio::time::interval(Duration::from_secs(5));
192            loop {
193                tokio::select! {
194                    _ = done.cancelled() => break,
195                    _ = ticker.tick() => {
196                        let _ = parent_tx.send(AgentEvent::SubAgentHeartbeat {
197                            parent_session_id: job_clone.parent_session_id.clone(),
198                            child_session_id: job_clone.child_session_id.clone(),
199                            timestamp: Utc::now(),
200                        });
201                    }
202                }
203            }
204        });
205    }
206
207    // Create mpsc channel for agent loop → session events sender.
208    let (mpsc_tx, _forwarder_handle) = create_event_forwarder(
209        job.child_session_id.clone(),
210        child_tx.clone(),
211        ctx.agent_runners.clone(),
212        ctx.account_feed_inbox.clone(),
213    );
214
215    // Child liveness is owned by the child runner. The parent wait state can
216    // have a longer lease, but it should not poll or terminate children.
217    let timeout_reason = Arc::new(RwLock::new(None::<String>));
218    let watchdog_policy = watchdog_policy_for_session(&session);
219    tokio::spawn(watch_child_liveness(
220        job.parent_session_id.clone(),
221        job.child_session_id.clone(),
222        ctx.agent_runners.clone(),
223        cancel_token.clone(),
224        timeout_reason.clone(),
225        forwarder_done.clone(),
226        watchdog_policy,
227    ));
228
229    // Run child loop via unified spawn_session_execution.
230    let model = job.model.clone();
231    let session_id_clone = job.child_session_id.clone();
232    let agent_runners_for_status = ctx.agent_runners.clone();
233    let sessions_cache = ctx.sessions_cache.clone();
234    let agent = ctx.agent.clone();
235    let external_runner = ctx.external_child_runner.clone();
236    let done = forwarder_done.clone();
237    let parent_tx_for_done = parent_tx.clone();
238    let parent_id_for_done = job.parent_session_id.clone();
239    let child_id_for_done = job.child_session_id.clone();
240    let session_event_senders = ctx.session_event_senders.clone();
241    let completion_handler = ctx.completion_handler.clone();
242
243    tokio::spawn(async move {
244        // Set the child model via the single authoritative pre-execution
245        // mutation point. The child session's system prompt is already in place
246        // (loaded from storage), so pass `None` for `system_prompt`.
247        crate::session_app::execution_prep::prepare_session_for_execution(
248            &mut session,
249            None,
250            Some(&model),
251        );
252
253        // Sub-agents always run as actors (the in-process runtime was removed):
254        // dispatch to the composite child runner. The built-in local actor
255        // handles the default case; expert `externalAgents` profiles handle
256        // roles pinned to other agents. `should_handle` selects the right one.
257        //
258        // The child's `AgentLoopConfig` is assembled by the external runner, not
259        // here, and does not currently wire `bash_resume_hook`. The end-of-turn
260        // bash suspend gate is therefore inert for children — graceful
261        // degradation: a child that leaves a `run_in_background` shell running
262        // simply completes; the shell keeps running detached and stays readable
263        // via BashOutput. No strand can occur because the gate refuses to
264        // suspend without the hook. (The parent-resume path re-wires the hook,
265        // so a RESUMED run is covered; only the initial child run is not.)
266        let result: crate::runtime::runner::Result<()> =
267            if external_runner.should_handle(&session).await {
268                external_runner
269                    .execute_external_child(&mut session, &job, mpsc_tx, cancel_token.clone())
270                    .await
271            } else {
272                Err(bamboo_agent_core::AgentError::LLM(format!(
273                "No child runner matched session runtime metadata: agent_id={:?}, protocol={:?}",
274                session.metadata.get("external.agent_id"),
275                session.metadata.get("external.protocol"),
276            )))
277            };
278
279        let timeout_error = timeout_reason.read().await.clone();
280        // Phase 2: a child that suspended awaiting the PARENT's approval of a
281        // gated tool is NOT done — publish a NON-terminal "suspended" status so
282        // the parent's completion coordinator does not resume the parent
283        // prematurely. The child is resumed once the human decides, then runs to
284        // a real terminal completion that re-enters this path.
285        //
286        // Issue #84 Phase 2b: a child that suspended mid-background-Bash is
287        // likewise NOT done — it must not publish a premature terminal
288        // "completed" to its parent while a `run_in_background` shell is still
289        // running for it.
290        let suspended_non_terminal = session
291            .metadata
292            .get("runtime.suspend_reason")
293            .map(|reason| {
294                matches!(
295                    reason.as_str(),
296                    "awaiting_parent_approval" | "waiting_for_bash"
297                )
298            })
299            .unwrap_or(false);
300        let (status, error) = if let Some(reason) = timeout_error {
301            ("timeout".to_string(), Some(reason))
302        } else if suspended_non_terminal && result.is_ok() {
303            ("suspended".to_string(), None)
304        } else {
305            match &result {
306                Ok(_) => ("completed".to_string(), None),
307                Err(e @ bamboo_agent_core::AgentError::Cancelled) => {
308                    ("cancelled".to_string(), Some(e.to_string()))
309                }
310                Err(e) => ("error".to_string(), Some(e.to_string())),
311            }
312        };
313
314        finalize_runner(&agent_runners_for_status, &session_id_clone, &result).await;
315
316        // Merge any queued injected messages that the pipeline didn't pick up
317        // (e.g. if the loop exited before the next turn boundary).
318        crate::runtime::runner::state_bridge::merge_pending_injected_messages(
319            &mut session,
320            Some(agent.storage()),
321            Some(agent.persistence()),
322        )
323        .await;
324
325        // Persist final session snapshot.
326        session.set_last_run_status(status.clone());
327        if let Some(err) = &error {
328            session.set_last_run_error(err.clone());
329        } else {
330            session.clear_last_run_error();
331        }
332        let _ = agent.persistence().save_runtime_session(&mut session).await;
333        sessions_cache.insert(
334            session_id_clone.clone(),
335            Arc::new(parking_lot::RwLock::new(session)),
336        );
337
338        // Stop forwarding/heartbeats and emit terminal child status through the
339        // same durable completion path used by success/error/cancel/timeout.
340        done.cancel();
341        publish_child_completion_parts(
342            &parent_tx_for_done,
343            completion_handler,
344            parent_id_for_done,
345            child_id_for_done,
346            status,
347            error,
348        )
349        .await;
350
351        // Allow dead code: session_event_senders keeps the sender alive during the task.
352        drop(session_event_senders);
353    });
354
355    Ok(())
356}