bamboo_engine/runtime/execution/
agent_spawn.rs1use std::collections::{BTreeSet, HashMap};
7use std::sync::Arc;
8
9use tokio::sync::{mpsc, RwLock};
10use tokio_util::sync::CancellationToken;
11use tracing::Instrument;
12
13use bamboo_agent_core::tools::ToolExecutor;
14use bamboo_agent_core::{AgentEvent, Session};
15use bamboo_domain::ReasoningEffort;
16use bamboo_infrastructure::LLMProvider;
17
18use crate::runtime::config::ImageFallbackConfig;
19use crate::runtime::execution::runner_lifecycle::finalize_runner;
20use crate::runtime::execution::runner_state::AgentRunner;
21use crate::runtime::Agent;
22use crate::runtime::ExecuteRequest;
23
24const SKILL_CONTEXT_START_MARKER: &str = "<!-- BAMBOO_SKILL_CONTEXT_START -->";
25const TOOL_GUIDE_START_MARKER: &str = "<!-- BAMBOO_TOOL_GUIDE_START -->";
26const EXTERNAL_MEMORY_START_MARKER: &str = "<!-- BAMBOO_EXTERNAL_MEMORY_START -->";
27const TASK_LIST_START_MARKER: &str = "<!-- BAMBOO_TASK_LIST_START -->";
28
29pub struct SessionExecutionArgs {
35 pub agent: Arc<Agent>,
37 pub session_id: String,
38 pub session: Session,
39
40 pub tools_override: Option<Arc<dyn ToolExecutor>>,
42 pub provider_override: Option<Arc<dyn LLMProvider>>,
43 pub provider_name: Option<String>,
44 pub model: String,
45 pub fast_model: Option<String>,
46 pub background_model_provider: Option<Arc<dyn LLMProvider>>,
48 pub reasoning_effort: Option<ReasoningEffort>,
49 pub reasoning_effort_source: String,
50 pub disabled_tools: Option<BTreeSet<String>>,
51 pub disabled_skill_ids: Option<BTreeSet<String>>,
52 pub selected_skill_ids: Option<Vec<String>>,
53 pub selected_skill_mode: Option<String>,
54 pub cancel_token: CancellationToken,
55 pub mpsc_tx: mpsc::Sender<AgentEvent>,
56 pub image_fallback: Option<ImageFallbackConfig>,
57
58 pub runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
60 pub sessions_cache: Arc<RwLock<HashMap<String, Session>>>,
61}
62
63pub fn spawn_session_execution(args: SessionExecutionArgs) {
73 let span_session_id = args.session_id.clone();
74 let session_span = tracing::info_span!("agent_execution", session_id = %span_session_id);
75
76 tokio::spawn(
77 async move {
78 let SessionExecutionArgs {
79 agent,
80 session_id,
81 mut session,
82 tools_override,
83 provider_override,
84 provider_name,
85 model,
86 fast_model,
87 background_model_provider,
88 reasoning_effort,
89 reasoning_effort_source,
90 disabled_tools,
91 disabled_skill_ids,
92 selected_skill_ids,
93 selected_skill_mode,
94 cancel_token,
95 mpsc_tx,
96 image_fallback,
97 runners,
98 sessions_cache,
99 } = args;
100 let initial_title = session.title.clone();
101 let initial_pinned = session.pinned;
102
103 let initial_message = initial_user_message_for_session(&session);
104 let selected_skill_ids =
105 selected_skill_ids.or_else(|| selected_skill_ids_for_session(&session));
106 let selected_skill_mode =
107 selected_skill_mode.or_else(|| selected_skill_mode_for_session(&session));
108
109 tracing::info!(
110 "[{}] Using resolved session model: {}, reasoning_effort={}, reasoning_source={}",
111 session_id,
112 model,
113 reasoning_effort
114 .map(ReasoningEffort::as_str)
115 .unwrap_or("none"),
116 reasoning_effort_source
117 );
118
119 session.model = model.clone();
120
121 let system_prompt = system_prompt_for_session(&session);
122 if let Some(prompt) = system_prompt.as_ref() {
123 log_base_system_prompt_snapshot(&session_id, prompt);
124 }
125
126 let result = agent
127 .execute(
128 &mut session,
129 ExecuteRequest {
130 initial_message,
131 event_tx: mpsc_tx.clone(),
132 cancel_token,
133 tools: tools_override,
134 provider_override,
135 model: Some(model),
136 provider_name,
137 background_model: fast_model,
138 background_model_provider,
139 reasoning_effort,
140 disabled_tools,
141 disabled_skill_ids,
142 selected_skill_ids,
143 selected_skill_mode,
144 image_fallback,
145 },
146 )
147 .await;
148
149 if let Some(error_event) = terminal_error_event_for_result(&result) {
151 let _ = mpsc_tx.send(error_event).await;
152 }
153
154 finalize_runner(&runners, &session_id, &result).await;
156
157 match agent.storage().load_session(&session_id).await {
159 Ok(Some(latest_persisted)) => preserve_concurrent_session_overrides(
160 &mut session,
161 &latest_persisted,
162 &initial_title,
163 initial_pinned,
164 ),
165 Ok(None) => {}
166 Err(error) => {
167 tracing::warn!(
168 "[{}] Failed to load latest session before final save: {}",
169 session_id,
170 error
171 );
172 }
173 }
174
175 if let Err(error) = agent.storage().save_session(&session).await {
177 tracing::warn!("[{}] Failed to save session: {}", session_id, error);
178 }
179
180 {
182 let mut sessions = sessions_cache.write().await;
183 sessions.insert(session_id.clone(), session);
184 }
185
186 tracing::info!("[{}] Agent execution completed", session_id);
187 }
188 .instrument(session_span),
189 );
190}
191
192pub fn log_base_system_prompt_snapshot(session_id: &str, prompt: &str) {
194 tracing::info!(
195 "[{}] Base system prompt snapshot: len={} chars, has_skill={}, has_tool_guide={}, has_external_memory={}, has_task_list={}",
196 session_id,
197 prompt.len(),
198 prompt.contains(SKILL_CONTEXT_START_MARKER),
199 prompt.contains(TOOL_GUIDE_START_MARKER),
200 prompt.contains(EXTERNAL_MEMORY_START_MARKER),
201 prompt.contains(TASK_LIST_START_MARKER),
202 );
203
204 tracing::debug!(
205 "[{}] ========== BASE SYSTEM PROMPT SNAPSHOT ==========",
206 session_id
207 );
208 tracing::debug!("[{}] Snapshot length: {} chars", session_id, prompt.len());
209 tracing::debug!("[{}] -----------------------------------", session_id);
210 tracing::debug!("[{}] {}", session_id, prompt);
211 tracing::debug!(
212 "[{}] ========== END BASE SYSTEM PROMPT SNAPSHOT ==========",
213 session_id
214 );
215}
216
217pub fn preserve_concurrent_session_overrides(
220 session: &mut Session,
221 latest_persisted: &Session,
222 initial_title: &str,
223 initial_pinned: bool,
224) {
225 if session.title == initial_title {
226 session.title = latest_persisted.title.clone();
227 }
228 if session.pinned == initial_pinned {
229 session.pinned = latest_persisted.pinned;
230 }
231}
232
233pub fn terminal_error_event_for_result<E>(result: &Result<(), E>) -> Option<AgentEvent>
235where
236 E: std::fmt::Display,
237{
238 match result {
239 Ok(_) => None,
240 Err(error) if is_cancelled_error(error) => Some(AgentEvent::Error {
241 message: "Agent execution cancelled by user".to_string(),
242 }),
243 Err(error) => Some(AgentEvent::Error {
244 message: error.to_string(),
245 }),
246 }
247}
248
249fn is_cancelled_error<E>(error: &E) -> bool
250where
251 E: std::fmt::Display,
252{
253 error.to_string().contains("cancelled")
254}
255
256fn system_prompt_for_session(session: &Session) -> Option<String> {
259 session
260 .messages
261 .iter()
262 .find(|message| matches!(message.role, bamboo_agent_core::Role::System))
263 .map(|message| message.content.clone())
264}
265
266fn initial_user_message_for_session(session: &Session) -> String {
267 session
268 .messages
269 .last()
270 .filter(|message| matches!(message.role, bamboo_agent_core::Role::User))
271 .map(|message| message.content.clone())
272 .unwrap_or_default()
273}
274
275fn selected_skill_ids_for_session(session: &Session) -> Option<Vec<String>> {
276 session
277 .metadata
278 .get("selected_skill_ids")
279 .and_then(|raw| crate::skills::selection::parse_selected_skill_ids_metadata(raw))
280}
281
282fn selected_skill_mode_for_session(session: &Session) -> Option<String> {
283 let value = session
284 .metadata
285 .get("skill_mode")
286 .or_else(|| session.metadata.get("mode"))?;
287 let trimmed = value.trim();
288 if trimmed.is_empty() {
289 None
290 } else {
291 Some(trimmed.to_string())
292 }
293}