1use std::collections::{BTreeSet, HashMap};
7use std::future::Future;
8use std::pin::Pin;
9use std::sync::Arc;
10
11use tokio::sync::{mpsc, RwLock};
12use tokio_util::sync::CancellationToken;
13use tracing::Instrument;
14
15use bamboo_agent_core::tools::ToolExecutor;
16use bamboo_agent_core::{AgentError, AgentEvent, Session};
17use bamboo_domain::ReasoningEffort;
18use bamboo_llm::LLMProvider;
19
20use crate::runtime::config::{
21 AuxiliaryModelConfig, BashResumeHook, GoldConfig, GuardianConfig, GuardianSpawner,
22 ImageFallbackConfig,
23};
24use crate::runtime::execution::runner_lifecycle::finalize_runner;
25use crate::runtime::execution::runner_state::AgentRunner;
26use crate::runtime::model_roster::ModelRoster;
27use crate::runtime::Agent;
28use crate::runtime::{ExecuteRequest, ExecuteRequestBuilder};
29
30pub type SessionCache = std::sync::Arc<
38 dashmap::DashMap<String, std::sync::Arc<parking_lot::RwLock<bamboo_agent_core::Session>>>,
39>;
40
41pub fn read_cached_session(cache: &SessionCache, id: &str) -> Option<bamboo_agent_core::Session> {
49 cache
50 .get(id)
51 .map(|e| e.value().clone())
52 .map(|a| a.read().clone())
53}
54
55const SKILL_CONTEXT_START_MARKER: &str = "<!-- BAMBOO_SKILL_CONTEXT_START -->";
56const TOOL_GUIDE_START_MARKER: &str = "<!-- BAMBOO_TOOL_GUIDE_START -->";
57const EXTERNAL_MEMORY_START_MARKER: &str = "<!-- BAMBOO_EXTERNAL_MEMORY_START -->";
58const TASK_LIST_START_MARKER: &str = "<!-- BAMBOO_TASK_LIST_START -->";
59
60pub struct SessionExecutionOutcome {
67 pub success: bool,
69 pub cancelled: bool,
71 pub error: Option<String>,
73}
74
75impl SessionExecutionOutcome {
76 fn from_result(result: &Result<(), AgentError>) -> Self {
77 match result {
78 Ok(()) => Self {
79 success: true,
80 cancelled: false,
81 error: None,
82 },
83 Err(error) => Self {
84 success: false,
85 cancelled: error.is_cancelled(),
86 error: Some(error.to_string()),
87 },
88 }
89 }
90}
91
92pub type SessionCompletionHook = Box<
102 dyn for<'a> FnOnce(
103 SessionExecutionOutcome,
104 &'a mut Session,
105 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>
106 + Send,
107>;
108
109pub struct SessionExecutionArgs {
115 pub agent: Arc<Agent>,
117 pub session_id: String,
118 pub session: Session,
119
120 pub tools_override: Option<Arc<dyn ToolExecutor>>,
122 pub provider_override: Option<Arc<dyn LLMProvider>>,
123 pub model_roster: ModelRoster,
127 pub reasoning_effort: Option<ReasoningEffort>,
128 pub reasoning_effort_source: String,
129 pub auxiliary_model_resolver:
130 Option<Arc<dyn Fn() -> crate::runtime::config::AuxiliaryModelConfig + Send + Sync>>,
131 pub disabled_filter_resolver:
135 Option<Arc<dyn Fn() -> (BTreeSet<String>, BTreeSet<String>) + Send + Sync>>,
136 pub disabled_tools: Option<BTreeSet<String>>,
137 pub disabled_skill_ids: Option<BTreeSet<String>>,
138 pub selected_skill_ids: Option<Vec<String>>,
139 pub selected_skill_mode: Option<String>,
140 pub cancel_token: CancellationToken,
141 pub mpsc_tx: mpsc::Sender<AgentEvent>,
142 pub image_fallback: Option<ImageFallbackConfig>,
143 pub gold_config: Option<GoldConfig>,
144 pub guardian_config: Option<GuardianConfig>,
146 pub guardian_spawner: Option<Arc<dyn GuardianSpawner>>,
149 pub bash_resume_hook: Option<Arc<dyn BashResumeHook>>,
151 pub app_data_dir: Option<std::path::PathBuf>,
152
153 pub runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
155 pub sessions_cache: SessionCache,
156
157 pub on_complete: Option<SessionCompletionHook>,
160}
161
162struct ExecuteRequestParams {
172 tools: Option<Arc<dyn ToolExecutor>>,
173 provider_override: Option<Arc<dyn LLMProvider>>,
174 model_roster: ModelRoster,
175 reasoning_effort: Option<ReasoningEffort>,
176 auxiliary_model_resolver: Option<Arc<dyn Fn() -> AuxiliaryModelConfig + Send + Sync>>,
177 disabled_filter_resolver:
178 Option<Arc<dyn Fn() -> (BTreeSet<String>, BTreeSet<String>) + Send + Sync>>,
179 disabled_tools: Option<BTreeSet<String>>,
180 disabled_skill_ids: Option<BTreeSet<String>>,
181 selected_skill_ids: Option<Vec<String>>,
182 selected_skill_mode: Option<String>,
183 image_fallback: Option<ImageFallbackConfig>,
184 gold_config: Option<GoldConfig>,
185 guardian_config: Option<GuardianConfig>,
186 guardian_spawner: Option<Arc<dyn GuardianSpawner>>,
187 bash_resume_hook: Option<Arc<dyn BashResumeHook>>,
188 app_data_dir: Option<std::path::PathBuf>,
189}
190
191fn build_execute_request(
198 initial_message: String,
199 event_tx: mpsc::Sender<AgentEvent>,
200 cancel_token: CancellationToken,
201 params: ExecuteRequestParams,
202) -> ExecuteRequest {
203 let ExecuteRequestParams {
204 tools,
205 provider_override,
206 model_roster,
207 reasoning_effort,
208 auxiliary_model_resolver,
209 disabled_filter_resolver,
210 disabled_tools,
211 disabled_skill_ids,
212 selected_skill_ids,
213 selected_skill_mode,
214 image_fallback,
215 gold_config,
216 guardian_config,
217 guardian_spawner,
218 bash_resume_hook,
219 app_data_dir,
220 } = params;
221
222 let mut builder = ExecuteRequestBuilder::new(initial_message, event_tx, cancel_token)
223 .model_roster(model_roster)
224 .gold_config(gold_config)
225 .guardian_config(guardian_config)
226 .guardian_spawner(guardian_spawner)
227 .bash_resume_hook(bash_resume_hook);
228
229 if let Some(tools) = tools {
230 builder = builder.tools(tools);
231 }
232 if let Some(provider_override) = provider_override {
233 builder = builder.provider_override(provider_override);
234 }
235 if let Some(reasoning_effort) = reasoning_effort {
236 builder = builder.reasoning_effort(reasoning_effort);
237 }
238 if let Some(disabled_filter_resolver) = disabled_filter_resolver {
239 builder = builder.disabled_filter_resolver(disabled_filter_resolver);
240 }
241 if let Some(auxiliary_model_resolver) = auxiliary_model_resolver {
242 builder = builder.auxiliary_model_resolver(auxiliary_model_resolver);
243 }
244 if let Some(disabled_tools) = disabled_tools {
245 builder = builder.disabled_tools(disabled_tools);
246 }
247 if let Some(disabled_skill_ids) = disabled_skill_ids {
248 builder = builder.disabled_skill_ids(disabled_skill_ids);
249 }
250 if let Some(selected_skill_ids) = selected_skill_ids {
251 builder = builder.selected_skill_ids(selected_skill_ids);
252 }
253 if let Some(selected_skill_mode) = selected_skill_mode {
254 builder = builder.selected_skill_mode(selected_skill_mode);
255 }
256 if let Some(image_fallback) = image_fallback {
257 builder = builder.image_fallback(image_fallback);
258 }
259 if let Some(app_data_dir) = app_data_dir {
260 builder = builder.app_data_dir(app_data_dir);
261 }
262
263 builder.build()
264}
265
266pub fn spawn_session_execution(args: SessionExecutionArgs) {
275 let span_session_id = args.session_id.clone();
276 let session_span = tracing::info_span!("agent_execution", session_id = %span_session_id);
277
278 tokio::spawn(
279 async move {
280 let SessionExecutionArgs {
281 agent,
282 session_id,
283 mut session,
284 tools_override,
285 provider_override,
286 model_roster,
287 reasoning_effort,
288 reasoning_effort_source,
289 auxiliary_model_resolver,
290 disabled_filter_resolver,
291 disabled_tools,
292 disabled_skill_ids,
293 selected_skill_ids,
294 selected_skill_mode,
295 cancel_token,
296 mpsc_tx,
297 image_fallback,
298 gold_config,
299 guardian_config,
300 guardian_spawner,
301 bash_resume_hook,
302 app_data_dir,
303 runners,
304 sessions_cache,
305 on_complete,
306 } = args;
307
308 let model = model_roster.model.clone().unwrap_or_default();
312
313 let initial_message = initial_user_message_for_session(&session);
314 let selected_skill_ids =
315 selected_skill_ids.or_else(|| selected_skill_ids_for_session(&session));
316 let selected_skill_mode =
317 selected_skill_mode.or_else(|| selected_skill_mode_for_session(&session));
318
319 tracing::info!(
320 "[{}] Using resolved session model: {}, reasoning_effort={}, reasoning_source={}",
321 session_id,
322 model,
323 reasoning_effort
324 .map(ReasoningEffort::as_str)
325 .unwrap_or("none"),
326 reasoning_effort_source
327 );
328
329 crate::session_app::execution_prep::prepare_session_for_execution(
336 &mut session,
337 None,
338 Some(&model),
339 );
340
341 let system_prompt = system_prompt_for_session(&session);
342 if let Some(prompt) = system_prompt.as_ref() {
343 log_base_system_prompt_snapshot(&session_id, prompt);
344 }
345
346 let execute_request = build_execute_request(
347 initial_message,
348 mpsc_tx.clone(),
349 cancel_token,
350 ExecuteRequestParams {
351 tools: tools_override,
352 provider_override,
353 model_roster,
354 reasoning_effort,
355 auxiliary_model_resolver,
356 disabled_filter_resolver,
357 disabled_tools,
358 disabled_skill_ids,
359 selected_skill_ids,
360 selected_skill_mode,
361 image_fallback,
362 gold_config,
363 guardian_config,
364 guardian_spawner,
365 bash_resume_hook,
366 app_data_dir,
367 },
368 );
369
370 let result = agent.execute(&mut session, execute_request).await;
371
372 if let Some(error_event) = terminal_error_event_for_result(&result) {
374 let _ = mpsc_tx.send(error_event).await;
375 }
376
377 finalize_runner(&runners, &session_id, &result).await;
379
380 if let Some(on_complete) = on_complete {
385 on_complete(SessionExecutionOutcome::from_result(&result), &mut session).await;
386 }
387
388 if let Err(error) = agent.persistence().save_runtime_session(&mut session).await {
392 tracing::warn!("[{}] Failed to save session: {}", session_id, error);
393 }
394
395 sessions_cache.insert(
397 session_id.clone(),
398 Arc::new(parking_lot::RwLock::new(session)),
399 );
400
401 tracing::info!("[{}] Agent execution completed", session_id);
402 }
403 .instrument(session_span),
404 );
405}
406
407pub fn log_base_system_prompt_snapshot(session_id: &str, prompt: &str) {
409 tracing::info!(
410 "[{}] Base system prompt snapshot: len={} chars, has_skill={}, has_tool_guide={}, has_external_memory={}, has_task_list={}",
411 session_id,
412 prompt.len(),
413 prompt.contains(SKILL_CONTEXT_START_MARKER),
414 prompt.contains(TOOL_GUIDE_START_MARKER),
415 prompt.contains(EXTERNAL_MEMORY_START_MARKER),
416 prompt.contains(TASK_LIST_START_MARKER),
417 );
418
419 tracing::debug!(
420 "[{}] ========== BASE SYSTEM PROMPT SNAPSHOT ==========",
421 session_id
422 );
423 tracing::debug!("[{}] Snapshot length: {} chars", session_id, prompt.len());
424 tracing::debug!("[{}] -----------------------------------", session_id);
425 tracing::debug!("[{}] {}", session_id, prompt);
426 tracing::debug!(
427 "[{}] ========== END BASE SYSTEM PROMPT SNAPSHOT ==========",
428 session_id
429 );
430}
431
432pub fn terminal_error_event_for_result(result: &Result<(), AgentError>) -> Option<AgentEvent> {
434 match result {
435 Ok(_) => None,
436 Err(error) if error.is_cancelled() => Some(AgentEvent::Error {
437 message: "Agent execution cancelled by user".to_string(),
438 }),
439 Err(error) => Some(AgentEvent::Error {
440 message: error.to_string(),
441 }),
442 }
443}
444
445fn system_prompt_for_session(session: &Session) -> Option<String> {
448 session
449 .messages
450 .iter()
451 .find(|message| matches!(message.role, bamboo_agent_core::Role::System))
452 .map(|message| message.content.clone())
453}
454
455fn initial_user_message_for_session(session: &Session) -> String {
456 session
457 .messages
458 .last()
459 .filter(|message| matches!(message.role, bamboo_agent_core::Role::User))
460 .map(|message| message.content.clone())
461 .unwrap_or_default()
462}
463
464fn selected_skill_ids_for_session(session: &Session) -> Option<Vec<String>> {
465 session
466 .metadata
467 .get("selected_skill_ids")
468 .and_then(|raw| bamboo_skills::selection::parse_selected_skill_ids_metadata(raw))
469}
470
471fn selected_skill_mode_for_session(session: &Session) -> Option<String> {
472 let value = session
473 .metadata
474 .get("skill_mode")
475 .or_else(|| session.metadata.get("mode"))?;
476 let trimmed = value.trim();
477 if trimmed.is_empty() {
478 None
479 } else {
480 Some(trimmed.to_string())
481 }
482}