1use std::collections::{BTreeSet, HashMap};
7use std::future::Future;
8use std::pin::Pin;
9use std::sync::Arc;
10
11use tokio::sync::{mpsc, RwLock};
12use tokio_util::sync::CancellationToken;
13use tracing::Instrument;
14
15use bamboo_agent_core::tools::ToolExecutor;
16use bamboo_agent_core::{AgentError, AgentEvent, Session};
17use bamboo_domain::ReasoningEffort;
18use bamboo_llm::LLMProvider;
19
20use crate::runtime::config::{AuxiliaryModelConfig, GoldConfig, ImageFallbackConfig};
21use crate::runtime::execution::runner_lifecycle::finalize_runner;
22use crate::runtime::execution::runner_state::AgentRunner;
23use crate::runtime::model_roster::ModelRoster;
24use crate::runtime::Agent;
25use crate::runtime::{ExecuteRequest, ExecuteRequestBuilder};
26
27pub type SessionCache = std::sync::Arc<
35 dashmap::DashMap<String, std::sync::Arc<parking_lot::RwLock<bamboo_agent_core::Session>>>,
36>;
37
38pub fn read_cached_session(cache: &SessionCache, id: &str) -> Option<bamboo_agent_core::Session> {
46 cache
47 .get(id)
48 .map(|e| e.value().clone())
49 .map(|a| a.read().clone())
50}
51
52const SKILL_CONTEXT_START_MARKER: &str = "<!-- BAMBOO_SKILL_CONTEXT_START -->";
53const TOOL_GUIDE_START_MARKER: &str = "<!-- BAMBOO_TOOL_GUIDE_START -->";
54const EXTERNAL_MEMORY_START_MARKER: &str = "<!-- BAMBOO_EXTERNAL_MEMORY_START -->";
55const TASK_LIST_START_MARKER: &str = "<!-- BAMBOO_TASK_LIST_START -->";
56
57pub struct SessionExecutionOutcome {
64 pub success: bool,
66 pub cancelled: bool,
68 pub error: Option<String>,
70}
71
72impl SessionExecutionOutcome {
73 fn from_result(result: &Result<(), AgentError>) -> Self {
74 match result {
75 Ok(()) => Self {
76 success: true,
77 cancelled: false,
78 error: None,
79 },
80 Err(error) => Self {
81 success: false,
82 cancelled: error.is_cancelled(),
83 error: Some(error.to_string()),
84 },
85 }
86 }
87}
88
89pub type SessionCompletionHook = Box<
99 dyn for<'a> FnOnce(
100 SessionExecutionOutcome,
101 &'a mut Session,
102 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>
103 + Send,
104>;
105
106pub struct SessionExecutionArgs {
112 pub agent: Arc<Agent>,
114 pub session_id: String,
115 pub session: Session,
116
117 pub tools_override: Option<Arc<dyn ToolExecutor>>,
119 pub provider_override: Option<Arc<dyn LLMProvider>>,
120 pub model_roster: ModelRoster,
124 pub reasoning_effort: Option<ReasoningEffort>,
125 pub reasoning_effort_source: String,
126 pub auxiliary_model_resolver:
127 Option<Arc<dyn Fn() -> crate::runtime::config::AuxiliaryModelConfig + Send + Sync>>,
128 pub disabled_tools: Option<BTreeSet<String>>,
129 pub disabled_skill_ids: Option<BTreeSet<String>>,
130 pub selected_skill_ids: Option<Vec<String>>,
131 pub selected_skill_mode: Option<String>,
132 pub cancel_token: CancellationToken,
133 pub mpsc_tx: mpsc::Sender<AgentEvent>,
134 pub image_fallback: Option<ImageFallbackConfig>,
135 pub gold_config: Option<GoldConfig>,
136 pub app_data_dir: Option<std::path::PathBuf>,
137
138 pub runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
140 pub sessions_cache: SessionCache,
141
142 pub on_complete: Option<SessionCompletionHook>,
145}
146
147struct ExecuteRequestParams {
157 tools: Option<Arc<dyn ToolExecutor>>,
158 provider_override: Option<Arc<dyn LLMProvider>>,
159 model_roster: ModelRoster,
160 reasoning_effort: Option<ReasoningEffort>,
161 auxiliary_model_resolver: Option<Arc<dyn Fn() -> AuxiliaryModelConfig + Send + Sync>>,
162 disabled_tools: Option<BTreeSet<String>>,
163 disabled_skill_ids: Option<BTreeSet<String>>,
164 selected_skill_ids: Option<Vec<String>>,
165 selected_skill_mode: Option<String>,
166 image_fallback: Option<ImageFallbackConfig>,
167 gold_config: Option<GoldConfig>,
168 app_data_dir: Option<std::path::PathBuf>,
169}
170
171fn build_execute_request(
178 initial_message: String,
179 event_tx: mpsc::Sender<AgentEvent>,
180 cancel_token: CancellationToken,
181 params: ExecuteRequestParams,
182) -> ExecuteRequest {
183 let ExecuteRequestParams {
184 tools,
185 provider_override,
186 model_roster,
187 reasoning_effort,
188 auxiliary_model_resolver,
189 disabled_tools,
190 disabled_skill_ids,
191 selected_skill_ids,
192 selected_skill_mode,
193 image_fallback,
194 gold_config,
195 app_data_dir,
196 } = params;
197
198 let mut builder = ExecuteRequestBuilder::new(initial_message, event_tx, cancel_token)
199 .model_roster(model_roster)
200 .gold_config(gold_config);
201
202 if let Some(tools) = tools {
203 builder = builder.tools(tools);
204 }
205 if let Some(provider_override) = provider_override {
206 builder = builder.provider_override(provider_override);
207 }
208 if let Some(reasoning_effort) = reasoning_effort {
209 builder = builder.reasoning_effort(reasoning_effort);
210 }
211 if let Some(auxiliary_model_resolver) = auxiliary_model_resolver {
212 builder = builder.auxiliary_model_resolver(auxiliary_model_resolver);
213 }
214 if let Some(disabled_tools) = disabled_tools {
215 builder = builder.disabled_tools(disabled_tools);
216 }
217 if let Some(disabled_skill_ids) = disabled_skill_ids {
218 builder = builder.disabled_skill_ids(disabled_skill_ids);
219 }
220 if let Some(selected_skill_ids) = selected_skill_ids {
221 builder = builder.selected_skill_ids(selected_skill_ids);
222 }
223 if let Some(selected_skill_mode) = selected_skill_mode {
224 builder = builder.selected_skill_mode(selected_skill_mode);
225 }
226 if let Some(image_fallback) = image_fallback {
227 builder = builder.image_fallback(image_fallback);
228 }
229 if let Some(app_data_dir) = app_data_dir {
230 builder = builder.app_data_dir(app_data_dir);
231 }
232
233 builder.build()
234}
235
236pub fn spawn_session_execution(args: SessionExecutionArgs) {
245 let span_session_id = args.session_id.clone();
246 let session_span = tracing::info_span!("agent_execution", session_id = %span_session_id);
247
248 tokio::spawn(
249 async move {
250 let SessionExecutionArgs {
251 agent,
252 session_id,
253 mut session,
254 tools_override,
255 provider_override,
256 model_roster,
257 reasoning_effort,
258 reasoning_effort_source,
259 auxiliary_model_resolver,
260 disabled_tools,
261 disabled_skill_ids,
262 selected_skill_ids,
263 selected_skill_mode,
264 cancel_token,
265 mpsc_tx,
266 image_fallback,
267 gold_config,
268 app_data_dir,
269 runners,
270 sessions_cache,
271 on_complete,
272 } = args;
273
274 let model = model_roster.model.clone().unwrap_or_default();
278
279 let initial_message = initial_user_message_for_session(&session);
280 let selected_skill_ids =
281 selected_skill_ids.or_else(|| selected_skill_ids_for_session(&session));
282 let selected_skill_mode =
283 selected_skill_mode.or_else(|| selected_skill_mode_for_session(&session));
284
285 tracing::info!(
286 "[{}] Using resolved session model: {}, reasoning_effort={}, reasoning_source={}",
287 session_id,
288 model,
289 reasoning_effort
290 .map(ReasoningEffort::as_str)
291 .unwrap_or("none"),
292 reasoning_effort_source
293 );
294
295 crate::session_app::execution_prep::prepare_session_for_execution(
302 &mut session,
303 None,
304 Some(&model),
305 );
306
307 let system_prompt = system_prompt_for_session(&session);
308 if let Some(prompt) = system_prompt.as_ref() {
309 log_base_system_prompt_snapshot(&session_id, prompt);
310 }
311
312 let execute_request = build_execute_request(
313 initial_message,
314 mpsc_tx.clone(),
315 cancel_token,
316 ExecuteRequestParams {
317 tools: tools_override,
318 provider_override,
319 model_roster,
320 reasoning_effort,
321 auxiliary_model_resolver,
322 disabled_tools,
323 disabled_skill_ids,
324 selected_skill_ids,
325 selected_skill_mode,
326 image_fallback,
327 gold_config,
328 app_data_dir,
329 },
330 );
331
332 let result = agent.execute(&mut session, execute_request).await;
333
334 if let Some(error_event) = terminal_error_event_for_result(&result) {
336 let _ = mpsc_tx.send(error_event).await;
337 }
338
339 finalize_runner(&runners, &session_id, &result).await;
341
342 if let Some(on_complete) = on_complete {
347 on_complete(SessionExecutionOutcome::from_result(&result), &mut session).await;
348 }
349
350 if let Err(error) = agent.persistence().save_runtime_session(&mut session).await {
354 tracing::warn!("[{}] Failed to save session: {}", session_id, error);
355 }
356
357 sessions_cache.insert(
359 session_id.clone(),
360 Arc::new(parking_lot::RwLock::new(session)),
361 );
362
363 tracing::info!("[{}] Agent execution completed", session_id);
364 }
365 .instrument(session_span),
366 );
367}
368
369pub fn log_base_system_prompt_snapshot(session_id: &str, prompt: &str) {
371 tracing::info!(
372 "[{}] Base system prompt snapshot: len={} chars, has_skill={}, has_tool_guide={}, has_external_memory={}, has_task_list={}",
373 session_id,
374 prompt.len(),
375 prompt.contains(SKILL_CONTEXT_START_MARKER),
376 prompt.contains(TOOL_GUIDE_START_MARKER),
377 prompt.contains(EXTERNAL_MEMORY_START_MARKER),
378 prompt.contains(TASK_LIST_START_MARKER),
379 );
380
381 tracing::debug!(
382 "[{}] ========== BASE SYSTEM PROMPT SNAPSHOT ==========",
383 session_id
384 );
385 tracing::debug!("[{}] Snapshot length: {} chars", session_id, prompt.len());
386 tracing::debug!("[{}] -----------------------------------", session_id);
387 tracing::debug!("[{}] {}", session_id, prompt);
388 tracing::debug!(
389 "[{}] ========== END BASE SYSTEM PROMPT SNAPSHOT ==========",
390 session_id
391 );
392}
393
394pub fn terminal_error_event_for_result(result: &Result<(), AgentError>) -> Option<AgentEvent> {
396 match result {
397 Ok(_) => None,
398 Err(error) if error.is_cancelled() => Some(AgentEvent::Error {
399 message: "Agent execution cancelled by user".to_string(),
400 }),
401 Err(error) => Some(AgentEvent::Error {
402 message: error.to_string(),
403 }),
404 }
405}
406
407fn system_prompt_for_session(session: &Session) -> Option<String> {
410 session
411 .messages
412 .iter()
413 .find(|message| matches!(message.role, bamboo_agent_core::Role::System))
414 .map(|message| message.content.clone())
415}
416
417fn initial_user_message_for_session(session: &Session) -> String {
418 session
419 .messages
420 .last()
421 .filter(|message| matches!(message.role, bamboo_agent_core::Role::User))
422 .map(|message| message.content.clone())
423 .unwrap_or_default()
424}
425
426fn selected_skill_ids_for_session(session: &Session) -> Option<Vec<String>> {
427 session
428 .metadata
429 .get("selected_skill_ids")
430 .and_then(|raw| bamboo_skills::selection::parse_selected_skill_ids_metadata(raw))
431}
432
433fn selected_skill_mode_for_session(session: &Session) -> Option<String> {
434 let value = session
435 .metadata
436 .get("skill_mode")
437 .or_else(|| session.metadata.get("mode"))?;
438 let trimmed = value.trim();
439 if trimmed.is_empty() {
440 None
441 } else {
442 Some(trimmed.to_string())
443 }
444}