Skip to main content

bamboo_engine/runtime/execution/
spawn.rs

1//! Sub-session spawn scheduler.
2//!
3//! Provides a background queue for spawning child sessions. Spawn is async
4//! (tool returns immediately), but the UI can observe child progress via
5//! events forwarded to the parent session stream.
6
7use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::Duration;
10
11use chrono::Utc;
12use tokio::sync::{broadcast, mpsc, RwLock};
13use tokio_util::sync::CancellationToken;
14
15use bamboo_agent_core::tools::ToolExecutor;
16use bamboo_agent_core::{AgentEvent, Role, Session, SessionKind};
17
18use crate::runtime::Agent;
19use crate::runtime::ExecuteRequest;
20
21use super::event_forwarder::create_event_forwarder;
22use super::runner_lifecycle::{finalize_runner, try_reserve_runner};
23use super::runner_state::AgentRunner;
24use super::session_events::get_or_create_event_sender;
25
26#[derive(Debug, Clone)]
27pub struct SpawnJob {
28    pub parent_session_id: String,
29    pub child_session_id: String,
30    pub model: String,
31}
32
33#[derive(Clone)]
34pub struct SpawnContext {
35    pub agent: Arc<Agent>,
36    pub tools: Arc<dyn ToolExecutor>,
37    pub sessions_cache: Arc<RwLock<HashMap<String, Session>>>,
38    pub agent_runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
39    pub session_event_senders: Arc<RwLock<HashMap<String, broadcast::Sender<AgentEvent>>>>,
40}
41
42#[derive(Clone)]
43pub struct SpawnScheduler {
44    tx: mpsc::Sender<SpawnJob>,
45}
46
47impl SpawnScheduler {
48    pub fn new(ctx: SpawnContext) -> Self {
49        let (tx, mut rx) = mpsc::channel::<SpawnJob>(128);
50
51        tokio::spawn(async move {
52            while let Some(job) = rx.recv().await {
53                if let Err(err) = run_spawn_job(ctx.clone(), job).await {
54                    tracing::warn!("spawn job failed: {}", err);
55                }
56            }
57        });
58
59        Self { tx }
60    }
61
62    pub async fn enqueue(&self, job: SpawnJob) -> Result<(), String> {
63        self.tx
64            .send(job)
65            .await
66            .map_err(|_| "spawn scheduler is not running".to_string())
67    }
68}
69
70async fn run_spawn_job(ctx: SpawnContext, job: SpawnJob) -> Result<(), String> {
71    // Ensure both session event streams exist.
72    let parent_tx =
73        get_or_create_event_sender(&ctx.session_event_senders, &job.parent_session_id).await;
74    let child_tx =
75        get_or_create_event_sender(&ctx.session_event_senders, &job.child_session_id).await;
76
77    let emit_error_completion = |error: String| {
78        let _ = parent_tx.send(AgentEvent::SubSessionCompleted {
79            parent_session_id: job.parent_session_id.clone(),
80            child_session_id: job.child_session_id.clone(),
81            status: "error".to_string(),
82            error: Some(error.clone()),
83        });
84        error
85    };
86
87    // Load child session.
88    let mut session = match ctx
89        .agent
90        .storage()
91        .load_session(&job.child_session_id)
92        .await
93    {
94        Ok(Some(s)) => s,
95        Ok(None) => return Err(emit_error_completion("child session not found".to_string())),
96        Err(e) => {
97            return Err(emit_error_completion(format!(
98                "failed to load child session: {e}"
99            )))
100        }
101    };
102
103    if session.kind != SessionKind::Child {
104        return Err(emit_error_completion(
105            "spawn job child session is not kind=child".to_string(),
106        ));
107    }
108
109    // Ensure last message is user (otherwise nothing to do).
110    let last_is_user = session
111        .messages
112        .last()
113        .map(|m| matches!(m.role, Role::User))
114        .unwrap_or(false);
115    if !last_is_user {
116        session
117            .metadata
118            .insert("last_run_status".to_string(), "skipped".to_string());
119        session.metadata.insert(
120            "last_run_error".to_string(),
121            "No pending message to execute".to_string(),
122        );
123        let _ = ctx.agent.storage().save_session(&session).await;
124        let _ = parent_tx.send(AgentEvent::SubSessionCompleted {
125            parent_session_id: job.parent_session_id.clone(),
126            child_session_id: job.child_session_id.clone(),
127            status: "skipped".to_string(),
128            error: Some("No pending message to execute".to_string()),
129        });
130        return Ok(());
131    }
132
133    // Persist a running marker early so list_sessions can reconstruct status.
134    session
135        .metadata
136        .insert("last_run_status".to_string(), "running".to_string());
137    session.metadata.remove("last_run_error");
138    let _ = ctx.agent.storage().save_session(&session).await;
139
140    // Insert runner status.
141    let Some(cancel_token) =
142        try_reserve_runner(&ctx.agent_runners, &job.child_session_id, &child_tx).await
143    else {
144        return Ok(());
145    };
146
147    // Forward ALL child events to parent.
148    let forwarder_done = CancellationToken::new();
149    {
150        let mut rx = child_tx.subscribe();
151        let parent_tx = parent_tx.clone();
152        let job_clone = job.clone();
153        let done = forwarder_done.clone();
154        tokio::spawn(async move {
155            loop {
156                tokio::select! {
157                    _ = done.cancelled() => break,
158                    evt = rx.recv() => {
159                        match evt {
160                            Ok(event) => {
161                                let _ = parent_tx.send(AgentEvent::SubSessionEvent {
162                                    parent_session_id: job_clone.parent_session_id.clone(),
163                                    child_session_id: job_clone.child_session_id.clone(),
164                                    event: Box::new(event),
165                                });
166                            }
167                            Err(broadcast::error::RecvError::Lagged(_)) => {
168                                continue;
169                            }
170                            Err(_) => break,
171                        }
172                    }
173                }
174            }
175        });
176    }
177    {
178        let parent_tx = parent_tx.clone();
179        let job_clone = job.clone();
180        let done = forwarder_done.clone();
181        tokio::spawn(async move {
182            let mut ticker = tokio::time::interval(Duration::from_secs(5));
183            loop {
184                tokio::select! {
185                    _ = done.cancelled() => break,
186                    _ = ticker.tick() => {
187                        let _ = parent_tx.send(AgentEvent::SubSessionHeartbeat {
188                            parent_session_id: job_clone.parent_session_id.clone(),
189                            child_session_id: job_clone.child_session_id.clone(),
190                            timestamp: Utc::now(),
191                        });
192                    }
193                }
194            }
195        });
196    }
197
198    // Create mpsc channel for agent loop → session events sender.
199    let (mpsc_tx, _forwarder_handle) = create_event_forwarder(
200        job.child_session_id.clone(),
201        child_tx.clone(),
202        ctx.agent_runners.clone(),
203    );
204
205    // Run child loop via unified spawn_session_execution.
206    let model = job.model.clone();
207    let session_id_clone = job.child_session_id.clone();
208    let agent_runners_for_status = ctx.agent_runners.clone();
209    let sessions_cache = ctx.sessions_cache.clone();
210    let agent = ctx.agent.clone();
211    let tools = ctx.tools.clone();
212    let done = forwarder_done.clone();
213    let parent_tx_for_done = parent_tx.clone();
214    let parent_id_for_done = job.parent_session_id.clone();
215    let child_id_for_done = job.child_session_id.clone();
216    let session_event_senders = ctx.session_event_senders.clone();
217
218    tokio::spawn(async move {
219        session.model = model.clone();
220
221        let result: crate::runtime::runner::Result<()> = agent
222            .execute(
223                &mut session,
224                ExecuteRequest {
225                    initial_message: String::new(), // handled by agent loop
226                    event_tx: mpsc_tx,
227                    cancel_token,
228                    tools: Some(tools),
229                    provider_override: None,
230                    model: Some(model.clone()),
231                    provider_name: None,
232                    background_model: None,
233                    background_model_provider: None,
234                    reasoning_effort: None,
235                    disabled_tools: None,
236                    disabled_skill_ids: None,
237                    selected_skill_ids: None,
238                    selected_skill_mode: None,
239                    image_fallback: None,
240                },
241            )
242            .await;
243
244        let (status, error) = match &result {
245            Ok(_) => ("completed".to_string(), None),
246            Err(e) if e.to_string().contains("cancelled") => {
247                ("cancelled".to_string(), Some(e.to_string()))
248            }
249            Err(e) => ("error".to_string(), Some(e.to_string())),
250        };
251
252        finalize_runner(&agent_runners_for_status, &session_id_clone, &result).await;
253
254        // Persist final session snapshot.
255        session
256            .metadata
257            .insert("last_run_status".to_string(), status.clone());
258        if let Some(err) = &error {
259            session
260                .metadata
261                .insert("last_run_error".to_string(), err.clone());
262        } else {
263            session.metadata.remove("last_run_error");
264        }
265        let _ = agent.storage().save_session(&session).await;
266        {
267            let mut sessions = sessions_cache.write().await;
268            sessions.insert(session_id_clone.clone(), session);
269        }
270
271        // Stop forwarding/heartbeats and emit terminal child status.
272        done.cancel();
273        let _ = parent_tx_for_done.send(AgentEvent::SubSessionCompleted {
274            parent_session_id: parent_id_for_done,
275            child_session_id: child_id_for_done,
276            status,
277            error,
278        });
279
280        // Allow dead code: session_event_senders keeps the sender alive during the task.
281        drop(session_event_senders);
282    });
283
284    Ok(())
285}