1use std::collections::{BTreeSet, HashMap};
7use std::future::Future;
8use std::pin::Pin;
9use std::sync::Arc;
10
11use tokio::sync::{mpsc, RwLock};
12use tokio_util::sync::CancellationToken;
13use tracing::Instrument;
14
15use bamboo_agent_core::tools::ToolExecutor;
16use bamboo_agent_core::{AgentError, AgentEvent, Session};
17use bamboo_domain::ReasoningEffort;
18use bamboo_llm::LLMProvider;
19
20use crate::runtime::config::{
21 AuxiliaryModelConfig, GoldConfig, GuardianConfig, GuardianSpawner, ImageFallbackConfig,
22};
23use crate::runtime::execution::runner_lifecycle::finalize_runner;
24use crate::runtime::execution::runner_state::AgentRunner;
25use crate::runtime::model_roster::ModelRoster;
26use crate::runtime::Agent;
27use crate::runtime::{ExecuteRequest, ExecuteRequestBuilder};
28
29pub type SessionCache = std::sync::Arc<
37 dashmap::DashMap<String, std::sync::Arc<parking_lot::RwLock<bamboo_agent_core::Session>>>,
38>;
39
40pub fn read_cached_session(cache: &SessionCache, id: &str) -> Option<bamboo_agent_core::Session> {
48 cache
49 .get(id)
50 .map(|e| e.value().clone())
51 .map(|a| a.read().clone())
52}
53
54const SKILL_CONTEXT_START_MARKER: &str = "<!-- BAMBOO_SKILL_CONTEXT_START -->";
55const TOOL_GUIDE_START_MARKER: &str = "<!-- BAMBOO_TOOL_GUIDE_START -->";
56const EXTERNAL_MEMORY_START_MARKER: &str = "<!-- BAMBOO_EXTERNAL_MEMORY_START -->";
57const TASK_LIST_START_MARKER: &str = "<!-- BAMBOO_TASK_LIST_START -->";
58
59pub struct SessionExecutionOutcome {
66 pub success: bool,
68 pub cancelled: bool,
70 pub error: Option<String>,
72}
73
74impl SessionExecutionOutcome {
75 fn from_result(result: &Result<(), AgentError>) -> Self {
76 match result {
77 Ok(()) => Self {
78 success: true,
79 cancelled: false,
80 error: None,
81 },
82 Err(error) => Self {
83 success: false,
84 cancelled: error.is_cancelled(),
85 error: Some(error.to_string()),
86 },
87 }
88 }
89}
90
91pub type SessionCompletionHook = Box<
101 dyn for<'a> FnOnce(
102 SessionExecutionOutcome,
103 &'a mut Session,
104 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>
105 + Send,
106>;
107
108pub struct SessionExecutionArgs {
114 pub agent: Arc<Agent>,
116 pub session_id: String,
117 pub session: Session,
118
119 pub tools_override: Option<Arc<dyn ToolExecutor>>,
121 pub provider_override: Option<Arc<dyn LLMProvider>>,
122 pub model_roster: ModelRoster,
126 pub reasoning_effort: Option<ReasoningEffort>,
127 pub reasoning_effort_source: String,
128 pub auxiliary_model_resolver:
129 Option<Arc<dyn Fn() -> crate::runtime::config::AuxiliaryModelConfig + Send + Sync>>,
130 pub disabled_tools: Option<BTreeSet<String>>,
131 pub disabled_skill_ids: Option<BTreeSet<String>>,
132 pub selected_skill_ids: Option<Vec<String>>,
133 pub selected_skill_mode: Option<String>,
134 pub cancel_token: CancellationToken,
135 pub mpsc_tx: mpsc::Sender<AgentEvent>,
136 pub image_fallback: Option<ImageFallbackConfig>,
137 pub gold_config: Option<GoldConfig>,
138 pub guardian_config: Option<GuardianConfig>,
140 pub guardian_spawner: Option<Arc<dyn GuardianSpawner>>,
143 pub app_data_dir: Option<std::path::PathBuf>,
144
145 pub runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
147 pub sessions_cache: SessionCache,
148
149 pub on_complete: Option<SessionCompletionHook>,
152}
153
154struct ExecuteRequestParams {
164 tools: Option<Arc<dyn ToolExecutor>>,
165 provider_override: Option<Arc<dyn LLMProvider>>,
166 model_roster: ModelRoster,
167 reasoning_effort: Option<ReasoningEffort>,
168 auxiliary_model_resolver: Option<Arc<dyn Fn() -> AuxiliaryModelConfig + Send + Sync>>,
169 disabled_tools: Option<BTreeSet<String>>,
170 disabled_skill_ids: Option<BTreeSet<String>>,
171 selected_skill_ids: Option<Vec<String>>,
172 selected_skill_mode: Option<String>,
173 image_fallback: Option<ImageFallbackConfig>,
174 gold_config: Option<GoldConfig>,
175 guardian_config: Option<GuardianConfig>,
176 guardian_spawner: Option<Arc<dyn GuardianSpawner>>,
177 app_data_dir: Option<std::path::PathBuf>,
178}
179
180fn build_execute_request(
187 initial_message: String,
188 event_tx: mpsc::Sender<AgentEvent>,
189 cancel_token: CancellationToken,
190 params: ExecuteRequestParams,
191) -> ExecuteRequest {
192 let ExecuteRequestParams {
193 tools,
194 provider_override,
195 model_roster,
196 reasoning_effort,
197 auxiliary_model_resolver,
198 disabled_tools,
199 disabled_skill_ids,
200 selected_skill_ids,
201 selected_skill_mode,
202 image_fallback,
203 gold_config,
204 guardian_config,
205 guardian_spawner,
206 app_data_dir,
207 } = params;
208
209 let mut builder = ExecuteRequestBuilder::new(initial_message, event_tx, cancel_token)
210 .model_roster(model_roster)
211 .gold_config(gold_config)
212 .guardian_config(guardian_config)
213 .guardian_spawner(guardian_spawner);
214
215 if let Some(tools) = tools {
216 builder = builder.tools(tools);
217 }
218 if let Some(provider_override) = provider_override {
219 builder = builder.provider_override(provider_override);
220 }
221 if let Some(reasoning_effort) = reasoning_effort {
222 builder = builder.reasoning_effort(reasoning_effort);
223 }
224 if let Some(auxiliary_model_resolver) = auxiliary_model_resolver {
225 builder = builder.auxiliary_model_resolver(auxiliary_model_resolver);
226 }
227 if let Some(disabled_tools) = disabled_tools {
228 builder = builder.disabled_tools(disabled_tools);
229 }
230 if let Some(disabled_skill_ids) = disabled_skill_ids {
231 builder = builder.disabled_skill_ids(disabled_skill_ids);
232 }
233 if let Some(selected_skill_ids) = selected_skill_ids {
234 builder = builder.selected_skill_ids(selected_skill_ids);
235 }
236 if let Some(selected_skill_mode) = selected_skill_mode {
237 builder = builder.selected_skill_mode(selected_skill_mode);
238 }
239 if let Some(image_fallback) = image_fallback {
240 builder = builder.image_fallback(image_fallback);
241 }
242 if let Some(app_data_dir) = app_data_dir {
243 builder = builder.app_data_dir(app_data_dir);
244 }
245
246 builder.build()
247}
248
249pub fn spawn_session_execution(args: SessionExecutionArgs) {
258 let span_session_id = args.session_id.clone();
259 let session_span = tracing::info_span!("agent_execution", session_id = %span_session_id);
260
261 tokio::spawn(
262 async move {
263 let SessionExecutionArgs {
264 agent,
265 session_id,
266 mut session,
267 tools_override,
268 provider_override,
269 model_roster,
270 reasoning_effort,
271 reasoning_effort_source,
272 auxiliary_model_resolver,
273 disabled_tools,
274 disabled_skill_ids,
275 selected_skill_ids,
276 selected_skill_mode,
277 cancel_token,
278 mpsc_tx,
279 image_fallback,
280 gold_config,
281 guardian_config,
282 guardian_spawner,
283 app_data_dir,
284 runners,
285 sessions_cache,
286 on_complete,
287 } = args;
288
289 let model = model_roster.model.clone().unwrap_or_default();
293
294 let initial_message = initial_user_message_for_session(&session);
295 let selected_skill_ids =
296 selected_skill_ids.or_else(|| selected_skill_ids_for_session(&session));
297 let selected_skill_mode =
298 selected_skill_mode.or_else(|| selected_skill_mode_for_session(&session));
299
300 tracing::info!(
301 "[{}] Using resolved session model: {}, reasoning_effort={}, reasoning_source={}",
302 session_id,
303 model,
304 reasoning_effort
305 .map(ReasoningEffort::as_str)
306 .unwrap_or("none"),
307 reasoning_effort_source
308 );
309
310 crate::session_app::execution_prep::prepare_session_for_execution(
317 &mut session,
318 None,
319 Some(&model),
320 );
321
322 let system_prompt = system_prompt_for_session(&session);
323 if let Some(prompt) = system_prompt.as_ref() {
324 log_base_system_prompt_snapshot(&session_id, prompt);
325 }
326
327 let execute_request = build_execute_request(
328 initial_message,
329 mpsc_tx.clone(),
330 cancel_token,
331 ExecuteRequestParams {
332 tools: tools_override,
333 provider_override,
334 model_roster,
335 reasoning_effort,
336 auxiliary_model_resolver,
337 disabled_tools,
338 disabled_skill_ids,
339 selected_skill_ids,
340 selected_skill_mode,
341 image_fallback,
342 gold_config,
343 guardian_config,
344 guardian_spawner,
345 app_data_dir,
346 },
347 );
348
349 let result = agent.execute(&mut session, execute_request).await;
350
351 if let Some(error_event) = terminal_error_event_for_result(&result) {
353 let _ = mpsc_tx.send(error_event).await;
354 }
355
356 finalize_runner(&runners, &session_id, &result).await;
358
359 if let Some(on_complete) = on_complete {
364 on_complete(SessionExecutionOutcome::from_result(&result), &mut session).await;
365 }
366
367 if let Err(error) = agent.persistence().save_runtime_session(&mut session).await {
371 tracing::warn!("[{}] Failed to save session: {}", session_id, error);
372 }
373
374 sessions_cache.insert(
376 session_id.clone(),
377 Arc::new(parking_lot::RwLock::new(session)),
378 );
379
380 tracing::info!("[{}] Agent execution completed", session_id);
381 }
382 .instrument(session_span),
383 );
384}
385
386pub fn log_base_system_prompt_snapshot(session_id: &str, prompt: &str) {
388 tracing::info!(
389 "[{}] Base system prompt snapshot: len={} chars, has_skill={}, has_tool_guide={}, has_external_memory={}, has_task_list={}",
390 session_id,
391 prompt.len(),
392 prompt.contains(SKILL_CONTEXT_START_MARKER),
393 prompt.contains(TOOL_GUIDE_START_MARKER),
394 prompt.contains(EXTERNAL_MEMORY_START_MARKER),
395 prompt.contains(TASK_LIST_START_MARKER),
396 );
397
398 tracing::debug!(
399 "[{}] ========== BASE SYSTEM PROMPT SNAPSHOT ==========",
400 session_id
401 );
402 tracing::debug!("[{}] Snapshot length: {} chars", session_id, prompt.len());
403 tracing::debug!("[{}] -----------------------------------", session_id);
404 tracing::debug!("[{}] {}", session_id, prompt);
405 tracing::debug!(
406 "[{}] ========== END BASE SYSTEM PROMPT SNAPSHOT ==========",
407 session_id
408 );
409}
410
411pub fn terminal_error_event_for_result(result: &Result<(), AgentError>) -> Option<AgentEvent> {
413 match result {
414 Ok(_) => None,
415 Err(error) if error.is_cancelled() => Some(AgentEvent::Error {
416 message: "Agent execution cancelled by user".to_string(),
417 }),
418 Err(error) => Some(AgentEvent::Error {
419 message: error.to_string(),
420 }),
421 }
422}
423
424fn system_prompt_for_session(session: &Session) -> Option<String> {
427 session
428 .messages
429 .iter()
430 .find(|message| matches!(message.role, bamboo_agent_core::Role::System))
431 .map(|message| message.content.clone())
432}
433
434fn initial_user_message_for_session(session: &Session) -> String {
435 session
436 .messages
437 .last()
438 .filter(|message| matches!(message.role, bamboo_agent_core::Role::User))
439 .map(|message| message.content.clone())
440 .unwrap_or_default()
441}
442
443fn selected_skill_ids_for_session(session: &Session) -> Option<Vec<String>> {
444 session
445 .metadata
446 .get("selected_skill_ids")
447 .and_then(|raw| bamboo_skills::selection::parse_selected_skill_ids_metadata(raw))
448}
449
450fn selected_skill_mode_for_session(session: &Session) -> Option<String> {
451 let value = session
452 .metadata
453 .get("skill_mode")
454 .or_else(|| session.metadata.get("mode"))?;
455 let trimmed = value.trim();
456 if trimmed.is_empty() {
457 None
458 } else {
459 Some(trimmed.to_string())
460 }
461}