1use std::collections::HashMap;
7use std::sync::Arc;
8
9use async_trait::async_trait;
10use tokio::sync::{broadcast, RwLock};
11use tokio::time::{sleep, Duration, Instant};
12
13use crate::app_state::session_events::get_or_create_event_sender;
14use crate::app_state::{AgentRunner, AgentStatus};
15use crate::session_app::child_session::{
16 ChildRunnerInfo, ChildSessionEntry, ChildSessionError, ChildSessionPort, DeleteChildResult,
17};
18use crate::spawn_scheduler::{SpawnJob, SpawnScheduler};
19use bamboo_agent_core::storage::Storage;
20use bamboo_agent_core::tools::ToolError;
21use bamboo_agent_core::{AgentEvent, Session, SessionKind};
22use bamboo_infrastructure::{Config, SessionIndexEntry, SessionStoreV2};
23
24pub struct ChildSessionAdapter {
29 pub(crate) session_store: Arc<SessionStoreV2>,
30 pub(crate) storage: Arc<dyn Storage>,
31 pub(crate) scheduler: Arc<SpawnScheduler>,
32 pub(crate) sessions_cache: Arc<RwLock<HashMap<String, Session>>>,
33 pub(crate) agent_runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
34 pub(crate) session_event_senders: Arc<RwLock<HashMap<String, broadcast::Sender<AgentEvent>>>>,
35 pub(crate) subagent_model_resolver: crate::tools::OptionalSubagentModelResolver,
37 pub(crate) config: Arc<RwLock<Config>>,
39 pub(crate) subagent_profiles: Arc<bamboo_domain::subagent::SubagentProfileRegistry>,
42}
43
44impl ChildSessionAdapter {
45 pub async fn resolve_subagent_model(
47 &self,
48 subagent_type: &str,
49 ) -> Option<bamboo_domain::ProviderModelRef> {
50 match &self.subagent_model_resolver {
51 Some(resolver) => resolver(subagent_type.to_string()).await,
52 None => None,
53 }
54 }
55
56 pub async fn resolve_runtime_metadata(&self, subagent_type: &str) -> HashMap<String, String> {
58 let config = self.config.read().await;
59 crate::external_agents::config::resolve_runtime_metadata(&config, subagent_type)
60 }
61
62 pub fn resolve_subagent_prompt(&self, subagent_type: &str) -> String {
68 self.subagent_profiles
69 .resolve(subagent_type)
70 .system_prompt
71 .clone()
72 }
73}
74
75fn map_index_entry_to_child_entry(entry: &SessionIndexEntry) -> ChildSessionEntry {
76 ChildSessionEntry {
77 child_session_id: entry.id.clone(),
78 title: entry.title.clone(),
79 pinned: entry.pinned,
80 message_count: entry.message_count,
81 updated_at: entry.updated_at.to_rfc3339(),
82 last_run_status: entry.last_run_status.clone(),
83 last_run_error: entry.last_run_error.clone(),
84 }
85}
86
87#[async_trait]
88impl ChildSessionPort for ChildSessionAdapter {
89 async fn load_root_session(&self, root_session_id: &str) -> Result<Session, ChildSessionError> {
90 let Some(session) = self
91 .storage
92 .load_session(root_session_id)
93 .await
94 .map_err(|error| {
95 ChildSessionError::Execution(format!(
96 "failed to load session {root_session_id}: {error}"
97 ))
98 })?
99 else {
100 return Err(ChildSessionError::NotFound(root_session_id.to_string()));
101 };
102
103 if session.kind != SessionKind::Root {
104 return Err(ChildSessionError::NotRootSession(
105 root_session_id.to_string(),
106 ));
107 }
108
109 Ok(session)
110 }
111
112 async fn load_child_for_parent(
113 &self,
114 parent_session_id: &str,
115 child_session_id: &str,
116 ) -> Result<Session, ChildSessionError> {
117 let Some(child) = self
118 .storage
119 .load_session(child_session_id)
120 .await
121 .map_err(|error| {
122 ChildSessionError::Execution(format!(
123 "failed to load child session {child_session_id}: {error}"
124 ))
125 })?
126 else {
127 return Err(ChildSessionError::NotFound(child_session_id.to_string()));
128 };
129
130 if child.kind != SessionKind::Child {
131 return Err(ChildSessionError::NotChildSession(
132 child_session_id.to_string(),
133 ));
134 }
135
136 if child.parent_session_id.as_deref() != Some(parent_session_id) {
137 return Err(ChildSessionError::NotChildOfParent {
138 child_id: child_session_id.to_string(),
139 parent_id: parent_session_id.to_string(),
140 });
141 }
142
143 Ok(child)
144 }
145
146 async fn save_child_session(&self, child: &Session) -> Result<(), ChildSessionError> {
147 self.storage.save_session(child).await.map_err(|error| {
148 ChildSessionError::Execution(format!("failed to save child session: {error}"))
149 })?;
150
151 let mut sessions = self.sessions_cache.write().await;
152 sessions.insert(child.id.clone(), child.clone());
153
154 Ok(())
155 }
156
157 async fn is_child_running(&self, child_session_id: &str) -> bool {
158 let runners = self.agent_runners.read().await;
159 runners
160 .get(child_session_id)
161 .is_some_and(|runner| matches!(runner.status, AgentStatus::Running))
162 }
163
164 async fn list_children(&self, parent_session_id: &str) -> Vec<ChildSessionEntry> {
165 self.session_store
166 .list_index_entries()
167 .await
168 .into_iter()
169 .filter(|entry| {
170 entry.kind == SessionKind::Child
171 && entry.parent_session_id.as_deref() == Some(parent_session_id)
172 })
173 .map(|entry| map_index_entry_to_child_entry(&entry))
174 .collect()
175 }
176
177 async fn enqueue_child_run(
178 &self,
179 parent: &Session,
180 child: &Session,
181 ) -> Result<(), ChildSessionError> {
182 let model = if child.model.trim().is_empty() {
183 parent.model.clone()
184 } else {
185 child.model.clone()
186 };
187 if model.trim().is_empty() {
188 return Err(ChildSessionError::Execution(
189 "child model is empty and parent model is unavailable".to_string(),
190 ));
191 }
192
193 self.scheduler
194 .enqueue(SpawnJob {
195 parent_session_id: parent.id.clone(),
196 child_session_id: child.id.clone(),
197 model,
198 })
199 .await
200 .map_err(ChildSessionError::Execution)?;
201
202 let parent_tx = get_or_create_event_sender(&self.session_event_senders, &parent.id).await;
203 let _ = parent_tx.send(AgentEvent::SubSessionStarted {
204 parent_session_id: parent.id.clone(),
205 child_session_id: child.id.clone(),
206 title: Some(child.title.clone()),
207 });
208
209 Ok(())
210 }
211
212 async fn cancel_child_run_and_wait(
213 &self,
214 child_session_id: &str,
215 ) -> Result<(), ChildSessionError> {
216 let cancelled = {
217 let mut runners = self.agent_runners.write().await;
218 if let Some(runner) = runners.get_mut(child_session_id) {
219 if matches!(runner.status, AgentStatus::Running) {
220 runner.cancel_token.cancel();
221 true
222 } else {
223 false
224 }
225 } else {
226 false
227 }
228 };
229
230 if !cancelled {
231 return Ok(());
232 }
233
234 let deadline = Instant::now() + Duration::from_secs(10);
235 loop {
236 let still_running = {
237 let runners = self.agent_runners.read().await;
238 runners
239 .get(child_session_id)
240 .is_some_and(|runner| matches!(runner.status, AgentStatus::Running))
241 };
242 if !still_running {
243 return Ok(());
244 }
245 if Instant::now() >= deadline {
246 return Err(ChildSessionError::Execution(format!(
247 "timed out waiting for child session {child_session_id} to stop after cancellation"
248 )));
249 }
250 sleep(Duration::from_millis(50)).await;
251 }
252 }
253
254 async fn delete_child_session(
255 &self,
256 parent_session_id: &str,
257 child_id: &str,
258 ) -> Result<DeleteChildResult, ChildSessionError> {
259 let cancelled_running_child = {
260 let mut runners = self.agent_runners.write().await;
261 if let Some(runner) = runners.remove(child_id) {
262 runner.cancel_token.cancel();
263 true
264 } else {
265 false
266 }
267 };
268
269 let deleted = self
270 .storage
271 .delete_session(child_id)
272 .await
273 .map_err(|error| {
274 ChildSessionError::Execution(format!("failed to delete child session: {error}"))
275 })?;
276
277 {
278 let mut sessions = self.sessions_cache.write().await;
279 sessions.remove(child_id);
280 }
281 {
282 let mut senders = self.session_event_senders.write().await;
283 senders.remove(child_id);
284 if cancelled_running_child {
285 if let Some(parent_tx) = senders.get(parent_session_id) {
286 let _ = parent_tx.send(AgentEvent::SubSessionCompleted {
287 parent_session_id: parent_session_id.to_string(),
288 child_session_id: child_id.to_string(),
289 status: "cancelled".to_string(),
290 error: Some("Child session deleted while running".to_string()),
291 });
292 }
293 }
294 }
295
296 Ok(DeleteChildResult {
297 deleted,
298 cancelled_running_child,
299 })
300 }
301
302 async fn get_child_runner_info(&self, child_id: &str) -> Option<ChildRunnerInfo> {
303 let runners = self.agent_runners.read().await;
304 runners.get(child_id).map(|runner| ChildRunnerInfo {
305 started_at: Some(runner.started_at),
306 completed_at: runner.completed_at,
307 last_tool_name: runner.last_tool_name.clone(),
308 last_tool_phase: runner.last_tool_phase.clone(),
309 last_event_at: runner.last_event_at,
310 round_count: runner.round_count,
311 })
312 }
313}
314
315pub fn tool_error_from_child_session(error: ChildSessionError) -> ToolError {
317 match error {
318 ChildSessionError::NotFound(id) => ToolError::Execution(format!("session not found: {id}")),
319 ChildSessionError::NotRootSession(id) => {
320 ToolError::Execution(format!("session is not a root session: {id}"))
321 }
322 ChildSessionError::InvalidArguments(msg) => ToolError::InvalidArguments(msg),
323 ChildSessionError::Execution(msg) => ToolError::Execution(msg),
324 other => ToolError::Execution(other.to_string()),
325 }
326}