1use std::sync::Arc;
11use std::time::Duration;
12
13use chrono::Utc;
14use tokio::sync::broadcast;
15use tokio::sync::RwLock;
16use tokio_util::sync::CancellationToken;
17
18use bamboo_agent_core::{AgentEvent, Role, SessionKind};
19
20use crate::runtime::execution::event_forwarder::create_event_forwarder;
21use crate::runtime::execution::runner_lifecycle::{
22 finalize_runner, try_reserve_runner, RunnerReservation,
23};
24use crate::runtime::execution::session_events::get_or_create_event_sender;
25use crate::runtime::execution::spawn::{
26 publish_child_completion_parts, watch_child_liveness, watchdog_policy_for_session,
27 SpawnContext, SpawnJob,
28};
29
30pub async fn run_child_spawn(ctx: SpawnContext, job: SpawnJob) -> Result<(), String> {
45 let parent_tx =
47 get_or_create_event_sender(&ctx.session_event_senders, &job.parent_session_id).await;
48 let child_tx =
49 get_or_create_event_sender(&ctx.session_event_senders, &job.child_session_id).await;
50
51 let mut session = match ctx
53 .agent
54 .storage()
55 .load_session(&job.child_session_id)
56 .await
57 {
58 Ok(Some(s)) => s,
59 Ok(None) => {
60 let error = "child session not found".to_string();
61 publish_child_completion_parts(
62 &parent_tx,
63 ctx.completion_handler.clone(),
64 job.parent_session_id.clone(),
65 job.child_session_id.clone(),
66 "error".to_string(),
67 Some(error.clone()),
68 )
69 .await;
70 return Err(error);
71 }
72 Err(e) => {
73 let error = format!("failed to load child session: {e}");
74 publish_child_completion_parts(
75 &parent_tx,
76 ctx.completion_handler.clone(),
77 job.parent_session_id.clone(),
78 job.child_session_id.clone(),
79 "error".to_string(),
80 Some(error.clone()),
81 )
82 .await;
83 return Err(error);
84 }
85 };
86
87 if let Some(ref ws) = session.workspace {
90 bamboo_agent_core::workspace_state::set_workspace(
91 &session.id,
92 std::path::PathBuf::from(ws),
93 );
94 }
95
96 if session.kind != SessionKind::Child {
97 let error = "spawn job child session is not kind=child".to_string();
98 publish_child_completion_parts(
99 &parent_tx,
100 ctx.completion_handler.clone(),
101 job.parent_session_id.clone(),
102 job.child_session_id.clone(),
103 "error".to_string(),
104 Some(error.clone()),
105 )
106 .await;
107 return Err(error);
108 }
109
110 let last_is_user = session
112 .messages
113 .last()
114 .map(|m| matches!(m.role, Role::User))
115 .unwrap_or(false);
116 if !last_is_user {
117 session.set_last_run_status("skipped");
118 session.set_last_run_error("No pending message to execute");
119 let _ = ctx
120 .agent
121 .persistence()
122 .save_runtime_session(&mut session)
123 .await;
124 ctx.sessions_cache.insert(
125 job.child_session_id.clone(),
126 Arc::new(parking_lot::RwLock::new(session)),
127 );
128 publish_child_completion_parts(
129 &parent_tx,
130 ctx.completion_handler.clone(),
131 job.parent_session_id.clone(),
132 job.child_session_id.clone(),
133 "skipped".to_string(),
134 Some("No pending message to execute".to_string()),
135 )
136 .await;
137 return Ok(());
138 }
139
140 session.set_last_run_status("running");
142 session.clear_last_run_error();
143 let _ = ctx
144 .agent
145 .persistence()
146 .save_runtime_session(&mut session)
147 .await;
148
149 let Some(RunnerReservation { cancel_token, .. }) =
151 try_reserve_runner(&ctx.agent_runners, &job.child_session_id, &child_tx).await
152 else {
153 return Ok(());
154 };
155
156 let forwarder_done = CancellationToken::new();
158 {
159 let mut rx = child_tx.subscribe();
160 let parent_tx = parent_tx.clone();
161 let job_clone = job.clone();
162 let done = forwarder_done.clone();
163 tokio::spawn(async move {
164 loop {
165 tokio::select! {
166 _ = done.cancelled() => break,
167 evt = rx.recv() => {
168 match evt {
169 Ok(event) => {
170 let _ = parent_tx.send(AgentEvent::SubAgentEvent {
171 parent_session_id: job_clone.parent_session_id.clone(),
172 child_session_id: job_clone.child_session_id.clone(),
173 event: Box::new(event),
174 });
175 }
176 Err(broadcast::error::RecvError::Lagged(_)) => {
177 continue;
178 }
179 Err(_) => break,
180 }
181 }
182 }
183 }
184 });
185 }
186 {
187 let parent_tx = parent_tx.clone();
188 let job_clone = job.clone();
189 let done = forwarder_done.clone();
190 tokio::spawn(async move {
191 let mut ticker = tokio::time::interval(Duration::from_secs(5));
192 loop {
193 tokio::select! {
194 _ = done.cancelled() => break,
195 _ = ticker.tick() => {
196 let _ = parent_tx.send(AgentEvent::SubAgentHeartbeat {
197 parent_session_id: job_clone.parent_session_id.clone(),
198 child_session_id: job_clone.child_session_id.clone(),
199 timestamp: Utc::now(),
200 });
201 }
202 }
203 }
204 });
205 }
206
207 let (mpsc_tx, _forwarder_handle) = create_event_forwarder(
209 job.child_session_id.clone(),
210 child_tx.clone(),
211 ctx.agent_runners.clone(),
212 ctx.account_feed_inbox.clone(),
213 );
214
215 let timeout_reason = Arc::new(RwLock::new(None::<String>));
218 let watchdog_policy = watchdog_policy_for_session(&session);
219 tokio::spawn(watch_child_liveness(
220 job.parent_session_id.clone(),
221 job.child_session_id.clone(),
222 ctx.agent_runners.clone(),
223 cancel_token.clone(),
224 timeout_reason.clone(),
225 forwarder_done.clone(),
226 watchdog_policy,
227 ));
228
229 let model = job.model.clone();
231 let session_id_clone = job.child_session_id.clone();
232 let agent_runners_for_status = ctx.agent_runners.clone();
233 let sessions_cache = ctx.sessions_cache.clone();
234 let agent = ctx.agent.clone();
235 let external_runner = ctx.external_child_runner.clone();
236 let done = forwarder_done.clone();
237 let parent_tx_for_done = parent_tx.clone();
238 let parent_id_for_done = job.parent_session_id.clone();
239 let child_id_for_done = job.child_session_id.clone();
240 let session_event_senders = ctx.session_event_senders.clone();
241 let completion_handler = ctx.completion_handler.clone();
242
243 tokio::spawn(async move {
244 crate::session_app::execution_prep::prepare_session_for_execution(
248 &mut session,
249 None,
250 Some(&model),
251 );
252
253 let result: crate::runtime::runner::Result<()> =
267 if external_runner.should_handle(&session).await {
268 external_runner
269 .execute_external_child(&mut session, &job, mpsc_tx, cancel_token.clone())
270 .await
271 } else {
272 Err(bamboo_agent_core::AgentError::LLM(format!(
273 "No child runner matched session runtime metadata: agent_id={:?}, protocol={:?}",
274 session.metadata.get("external.agent_id"),
275 session.metadata.get("external.protocol"),
276 )))
277 };
278
279 let timeout_error = timeout_reason.read().await.clone();
280 let suspended_non_terminal = session
291 .metadata
292 .get("runtime.suspend_reason")
293 .map(|reason| {
294 matches!(
295 reason.as_str(),
296 "awaiting_parent_approval" | "waiting_for_bash"
297 )
298 })
299 .unwrap_or(false);
300 let (status, error) = if let Some(reason) = timeout_error {
301 ("timeout".to_string(), Some(reason))
302 } else if suspended_non_terminal && result.is_ok() {
303 ("suspended".to_string(), None)
304 } else {
305 match &result {
306 Ok(_) => ("completed".to_string(), None),
307 Err(e @ bamboo_agent_core::AgentError::Cancelled) => {
308 ("cancelled".to_string(), Some(e.to_string()))
309 }
310 Err(e) => ("error".to_string(), Some(e.to_string())),
311 }
312 };
313
314 finalize_runner(&agent_runners_for_status, &session_id_clone, &result).await;
315
316 crate::runtime::runner::state_bridge::merge_pending_injected_messages(
319 &mut session,
320 Some(agent.storage()),
321 Some(agent.persistence()),
322 )
323 .await;
324
325 session.set_last_run_status(status.clone());
327 if let Some(err) = &error {
328 session.set_last_run_error(err.clone());
329 } else {
330 session.clear_last_run_error();
331 }
332 let _ = agent.persistence().save_runtime_session(&mut session).await;
333 sessions_cache.insert(
334 session_id_clone.clone(),
335 Arc::new(parking_lot::RwLock::new(session)),
336 );
337
338 done.cancel();
341 publish_child_completion_parts(
342 &parent_tx_for_done,
343 completion_handler,
344 parent_id_for_done,
345 child_id_for_done,
346 status,
347 error,
348 )
349 .await;
350
351 drop(session_event_senders);
353 });
354
355 Ok(())
356}