1use std::collections::HashMap;
7use std::sync::Arc;
8
9use async_trait::async_trait;
10use chrono::{Duration as ChronoDuration, Utc};
11use tokio::sync::{broadcast, RwLock};
12use tokio::time::{sleep, Duration, Instant};
13
14use crate::app_state::session_events::get_or_create_event_sender;
15use crate::app_state::{AgentRunner, AgentStatus};
16use crate::session_app::child_session::{
17 ChildRunnerInfo, ChildSessionEntry, ChildSessionError, ChildSessionPort, DeleteChildResult,
18};
19use crate::spawn_scheduler::{SpawnJob, SpawnScheduler};
20use bamboo_agent_core::storage::Storage;
21use bamboo_agent_core::tools::ToolError;
22use bamboo_agent_core::{AgentEvent, Session, SessionKind};
23use bamboo_domain::session::runtime_state::{
24 AgentRuntimeState, ChildWaitPolicy, WaitingForChildrenState,
25};
26use bamboo_infrastructure::{Config, LockedSessionStore, SessionIndexEntry, SessionStoreV2};
27
28pub struct ChildSessionAdapter {
33 pub(crate) session_store: Arc<SessionStoreV2>,
34 pub(crate) storage: Arc<dyn Storage>,
35 pub(crate) persistence: Arc<LockedSessionStore>,
36 pub(crate) scheduler: Arc<SpawnScheduler>,
37 pub(crate) sessions_cache: Arc<RwLock<HashMap<String, Session>>>,
38 pub(crate) agent_runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
39 pub(crate) session_event_senders: Arc<RwLock<HashMap<String, broadcast::Sender<AgentEvent>>>>,
40 pub(crate) subagent_model_resolver: crate::tools::OptionalSubagentModelResolver,
42 pub(crate) config: Arc<RwLock<Config>>,
44 pub(crate) subagent_profiles: Arc<bamboo_domain::subagent::SubagentProfileRegistry>,
47 pub(crate) tool_names: Vec<String>,
50}
51
52const AGENT_RUNTIME_STATE_METADATA_KEY: &str = "agent.runtime.state";
53
54fn read_runtime_state(session: &Session) -> AgentRuntimeState {
55 session
56 .agent_runtime_state
57 .clone()
58 .or_else(|| {
59 session
60 .metadata
61 .get(AGENT_RUNTIME_STATE_METADATA_KEY)
62 .and_then(|raw| serde_json::from_str::<AgentRuntimeState>(raw).ok())
63 })
64 .unwrap_or_else(|| AgentRuntimeState::new(format!("{}-wait", session.id)))
65}
66
67fn write_runtime_state(session: &mut Session, runtime_state: &AgentRuntimeState) {
68 session.agent_runtime_state = Some(runtime_state.clone());
69 if let Ok(serialized) = serde_json::to_string(runtime_state) {
70 session
71 .metadata
72 .insert(AGENT_RUNTIME_STATE_METADATA_KEY.to_string(), serialized);
73 }
74}
75
76impl ChildSessionAdapter {
77 pub async fn resolve_subagent_model(
79 &self,
80 subagent_type: &str,
81 ) -> Option<bamboo_domain::ProviderModelRef> {
82 match &self.subagent_model_resolver {
83 Some(resolver) => resolver(subagent_type.to_string()).await,
84 None => None,
85 }
86 }
87
88 pub async fn resolve_runtime_metadata(&self, subagent_type: &str) -> HashMap<String, String> {
90 let config = self.config.read().await;
91 crate::external_agents::config::resolve_runtime_metadata(&config, subagent_type)
92 }
93
94 pub fn resolve_subagent_prompt(&self, subagent_type: &str) -> String {
100 self.subagent_profiles
101 .resolve(subagent_type)
102 .system_prompt
103 .clone()
104 }
105
106 pub async fn register_parent_wait_for_child(
112 &self,
113 parent_session_id: &str,
114 child_session_id: &str,
115 tool_call_id: Option<&str>,
116 ) -> Result<(), ChildSessionError> {
117 let Some(mut parent) =
118 self.storage
119 .load_session(parent_session_id)
120 .await
121 .map_err(|error| {
122 ChildSessionError::Execution(format!(
123 "failed to load parent session {parent_session_id}: {error}"
124 ))
125 })?
126 else {
127 return Err(ChildSessionError::NotFound(parent_session_id.to_string()));
128 };
129
130 let mut runtime_state = read_runtime_state(&parent);
131 runtime_state
132 .children
133 .active_ids
134 .retain(|id| id != child_session_id);
135 runtime_state
136 .children
137 .completed_ids
138 .retain(|id| id != child_session_id);
139 runtime_state
140 .children
141 .active_ids
142 .push(child_session_id.to_string());
143 runtime_state.children.active_ids.sort();
144 runtime_state.children.active_ids.dedup();
145 runtime_state.children.active_children = runtime_state.children.active_ids.len() as u32;
146 runtime_state.children.completed_children =
147 runtime_state.children.completed_ids.len() as u32;
148
149 let now = Utc::now();
150 let mut wait = runtime_state
151 .waiting_for_children
152 .take()
153 .unwrap_or_else(|| WaitingForChildrenState {
154 child_session_ids: Vec::new(),
155 wait_for: ChildWaitPolicy::All,
156 registered_at: now,
157 timeout_at: Some(now + ChronoDuration::hours(6)),
158 registered_by_tool_call_id: tool_call_id.map(str::to_string),
159 });
160 if !wait
161 .child_session_ids
162 .iter()
163 .any(|id| id == child_session_id)
164 {
165 wait.child_session_ids.push(child_session_id.to_string());
166 }
167 wait.child_session_ids.sort();
168 wait.child_session_ids.dedup();
169 if wait.registered_by_tool_call_id.is_none() {
170 wait.registered_by_tool_call_id = tool_call_id.map(str::to_string);
171 }
172 runtime_state.waiting_for_children = Some(wait);
173
174 write_runtime_state(&mut parent, &runtime_state);
175 parent.metadata.insert(
176 "runtime.suspend_reason".to_string(),
177 "waiting_for_children".to_string(),
178 );
179 parent.updated_at = Utc::now();
180
181 self.persistence
182 .merge_save_runtime(&mut parent)
183 .await
184 .map_err(|error| {
185 ChildSessionError::Execution(format!("failed to save parent wait state: {error}"))
186 })?;
187 self.sessions_cache
188 .write()
189 .await
190 .insert(parent.id.clone(), parent);
191
192 Ok(())
193 }
194}
195
196fn map_index_entry_to_child_entry(entry: &SessionIndexEntry) -> ChildSessionEntry {
197 ChildSessionEntry {
198 child_session_id: entry.id.clone(),
199 title: entry.title.clone(),
200 pinned: entry.pinned,
201 message_count: entry.message_count,
202 updated_at: entry.updated_at.to_rfc3339(),
203 last_run_status: entry.last_run_status.clone(),
204 last_run_error: entry.last_run_error.clone(),
205 }
206}
207
208#[async_trait]
209impl ChildSessionPort for ChildSessionAdapter {
210 async fn load_root_session(&self, root_session_id: &str) -> Result<Session, ChildSessionError> {
211 let Some(session) = self
212 .storage
213 .load_session(root_session_id)
214 .await
215 .map_err(|error| {
216 ChildSessionError::Execution(format!(
217 "failed to load session {root_session_id}: {error}"
218 ))
219 })?
220 else {
221 return Err(ChildSessionError::NotFound(root_session_id.to_string()));
222 };
223
224 if session.kind != SessionKind::Root {
225 return Err(ChildSessionError::NotRootSession(
226 root_session_id.to_string(),
227 ));
228 }
229
230 Ok(session)
231 }
232
233 async fn load_child_for_parent(
234 &self,
235 parent_session_id: &str,
236 child_session_id: &str,
237 ) -> Result<Session, ChildSessionError> {
238 let Some(child) = self
239 .storage
240 .load_session(child_session_id)
241 .await
242 .map_err(|error| {
243 ChildSessionError::Execution(format!(
244 "failed to load child session {child_session_id}: {error}"
245 ))
246 })?
247 else {
248 return Err(ChildSessionError::NotFound(child_session_id.to_string()));
249 };
250
251 if child.kind != SessionKind::Child {
252 return Err(ChildSessionError::NotChildSession(
253 child_session_id.to_string(),
254 ));
255 }
256
257 if child.parent_session_id.as_deref() != Some(parent_session_id) {
258 return Err(ChildSessionError::NotChildOfParent {
259 child_id: child_session_id.to_string(),
260 parent_id: parent_session_id.to_string(),
261 });
262 }
263
264 Ok(child)
265 }
266
267 async fn save_child_session(&self, child: &mut Session) -> Result<(), ChildSessionError> {
268 self.persistence
269 .merge_save_runtime(child)
270 .await
271 .map_err(|error| {
272 ChildSessionError::Execution(format!("failed to save child session: {error}"))
273 })?;
274
275 let mut sessions = self.sessions_cache.write().await;
276 sessions.insert(child.id.clone(), child.clone());
277
278 Ok(())
279 }
280
281 async fn is_child_running(&self, child_session_id: &str) -> bool {
282 let runners = self.agent_runners.read().await;
283 runners
284 .get(child_session_id)
285 .is_some_and(|runner| matches!(runner.status, AgentStatus::Running))
286 }
287
288 async fn list_children(&self, parent_session_id: &str) -> Vec<ChildSessionEntry> {
289 self.session_store
290 .list_index_entries()
291 .await
292 .into_iter()
293 .filter(|entry| {
294 entry.kind == SessionKind::Child
295 && entry.parent_session_id.as_deref() == Some(parent_session_id)
296 })
297 .map(|entry| map_index_entry_to_child_entry(&entry))
298 .collect()
299 }
300
301 async fn enqueue_child_run(
302 &self,
303 parent: &Session,
304 child: &Session,
305 ) -> Result<(), ChildSessionError> {
306 let model = if child.model.trim().is_empty() {
307 parent.model.clone()
308 } else {
309 child.model.clone()
310 };
311 if model.trim().is_empty() {
312 return Err(ChildSessionError::Execution(
313 "child model is empty and parent model is unavailable".to_string(),
314 ));
315 }
316
317 let disabled_tools = child
319 .metadata
320 .get("subagent_type")
321 .map(|s| s.trim())
322 .filter(|s| !s.is_empty())
323 .and_then(|subagent_type| {
324 let profile = self.subagent_profiles.resolve(subagent_type);
325 match &profile.tools {
326 bamboo_domain::subagent::ToolPolicy::Inherit => None,
327 policy => {
328 let names = bamboo_domain::subagent::disabled_tools_for_profile(
329 policy,
330 &self.tool_names,
331 );
332 if names.is_empty() {
333 None
334 } else {
335 Some(names)
336 }
337 }
338 }
339 });
340
341 self.register_parent_wait_for_child(&parent.id, &child.id, None)
342 .await?;
343
344 self.scheduler
345 .enqueue(SpawnJob {
346 parent_session_id: parent.id.clone(),
347 child_session_id: child.id.clone(),
348 model,
349 disabled_tools,
350 })
351 .await
352 .map_err(ChildSessionError::Execution)?;
353
354 let parent_tx = get_or_create_event_sender(&self.session_event_senders, &parent.id).await;
355 let _ = parent_tx.send(AgentEvent::SubAgentStarted {
356 parent_session_id: parent.id.clone(),
357 child_session_id: child.id.clone(),
358 title: Some(child.title.clone()),
359 });
360
361 Ok(())
362 }
363
364 async fn cancel_child_run_and_wait(
365 &self,
366 child_session_id: &str,
367 ) -> Result<(), ChildSessionError> {
368 let cancelled = {
369 let mut runners = self.agent_runners.write().await;
370 if let Some(runner) = runners.get_mut(child_session_id) {
371 if matches!(runner.status, AgentStatus::Running) {
372 runner.cancel_token.cancel();
373 true
374 } else {
375 false
376 }
377 } else {
378 false
379 }
380 };
381
382 if !cancelled {
383 return Ok(());
384 }
385
386 let deadline = Instant::now() + Duration::from_secs(10);
387 loop {
388 let still_running = {
389 let runners = self.agent_runners.read().await;
390 runners
391 .get(child_session_id)
392 .is_some_and(|runner| matches!(runner.status, AgentStatus::Running))
393 };
394 if !still_running {
395 return Ok(());
396 }
397 if Instant::now() >= deadline {
398 return Err(ChildSessionError::Execution(format!(
399 "timed out waiting for child session {child_session_id} to stop after cancellation"
400 )));
401 }
402 sleep(Duration::from_millis(50)).await;
403 }
404 }
405
406 async fn delete_child_session(
407 &self,
408 parent_session_id: &str,
409 child_id: &str,
410 ) -> Result<DeleteChildResult, ChildSessionError> {
411 let cancelled_running_child = {
412 let mut runners = self.agent_runners.write().await;
413 if let Some(runner) = runners.remove(child_id) {
414 runner.cancel_token.cancel();
415 true
416 } else {
417 false
418 }
419 };
420
421 let deleted = self
422 .storage
423 .delete_session(child_id)
424 .await
425 .map_err(|error| {
426 ChildSessionError::Execution(format!("failed to delete child session: {error}"))
427 })?;
428
429 {
430 let mut sessions = self.sessions_cache.write().await;
431 sessions.remove(child_id);
432 }
433 {
434 let mut senders = self.session_event_senders.write().await;
435 senders.remove(child_id);
436 if cancelled_running_child {
437 if let Some(parent_tx) = senders.get(parent_session_id) {
438 let _ = parent_tx.send(AgentEvent::SubAgentCompleted {
439 parent_session_id: parent_session_id.to_string(),
440 child_session_id: child_id.to_string(),
441 status: "cancelled".to_string(),
442 error: Some("Child session deleted while running".to_string()),
443 });
444 }
445 }
446 }
447
448 Ok(DeleteChildResult {
449 deleted,
450 cancelled_running_child,
451 })
452 }
453
454 async fn get_child_runner_info(&self, child_id: &str) -> Option<ChildRunnerInfo> {
455 let runners = self.agent_runners.read().await;
456 runners.get(child_id).map(|runner| ChildRunnerInfo {
457 started_at: Some(runner.started_at),
458 completed_at: runner.completed_at,
459 last_tool_name: runner.last_tool_name.clone(),
460 last_tool_phase: runner.last_tool_phase.clone(),
461 last_event_at: runner.last_event_at,
462 round_count: runner.round_count,
463 })
464 }
465}
466
467pub fn tool_error_from_child_session(error: ChildSessionError) -> ToolError {
469 match error {
470 ChildSessionError::NotFound(id) => ToolError::Execution(format!("session not found: {id}")),
471 ChildSessionError::NotRootSession(id) => {
472 ToolError::Execution(format!("session is not a root session: {id}"))
473 }
474 ChildSessionError::InvalidArguments(msg) => ToolError::InvalidArguments(msg),
475 ChildSessionError::Execution(msg) => ToolError::Execution(msg),
476 other => ToolError::Execution(other.to_string()),
477 }
478}