Skip to main content

bamboo_engine/runtime/execution/
agent_spawn.rs

1//! Core agent execution spawning logic.
2//!
3//! Provides [`spawn_session_execution`] which handles the full lifecycle of a
4//! background agent run: spawn task → execute → finalize runner → persist session.
5
6use std::collections::{BTreeSet, HashMap};
7use std::sync::Arc;
8
9use tokio::sync::{mpsc, RwLock};
10use tokio_util::sync::CancellationToken;
11use tracing::Instrument;
12
13use bamboo_agent_core::tools::ToolExecutor;
14use bamboo_agent_core::{AgentEvent, Session};
15use bamboo_domain::ReasoningEffort;
16use bamboo_infrastructure::LLMProvider;
17
18use crate::runtime::config::{GoldConfig, ImageFallbackConfig};
19use crate::runtime::execution::runner_lifecycle::finalize_runner;
20use crate::runtime::execution::runner_state::AgentRunner;
21use crate::runtime::Agent;
22use crate::runtime::ExecuteRequest;
23
24const SKILL_CONTEXT_START_MARKER: &str = "<!-- BAMBOO_SKILL_CONTEXT_START -->";
25const TOOL_GUIDE_START_MARKER: &str = "<!-- BAMBOO_TOOL_GUIDE_START -->";
26const EXTERNAL_MEMORY_START_MARKER: &str = "<!-- BAMBOO_EXTERNAL_MEMORY_START -->";
27const TASK_LIST_START_MARKER: &str = "<!-- BAMBOO_TASK_LIST_START -->";
28
29/// Arguments for spawning a background agent execution.
30///
31/// This is the crate-agnostic equivalent of the server's `SpawnAgentExecution`.
32/// It holds everything needed to run the agent loop and persist the result,
33/// without depending on HTTP types or `AppState`.
34pub struct SessionExecutionArgs {
35    // Core execution.
36    pub agent: Arc<Agent>,
37    pub session_id: String,
38    pub session: Session,
39
40    // Execution parameters.
41    pub tools_override: Option<Arc<dyn ToolExecutor>>,
42    pub provider_override: Option<Arc<dyn LLMProvider>>,
43    pub provider_name: Option<String>,
44    pub provider_type: Option<String>,
45    pub model: String,
46    pub fast_model: Option<String>,
47    /// Optional provider override for lightweight fast-model calls.
48    pub fast_model_provider: Option<Arc<dyn LLMProvider>>,
49    pub background_model: Option<String>,
50    /// Optional provider override for memory/background model calls.
51    pub background_model_provider: Option<Arc<dyn LLMProvider>>,
52    pub summarization_model: Option<String>,
53    /// Optional provider override for summarization / context compression calls.
54    pub summarization_model_provider: Option<Arc<dyn LLMProvider>>,
55    pub reasoning_effort: Option<ReasoningEffort>,
56    pub reasoning_effort_source: String,
57    pub auxiliary_model_resolver:
58        Option<Arc<dyn Fn() -> crate::runtime::config::AuxiliaryModelConfig + Send + Sync>>,
59    pub disabled_tools: Option<BTreeSet<String>>,
60    pub disabled_skill_ids: Option<BTreeSet<String>>,
61    pub selected_skill_ids: Option<Vec<String>>,
62    pub selected_skill_mode: Option<String>,
63    pub cancel_token: CancellationToken,
64    pub mpsc_tx: mpsc::Sender<AgentEvent>,
65    pub image_fallback: Option<ImageFallbackConfig>,
66    pub gold_config: Option<GoldConfig>,
67    pub app_data_dir: Option<std::path::PathBuf>,
68
69    // Post-execution resources.
70    pub runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
71    pub sessions_cache: Arc<RwLock<HashMap<String, Session>>>,
72}
73
74/// Spawn a background agent execution task.
75///
76/// This function spawns a tokio task that:
77/// 1. Executes the agent loop via `agent.execute()`
78/// 2. Sends a terminal error event if the execution fails
79/// 3. Finalizes the runner status
80/// 4. Persists the session via merge-save (preserves concurrent UI title/pin edits)
81/// 5. Updates the in-memory session cache
82pub fn spawn_session_execution(args: SessionExecutionArgs) {
83    let span_session_id = args.session_id.clone();
84    let session_span = tracing::info_span!("agent_execution", session_id = %span_session_id);
85
86    tokio::spawn(
87        async move {
88            let SessionExecutionArgs {
89                agent,
90                session_id,
91                mut session,
92                tools_override,
93                provider_override,
94                provider_name,
95                provider_type,
96                model,
97                fast_model,
98                fast_model_provider,
99                background_model,
100                background_model_provider,
101                summarization_model,
102                summarization_model_provider,
103                reasoning_effort,
104                reasoning_effort_source,
105                auxiliary_model_resolver,
106                disabled_tools,
107                disabled_skill_ids,
108                selected_skill_ids,
109                selected_skill_mode,
110                cancel_token,
111                mpsc_tx,
112                image_fallback,
113                gold_config,
114                app_data_dir,
115                runners,
116                sessions_cache,
117            } = args;
118
119            let initial_message = initial_user_message_for_session(&session);
120            let selected_skill_ids =
121                selected_skill_ids.or_else(|| selected_skill_ids_for_session(&session));
122            let selected_skill_mode =
123                selected_skill_mode.or_else(|| selected_skill_mode_for_session(&session));
124
125            tracing::info!(
126                "[{}] Using resolved session model: {}, reasoning_effort={}, reasoning_source={}",
127                session_id,
128                model,
129                reasoning_effort
130                    .map(ReasoningEffort::as_str)
131                    .unwrap_or("none"),
132                reasoning_effort_source
133            );
134
135            session.model = model.clone();
136
137            let system_prompt = system_prompt_for_session(&session);
138            if let Some(prompt) = system_prompt.as_ref() {
139                log_base_system_prompt_snapshot(&session_id, prompt);
140            }
141
142            let result = agent
143                .execute(
144                    &mut session,
145                    ExecuteRequest {
146                        initial_message,
147                        event_tx: mpsc_tx.clone(),
148                        cancel_token,
149                        tools: tools_override,
150                        provider_override,
151                        model: Some(model),
152                        provider_name,
153                        provider_type,
154                        fast_model,
155                        fast_model_provider,
156                        background_model,
157                        background_model_provider,
158                        summarization_model,
159                        summarization_model_provider,
160                        reasoning_effort,
161                        auxiliary_model_resolver,
162                        disabled_tools,
163                        disabled_skill_ids,
164                        selected_skill_ids,
165                        selected_skill_mode,
166                        image_fallback,
167                        gold_config,
168                        app_data_dir,
169                    },
170                )
171                .await;
172
173            // Send terminal event for all error cases (including cancellation).
174            if let Some(error_event) = terminal_error_event_for_result(&result) {
175                let _ = mpsc_tx.send(error_event).await;
176            }
177
178            // Update runner status.
179            finalize_runner(&runners, &session_id, &result).await;
180
181            // Save session via merge-save so any concurrent UI edits to
182            // title / pinned / title_version are preserved (the runtime is not
183            // an authoritative title writer).
184            if let Err(error) = agent.persistence().save_runtime_session(&mut session).await {
185                tracing::warn!("[{}] Failed to save session: {}", session_id, error);
186            }
187
188            // Update memory cache.
189            {
190                let mut sessions = sessions_cache.write().await;
191                sessions.insert(session_id.clone(), session);
192            }
193
194            tracing::info!("[{}] Agent execution completed", session_id);
195        }
196        .instrument(session_span),
197    );
198}
199
200/// Log a snapshot of the base system prompt for debugging.
201pub fn log_base_system_prompt_snapshot(session_id: &str, prompt: &str) {
202    tracing::info!(
203        "[{}] Base system prompt snapshot: len={} chars, has_skill={}, has_tool_guide={}, has_external_memory={}, has_task_list={}",
204        session_id,
205        prompt.len(),
206        prompt.contains(SKILL_CONTEXT_START_MARKER),
207        prompt.contains(TOOL_GUIDE_START_MARKER),
208        prompt.contains(EXTERNAL_MEMORY_START_MARKER),
209        prompt.contains(TASK_LIST_START_MARKER),
210    );
211
212    tracing::debug!(
213        "[{}] ========== BASE SYSTEM PROMPT SNAPSHOT ==========",
214        session_id
215    );
216    tracing::debug!("[{}] Snapshot length: {} chars", session_id, prompt.len());
217    tracing::debug!("[{}] -----------------------------------", session_id);
218    tracing::debug!("[{}] {}", session_id, prompt);
219    tracing::debug!(
220        "[{}] ========== END BASE SYSTEM PROMPT SNAPSHOT ==========",
221        session_id
222    );
223}
224
225/// Map an execution result to a terminal error event.
226pub fn terminal_error_event_for_result<E>(result: &Result<(), E>) -> Option<AgentEvent>
227where
228    E: std::fmt::Display,
229{
230    match result {
231        Ok(_) => None,
232        Err(error) if is_cancelled_error(error) => Some(AgentEvent::Error {
233            message: "Agent execution cancelled by user".to_string(),
234        }),
235        Err(error) => Some(AgentEvent::Error {
236            message: error.to_string(),
237        }),
238    }
239}
240
241fn is_cancelled_error<E>(error: &E) -> bool
242where
243    E: std::fmt::Display,
244{
245    error.to_string().contains("cancelled")
246}
247
248// Session metadata helpers (pure functions, no server dependency).
249
250fn system_prompt_for_session(session: &Session) -> Option<String> {
251    session
252        .messages
253        .iter()
254        .find(|message| matches!(message.role, bamboo_agent_core::Role::System))
255        .map(|message| message.content.clone())
256}
257
258fn initial_user_message_for_session(session: &Session) -> String {
259    session
260        .messages
261        .last()
262        .filter(|message| matches!(message.role, bamboo_agent_core::Role::User))
263        .map(|message| message.content.clone())
264        .unwrap_or_default()
265}
266
267fn selected_skill_ids_for_session(session: &Session) -> Option<Vec<String>> {
268    session
269        .metadata
270        .get("selected_skill_ids")
271        .and_then(|raw| crate::skills::selection::parse_selected_skill_ids_metadata(raw))
272}
273
274fn selected_skill_mode_for_session(session: &Session) -> Option<String> {
275    let value = session
276        .metadata
277        .get("skill_mode")
278        .or_else(|| session.metadata.get("mode"))?;
279    let trimmed = value.trim();
280    if trimmed.is_empty() {
281        None
282    } else {
283        Some(trimmed.to_string())
284    }
285}