bamboo_engine/runtime/execution/
agent_spawn.rs1use std::collections::{BTreeSet, HashMap};
7use std::sync::Arc;
8
9use tokio::sync::{mpsc, RwLock};
10use tokio_util::sync::CancellationToken;
11use tracing::Instrument;
12
13use bamboo_agent_core::tools::ToolExecutor;
14use bamboo_agent_core::{AgentEvent, Session};
15use bamboo_domain::ReasoningEffort;
16use bamboo_infrastructure::LLMProvider;
17
18use crate::runtime::config::ImageFallbackConfig;
19use crate::runtime::execution::runner_lifecycle::finalize_runner;
20use crate::runtime::execution::runner_state::AgentRunner;
21use crate::runtime::Agent;
22use crate::runtime::ExecuteRequest;
23
24const SKILL_CONTEXT_START_MARKER: &str = "<!-- BAMBOO_SKILL_CONTEXT_START -->";
25const TOOL_GUIDE_START_MARKER: &str = "<!-- BAMBOO_TOOL_GUIDE_START -->";
26const EXTERNAL_MEMORY_START_MARKER: &str = "<!-- BAMBOO_EXTERNAL_MEMORY_START -->";
27const TASK_LIST_START_MARKER: &str = "<!-- BAMBOO_TASK_LIST_START -->";
28
29pub struct SessionExecutionArgs {
35 pub agent: Arc<Agent>,
37 pub session_id: String,
38 pub session: Session,
39
40 pub tools_override: Option<Arc<dyn ToolExecutor>>,
42 pub provider_override: Option<Arc<dyn LLMProvider>>,
43 pub provider_name: Option<String>,
44 pub model: String,
45 pub fast_model: Option<String>,
46 pub background_model_provider: Option<Arc<dyn LLMProvider>>,
48 pub reasoning_effort: Option<ReasoningEffort>,
49 pub reasoning_effort_source: String,
50 pub disabled_tools: Option<BTreeSet<String>>,
51 pub disabled_skill_ids: Option<BTreeSet<String>>,
52 pub selected_skill_ids: Option<Vec<String>>,
53 pub selected_skill_mode: Option<String>,
54 pub cancel_token: CancellationToken,
55 pub mpsc_tx: mpsc::Sender<AgentEvent>,
56 pub image_fallback: Option<ImageFallbackConfig>,
57
58 pub runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
60 pub sessions_cache: Arc<RwLock<HashMap<String, Session>>>,
61}
62
63pub fn spawn_session_execution(args: SessionExecutionArgs) {
72 let span_session_id = args.session_id.clone();
73 let session_span = tracing::info_span!("agent_execution", session_id = %span_session_id);
74
75 tokio::spawn(
76 async move {
77 let SessionExecutionArgs {
78 agent,
79 session_id,
80 mut session,
81 tools_override,
82 provider_override,
83 provider_name,
84 model,
85 fast_model,
86 background_model_provider,
87 reasoning_effort,
88 reasoning_effort_source,
89 disabled_tools,
90 disabled_skill_ids,
91 selected_skill_ids,
92 selected_skill_mode,
93 cancel_token,
94 mpsc_tx,
95 image_fallback,
96 runners,
97 sessions_cache,
98 } = args;
99
100 let initial_message = initial_user_message_for_session(&session);
101 let selected_skill_ids =
102 selected_skill_ids.or_else(|| selected_skill_ids_for_session(&session));
103 let selected_skill_mode =
104 selected_skill_mode.or_else(|| selected_skill_mode_for_session(&session));
105
106 tracing::info!(
107 "[{}] Using resolved session model: {}, reasoning_effort={}, reasoning_source={}",
108 session_id,
109 model,
110 reasoning_effort
111 .map(ReasoningEffort::as_str)
112 .unwrap_or("none"),
113 reasoning_effort_source
114 );
115
116 session.model = model.clone();
117
118 let system_prompt = system_prompt_for_session(&session);
119 if let Some(prompt) = system_prompt.as_ref() {
120 log_base_system_prompt_snapshot(&session_id, prompt);
121 }
122
123 let result = agent
124 .execute(
125 &mut session,
126 ExecuteRequest {
127 initial_message,
128 event_tx: mpsc_tx.clone(),
129 cancel_token,
130 tools: tools_override,
131 provider_override,
132 model: Some(model),
133 provider_name,
134 background_model: fast_model,
135 background_model_provider,
136 reasoning_effort,
137 disabled_tools,
138 disabled_skill_ids,
139 selected_skill_ids,
140 selected_skill_mode,
141 image_fallback,
142 },
143 )
144 .await;
145
146 if let Some(error_event) = terminal_error_event_for_result(&result) {
148 let _ = mpsc_tx.send(error_event).await;
149 }
150
151 finalize_runner(&runners, &session_id, &result).await;
153
154 if let Err(error) = agent.persistence().save_runtime_session(&mut session).await {
158 tracing::warn!("[{}] Failed to save session: {}", session_id, error);
159 }
160
161 {
163 let mut sessions = sessions_cache.write().await;
164 sessions.insert(session_id.clone(), session);
165 }
166
167 tracing::info!("[{}] Agent execution completed", session_id);
168 }
169 .instrument(session_span),
170 );
171}
172
173pub fn log_base_system_prompt_snapshot(session_id: &str, prompt: &str) {
175 tracing::info!(
176 "[{}] Base system prompt snapshot: len={} chars, has_skill={}, has_tool_guide={}, has_external_memory={}, has_task_list={}",
177 session_id,
178 prompt.len(),
179 prompt.contains(SKILL_CONTEXT_START_MARKER),
180 prompt.contains(TOOL_GUIDE_START_MARKER),
181 prompt.contains(EXTERNAL_MEMORY_START_MARKER),
182 prompt.contains(TASK_LIST_START_MARKER),
183 );
184
185 tracing::debug!(
186 "[{}] ========== BASE SYSTEM PROMPT SNAPSHOT ==========",
187 session_id
188 );
189 tracing::debug!("[{}] Snapshot length: {} chars", session_id, prompt.len());
190 tracing::debug!("[{}] -----------------------------------", session_id);
191 tracing::debug!("[{}] {}", session_id, prompt);
192 tracing::debug!(
193 "[{}] ========== END BASE SYSTEM PROMPT SNAPSHOT ==========",
194 session_id
195 );
196}
197
198pub fn terminal_error_event_for_result<E>(result: &Result<(), E>) -> Option<AgentEvent>
200where
201 E: std::fmt::Display,
202{
203 match result {
204 Ok(_) => None,
205 Err(error) if is_cancelled_error(error) => Some(AgentEvent::Error {
206 message: "Agent execution cancelled by user".to_string(),
207 }),
208 Err(error) => Some(AgentEvent::Error {
209 message: error.to_string(),
210 }),
211 }
212}
213
214fn is_cancelled_error<E>(error: &E) -> bool
215where
216 E: std::fmt::Display,
217{
218 error.to_string().contains("cancelled")
219}
220
221fn system_prompt_for_session(session: &Session) -> Option<String> {
224 session
225 .messages
226 .iter()
227 .find(|message| matches!(message.role, bamboo_agent_core::Role::System))
228 .map(|message| message.content.clone())
229}
230
231fn initial_user_message_for_session(session: &Session) -> String {
232 session
233 .messages
234 .last()
235 .filter(|message| matches!(message.role, bamboo_agent_core::Role::User))
236 .map(|message| message.content.clone())
237 .unwrap_or_default()
238}
239
240fn selected_skill_ids_for_session(session: &Session) -> Option<Vec<String>> {
241 session
242 .metadata
243 .get("selected_skill_ids")
244 .and_then(|raw| crate::skills::selection::parse_selected_skill_ids_metadata(raw))
245}
246
247fn selected_skill_mode_for_session(session: &Session) -> Option<String> {
248 let value = session
249 .metadata
250 .get("skill_mode")
251 .or_else(|| session.metadata.get("mode"))?;
252 let trimmed = value.trim();
253 if trimmed.is_empty() {
254 None
255 } else {
256 Some(trimmed.to_string())
257 }
258}