bamboo_engine/runtime/execution/
spawn.rs1use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::Duration;
10
11use chrono::Utc;
12use tokio::sync::{broadcast, mpsc, RwLock};
13use tokio_util::sync::CancellationToken;
14
15use bamboo_agent_core::tools::ToolExecutor;
16use bamboo_agent_core::{AgentEvent, Role, Session, SessionKind};
17
18use crate::runtime::Agent;
19use crate::runtime::ExecuteRequest;
20
21use super::event_forwarder::create_event_forwarder;
22use super::runner_lifecycle::{finalize_runner, try_reserve_runner};
23use super::runner_state::AgentRunner;
24use super::session_events::get_or_create_event_sender;
25
26#[derive(Debug, Clone)]
27pub struct SpawnJob {
28 pub parent_session_id: String,
29 pub child_session_id: String,
30 pub model: String,
31}
32
33#[derive(Clone)]
34pub struct SpawnContext {
35 pub agent: Arc<Agent>,
36 pub tools: Arc<dyn ToolExecutor>,
37 pub sessions_cache: Arc<RwLock<HashMap<String, Session>>>,
38 pub agent_runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
39 pub session_event_senders: Arc<RwLock<HashMap<String, broadcast::Sender<AgentEvent>>>>,
40}
41
42#[derive(Clone)]
43pub struct SpawnScheduler {
44 tx: mpsc::Sender<SpawnJob>,
45}
46
47impl SpawnScheduler {
48 pub fn new(ctx: SpawnContext) -> Self {
49 let (tx, mut rx) = mpsc::channel::<SpawnJob>(128);
50
51 tokio::spawn(async move {
52 while let Some(job) = rx.recv().await {
53 if let Err(err) = run_spawn_job(ctx.clone(), job).await {
54 tracing::warn!("spawn job failed: {}", err);
55 }
56 }
57 });
58
59 Self { tx }
60 }
61
62 pub async fn enqueue(&self, job: SpawnJob) -> Result<(), String> {
63 self.tx
64 .send(job)
65 .await
66 .map_err(|_| "spawn scheduler is not running".to_string())
67 }
68}
69
70async fn run_spawn_job(ctx: SpawnContext, job: SpawnJob) -> Result<(), String> {
71 let parent_tx =
73 get_or_create_event_sender(&ctx.session_event_senders, &job.parent_session_id).await;
74 let child_tx =
75 get_or_create_event_sender(&ctx.session_event_senders, &job.child_session_id).await;
76
77 let emit_error_completion = |error: String| {
78 let _ = parent_tx.send(AgentEvent::SubSessionCompleted {
79 parent_session_id: job.parent_session_id.clone(),
80 child_session_id: job.child_session_id.clone(),
81 status: "error".to_string(),
82 error: Some(error.clone()),
83 });
84 error
85 };
86
87 let mut session = match ctx
89 .agent
90 .storage()
91 .load_session(&job.child_session_id)
92 .await
93 {
94 Ok(Some(s)) => s,
95 Ok(None) => return Err(emit_error_completion("child session not found".to_string())),
96 Err(e) => {
97 return Err(emit_error_completion(format!(
98 "failed to load child session: {e}"
99 )))
100 }
101 };
102
103 if session.kind != SessionKind::Child {
104 return Err(emit_error_completion(
105 "spawn job child session is not kind=child".to_string(),
106 ));
107 }
108
109 let last_is_user = session
111 .messages
112 .last()
113 .map(|m| matches!(m.role, Role::User))
114 .unwrap_or(false);
115 if !last_is_user {
116 session
117 .metadata
118 .insert("last_run_status".to_string(), "skipped".to_string());
119 session.metadata.insert(
120 "last_run_error".to_string(),
121 "No pending message to execute".to_string(),
122 );
123 let _ = ctx.agent.storage().save_session(&session).await;
124 let _ = parent_tx.send(AgentEvent::SubSessionCompleted {
125 parent_session_id: job.parent_session_id.clone(),
126 child_session_id: job.child_session_id.clone(),
127 status: "skipped".to_string(),
128 error: Some("No pending message to execute".to_string()),
129 });
130 return Ok(());
131 }
132
133 session
135 .metadata
136 .insert("last_run_status".to_string(), "running".to_string());
137 session.metadata.remove("last_run_error");
138 let _ = ctx.agent.storage().save_session(&session).await;
139
140 let Some(cancel_token) =
142 try_reserve_runner(&ctx.agent_runners, &job.child_session_id, &child_tx).await
143 else {
144 return Ok(());
145 };
146
147 let forwarder_done = CancellationToken::new();
149 {
150 let mut rx = child_tx.subscribe();
151 let parent_tx = parent_tx.clone();
152 let job_clone = job.clone();
153 let done = forwarder_done.clone();
154 tokio::spawn(async move {
155 loop {
156 tokio::select! {
157 _ = done.cancelled() => break,
158 evt = rx.recv() => {
159 match evt {
160 Ok(event) => {
161 let _ = parent_tx.send(AgentEvent::SubSessionEvent {
162 parent_session_id: job_clone.parent_session_id.clone(),
163 child_session_id: job_clone.child_session_id.clone(),
164 event: Box::new(event),
165 });
166 }
167 Err(broadcast::error::RecvError::Lagged(_)) => {
168 continue;
169 }
170 Err(_) => break,
171 }
172 }
173 }
174 }
175 });
176 }
177 {
178 let parent_tx = parent_tx.clone();
179 let job_clone = job.clone();
180 let done = forwarder_done.clone();
181 tokio::spawn(async move {
182 let mut ticker = tokio::time::interval(Duration::from_secs(5));
183 loop {
184 tokio::select! {
185 _ = done.cancelled() => break,
186 _ = ticker.tick() => {
187 let _ = parent_tx.send(AgentEvent::SubSessionHeartbeat {
188 parent_session_id: job_clone.parent_session_id.clone(),
189 child_session_id: job_clone.child_session_id.clone(),
190 timestamp: Utc::now(),
191 });
192 }
193 }
194 }
195 });
196 }
197
198 let (mpsc_tx, _forwarder_handle) = create_event_forwarder(
200 job.child_session_id.clone(),
201 child_tx.clone(),
202 ctx.agent_runners.clone(),
203 );
204
205 let model = job.model.clone();
207 let session_id_clone = job.child_session_id.clone();
208 let agent_runners_for_status = ctx.agent_runners.clone();
209 let sessions_cache = ctx.sessions_cache.clone();
210 let agent = ctx.agent.clone();
211 let tools = ctx.tools.clone();
212 let done = forwarder_done.clone();
213 let parent_tx_for_done = parent_tx.clone();
214 let parent_id_for_done = job.parent_session_id.clone();
215 let child_id_for_done = job.child_session_id.clone();
216 let session_event_senders = ctx.session_event_senders.clone();
217
218 tokio::spawn(async move {
219 session.model = model.clone();
220
221 let result: crate::runtime::runner::Result<()> = agent
222 .execute(
223 &mut session,
224 ExecuteRequest {
225 initial_message: String::new(), event_tx: mpsc_tx,
227 cancel_token,
228 tools: Some(tools),
229 provider_override: None,
230 model: Some(model.clone()),
231 provider_name: None,
232 background_model: None,
233 background_model_provider: None,
234 reasoning_effort: None,
235 disabled_tools: None,
236 disabled_skill_ids: None,
237 selected_skill_ids: None,
238 selected_skill_mode: None,
239 image_fallback: None,
240 },
241 )
242 .await;
243
244 let (status, error) = match &result {
245 Ok(_) => ("completed".to_string(), None),
246 Err(e) if e.to_string().contains("cancelled") => {
247 ("cancelled".to_string(), Some(e.to_string()))
248 }
249 Err(e) => ("error".to_string(), Some(e.to_string())),
250 };
251
252 finalize_runner(&agent_runners_for_status, &session_id_clone, &result).await;
253
254 session
256 .metadata
257 .insert("last_run_status".to_string(), status.clone());
258 if let Some(err) = &error {
259 session
260 .metadata
261 .insert("last_run_error".to_string(), err.clone());
262 } else {
263 session.metadata.remove("last_run_error");
264 }
265 let _ = agent.storage().save_session(&session).await;
266 {
267 let mut sessions = sessions_cache.write().await;
268 sessions.insert(session_id_clone.clone(), session);
269 }
270
271 done.cancel();
273 let _ = parent_tx_for_done.send(AgentEvent::SubSessionCompleted {
274 parent_session_id: parent_id_for_done,
275 child_session_id: child_id_for_done,
276 status,
277 error,
278 });
279
280 drop(session_event_senders);
282 });
283
284 Ok(())
285}