Skip to main content

bamboo_server/tools/
child_session_adapter.rs

1//! Shared adapter implementing `ChildSessionPort` for server-side child session tools.
2//!
3//! The unified `SubAgentTool` delegates to this adapter instead of
4//! duplicating `ChildSessionPort` implementations.
5
6use std::collections::HashMap;
7use std::sync::Arc;
8
9use async_trait::async_trait;
10use chrono::{Duration as ChronoDuration, Utc};
11use tokio::sync::{broadcast, RwLock};
12use tokio::time::{sleep, Duration, Instant};
13
14use crate::app_state::session_events::get_or_create_event_sender;
15use crate::app_state::{AgentRunner, AgentStatus};
16use crate::session_app::child_session::{
17    ChildRunnerInfo, ChildSessionEntry, ChildSessionError, ChildSessionPort, DeleteChildResult,
18};
19use crate::spawn_scheduler::{SpawnJob, SpawnScheduler};
20use bamboo_agent_core::storage::Storage;
21use bamboo_agent_core::tools::ToolError;
22use bamboo_agent_core::{AgentEvent, Session, SessionKind};
23use bamboo_domain::session::runtime_state::{
24    AgentRuntimeState, ChildWaitPolicy, WaitingForChildrenState,
25};
26use bamboo_infrastructure::{Config, LockedSessionStore, SessionIndexEntry, SessionStoreV2};
27
28/// Server-side adapter that bridges domain `ChildSessionPort` to infrastructure.
29///
30/// Holds all shared state needed by `SubAgentTool`.
31/// Implements the full `ChildSessionPort` trait with real methods (no stubs).
32pub struct ChildSessionAdapter {
33    pub(crate) session_store: Arc<SessionStoreV2>,
34    pub(crate) storage: Arc<dyn Storage>,
35    pub(crate) persistence: Arc<LockedSessionStore>,
36    pub(crate) scheduler: Arc<SpawnScheduler>,
37    pub(crate) sessions_cache: Arc<RwLock<HashMap<String, Session>>>,
38    pub(crate) agent_runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
39    pub(crate) session_event_senders: Arc<RwLock<HashMap<String, broadcast::Sender<AgentEvent>>>>,
40    /// Optional subagent model resolver: maps subagent_type → provider+model ref.
41    pub(crate) subagent_model_resolver: crate::tools::OptionalSubagentModelResolver,
42    /// Application config for resolving subagent routing and external agent profiles.
43    pub(crate) config: Arc<RwLock<Config>>,
44    /// Subagent profile registry. Used to resolve `subagent_type` →
45    /// `system_prompt` and tool surface filter.
46    pub(crate) subagent_profiles: Arc<bamboo_domain::subagent::SubagentProfileRegistry>,
47    /// Cached list of all available tool names from the base executor.
48    /// Used to compute the complement set for Allowlist policies.
49    pub(crate) tool_names: Vec<String>,
50}
51
52const AGENT_RUNTIME_STATE_METADATA_KEY: &str = "agent.runtime.state";
53
54fn read_runtime_state(session: &Session) -> AgentRuntimeState {
55    session
56        .agent_runtime_state
57        .clone()
58        .or_else(|| {
59            session
60                .metadata
61                .get(AGENT_RUNTIME_STATE_METADATA_KEY)
62                .and_then(|raw| serde_json::from_str::<AgentRuntimeState>(raw).ok())
63        })
64        .unwrap_or_else(|| AgentRuntimeState::new(format!("{}-wait", session.id)))
65}
66
67fn write_runtime_state(session: &mut Session, runtime_state: &AgentRuntimeState) {
68    session.agent_runtime_state = Some(runtime_state.clone());
69    if let Ok(serialized) = serde_json::to_string(runtime_state) {
70        session
71            .metadata
72            .insert(AGENT_RUNTIME_STATE_METADATA_KEY.to_string(), serialized);
73    }
74}
75
76impl ChildSessionAdapter {
77    /// Resolve the provider+model ref for a given subagent_type using the configured resolver.
78    pub async fn resolve_subagent_model(
79        &self,
80        subagent_type: &str,
81    ) -> Option<bamboo_domain::ProviderModelRef> {
82        match &self.subagent_model_resolver {
83            Some(resolver) => resolver(subagent_type.to_string()).await,
84            None => None,
85        }
86    }
87
88    /// Resolve runtime metadata (e.g. external agent routing) for a subagent_type.
89    pub async fn resolve_runtime_metadata(&self, subagent_type: &str) -> HashMap<String, String> {
90        let config = self.config.read().await;
91        crate::external_agents::config::resolve_runtime_metadata(&config, subagent_type)
92    }
93
94    /// Resolve the canonical system prompt for the given `subagent_type`.
95    ///
96    /// Always returns a prompt: unknown / empty `subagent_type` values fall
97    /// back to the `general-purpose` profile (whose prompt is byte-equal to
98    /// the legacy `CHILD_SYSTEM_PROMPT`).
99    pub fn resolve_subagent_prompt(&self, subagent_type: &str) -> String {
100        self.subagent_profiles
101            .resolve(subagent_type)
102            .system_prompt
103            .clone()
104    }
105
106    /// Register a durable parent wait for an enqueued child session.
107    ///
108    /// This is intentionally idempotent: repeated registrations for the same
109    /// child merge into the existing wait set. The child runner owns timeout
110    /// and liveness; the parent wait timeout is a long lease for observability.
111    pub async fn register_parent_wait_for_child(
112        &self,
113        parent_session_id: &str,
114        child_session_id: &str,
115        tool_call_id: Option<&str>,
116    ) -> Result<(), ChildSessionError> {
117        let Some(mut parent) =
118            self.storage
119                .load_session(parent_session_id)
120                .await
121                .map_err(|error| {
122                    ChildSessionError::Execution(format!(
123                        "failed to load parent session {parent_session_id}: {error}"
124                    ))
125                })?
126        else {
127            return Err(ChildSessionError::NotFound(parent_session_id.to_string()));
128        };
129
130        let mut runtime_state = read_runtime_state(&parent);
131        runtime_state
132            .children
133            .active_ids
134            .retain(|id| id != child_session_id);
135        runtime_state
136            .children
137            .completed_ids
138            .retain(|id| id != child_session_id);
139        runtime_state
140            .children
141            .active_ids
142            .push(child_session_id.to_string());
143        runtime_state.children.active_ids.sort();
144        runtime_state.children.active_ids.dedup();
145        runtime_state.children.active_children = runtime_state.children.active_ids.len() as u32;
146        runtime_state.children.completed_children =
147            runtime_state.children.completed_ids.len() as u32;
148
149        let now = Utc::now();
150        let mut wait = runtime_state
151            .waiting_for_children
152            .take()
153            .unwrap_or_else(|| WaitingForChildrenState {
154                child_session_ids: Vec::new(),
155                wait_for: ChildWaitPolicy::All,
156                registered_at: now,
157                timeout_at: Some(now + ChronoDuration::hours(6)),
158                registered_by_tool_call_id: tool_call_id.map(str::to_string),
159            });
160        if !wait
161            .child_session_ids
162            .iter()
163            .any(|id| id == child_session_id)
164        {
165            wait.child_session_ids.push(child_session_id.to_string());
166        }
167        wait.child_session_ids.sort();
168        wait.child_session_ids.dedup();
169        if wait.registered_by_tool_call_id.is_none() {
170            wait.registered_by_tool_call_id = tool_call_id.map(str::to_string);
171        }
172        runtime_state.waiting_for_children = Some(wait);
173
174        write_runtime_state(&mut parent, &runtime_state);
175        parent.metadata.insert(
176            "runtime.suspend_reason".to_string(),
177            "waiting_for_children".to_string(),
178        );
179        parent.updated_at = Utc::now();
180
181        self.persistence
182            .merge_save_runtime(&mut parent)
183            .await
184            .map_err(|error| {
185                ChildSessionError::Execution(format!("failed to save parent wait state: {error}"))
186            })?;
187        self.sessions_cache
188            .write()
189            .await
190            .insert(parent.id.clone(), parent);
191
192        Ok(())
193    }
194}
195
196fn map_index_entry_to_child_entry(entry: &SessionIndexEntry) -> ChildSessionEntry {
197    ChildSessionEntry {
198        child_session_id: entry.id.clone(),
199        title: entry.title.clone(),
200        pinned: entry.pinned,
201        message_count: entry.message_count,
202        updated_at: entry.updated_at.to_rfc3339(),
203        last_run_status: entry.last_run_status.clone(),
204        last_run_error: entry.last_run_error.clone(),
205    }
206}
207
208#[async_trait]
209impl ChildSessionPort for ChildSessionAdapter {
210    async fn load_root_session(&self, root_session_id: &str) -> Result<Session, ChildSessionError> {
211        let Some(session) = self
212            .storage
213            .load_session(root_session_id)
214            .await
215            .map_err(|error| {
216                ChildSessionError::Execution(format!(
217                    "failed to load session {root_session_id}: {error}"
218                ))
219            })?
220        else {
221            return Err(ChildSessionError::NotFound(root_session_id.to_string()));
222        };
223
224        if session.kind != SessionKind::Root {
225            return Err(ChildSessionError::NotRootSession(
226                root_session_id.to_string(),
227            ));
228        }
229
230        Ok(session)
231    }
232
233    async fn load_child_for_parent(
234        &self,
235        parent_session_id: &str,
236        child_session_id: &str,
237    ) -> Result<Session, ChildSessionError> {
238        let Some(child) = self
239            .storage
240            .load_session(child_session_id)
241            .await
242            .map_err(|error| {
243                ChildSessionError::Execution(format!(
244                    "failed to load child session {child_session_id}: {error}"
245                ))
246            })?
247        else {
248            return Err(ChildSessionError::NotFound(child_session_id.to_string()));
249        };
250
251        if child.kind != SessionKind::Child {
252            return Err(ChildSessionError::NotChildSession(
253                child_session_id.to_string(),
254            ));
255        }
256
257        if child.parent_session_id.as_deref() != Some(parent_session_id) {
258            return Err(ChildSessionError::NotChildOfParent {
259                child_id: child_session_id.to_string(),
260                parent_id: parent_session_id.to_string(),
261            });
262        }
263
264        Ok(child)
265    }
266
267    async fn save_child_session(&self, child: &mut Session) -> Result<(), ChildSessionError> {
268        self.persistence
269            .merge_save_runtime(child)
270            .await
271            .map_err(|error| {
272                ChildSessionError::Execution(format!("failed to save child session: {error}"))
273            })?;
274
275        let mut sessions = self.sessions_cache.write().await;
276        sessions.insert(child.id.clone(), child.clone());
277
278        Ok(())
279    }
280
281    async fn is_child_running(&self, child_session_id: &str) -> bool {
282        let runners = self.agent_runners.read().await;
283        runners
284            .get(child_session_id)
285            .is_some_and(|runner| matches!(runner.status, AgentStatus::Running))
286    }
287
288    async fn list_children(&self, parent_session_id: &str) -> Vec<ChildSessionEntry> {
289        self.session_store
290            .list_index_entries()
291            .await
292            .into_iter()
293            .filter(|entry| {
294                entry.kind == SessionKind::Child
295                    && entry.parent_session_id.as_deref() == Some(parent_session_id)
296            })
297            .map(|entry| map_index_entry_to_child_entry(&entry))
298            .collect()
299    }
300
301    async fn enqueue_child_run(
302        &self,
303        parent: &Session,
304        child: &Session,
305    ) -> Result<(), ChildSessionError> {
306        let model = if child.model.trim().is_empty() {
307            parent.model.clone()
308        } else {
309            child.model.clone()
310        };
311        if model.trim().is_empty() {
312            return Err(ChildSessionError::Execution(
313                "child model is empty and parent model is unavailable".to_string(),
314            ));
315        }
316
317        // Resolve profile policy into schema-level disabled_tools.
318        let disabled_tools = child
319            .metadata
320            .get("subagent_type")
321            .map(|s| s.trim())
322            .filter(|s| !s.is_empty())
323            .and_then(|subagent_type| {
324                let profile = self.subagent_profiles.resolve(subagent_type);
325                match &profile.tools {
326                    bamboo_domain::subagent::ToolPolicy::Inherit => None,
327                    policy => {
328                        let names = bamboo_domain::subagent::disabled_tools_for_profile(
329                            policy,
330                            &self.tool_names,
331                        );
332                        if names.is_empty() {
333                            None
334                        } else {
335                            Some(names)
336                        }
337                    }
338                }
339            });
340
341        self.register_parent_wait_for_child(&parent.id, &child.id, None)
342            .await?;
343
344        self.scheduler
345            .enqueue(SpawnJob {
346                parent_session_id: parent.id.clone(),
347                child_session_id: child.id.clone(),
348                model,
349                disabled_tools,
350            })
351            .await
352            .map_err(ChildSessionError::Execution)?;
353
354        let parent_tx = get_or_create_event_sender(&self.session_event_senders, &parent.id).await;
355        let _ = parent_tx.send(AgentEvent::SubAgentStarted {
356            parent_session_id: parent.id.clone(),
357            child_session_id: child.id.clone(),
358            title: Some(child.title.clone()),
359        });
360
361        Ok(())
362    }
363
364    async fn cancel_child_run_and_wait(
365        &self,
366        child_session_id: &str,
367    ) -> Result<(), ChildSessionError> {
368        let cancelled = {
369            let mut runners = self.agent_runners.write().await;
370            if let Some(runner) = runners.get_mut(child_session_id) {
371                if matches!(runner.status, AgentStatus::Running) {
372                    runner.cancel_token.cancel();
373                    true
374                } else {
375                    false
376                }
377            } else {
378                false
379            }
380        };
381
382        if !cancelled {
383            return Ok(());
384        }
385
386        let deadline = Instant::now() + Duration::from_secs(10);
387        loop {
388            let still_running = {
389                let runners = self.agent_runners.read().await;
390                runners
391                    .get(child_session_id)
392                    .is_some_and(|runner| matches!(runner.status, AgentStatus::Running))
393            };
394            if !still_running {
395                return Ok(());
396            }
397            if Instant::now() >= deadline {
398                return Err(ChildSessionError::Execution(format!(
399                    "timed out waiting for child session {child_session_id} to stop after cancellation"
400                )));
401            }
402            sleep(Duration::from_millis(50)).await;
403        }
404    }
405
406    async fn delete_child_session(
407        &self,
408        parent_session_id: &str,
409        child_id: &str,
410    ) -> Result<DeleteChildResult, ChildSessionError> {
411        let cancelled_running_child = {
412            let mut runners = self.agent_runners.write().await;
413            if let Some(runner) = runners.remove(child_id) {
414                runner.cancel_token.cancel();
415                true
416            } else {
417                false
418            }
419        };
420
421        let deleted = self
422            .storage
423            .delete_session(child_id)
424            .await
425            .map_err(|error| {
426                ChildSessionError::Execution(format!("failed to delete child session: {error}"))
427            })?;
428
429        {
430            let mut sessions = self.sessions_cache.write().await;
431            sessions.remove(child_id);
432        }
433        {
434            let mut senders = self.session_event_senders.write().await;
435            senders.remove(child_id);
436            if cancelled_running_child {
437                if let Some(parent_tx) = senders.get(parent_session_id) {
438                    let _ = parent_tx.send(AgentEvent::SubAgentCompleted {
439                        parent_session_id: parent_session_id.to_string(),
440                        child_session_id: child_id.to_string(),
441                        status: "cancelled".to_string(),
442                        error: Some("Child session deleted while running".to_string()),
443                    });
444                }
445            }
446        }
447
448        Ok(DeleteChildResult {
449            deleted,
450            cancelled_running_child,
451        })
452    }
453
454    async fn get_child_runner_info(&self, child_id: &str) -> Option<ChildRunnerInfo> {
455        let runners = self.agent_runners.read().await;
456        runners.get(child_id).map(|runner| ChildRunnerInfo {
457            started_at: Some(runner.started_at),
458            completed_at: runner.completed_at,
459            last_tool_name: runner.last_tool_name.clone(),
460            last_tool_phase: runner.last_tool_phase.clone(),
461            last_event_at: runner.last_event_at,
462            round_count: runner.round_count,
463        })
464    }
465}
466
467/// Map a `ChildSessionError` to a server `ToolError`.
468pub fn tool_error_from_child_session(error: ChildSessionError) -> ToolError {
469    match error {
470        ChildSessionError::NotFound(id) => ToolError::Execution(format!("session not found: {id}")),
471        ChildSessionError::NotRootSession(id) => {
472            ToolError::Execution(format!("session is not a root session: {id}"))
473        }
474        ChildSessionError::InvalidArguments(msg) => ToolError::InvalidArguments(msg),
475        ChildSessionError::Execution(msg) => ToolError::Execution(msg),
476        other => ToolError::Execution(other.to_string()),
477    }
478}