use std::collections::{BTreeSet, HashMap};
use std::sync::Arc;
use tokio::sync::{mpsc, RwLock};
use tokio_util::sync::CancellationToken;
use tracing::Instrument;
use bamboo_agent_core::tools::ToolExecutor;
use bamboo_agent_core::{AgentEvent, Session};
use bamboo_domain::ReasoningEffort;
use bamboo_infrastructure::LLMProvider;
use crate::runtime::config::ImageFallbackConfig;
use crate::runtime::execution::runner_lifecycle::finalize_runner;
use crate::runtime::execution::runner_state::AgentRunner;
use crate::runtime::Agent;
use crate::runtime::ExecuteRequest;
const SKILL_CONTEXT_START_MARKER: &str = "<!-- BAMBOO_SKILL_CONTEXT_START -->";
const TOOL_GUIDE_START_MARKER: &str = "<!-- BAMBOO_TOOL_GUIDE_START -->";
const EXTERNAL_MEMORY_START_MARKER: &str = "<!-- BAMBOO_EXTERNAL_MEMORY_START -->";
const TASK_LIST_START_MARKER: &str = "<!-- BAMBOO_TASK_LIST_START -->";
pub struct SessionExecutionArgs {
pub agent: Arc<Agent>,
pub session_id: String,
pub session: Session,
pub tools_override: Option<Arc<dyn ToolExecutor>>,
pub provider_override: Option<Arc<dyn LLMProvider>>,
pub provider_name: Option<String>,
pub model: String,
pub fast_model: Option<String>,
pub background_model_provider: Option<Arc<dyn LLMProvider>>,
pub reasoning_effort: Option<ReasoningEffort>,
pub reasoning_effort_source: String,
pub disabled_tools: Option<BTreeSet<String>>,
pub disabled_skill_ids: Option<BTreeSet<String>>,
pub selected_skill_ids: Option<Vec<String>>,
pub selected_skill_mode: Option<String>,
pub cancel_token: CancellationToken,
pub mpsc_tx: mpsc::Sender<AgentEvent>,
pub image_fallback: Option<ImageFallbackConfig>,
pub runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
pub sessions_cache: Arc<RwLock<HashMap<String, Session>>>,
}
pub fn spawn_session_execution(args: SessionExecutionArgs) {
let span_session_id = args.session_id.clone();
let session_span = tracing::info_span!("agent_execution", session_id = %span_session_id);
tokio::spawn(
async move {
let SessionExecutionArgs {
agent,
session_id,
mut session,
tools_override,
provider_override,
provider_name,
model,
fast_model,
background_model_provider,
reasoning_effort,
reasoning_effort_source,
disabled_tools,
disabled_skill_ids,
selected_skill_ids,
selected_skill_mode,
cancel_token,
mpsc_tx,
image_fallback,
runners,
sessions_cache,
} = args;
let initial_title = session.title.clone();
let initial_pinned = session.pinned;
let initial_message = initial_user_message_for_session(&session);
let selected_skill_ids =
selected_skill_ids.or_else(|| selected_skill_ids_for_session(&session));
let selected_skill_mode =
selected_skill_mode.or_else(|| selected_skill_mode_for_session(&session));
tracing::info!(
"[{}] Using resolved session model: {}, reasoning_effort={}, reasoning_source={}",
session_id,
model,
reasoning_effort
.map(ReasoningEffort::as_str)
.unwrap_or("none"),
reasoning_effort_source
);
session.model = model.clone();
let system_prompt = system_prompt_for_session(&session);
if let Some(prompt) = system_prompt.as_ref() {
log_base_system_prompt_snapshot(&session_id, prompt);
}
let result = agent
.execute(
&mut session,
ExecuteRequest {
initial_message,
event_tx: mpsc_tx.clone(),
cancel_token,
tools: tools_override,
provider_override,
model: Some(model),
provider_name,
background_model: fast_model,
background_model_provider,
reasoning_effort,
disabled_tools,
disabled_skill_ids,
selected_skill_ids,
selected_skill_mode,
image_fallback,
},
)
.await;
if let Some(error_event) = terminal_error_event_for_result(&result) {
let _ = mpsc_tx.send(error_event).await;
}
finalize_runner(&runners, &session_id, &result).await;
match agent.storage().load_session(&session_id).await {
Ok(Some(latest_persisted)) => preserve_concurrent_session_overrides(
&mut session,
&latest_persisted,
&initial_title,
initial_pinned,
),
Ok(None) => {}
Err(error) => {
tracing::warn!(
"[{}] Failed to load latest session before final save: {}",
session_id,
error
);
}
}
if let Err(error) = agent.storage().save_session(&session).await {
tracing::warn!("[{}] Failed to save session: {}", session_id, error);
}
{
let mut sessions = sessions_cache.write().await;
sessions.insert(session_id.clone(), session);
}
tracing::info!("[{}] Agent execution completed", session_id);
}
.instrument(session_span),
);
}
pub fn log_base_system_prompt_snapshot(session_id: &str, prompt: &str) {
tracing::info!(
"[{}] Base system prompt snapshot: len={} chars, has_skill={}, has_tool_guide={}, has_external_memory={}, has_task_list={}",
session_id,
prompt.len(),
prompt.contains(SKILL_CONTEXT_START_MARKER),
prompt.contains(TOOL_GUIDE_START_MARKER),
prompt.contains(EXTERNAL_MEMORY_START_MARKER),
prompt.contains(TASK_LIST_START_MARKER),
);
tracing::debug!(
"[{}] ========== BASE SYSTEM PROMPT SNAPSHOT ==========",
session_id
);
tracing::debug!("[{}] Snapshot length: {} chars", session_id, prompt.len());
tracing::debug!("[{}] -----------------------------------", session_id);
tracing::debug!("[{}] {}", session_id, prompt);
tracing::debug!(
"[{}] ========== END BASE SYSTEM PROMPT SNAPSHOT ==========",
session_id
);
}
pub fn preserve_concurrent_session_overrides(
session: &mut Session,
latest_persisted: &Session,
initial_title: &str,
initial_pinned: bool,
) {
if session.title == initial_title {
session.title = latest_persisted.title.clone();
}
if session.pinned == initial_pinned {
session.pinned = latest_persisted.pinned;
}
}
pub fn terminal_error_event_for_result<E>(result: &Result<(), E>) -> Option<AgentEvent>
where
E: std::fmt::Display,
{
match result {
Ok(_) => None,
Err(error) if is_cancelled_error(error) => Some(AgentEvent::Error {
message: "Agent execution cancelled by user".to_string(),
}),
Err(error) => Some(AgentEvent::Error {
message: error.to_string(),
}),
}
}
fn is_cancelled_error<E>(error: &E) -> bool
where
E: std::fmt::Display,
{
error.to_string().contains("cancelled")
}
fn system_prompt_for_session(session: &Session) -> Option<String> {
session
.messages
.iter()
.find(|message| matches!(message.role, bamboo_agent_core::Role::System))
.map(|message| message.content.clone())
}
fn initial_user_message_for_session(session: &Session) -> String {
session
.messages
.last()
.filter(|message| matches!(message.role, bamboo_agent_core::Role::User))
.map(|message| message.content.clone())
.unwrap_or_default()
}
fn selected_skill_ids_for_session(session: &Session) -> Option<Vec<String>> {
session
.metadata
.get("selected_skill_ids")
.and_then(|raw| crate::skills::selection::parse_selected_skill_ids_metadata(raw))
}
fn selected_skill_mode_for_session(session: &Session) -> Option<String> {
let value = session
.metadata
.get("skill_mode")
.or_else(|| session.metadata.get("mode"))?;
let trimmed = value.trim();
if trimmed.is_empty() {
None
} else {
Some(trimmed.to_string())
}
}