Skip to main content

bamboo_server/tools/
child_session_adapter.rs

1//! Shared adapter implementing `ChildSessionPort` for server-side child session tools.
2//!
3//! The unified `SubSessionTool` delegates to this adapter instead of
4//! duplicating `ChildSessionPort` implementations.
5
6use std::collections::HashMap;
7use std::sync::Arc;
8
9use async_trait::async_trait;
10use tokio::sync::{broadcast, RwLock};
11use tokio::time::{sleep, Duration, Instant};
12
13use crate::app_state::session_events::get_or_create_event_sender;
14use crate::app_state::{AgentRunner, AgentStatus};
15use crate::session_app::child_session::{
16    ChildRunnerInfo, ChildSessionEntry, ChildSessionError, ChildSessionPort, DeleteChildResult,
17};
18use crate::spawn_scheduler::{SpawnJob, SpawnScheduler};
19use bamboo_agent_core::storage::Storage;
20use bamboo_agent_core::tools::ToolError;
21use bamboo_agent_core::{AgentEvent, Session, SessionKind};
22use bamboo_infrastructure::{Config, SessionIndexEntry, SessionStoreV2};
23
24/// Server-side adapter that bridges domain `ChildSessionPort` to infrastructure.
25///
26/// Holds all shared state needed by `SubSessionTool`.
27/// Implements the full `ChildSessionPort` trait with real methods (no stubs).
28pub struct ChildSessionAdapter {
29    pub(crate) session_store: Arc<SessionStoreV2>,
30    pub(crate) storage: Arc<dyn Storage>,
31    pub(crate) scheduler: Arc<SpawnScheduler>,
32    pub(crate) sessions_cache: Arc<RwLock<HashMap<String, Session>>>,
33    pub(crate) agent_runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
34    pub(crate) session_event_senders: Arc<RwLock<HashMap<String, broadcast::Sender<AgentEvent>>>>,
35    /// Optional subagent model resolver: maps subagent_type → provider+model ref.
36    pub(crate) subagent_model_resolver: crate::tools::OptionalSubagentModelResolver,
37    /// Application config for resolving subagent routing and external agent profiles.
38    pub(crate) config: Arc<RwLock<Config>>,
39    /// Subagent profile registry. Used to resolve `subagent_type` →
40    /// `system_prompt` (and, in later PRs, the tool surface filter).
41    pub(crate) subagent_profiles: Arc<bamboo_domain::subagent::SubagentProfileRegistry>,
42}
43
44impl ChildSessionAdapter {
45    /// Resolve the provider+model ref for a given subagent_type using the configured resolver.
46    pub async fn resolve_subagent_model(
47        &self,
48        subagent_type: &str,
49    ) -> Option<bamboo_domain::ProviderModelRef> {
50        match &self.subagent_model_resolver {
51            Some(resolver) => resolver(subagent_type.to_string()).await,
52            None => None,
53        }
54    }
55
56    /// Resolve runtime metadata (e.g. external agent routing) for a subagent_type.
57    pub async fn resolve_runtime_metadata(&self, subagent_type: &str) -> HashMap<String, String> {
58        let config = self.config.read().await;
59        crate::external_agents::config::resolve_runtime_metadata(&config, subagent_type)
60    }
61
62    /// Resolve the canonical system prompt for the given `subagent_type`.
63    ///
64    /// Always returns a prompt: unknown / empty `subagent_type` values fall
65    /// back to the `general-purpose` profile (whose prompt is byte-equal to
66    /// the legacy `CHILD_SYSTEM_PROMPT`).
67    pub fn resolve_subagent_prompt(&self, subagent_type: &str) -> String {
68        self.subagent_profiles
69            .resolve(subagent_type)
70            .system_prompt
71            .clone()
72    }
73}
74
75fn map_index_entry_to_child_entry(entry: &SessionIndexEntry) -> ChildSessionEntry {
76    ChildSessionEntry {
77        child_session_id: entry.id.clone(),
78        title: entry.title.clone(),
79        pinned: entry.pinned,
80        message_count: entry.message_count,
81        updated_at: entry.updated_at.to_rfc3339(),
82        last_run_status: entry.last_run_status.clone(),
83        last_run_error: entry.last_run_error.clone(),
84    }
85}
86
87#[async_trait]
88impl ChildSessionPort for ChildSessionAdapter {
89    async fn load_root_session(&self, root_session_id: &str) -> Result<Session, ChildSessionError> {
90        let Some(session) = self
91            .storage
92            .load_session(root_session_id)
93            .await
94            .map_err(|error| {
95                ChildSessionError::Execution(format!(
96                    "failed to load session {root_session_id}: {error}"
97                ))
98            })?
99        else {
100            return Err(ChildSessionError::NotFound(root_session_id.to_string()));
101        };
102
103        if session.kind != SessionKind::Root {
104            return Err(ChildSessionError::NotRootSession(
105                root_session_id.to_string(),
106            ));
107        }
108
109        Ok(session)
110    }
111
112    async fn load_child_for_parent(
113        &self,
114        parent_session_id: &str,
115        child_session_id: &str,
116    ) -> Result<Session, ChildSessionError> {
117        let Some(child) = self
118            .storage
119            .load_session(child_session_id)
120            .await
121            .map_err(|error| {
122                ChildSessionError::Execution(format!(
123                    "failed to load child session {child_session_id}: {error}"
124                ))
125            })?
126        else {
127            return Err(ChildSessionError::NotFound(child_session_id.to_string()));
128        };
129
130        if child.kind != SessionKind::Child {
131            return Err(ChildSessionError::NotChildSession(
132                child_session_id.to_string(),
133            ));
134        }
135
136        if child.parent_session_id.as_deref() != Some(parent_session_id) {
137            return Err(ChildSessionError::NotChildOfParent {
138                child_id: child_session_id.to_string(),
139                parent_id: parent_session_id.to_string(),
140            });
141        }
142
143        Ok(child)
144    }
145
146    async fn save_child_session(&self, child: &Session) -> Result<(), ChildSessionError> {
147        self.storage.save_session(child).await.map_err(|error| {
148            ChildSessionError::Execution(format!("failed to save child session: {error}"))
149        })?;
150
151        let mut sessions = self.sessions_cache.write().await;
152        sessions.insert(child.id.clone(), child.clone());
153
154        Ok(())
155    }
156
157    async fn is_child_running(&self, child_session_id: &str) -> bool {
158        let runners = self.agent_runners.read().await;
159        runners
160            .get(child_session_id)
161            .is_some_and(|runner| matches!(runner.status, AgentStatus::Running))
162    }
163
164    async fn list_children(&self, parent_session_id: &str) -> Vec<ChildSessionEntry> {
165        self.session_store
166            .list_index_entries()
167            .await
168            .into_iter()
169            .filter(|entry| {
170                entry.kind == SessionKind::Child
171                    && entry.parent_session_id.as_deref() == Some(parent_session_id)
172            })
173            .map(|entry| map_index_entry_to_child_entry(&entry))
174            .collect()
175    }
176
177    async fn enqueue_child_run(
178        &self,
179        parent: &Session,
180        child: &Session,
181    ) -> Result<(), ChildSessionError> {
182        let model = if child.model.trim().is_empty() {
183            parent.model.clone()
184        } else {
185            child.model.clone()
186        };
187        if model.trim().is_empty() {
188            return Err(ChildSessionError::Execution(
189                "child model is empty and parent model is unavailable".to_string(),
190            ));
191        }
192
193        self.scheduler
194            .enqueue(SpawnJob {
195                parent_session_id: parent.id.clone(),
196                child_session_id: child.id.clone(),
197                model,
198            })
199            .await
200            .map_err(ChildSessionError::Execution)?;
201
202        let parent_tx = get_or_create_event_sender(&self.session_event_senders, &parent.id).await;
203        let _ = parent_tx.send(AgentEvent::SubSessionStarted {
204            parent_session_id: parent.id.clone(),
205            child_session_id: child.id.clone(),
206            title: Some(child.title.clone()),
207        });
208
209        Ok(())
210    }
211
212    async fn cancel_child_run_and_wait(
213        &self,
214        child_session_id: &str,
215    ) -> Result<(), ChildSessionError> {
216        let cancelled = {
217            let mut runners = self.agent_runners.write().await;
218            if let Some(runner) = runners.get_mut(child_session_id) {
219                if matches!(runner.status, AgentStatus::Running) {
220                    runner.cancel_token.cancel();
221                    true
222                } else {
223                    false
224                }
225            } else {
226                false
227            }
228        };
229
230        if !cancelled {
231            return Ok(());
232        }
233
234        let deadline = Instant::now() + Duration::from_secs(10);
235        loop {
236            let still_running = {
237                let runners = self.agent_runners.read().await;
238                runners
239                    .get(child_session_id)
240                    .is_some_and(|runner| matches!(runner.status, AgentStatus::Running))
241            };
242            if !still_running {
243                return Ok(());
244            }
245            if Instant::now() >= deadline {
246                return Err(ChildSessionError::Execution(format!(
247                    "timed out waiting for child session {child_session_id} to stop after cancellation"
248                )));
249            }
250            sleep(Duration::from_millis(50)).await;
251        }
252    }
253
254    async fn delete_child_session(
255        &self,
256        parent_session_id: &str,
257        child_id: &str,
258    ) -> Result<DeleteChildResult, ChildSessionError> {
259        let cancelled_running_child = {
260            let mut runners = self.agent_runners.write().await;
261            if let Some(runner) = runners.remove(child_id) {
262                runner.cancel_token.cancel();
263                true
264            } else {
265                false
266            }
267        };
268
269        let deleted = self
270            .storage
271            .delete_session(child_id)
272            .await
273            .map_err(|error| {
274                ChildSessionError::Execution(format!("failed to delete child session: {error}"))
275            })?;
276
277        {
278            let mut sessions = self.sessions_cache.write().await;
279            sessions.remove(child_id);
280        }
281        {
282            let mut senders = self.session_event_senders.write().await;
283            senders.remove(child_id);
284            if cancelled_running_child {
285                if let Some(parent_tx) = senders.get(parent_session_id) {
286                    let _ = parent_tx.send(AgentEvent::SubSessionCompleted {
287                        parent_session_id: parent_session_id.to_string(),
288                        child_session_id: child_id.to_string(),
289                        status: "cancelled".to_string(),
290                        error: Some("Child session deleted while running".to_string()),
291                    });
292                }
293            }
294        }
295
296        Ok(DeleteChildResult {
297            deleted,
298            cancelled_running_child,
299        })
300    }
301
302    async fn get_child_runner_info(&self, child_id: &str) -> Option<ChildRunnerInfo> {
303        let runners = self.agent_runners.read().await;
304        runners.get(child_id).map(|runner| ChildRunnerInfo {
305            started_at: Some(runner.started_at),
306            completed_at: runner.completed_at,
307            last_tool_name: runner.last_tool_name.clone(),
308            last_tool_phase: runner.last_tool_phase.clone(),
309            last_event_at: runner.last_event_at,
310            round_count: runner.round_count,
311        })
312    }
313}
314
315/// Map a `ChildSessionError` to a server `ToolError`.
316pub fn tool_error_from_child_session(error: ChildSessionError) -> ToolError {
317    match error {
318        ChildSessionError::NotFound(id) => ToolError::Execution(format!("session not found: {id}")),
319        ChildSessionError::NotRootSession(id) => {
320            ToolError::Execution(format!("session is not a root session: {id}"))
321        }
322        ChildSessionError::InvalidArguments(msg) => ToolError::InvalidArguments(msg),
323        ChildSessionError::Execution(msg) => ToolError::Execution(msg),
324        other => ToolError::Execution(other.to_string()),
325    }
326}