1use std::collections::{BTreeSet, HashMap};
7use std::future::Future;
8use std::pin::Pin;
9use std::sync::Arc;
10
11use tokio::sync::{mpsc, RwLock};
12use tokio_util::sync::CancellationToken;
13use tracing::Instrument;
14
15use bamboo_agent_core::tools::ToolExecutor;
16use bamboo_agent_core::{AgentError, AgentEvent, Session};
17use bamboo_domain::ReasoningEffort;
18use bamboo_llm::LLMProvider;
19
20use crate::runtime::config::{
21 AuxiliaryModelConfig, BashResumeHook, GoldConfig, GuardianConfig, GuardianSpawner,
22 ImageFallbackConfig,
23};
24use crate::runtime::execution::runner_lifecycle::finalize_runner;
25use crate::runtime::execution::runner_state::AgentRunner;
26use crate::runtime::model_roster::ModelRoster;
27use crate::runtime::Agent;
28use crate::runtime::{ExecuteRequest, ExecuteRequestBuilder};
29
30pub type SessionCache = std::sync::Arc<
38 dashmap::DashMap<String, std::sync::Arc<parking_lot::RwLock<bamboo_agent_core::Session>>>,
39>;
40
41pub fn read_cached_session(cache: &SessionCache, id: &str) -> Option<bamboo_agent_core::Session> {
49 cache
50 .get(id)
51 .map(|e| e.value().clone())
52 .map(|a| a.read().clone())
53}
54
55const SKILL_CONTEXT_START_MARKER: &str = "<!-- BAMBOO_SKILL_CONTEXT_START -->";
56const TOOL_GUIDE_START_MARKER: &str = "<!-- BAMBOO_TOOL_GUIDE_START -->";
57const EXTERNAL_MEMORY_START_MARKER: &str = "<!-- BAMBOO_EXTERNAL_MEMORY_START -->";
58const TASK_LIST_START_MARKER: &str = "<!-- BAMBOO_TASK_LIST_START -->";
59
60pub struct SessionExecutionOutcome {
67 pub success: bool,
69 pub cancelled: bool,
71 pub error: Option<String>,
73}
74
75impl SessionExecutionOutcome {
76 fn from_result(result: &Result<(), AgentError>) -> Self {
77 match result {
78 Ok(()) => Self {
79 success: true,
80 cancelled: false,
81 error: None,
82 },
83 Err(error) => Self {
84 success: false,
85 cancelled: error.is_cancelled(),
86 error: Some(error.to_string()),
87 },
88 }
89 }
90}
91
92pub type SessionCompletionHook = Box<
102 dyn for<'a> FnOnce(
103 SessionExecutionOutcome,
104 &'a mut Session,
105 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>
106 + Send,
107>;
108
109pub struct SessionExecutionArgs {
115 pub agent: Arc<Agent>,
117 pub session_id: String,
118 pub session: Session,
119
120 pub tools_override: Option<Arc<dyn ToolExecutor>>,
122 pub provider_override: Option<Arc<dyn LLMProvider>>,
123 pub model_roster: ModelRoster,
127 pub reasoning_effort: Option<ReasoningEffort>,
128 pub reasoning_effort_source: String,
129 pub auxiliary_model_resolver:
130 Option<Arc<dyn Fn() -> crate::runtime::config::AuxiliaryModelConfig + Send + Sync>>,
131 pub disabled_tools: Option<BTreeSet<String>>,
132 pub disabled_skill_ids: Option<BTreeSet<String>>,
133 pub selected_skill_ids: Option<Vec<String>>,
134 pub selected_skill_mode: Option<String>,
135 pub cancel_token: CancellationToken,
136 pub mpsc_tx: mpsc::Sender<AgentEvent>,
137 pub image_fallback: Option<ImageFallbackConfig>,
138 pub gold_config: Option<GoldConfig>,
139 pub guardian_config: Option<GuardianConfig>,
141 pub guardian_spawner: Option<Arc<dyn GuardianSpawner>>,
144 pub bash_resume_hook: Option<Arc<dyn BashResumeHook>>,
146 pub app_data_dir: Option<std::path::PathBuf>,
147
148 pub runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
150 pub sessions_cache: SessionCache,
151
152 pub on_complete: Option<SessionCompletionHook>,
155}
156
157struct ExecuteRequestParams {
167 tools: Option<Arc<dyn ToolExecutor>>,
168 provider_override: Option<Arc<dyn LLMProvider>>,
169 model_roster: ModelRoster,
170 reasoning_effort: Option<ReasoningEffort>,
171 auxiliary_model_resolver: Option<Arc<dyn Fn() -> AuxiliaryModelConfig + Send + Sync>>,
172 disabled_tools: Option<BTreeSet<String>>,
173 disabled_skill_ids: Option<BTreeSet<String>>,
174 selected_skill_ids: Option<Vec<String>>,
175 selected_skill_mode: Option<String>,
176 image_fallback: Option<ImageFallbackConfig>,
177 gold_config: Option<GoldConfig>,
178 guardian_config: Option<GuardianConfig>,
179 guardian_spawner: Option<Arc<dyn GuardianSpawner>>,
180 bash_resume_hook: Option<Arc<dyn BashResumeHook>>,
181 app_data_dir: Option<std::path::PathBuf>,
182}
183
184fn build_execute_request(
191 initial_message: String,
192 event_tx: mpsc::Sender<AgentEvent>,
193 cancel_token: CancellationToken,
194 params: ExecuteRequestParams,
195) -> ExecuteRequest {
196 let ExecuteRequestParams {
197 tools,
198 provider_override,
199 model_roster,
200 reasoning_effort,
201 auxiliary_model_resolver,
202 disabled_tools,
203 disabled_skill_ids,
204 selected_skill_ids,
205 selected_skill_mode,
206 image_fallback,
207 gold_config,
208 guardian_config,
209 guardian_spawner,
210 bash_resume_hook,
211 app_data_dir,
212 } = params;
213
214 let mut builder = ExecuteRequestBuilder::new(initial_message, event_tx, cancel_token)
215 .model_roster(model_roster)
216 .gold_config(gold_config)
217 .guardian_config(guardian_config)
218 .guardian_spawner(guardian_spawner)
219 .bash_resume_hook(bash_resume_hook);
220
221 if let Some(tools) = tools {
222 builder = builder.tools(tools);
223 }
224 if let Some(provider_override) = provider_override {
225 builder = builder.provider_override(provider_override);
226 }
227 if let Some(reasoning_effort) = reasoning_effort {
228 builder = builder.reasoning_effort(reasoning_effort);
229 }
230 if let Some(auxiliary_model_resolver) = auxiliary_model_resolver {
231 builder = builder.auxiliary_model_resolver(auxiliary_model_resolver);
232 }
233 if let Some(disabled_tools) = disabled_tools {
234 builder = builder.disabled_tools(disabled_tools);
235 }
236 if let Some(disabled_skill_ids) = disabled_skill_ids {
237 builder = builder.disabled_skill_ids(disabled_skill_ids);
238 }
239 if let Some(selected_skill_ids) = selected_skill_ids {
240 builder = builder.selected_skill_ids(selected_skill_ids);
241 }
242 if let Some(selected_skill_mode) = selected_skill_mode {
243 builder = builder.selected_skill_mode(selected_skill_mode);
244 }
245 if let Some(image_fallback) = image_fallback {
246 builder = builder.image_fallback(image_fallback);
247 }
248 if let Some(app_data_dir) = app_data_dir {
249 builder = builder.app_data_dir(app_data_dir);
250 }
251
252 builder.build()
253}
254
255pub fn spawn_session_execution(args: SessionExecutionArgs) {
264 let span_session_id = args.session_id.clone();
265 let session_span = tracing::info_span!("agent_execution", session_id = %span_session_id);
266
267 tokio::spawn(
268 async move {
269 let SessionExecutionArgs {
270 agent,
271 session_id,
272 mut session,
273 tools_override,
274 provider_override,
275 model_roster,
276 reasoning_effort,
277 reasoning_effort_source,
278 auxiliary_model_resolver,
279 disabled_tools,
280 disabled_skill_ids,
281 selected_skill_ids,
282 selected_skill_mode,
283 cancel_token,
284 mpsc_tx,
285 image_fallback,
286 gold_config,
287 guardian_config,
288 guardian_spawner,
289 bash_resume_hook,
290 app_data_dir,
291 runners,
292 sessions_cache,
293 on_complete,
294 } = args;
295
296 let model = model_roster.model.clone().unwrap_or_default();
300
301 let initial_message = initial_user_message_for_session(&session);
302 let selected_skill_ids =
303 selected_skill_ids.or_else(|| selected_skill_ids_for_session(&session));
304 let selected_skill_mode =
305 selected_skill_mode.or_else(|| selected_skill_mode_for_session(&session));
306
307 tracing::info!(
308 "[{}] Using resolved session model: {}, reasoning_effort={}, reasoning_source={}",
309 session_id,
310 model,
311 reasoning_effort
312 .map(ReasoningEffort::as_str)
313 .unwrap_or("none"),
314 reasoning_effort_source
315 );
316
317 crate::session_app::execution_prep::prepare_session_for_execution(
324 &mut session,
325 None,
326 Some(&model),
327 );
328
329 let system_prompt = system_prompt_for_session(&session);
330 if let Some(prompt) = system_prompt.as_ref() {
331 log_base_system_prompt_snapshot(&session_id, prompt);
332 }
333
334 let execute_request = build_execute_request(
335 initial_message,
336 mpsc_tx.clone(),
337 cancel_token,
338 ExecuteRequestParams {
339 tools: tools_override,
340 provider_override,
341 model_roster,
342 reasoning_effort,
343 auxiliary_model_resolver,
344 disabled_tools,
345 disabled_skill_ids,
346 selected_skill_ids,
347 selected_skill_mode,
348 image_fallback,
349 gold_config,
350 guardian_config,
351 guardian_spawner,
352 bash_resume_hook,
353 app_data_dir,
354 },
355 );
356
357 let result = agent.execute(&mut session, execute_request).await;
358
359 if let Some(error_event) = terminal_error_event_for_result(&result) {
361 let _ = mpsc_tx.send(error_event).await;
362 }
363
364 finalize_runner(&runners, &session_id, &result).await;
366
367 if let Some(on_complete) = on_complete {
372 on_complete(SessionExecutionOutcome::from_result(&result), &mut session).await;
373 }
374
375 if let Err(error) = agent.persistence().save_runtime_session(&mut session).await {
379 tracing::warn!("[{}] Failed to save session: {}", session_id, error);
380 }
381
382 sessions_cache.insert(
384 session_id.clone(),
385 Arc::new(parking_lot::RwLock::new(session)),
386 );
387
388 tracing::info!("[{}] Agent execution completed", session_id);
389 }
390 .instrument(session_span),
391 );
392}
393
394pub fn log_base_system_prompt_snapshot(session_id: &str, prompt: &str) {
396 tracing::info!(
397 "[{}] Base system prompt snapshot: len={} chars, has_skill={}, has_tool_guide={}, has_external_memory={}, has_task_list={}",
398 session_id,
399 prompt.len(),
400 prompt.contains(SKILL_CONTEXT_START_MARKER),
401 prompt.contains(TOOL_GUIDE_START_MARKER),
402 prompt.contains(EXTERNAL_MEMORY_START_MARKER),
403 prompt.contains(TASK_LIST_START_MARKER),
404 );
405
406 tracing::debug!(
407 "[{}] ========== BASE SYSTEM PROMPT SNAPSHOT ==========",
408 session_id
409 );
410 tracing::debug!("[{}] Snapshot length: {} chars", session_id, prompt.len());
411 tracing::debug!("[{}] -----------------------------------", session_id);
412 tracing::debug!("[{}] {}", session_id, prompt);
413 tracing::debug!(
414 "[{}] ========== END BASE SYSTEM PROMPT SNAPSHOT ==========",
415 session_id
416 );
417}
418
419pub fn terminal_error_event_for_result(result: &Result<(), AgentError>) -> Option<AgentEvent> {
421 match result {
422 Ok(_) => None,
423 Err(error) if error.is_cancelled() => Some(AgentEvent::Error {
424 message: "Agent execution cancelled by user".to_string(),
425 }),
426 Err(error) => Some(AgentEvent::Error {
427 message: error.to_string(),
428 }),
429 }
430}
431
432fn system_prompt_for_session(session: &Session) -> Option<String> {
435 session
436 .messages
437 .iter()
438 .find(|message| matches!(message.role, bamboo_agent_core::Role::System))
439 .map(|message| message.content.clone())
440}
441
442fn initial_user_message_for_session(session: &Session) -> String {
443 session
444 .messages
445 .last()
446 .filter(|message| matches!(message.role, bamboo_agent_core::Role::User))
447 .map(|message| message.content.clone())
448 .unwrap_or_default()
449}
450
451fn selected_skill_ids_for_session(session: &Session) -> Option<Vec<String>> {
452 session
453 .metadata
454 .get("selected_skill_ids")
455 .and_then(|raw| bamboo_skills::selection::parse_selected_skill_ids_metadata(raw))
456}
457
458fn selected_skill_mode_for_session(session: &Session) -> Option<String> {
459 let value = session
460 .metadata
461 .get("skill_mode")
462 .or_else(|| session.metadata.get("mode"))?;
463 let trimmed = value.trim();
464 if trimmed.is_empty() {
465 None
466 } else {
467 Some(trimmed.to_string())
468 }
469}