bamboo_engine/runtime/execution/
agent_spawn.rs1use std::collections::{BTreeSet, HashMap};
7use std::sync::Arc;
8
9use tokio::sync::{mpsc, RwLock};
10use tokio_util::sync::CancellationToken;
11use tracing::Instrument;
12
13use bamboo_agent_core::tools::ToolExecutor;
14use bamboo_agent_core::{AgentEvent, Session};
15use bamboo_domain::ReasoningEffort;
16use bamboo_infrastructure::LLMProvider;
17
18use crate::runtime::config::ImageFallbackConfig;
19use crate::runtime::execution::runner_lifecycle::finalize_runner;
20use crate::runtime::execution::runner_state::AgentRunner;
21use crate::runtime::Agent;
22use crate::runtime::ExecuteRequest;
23
24const SKILL_CONTEXT_START_MARKER: &str = "<!-- BAMBOO_SKILL_CONTEXT_START -->";
25const TOOL_GUIDE_START_MARKER: &str = "<!-- BAMBOO_TOOL_GUIDE_START -->";
26const EXTERNAL_MEMORY_START_MARKER: &str = "<!-- BAMBOO_EXTERNAL_MEMORY_START -->";
27const TASK_LIST_START_MARKER: &str = "<!-- BAMBOO_TASK_LIST_START -->";
28
29pub struct SessionExecutionArgs {
35 pub agent: Arc<Agent>,
37 pub session_id: String,
38 pub session: Session,
39
40 pub tools_override: Option<Arc<dyn ToolExecutor>>,
42 pub provider_override: Option<Arc<dyn LLMProvider>>,
43 pub provider_name: Option<String>,
44 pub provider_type: Option<String>,
45 pub model: String,
46 pub fast_model: Option<String>,
47 pub fast_model_provider: Option<Arc<dyn LLMProvider>>,
49 pub background_model: Option<String>,
50 pub background_model_provider: Option<Arc<dyn LLMProvider>>,
52 pub summarization_model: Option<String>,
53 pub summarization_model_provider: Option<Arc<dyn LLMProvider>>,
55 pub reasoning_effort: Option<ReasoningEffort>,
56 pub reasoning_effort_source: String,
57 pub disabled_tools: Option<BTreeSet<String>>,
58 pub disabled_skill_ids: Option<BTreeSet<String>>,
59 pub selected_skill_ids: Option<Vec<String>>,
60 pub selected_skill_mode: Option<String>,
61 pub cancel_token: CancellationToken,
62 pub mpsc_tx: mpsc::Sender<AgentEvent>,
63 pub image_fallback: Option<ImageFallbackConfig>,
64 pub app_data_dir: Option<std::path::PathBuf>,
65
66 pub runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
68 pub sessions_cache: Arc<RwLock<HashMap<String, Session>>>,
69}
70
71pub fn spawn_session_execution(args: SessionExecutionArgs) {
80 let span_session_id = args.session_id.clone();
81 let session_span = tracing::info_span!("agent_execution", session_id = %span_session_id);
82
83 tokio::spawn(
84 async move {
85 let SessionExecutionArgs {
86 agent,
87 session_id,
88 mut session,
89 tools_override,
90 provider_override,
91 provider_name,
92 provider_type,
93 model,
94 fast_model,
95 fast_model_provider,
96 background_model,
97 background_model_provider,
98 summarization_model,
99 summarization_model_provider,
100 reasoning_effort,
101 reasoning_effort_source,
102 disabled_tools,
103 disabled_skill_ids,
104 selected_skill_ids,
105 selected_skill_mode,
106 cancel_token,
107 mpsc_tx,
108 image_fallback,
109 app_data_dir,
110 runners,
111 sessions_cache,
112 } = args;
113
114 let initial_message = initial_user_message_for_session(&session);
115 let selected_skill_ids =
116 selected_skill_ids.or_else(|| selected_skill_ids_for_session(&session));
117 let selected_skill_mode =
118 selected_skill_mode.or_else(|| selected_skill_mode_for_session(&session));
119
120 tracing::info!(
121 "[{}] Using resolved session model: {}, reasoning_effort={}, reasoning_source={}",
122 session_id,
123 model,
124 reasoning_effort
125 .map(ReasoningEffort::as_str)
126 .unwrap_or("none"),
127 reasoning_effort_source
128 );
129
130 session.model = model.clone();
131
132 let system_prompt = system_prompt_for_session(&session);
133 if let Some(prompt) = system_prompt.as_ref() {
134 log_base_system_prompt_snapshot(&session_id, prompt);
135 }
136
137 let result = agent
138 .execute(
139 &mut session,
140 ExecuteRequest {
141 initial_message,
142 event_tx: mpsc_tx.clone(),
143 cancel_token,
144 tools: tools_override,
145 provider_override,
146 model: Some(model),
147 provider_name,
148 provider_type,
149 fast_model,
150 fast_model_provider,
151 background_model,
152 background_model_provider,
153 summarization_model,
154 summarization_model_provider,
155 reasoning_effort,
156 disabled_tools,
157 disabled_skill_ids,
158 selected_skill_ids,
159 selected_skill_mode,
160 image_fallback,
161 app_data_dir,
162 },
163 )
164 .await;
165
166 if let Some(error_event) = terminal_error_event_for_result(&result) {
168 let _ = mpsc_tx.send(error_event).await;
169 }
170
171 finalize_runner(&runners, &session_id, &result).await;
173
174 if let Err(error) = agent.persistence().save_runtime_session(&mut session).await {
178 tracing::warn!("[{}] Failed to save session: {}", session_id, error);
179 }
180
181 {
183 let mut sessions = sessions_cache.write().await;
184 sessions.insert(session_id.clone(), session);
185 }
186
187 tracing::info!("[{}] Agent execution completed", session_id);
188 }
189 .instrument(session_span),
190 );
191}
192
193pub fn log_base_system_prompt_snapshot(session_id: &str, prompt: &str) {
195 tracing::info!(
196 "[{}] Base system prompt snapshot: len={} chars, has_skill={}, has_tool_guide={}, has_external_memory={}, has_task_list={}",
197 session_id,
198 prompt.len(),
199 prompt.contains(SKILL_CONTEXT_START_MARKER),
200 prompt.contains(TOOL_GUIDE_START_MARKER),
201 prompt.contains(EXTERNAL_MEMORY_START_MARKER),
202 prompt.contains(TASK_LIST_START_MARKER),
203 );
204
205 tracing::debug!(
206 "[{}] ========== BASE SYSTEM PROMPT SNAPSHOT ==========",
207 session_id
208 );
209 tracing::debug!("[{}] Snapshot length: {} chars", session_id, prompt.len());
210 tracing::debug!("[{}] -----------------------------------", session_id);
211 tracing::debug!("[{}] {}", session_id, prompt);
212 tracing::debug!(
213 "[{}] ========== END BASE SYSTEM PROMPT SNAPSHOT ==========",
214 session_id
215 );
216}
217
218pub fn terminal_error_event_for_result<E>(result: &Result<(), E>) -> Option<AgentEvent>
220where
221 E: std::fmt::Display,
222{
223 match result {
224 Ok(_) => None,
225 Err(error) if is_cancelled_error(error) => Some(AgentEvent::Error {
226 message: "Agent execution cancelled by user".to_string(),
227 }),
228 Err(error) => Some(AgentEvent::Error {
229 message: error.to_string(),
230 }),
231 }
232}
233
234fn is_cancelled_error<E>(error: &E) -> bool
235where
236 E: std::fmt::Display,
237{
238 error.to_string().contains("cancelled")
239}
240
241fn system_prompt_for_session(session: &Session) -> Option<String> {
244 session
245 .messages
246 .iter()
247 .find(|message| matches!(message.role, bamboo_agent_core::Role::System))
248 .map(|message| message.content.clone())
249}
250
251fn initial_user_message_for_session(session: &Session) -> String {
252 session
253 .messages
254 .last()
255 .filter(|message| matches!(message.role, bamboo_agent_core::Role::User))
256 .map(|message| message.content.clone())
257 .unwrap_or_default()
258}
259
260fn selected_skill_ids_for_session(session: &Session) -> Option<Vec<String>> {
261 session
262 .metadata
263 .get("selected_skill_ids")
264 .and_then(|raw| crate::skills::selection::parse_selected_skill_ids_metadata(raw))
265}
266
267fn selected_skill_mode_for_session(session: &Session) -> Option<String> {
268 let value = session
269 .metadata
270 .get("skill_mode")
271 .or_else(|| session.metadata.get("mode"))?;
272 let trimmed = value.trim();
273 if trimmed.is_empty() {
274 None
275 } else {
276 Some(trimmed.to_string())
277 }
278}