bamboo_engine/runtime/execution/
agent_spawn.rs1use std::collections::{BTreeSet, HashMap};
7use std::sync::Arc;
8
9use tokio::sync::{mpsc, RwLock};
10use tokio_util::sync::CancellationToken;
11use tracing::Instrument;
12
13use bamboo_agent_core::tools::ToolExecutor;
14use bamboo_agent_core::{AgentEvent, Session};
15use bamboo_domain::ReasoningEffort;
16use bamboo_infrastructure::LLMProvider;
17
18use crate::runtime::config::ImageFallbackConfig;
19use crate::runtime::execution::runner_lifecycle::finalize_runner;
20use crate::runtime::execution::runner_state::AgentRunner;
21use crate::runtime::Agent;
22use crate::runtime::ExecuteRequest;
23
24const SKILL_CONTEXT_START_MARKER: &str = "<!-- BAMBOO_SKILL_CONTEXT_START -->";
25const TOOL_GUIDE_START_MARKER: &str = "<!-- BAMBOO_TOOL_GUIDE_START -->";
26const EXTERNAL_MEMORY_START_MARKER: &str = "<!-- BAMBOO_EXTERNAL_MEMORY_START -->";
27const TASK_LIST_START_MARKER: &str = "<!-- BAMBOO_TASK_LIST_START -->";
28
29pub struct SessionExecutionArgs {
35 pub agent: Arc<Agent>,
37 pub session_id: String,
38 pub session: Session,
39
40 pub tools_override: Option<Arc<dyn ToolExecutor>>,
42 pub provider_override: Option<Arc<dyn LLMProvider>>,
43 pub provider_name: Option<String>,
44 pub provider_type: Option<String>,
45 pub model: String,
46 pub fast_model: Option<String>,
47 pub fast_model_provider: Option<Arc<dyn LLMProvider>>,
49 pub background_model: Option<String>,
50 pub background_model_provider: Option<Arc<dyn LLMProvider>>,
52 pub summarization_model: Option<String>,
53 pub summarization_model_provider: Option<Arc<dyn LLMProvider>>,
55 pub reasoning_effort: Option<ReasoningEffort>,
56 pub reasoning_effort_source: String,
57 pub auxiliary_model_resolver:
58 Option<Arc<dyn Fn() -> crate::runtime::config::AuxiliaryModelConfig + Send + Sync>>,
59 pub disabled_tools: Option<BTreeSet<String>>,
60 pub disabled_skill_ids: Option<BTreeSet<String>>,
61 pub selected_skill_ids: Option<Vec<String>>,
62 pub selected_skill_mode: Option<String>,
63 pub cancel_token: CancellationToken,
64 pub mpsc_tx: mpsc::Sender<AgentEvent>,
65 pub image_fallback: Option<ImageFallbackConfig>,
66 pub app_data_dir: Option<std::path::PathBuf>,
67
68 pub runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
70 pub sessions_cache: Arc<RwLock<HashMap<String, Session>>>,
71}
72
73pub fn spawn_session_execution(args: SessionExecutionArgs) {
82 let span_session_id = args.session_id.clone();
83 let session_span = tracing::info_span!("agent_execution", session_id = %span_session_id);
84
85 tokio::spawn(
86 async move {
87 let SessionExecutionArgs {
88 agent,
89 session_id,
90 mut session,
91 tools_override,
92 provider_override,
93 provider_name,
94 provider_type,
95 model,
96 fast_model,
97 fast_model_provider,
98 background_model,
99 background_model_provider,
100 summarization_model,
101 summarization_model_provider,
102 reasoning_effort,
103 reasoning_effort_source,
104 auxiliary_model_resolver,
105 disabled_tools,
106 disabled_skill_ids,
107 selected_skill_ids,
108 selected_skill_mode,
109 cancel_token,
110 mpsc_tx,
111 image_fallback,
112 app_data_dir,
113 runners,
114 sessions_cache,
115 } = args;
116
117 let initial_message = initial_user_message_for_session(&session);
118 let selected_skill_ids =
119 selected_skill_ids.or_else(|| selected_skill_ids_for_session(&session));
120 let selected_skill_mode =
121 selected_skill_mode.or_else(|| selected_skill_mode_for_session(&session));
122
123 tracing::info!(
124 "[{}] Using resolved session model: {}, reasoning_effort={}, reasoning_source={}",
125 session_id,
126 model,
127 reasoning_effort
128 .map(ReasoningEffort::as_str)
129 .unwrap_or("none"),
130 reasoning_effort_source
131 );
132
133 session.model = model.clone();
134
135 let system_prompt = system_prompt_for_session(&session);
136 if let Some(prompt) = system_prompt.as_ref() {
137 log_base_system_prompt_snapshot(&session_id, prompt);
138 }
139
140 let result = agent
141 .execute(
142 &mut session,
143 ExecuteRequest {
144 initial_message,
145 event_tx: mpsc_tx.clone(),
146 cancel_token,
147 tools: tools_override,
148 provider_override,
149 model: Some(model),
150 provider_name,
151 provider_type,
152 fast_model,
153 fast_model_provider,
154 background_model,
155 background_model_provider,
156 summarization_model,
157 summarization_model_provider,
158 reasoning_effort,
159 auxiliary_model_resolver,
160 disabled_tools,
161 disabled_skill_ids,
162 selected_skill_ids,
163 selected_skill_mode,
164 image_fallback,
165 app_data_dir,
166 },
167 )
168 .await;
169
170 if let Some(error_event) = terminal_error_event_for_result(&result) {
172 let _ = mpsc_tx.send(error_event).await;
173 }
174
175 finalize_runner(&runners, &session_id, &result).await;
177
178 if let Err(error) = agent.persistence().save_runtime_session(&mut session).await {
182 tracing::warn!("[{}] Failed to save session: {}", session_id, error);
183 }
184
185 {
187 let mut sessions = sessions_cache.write().await;
188 sessions.insert(session_id.clone(), session);
189 }
190
191 tracing::info!("[{}] Agent execution completed", session_id);
192 }
193 .instrument(session_span),
194 );
195}
196
197pub fn log_base_system_prompt_snapshot(session_id: &str, prompt: &str) {
199 tracing::info!(
200 "[{}] Base system prompt snapshot: len={} chars, has_skill={}, has_tool_guide={}, has_external_memory={}, has_task_list={}",
201 session_id,
202 prompt.len(),
203 prompt.contains(SKILL_CONTEXT_START_MARKER),
204 prompt.contains(TOOL_GUIDE_START_MARKER),
205 prompt.contains(EXTERNAL_MEMORY_START_MARKER),
206 prompt.contains(TASK_LIST_START_MARKER),
207 );
208
209 tracing::debug!(
210 "[{}] ========== BASE SYSTEM PROMPT SNAPSHOT ==========",
211 session_id
212 );
213 tracing::debug!("[{}] Snapshot length: {} chars", session_id, prompt.len());
214 tracing::debug!("[{}] -----------------------------------", session_id);
215 tracing::debug!("[{}] {}", session_id, prompt);
216 tracing::debug!(
217 "[{}] ========== END BASE SYSTEM PROMPT SNAPSHOT ==========",
218 session_id
219 );
220}
221
222pub fn terminal_error_event_for_result<E>(result: &Result<(), E>) -> Option<AgentEvent>
224where
225 E: std::fmt::Display,
226{
227 match result {
228 Ok(_) => None,
229 Err(error) if is_cancelled_error(error) => Some(AgentEvent::Error {
230 message: "Agent execution cancelled by user".to_string(),
231 }),
232 Err(error) => Some(AgentEvent::Error {
233 message: error.to_string(),
234 }),
235 }
236}
237
238fn is_cancelled_error<E>(error: &E) -> bool
239where
240 E: std::fmt::Display,
241{
242 error.to_string().contains("cancelled")
243}
244
245fn system_prompt_for_session(session: &Session) -> Option<String> {
248 session
249 .messages
250 .iter()
251 .find(|message| matches!(message.role, bamboo_agent_core::Role::System))
252 .map(|message| message.content.clone())
253}
254
255fn initial_user_message_for_session(session: &Session) -> String {
256 session
257 .messages
258 .last()
259 .filter(|message| matches!(message.role, bamboo_agent_core::Role::User))
260 .map(|message| message.content.clone())
261 .unwrap_or_default()
262}
263
264fn selected_skill_ids_for_session(session: &Session) -> Option<Vec<String>> {
265 session
266 .metadata
267 .get("selected_skill_ids")
268 .and_then(|raw| crate::skills::selection::parse_selected_skill_ids_metadata(raw))
269}
270
271fn selected_skill_mode_for_session(session: &Session) -> Option<String> {
272 let value = session
273 .metadata
274 .get("skill_mode")
275 .or_else(|| session.metadata.get("mode"))?;
276 let trimmed = value.trim();
277 if trimmed.is_empty() {
278 None
279 } else {
280 Some(trimmed.to_string())
281 }
282}