bamboo_engine/runtime/execution/
agent_spawn.rs1use std::collections::{BTreeSet, HashMap};
7use std::sync::Arc;
8
9use tokio::sync::{mpsc, RwLock};
10use tokio_util::sync::CancellationToken;
11use tracing::Instrument;
12
13use bamboo_agent_core::tools::ToolExecutor;
14use bamboo_agent_core::{AgentEvent, Session};
15use bamboo_domain::ReasoningEffort;
16use bamboo_infrastructure::LLMProvider;
17
18use crate::runtime::config::{GoldConfig, ImageFallbackConfig};
19use crate::runtime::execution::runner_lifecycle::finalize_runner;
20use crate::runtime::execution::runner_state::AgentRunner;
21use crate::runtime::Agent;
22use crate::runtime::ExecuteRequest;
23
24const SKILL_CONTEXT_START_MARKER: &str = "<!-- BAMBOO_SKILL_CONTEXT_START -->";
25const TOOL_GUIDE_START_MARKER: &str = "<!-- BAMBOO_TOOL_GUIDE_START -->";
26const EXTERNAL_MEMORY_START_MARKER: &str = "<!-- BAMBOO_EXTERNAL_MEMORY_START -->";
27const TASK_LIST_START_MARKER: &str = "<!-- BAMBOO_TASK_LIST_START -->";
28
29pub struct SessionExecutionArgs {
35 pub agent: Arc<Agent>,
37 pub session_id: String,
38 pub session: Session,
39
40 pub tools_override: Option<Arc<dyn ToolExecutor>>,
42 pub provider_override: Option<Arc<dyn LLMProvider>>,
43 pub provider_name: Option<String>,
44 pub provider_type: Option<String>,
45 pub model: String,
46 pub fast_model: Option<String>,
47 pub fast_model_provider: Option<Arc<dyn LLMProvider>>,
49 pub background_model: Option<String>,
50 pub background_model_provider: Option<Arc<dyn LLMProvider>>,
52 pub summarization_model: Option<String>,
53 pub summarization_model_provider: Option<Arc<dyn LLMProvider>>,
55 pub reasoning_effort: Option<ReasoningEffort>,
56 pub reasoning_effort_source: String,
57 pub auxiliary_model_resolver:
58 Option<Arc<dyn Fn() -> crate::runtime::config::AuxiliaryModelConfig + Send + Sync>>,
59 pub disabled_tools: Option<BTreeSet<String>>,
60 pub disabled_skill_ids: Option<BTreeSet<String>>,
61 pub selected_skill_ids: Option<Vec<String>>,
62 pub selected_skill_mode: Option<String>,
63 pub cancel_token: CancellationToken,
64 pub mpsc_tx: mpsc::Sender<AgentEvent>,
65 pub image_fallback: Option<ImageFallbackConfig>,
66 pub gold_config: Option<GoldConfig>,
67 pub app_data_dir: Option<std::path::PathBuf>,
68
69 pub runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
71 pub sessions_cache: Arc<RwLock<HashMap<String, Session>>>,
72}
73
74pub fn spawn_session_execution(args: SessionExecutionArgs) {
83 let span_session_id = args.session_id.clone();
84 let session_span = tracing::info_span!("agent_execution", session_id = %span_session_id);
85
86 tokio::spawn(
87 async move {
88 let SessionExecutionArgs {
89 agent,
90 session_id,
91 mut session,
92 tools_override,
93 provider_override,
94 provider_name,
95 provider_type,
96 model,
97 fast_model,
98 fast_model_provider,
99 background_model,
100 background_model_provider,
101 summarization_model,
102 summarization_model_provider,
103 reasoning_effort,
104 reasoning_effort_source,
105 auxiliary_model_resolver,
106 disabled_tools,
107 disabled_skill_ids,
108 selected_skill_ids,
109 selected_skill_mode,
110 cancel_token,
111 mpsc_tx,
112 image_fallback,
113 gold_config,
114 app_data_dir,
115 runners,
116 sessions_cache,
117 } = args;
118
119 let initial_message = initial_user_message_for_session(&session);
120 let selected_skill_ids =
121 selected_skill_ids.or_else(|| selected_skill_ids_for_session(&session));
122 let selected_skill_mode =
123 selected_skill_mode.or_else(|| selected_skill_mode_for_session(&session));
124
125 tracing::info!(
126 "[{}] Using resolved session model: {}, reasoning_effort={}, reasoning_source={}",
127 session_id,
128 model,
129 reasoning_effort
130 .map(ReasoningEffort::as_str)
131 .unwrap_or("none"),
132 reasoning_effort_source
133 );
134
135 session.model = model.clone();
136
137 let system_prompt = system_prompt_for_session(&session);
138 if let Some(prompt) = system_prompt.as_ref() {
139 log_base_system_prompt_snapshot(&session_id, prompt);
140 }
141
142 let result = agent
143 .execute(
144 &mut session,
145 ExecuteRequest {
146 initial_message,
147 event_tx: mpsc_tx.clone(),
148 cancel_token,
149 tools: tools_override,
150 provider_override,
151 model: Some(model),
152 provider_name,
153 provider_type,
154 fast_model,
155 fast_model_provider,
156 background_model,
157 background_model_provider,
158 summarization_model,
159 summarization_model_provider,
160 reasoning_effort,
161 auxiliary_model_resolver,
162 disabled_tools,
163 disabled_skill_ids,
164 selected_skill_ids,
165 selected_skill_mode,
166 image_fallback,
167 gold_config,
168 app_data_dir,
169 },
170 )
171 .await;
172
173 if let Some(error_event) = terminal_error_event_for_result(&result) {
175 let _ = mpsc_tx.send(error_event).await;
176 }
177
178 finalize_runner(&runners, &session_id, &result).await;
180
181 if let Err(error) = agent.persistence().save_runtime_session(&mut session).await {
185 tracing::warn!("[{}] Failed to save session: {}", session_id, error);
186 }
187
188 {
190 let mut sessions = sessions_cache.write().await;
191 sessions.insert(session_id.clone(), session);
192 }
193
194 tracing::info!("[{}] Agent execution completed", session_id);
195 }
196 .instrument(session_span),
197 );
198}
199
200pub fn log_base_system_prompt_snapshot(session_id: &str, prompt: &str) {
202 tracing::info!(
203 "[{}] Base system prompt snapshot: len={} chars, has_skill={}, has_tool_guide={}, has_external_memory={}, has_task_list={}",
204 session_id,
205 prompt.len(),
206 prompt.contains(SKILL_CONTEXT_START_MARKER),
207 prompt.contains(TOOL_GUIDE_START_MARKER),
208 prompt.contains(EXTERNAL_MEMORY_START_MARKER),
209 prompt.contains(TASK_LIST_START_MARKER),
210 );
211
212 tracing::debug!(
213 "[{}] ========== BASE SYSTEM PROMPT SNAPSHOT ==========",
214 session_id
215 );
216 tracing::debug!("[{}] Snapshot length: {} chars", session_id, prompt.len());
217 tracing::debug!("[{}] -----------------------------------", session_id);
218 tracing::debug!("[{}] {}", session_id, prompt);
219 tracing::debug!(
220 "[{}] ========== END BASE SYSTEM PROMPT SNAPSHOT ==========",
221 session_id
222 );
223}
224
225pub fn terminal_error_event_for_result<E>(result: &Result<(), E>) -> Option<AgentEvent>
227where
228 E: std::fmt::Display,
229{
230 match result {
231 Ok(_) => None,
232 Err(error) if is_cancelled_error(error) => Some(AgentEvent::Error {
233 message: "Agent execution cancelled by user".to_string(),
234 }),
235 Err(error) => Some(AgentEvent::Error {
236 message: error.to_string(),
237 }),
238 }
239}
240
241fn is_cancelled_error<E>(error: &E) -> bool
242where
243 E: std::fmt::Display,
244{
245 error.to_string().contains("cancelled")
246}
247
248fn system_prompt_for_session(session: &Session) -> Option<String> {
251 session
252 .messages
253 .iter()
254 .find(|message| matches!(message.role, bamboo_agent_core::Role::System))
255 .map(|message| message.content.clone())
256}
257
258fn initial_user_message_for_session(session: &Session) -> String {
259 session
260 .messages
261 .last()
262 .filter(|message| matches!(message.role, bamboo_agent_core::Role::User))
263 .map(|message| message.content.clone())
264 .unwrap_or_default()
265}
266
267fn selected_skill_ids_for_session(session: &Session) -> Option<Vec<String>> {
268 session
269 .metadata
270 .get("selected_skill_ids")
271 .and_then(|raw| crate::skills::selection::parse_selected_skill_ids_metadata(raw))
272}
273
274fn selected_skill_mode_for_session(session: &Session) -> Option<String> {
275 let value = session
276 .metadata
277 .get("skill_mode")
278 .or_else(|| session.metadata.get("mode"))?;
279 let trimmed = value.trim();
280 if trimmed.is_empty() {
281 None
282 } else {
283 Some(trimmed.to_string())
284 }
285}