Skip to main content

bamboo_engine/runtime/execution/
agent_spawn.rs

1//! Core agent execution spawning logic.
2//!
3//! Provides [`spawn_session_execution`] which handles the full lifecycle of a
4//! background agent run: spawn task → execute → finalize runner → persist session.
5
6use std::collections::{BTreeSet, HashMap};
7use std::sync::Arc;
8
9use tokio::sync::{mpsc, RwLock};
10use tokio_util::sync::CancellationToken;
11use tracing::Instrument;
12
13use bamboo_agent_core::tools::ToolExecutor;
14use bamboo_agent_core::{AgentEvent, Session};
15use bamboo_domain::ReasoningEffort;
16use bamboo_infrastructure::LLMProvider;
17
18use crate::runtime::config::ImageFallbackConfig;
19use crate::runtime::execution::runner_lifecycle::finalize_runner;
20use crate::runtime::execution::runner_state::AgentRunner;
21use crate::runtime::Agent;
22use crate::runtime::ExecuteRequest;
23
24const SKILL_CONTEXT_START_MARKER: &str = "<!-- BAMBOO_SKILL_CONTEXT_START -->";
25const TOOL_GUIDE_START_MARKER: &str = "<!-- BAMBOO_TOOL_GUIDE_START -->";
26const EXTERNAL_MEMORY_START_MARKER: &str = "<!-- BAMBOO_EXTERNAL_MEMORY_START -->";
27const TASK_LIST_START_MARKER: &str = "<!-- BAMBOO_TASK_LIST_START -->";
28
29/// Arguments for spawning a background agent execution.
30///
31/// This is the crate-agnostic equivalent of the server's `SpawnAgentExecution`.
32/// It holds everything needed to run the agent loop and persist the result,
33/// without depending on HTTP types or `AppState`.
34pub struct SessionExecutionArgs {
35    // Core execution.
36    pub agent: Arc<Agent>,
37    pub session_id: String,
38    pub session: Session,
39
40    // Execution parameters.
41    pub tools_override: Option<Arc<dyn ToolExecutor>>,
42    pub provider_override: Option<Arc<dyn LLMProvider>>,
43    pub provider_name: Option<String>,
44    pub model: String,
45    pub fast_model: Option<String>,
46    /// Optional provider override for background/fast model calls.
47    pub background_model_provider: Option<Arc<dyn LLMProvider>>,
48    pub reasoning_effort: Option<ReasoningEffort>,
49    pub reasoning_effort_source: String,
50    pub disabled_tools: Option<BTreeSet<String>>,
51    pub disabled_skill_ids: Option<BTreeSet<String>>,
52    pub selected_skill_ids: Option<Vec<String>>,
53    pub selected_skill_mode: Option<String>,
54    pub cancel_token: CancellationToken,
55    pub mpsc_tx: mpsc::Sender<AgentEvent>,
56    pub image_fallback: Option<ImageFallbackConfig>,
57
58    // Post-execution resources.
59    pub runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
60    pub sessions_cache: Arc<RwLock<HashMap<String, Session>>>,
61}
62
63/// Spawn a background agent execution task.
64///
65/// This function spawns a tokio task that:
66/// 1. Executes the agent loop via `agent.execute()`
67/// 2. Sends a terminal error event if the execution fails
68/// 3. Finalizes the runner status
69/// 4. Checks for concurrent session overrides (title/pin edits)
70/// 5. Persists the session via storage
71/// 6. Updates the in-memory session cache
72pub fn spawn_session_execution(args: SessionExecutionArgs) {
73    let span_session_id = args.session_id.clone();
74    let session_span = tracing::info_span!("agent_execution", session_id = %span_session_id);
75
76    tokio::spawn(
77        async move {
78            let SessionExecutionArgs {
79                agent,
80                session_id,
81                mut session,
82                tools_override,
83                provider_override,
84                provider_name,
85                model,
86                fast_model,
87                background_model_provider,
88                reasoning_effort,
89                reasoning_effort_source,
90                disabled_tools,
91                disabled_skill_ids,
92                selected_skill_ids,
93                selected_skill_mode,
94                cancel_token,
95                mpsc_tx,
96                image_fallback,
97                runners,
98                sessions_cache,
99            } = args;
100            let initial_title = session.title.clone();
101            let initial_pinned = session.pinned;
102
103            let initial_message = initial_user_message_for_session(&session);
104            let selected_skill_ids =
105                selected_skill_ids.or_else(|| selected_skill_ids_for_session(&session));
106            let selected_skill_mode =
107                selected_skill_mode.or_else(|| selected_skill_mode_for_session(&session));
108
109            tracing::info!(
110                "[{}] Using resolved session model: {}, reasoning_effort={}, reasoning_source={}",
111                session_id,
112                model,
113                reasoning_effort
114                    .map(ReasoningEffort::as_str)
115                    .unwrap_or("none"),
116                reasoning_effort_source
117            );
118
119            session.model = model.clone();
120
121            let system_prompt = system_prompt_for_session(&session);
122            if let Some(prompt) = system_prompt.as_ref() {
123                log_base_system_prompt_snapshot(&session_id, prompt);
124            }
125
126            let result = agent
127                .execute(
128                    &mut session,
129                    ExecuteRequest {
130                        initial_message,
131                        event_tx: mpsc_tx.clone(),
132                        cancel_token,
133                        tools: tools_override,
134                        provider_override,
135                        model: Some(model),
136                        provider_name,
137                        background_model: fast_model,
138                        background_model_provider,
139                        reasoning_effort,
140                        disabled_tools,
141                        disabled_skill_ids,
142                        selected_skill_ids,
143                        selected_skill_mode,
144                        image_fallback,
145                    },
146                )
147                .await;
148
149            // Send terminal event for all error cases (including cancellation).
150            if let Some(error_event) = terminal_error_event_for_result(&result) {
151                let _ = mpsc_tx.send(error_event).await;
152            }
153
154            // Update runner status.
155            finalize_runner(&runners, &session_id, &result).await;
156
157            // Avoid clobbering concurrent UI edits (title/pin).
158            match agent.storage().load_session(&session_id).await {
159                Ok(Some(latest_persisted)) => preserve_concurrent_session_overrides(
160                    &mut session,
161                    &latest_persisted,
162                    &initial_title,
163                    initial_pinned,
164                ),
165                Ok(None) => {}
166                Err(error) => {
167                    tracing::warn!(
168                        "[{}] Failed to load latest session before final save: {}",
169                        session_id,
170                        error
171                    );
172                }
173            }
174
175            // Save session.
176            if let Err(error) = agent.storage().save_session(&session).await {
177                tracing::warn!("[{}] Failed to save session: {}", session_id, error);
178            }
179
180            // Update memory cache.
181            {
182                let mut sessions = sessions_cache.write().await;
183                sessions.insert(session_id.clone(), session);
184            }
185
186            tracing::info!("[{}] Agent execution completed", session_id);
187        }
188        .instrument(session_span),
189    );
190}
191
192/// Log a snapshot of the base system prompt for debugging.
193pub fn log_base_system_prompt_snapshot(session_id: &str, prompt: &str) {
194    tracing::info!(
195        "[{}] Base system prompt snapshot: len={} chars, has_skill={}, has_tool_guide={}, has_external_memory={}, has_task_list={}",
196        session_id,
197        prompt.len(),
198        prompt.contains(SKILL_CONTEXT_START_MARKER),
199        prompt.contains(TOOL_GUIDE_START_MARKER),
200        prompt.contains(EXTERNAL_MEMORY_START_MARKER),
201        prompt.contains(TASK_LIST_START_MARKER),
202    );
203
204    tracing::debug!(
205        "[{}] ========== BASE SYSTEM PROMPT SNAPSHOT ==========",
206        session_id
207    );
208    tracing::debug!("[{}] Snapshot length: {} chars", session_id, prompt.len());
209    tracing::debug!("[{}] -----------------------------------", session_id);
210    tracing::debug!("[{}] {}", session_id, prompt);
211    tracing::debug!(
212        "[{}] ========== END BASE SYSTEM PROMPT SNAPSHOT ==========",
213        session_id
214    );
215}
216
217/// Preserve concurrent session overrides (title/pin) made via external edits
218/// while the agent loop was running.
219pub fn preserve_concurrent_session_overrides(
220    session: &mut Session,
221    latest_persisted: &Session,
222    initial_title: &str,
223    initial_pinned: bool,
224) {
225    if session.title == initial_title {
226        session.title = latest_persisted.title.clone();
227    }
228    if session.pinned == initial_pinned {
229        session.pinned = latest_persisted.pinned;
230    }
231}
232
233/// Map an execution result to a terminal error event.
234pub fn terminal_error_event_for_result<E>(result: &Result<(), E>) -> Option<AgentEvent>
235where
236    E: std::fmt::Display,
237{
238    match result {
239        Ok(_) => None,
240        Err(error) if is_cancelled_error(error) => Some(AgentEvent::Error {
241            message: "Agent execution cancelled by user".to_string(),
242        }),
243        Err(error) => Some(AgentEvent::Error {
244            message: error.to_string(),
245        }),
246    }
247}
248
249fn is_cancelled_error<E>(error: &E) -> bool
250where
251    E: std::fmt::Display,
252{
253    error.to_string().contains("cancelled")
254}
255
256// Session metadata helpers (pure functions, no server dependency).
257
258fn system_prompt_for_session(session: &Session) -> Option<String> {
259    session
260        .messages
261        .iter()
262        .find(|message| matches!(message.role, bamboo_agent_core::Role::System))
263        .map(|message| message.content.clone())
264}
265
266fn initial_user_message_for_session(session: &Session) -> String {
267    session
268        .messages
269        .last()
270        .filter(|message| matches!(message.role, bamboo_agent_core::Role::User))
271        .map(|message| message.content.clone())
272        .unwrap_or_default()
273}
274
275fn selected_skill_ids_for_session(session: &Session) -> Option<Vec<String>> {
276    session
277        .metadata
278        .get("selected_skill_ids")
279        .and_then(|raw| crate::skills::selection::parse_selected_skill_ids_metadata(raw))
280}
281
282fn selected_skill_mode_for_session(session: &Session) -> Option<String> {
283    let value = session
284        .metadata
285        .get("skill_mode")
286        .or_else(|| session.metadata.get("mode"))?;
287    let trimmed = value.trim();
288    if trimmed.is_empty() {
289        None
290    } else {
291        Some(trimmed.to_string())
292    }
293}