use std::sync::Arc;
use std::time::Duration;
use chrono::Utc;
use tokio::sync::broadcast;
use tokio::sync::RwLock;
use tokio_util::sync::CancellationToken;
use bamboo_agent_core::{AgentEvent, Role, SessionKind};
use crate::runtime::execution::event_forwarder::create_event_forwarder;
use crate::runtime::execution::runner_lifecycle::{
finalize_runner, try_reserve_runner, RunnerReservation,
};
use crate::runtime::execution::session_events::get_or_create_event_sender;
use crate::runtime::execution::spawn::{
publish_child_completion_parts, watch_child_liveness, watchdog_policy_for_session,
SpawnContext, SpawnJob,
};
pub async fn run_child_spawn(ctx: SpawnContext, job: SpawnJob) -> Result<(), String> {
let parent_tx =
get_or_create_event_sender(&ctx.session_event_senders, &job.parent_session_id).await;
let child_tx =
get_or_create_event_sender(&ctx.session_event_senders, &job.child_session_id).await;
let mut session = match ctx
.agent
.storage()
.load_session(&job.child_session_id)
.await
{
Ok(Some(s)) => s,
Ok(None) => {
let error = "child session not found".to_string();
publish_child_completion_parts(
&parent_tx,
ctx.completion_handler.clone(),
job.parent_session_id.clone(),
job.child_session_id.clone(),
"error".to_string(),
Some(error.clone()),
)
.await;
return Err(error);
}
Err(e) => {
let error = format!("failed to load child session: {e}");
publish_child_completion_parts(
&parent_tx,
ctx.completion_handler.clone(),
job.parent_session_id.clone(),
job.child_session_id.clone(),
"error".to_string(),
Some(error.clone()),
)
.await;
return Err(error);
}
};
if let Some(ref ws) = session.workspace {
bamboo_agent_core::workspace_state::set_workspace(
&session.id,
std::path::PathBuf::from(ws),
);
}
if session.kind != SessionKind::Child {
let error = "spawn job child session is not kind=child".to_string();
publish_child_completion_parts(
&parent_tx,
ctx.completion_handler.clone(),
job.parent_session_id.clone(),
job.child_session_id.clone(),
"error".to_string(),
Some(error.clone()),
)
.await;
return Err(error);
}
let last_is_user = session
.messages
.last()
.map(|m| matches!(m.role, Role::User))
.unwrap_or(false);
if !last_is_user {
session.set_last_run_status("skipped");
session.set_last_run_error("No pending message to execute");
let _ = ctx
.agent
.persistence()
.save_runtime_session(&mut session)
.await;
ctx.sessions_cache.insert(
job.child_session_id.clone(),
Arc::new(parking_lot::RwLock::new(session)),
);
publish_child_completion_parts(
&parent_tx,
ctx.completion_handler.clone(),
job.parent_session_id.clone(),
job.child_session_id.clone(),
"skipped".to_string(),
Some("No pending message to execute".to_string()),
)
.await;
return Ok(());
}
session.set_last_run_status("running");
session.clear_last_run_error();
let _ = ctx
.agent
.persistence()
.save_runtime_session(&mut session)
.await;
let Some(RunnerReservation { cancel_token, .. }) =
try_reserve_runner(&ctx.agent_runners, &job.child_session_id, &child_tx).await
else {
return Ok(());
};
let forwarder_done = CancellationToken::new();
{
let mut rx = child_tx.subscribe();
let parent_tx = parent_tx.clone();
let job_clone = job.clone();
let done = forwarder_done.clone();
tokio::spawn(async move {
loop {
tokio::select! {
_ = done.cancelled() => break,
evt = rx.recv() => {
match evt {
Ok(event) => {
let _ = parent_tx.send(AgentEvent::SubAgentEvent {
parent_session_id: job_clone.parent_session_id.clone(),
child_session_id: job_clone.child_session_id.clone(),
event: Box::new(event),
});
}
Err(broadcast::error::RecvError::Lagged(_)) => {
continue;
}
Err(_) => break,
}
}
}
}
});
}
{
let parent_tx = parent_tx.clone();
let job_clone = job.clone();
let done = forwarder_done.clone();
tokio::spawn(async move {
let mut ticker = tokio::time::interval(Duration::from_secs(5));
loop {
tokio::select! {
_ = done.cancelled() => break,
_ = ticker.tick() => {
let _ = parent_tx.send(AgentEvent::SubAgentHeartbeat {
parent_session_id: job_clone.parent_session_id.clone(),
child_session_id: job_clone.child_session_id.clone(),
timestamp: Utc::now(),
});
}
}
}
});
}
let (mpsc_tx, _forwarder_handle) = create_event_forwarder(
job.child_session_id.clone(),
child_tx.clone(),
ctx.agent_runners.clone(),
ctx.account_feed_inbox.clone(),
);
let timeout_reason = Arc::new(RwLock::new(None::<String>));
let watchdog_policy = watchdog_policy_for_session(&session);
tokio::spawn(watch_child_liveness(
job.parent_session_id.clone(),
job.child_session_id.clone(),
ctx.agent_runners.clone(),
cancel_token.clone(),
timeout_reason.clone(),
forwarder_done.clone(),
watchdog_policy,
));
let model = job.model.clone();
let session_id_clone = job.child_session_id.clone();
let agent_runners_for_status = ctx.agent_runners.clone();
let sessions_cache = ctx.sessions_cache.clone();
let agent = ctx.agent.clone();
let external_runner = ctx.external_child_runner.clone();
let done = forwarder_done.clone();
let parent_tx_for_done = parent_tx.clone();
let parent_id_for_done = job.parent_session_id.clone();
let child_id_for_done = job.child_session_id.clone();
let session_event_senders = ctx.session_event_senders.clone();
let completion_handler = ctx.completion_handler.clone();
tokio::spawn(async move {
crate::session_app::execution_prep::prepare_session_for_execution(
&mut session,
None,
Some(&model),
);
let result: crate::runtime::runner::Result<()> =
if external_runner.should_handle(&session).await {
external_runner
.execute_external_child(&mut session, &job, mpsc_tx, cancel_token.clone())
.await
} else {
Err(bamboo_agent_core::AgentError::LLM(format!(
"No child runner matched session runtime metadata: agent_id={:?}, protocol={:?}",
session.metadata.get("external.agent_id"),
session.metadata.get("external.protocol"),
)))
};
let timeout_error = timeout_reason.read().await.clone();
let suspended_for_parent_approval = session
.metadata
.get("runtime.suspend_reason")
.map(|reason| reason == "awaiting_parent_approval")
.unwrap_or(false);
let (status, error) = if let Some(reason) = timeout_error {
("timeout".to_string(), Some(reason))
} else if suspended_for_parent_approval && result.is_ok() {
("suspended".to_string(), None)
} else {
match &result {
Ok(_) => ("completed".to_string(), None),
Err(e @ bamboo_agent_core::AgentError::Cancelled) => {
("cancelled".to_string(), Some(e.to_string()))
}
Err(e) => ("error".to_string(), Some(e.to_string())),
}
};
finalize_runner(&agent_runners_for_status, &session_id_clone, &result).await;
crate::runtime::runner::state_bridge::merge_pending_injected_messages(
&mut session,
Some(agent.storage()),
Some(agent.persistence()),
)
.await;
session.set_last_run_status(status.clone());
if let Some(err) = &error {
session.set_last_run_error(err.clone());
} else {
session.clear_last_run_error();
}
let _ = agent.persistence().save_runtime_session(&mut session).await;
sessions_cache.insert(
session_id_clone.clone(),
Arc::new(parking_lot::RwLock::new(session)),
);
done.cancel();
publish_child_completion_parts(
&parent_tx_for_done,
completion_handler,
parent_id_for_done,
child_id_for_done,
status,
error,
)
.await;
drop(session_event_senders);
});
Ok(())
}