Skip to main content

bamboo_engine/runtime/execution/
agent_spawn.rs

1//! Core agent execution spawning logic.
2//!
3//! Provides [`spawn_session_execution`] which handles the full lifecycle of a
4//! background agent run: spawn task → execute → finalize runner → persist session.
5
6use std::collections::{BTreeSet, HashMap};
7use std::future::Future;
8use std::pin::Pin;
9use std::sync::Arc;
10
11use tokio::sync::{mpsc, RwLock};
12use tokio_util::sync::CancellationToken;
13use tracing::Instrument;
14
15use bamboo_agent_core::tools::ToolExecutor;
16use bamboo_agent_core::{AgentError, AgentEvent, Session};
17use bamboo_domain::ReasoningEffort;
18use bamboo_llm::LLMProvider;
19
20use crate::runtime::config::{AuxiliaryModelConfig, GoldConfig, ImageFallbackConfig};
21use crate::runtime::execution::runner_lifecycle::finalize_runner;
22use crate::runtime::execution::runner_state::AgentRunner;
23use crate::runtime::model_roster::ModelRoster;
24use crate::runtime::Agent;
25use crate::runtime::{ExecuteRequest, ExecuteRequestBuilder};
26
27/// Shared, per-session-locked session cache.
28///
29/// A `DashMap` gives each session id its own shard-level lock (so unrelated
30/// sessions never contend), and the inner `parking_lot::RwLock` is a *sync*
31/// lock held only to briefly clone-out or mutate-and-write-back a single
32/// `Session` — never across an `.await`. Using a sync lock makes "no guard
33/// across await" a compile-time guarantee in Send futures.
34pub type SessionCache = std::sync::Arc<
35    dashmap::DashMap<String, std::sync::Arc<parking_lot::RwLock<bamboo_agent_core::Session>>>,
36>;
37
38/// Read a session out of the in-memory cache, cloning it out from under the
39/// brief sync read-lock. Returns `None` on a cache miss.
40///
41/// This is the single canonical cache-read used everywhere a caller holds a
42/// `SessionCache` (HTTP handlers, server tools, the app-state loader). It
43/// replaced ~13 verbatim copies of the
44/// `cache.get(id).map(|e| e.value().clone()).map(|a| a.read().clone())` idiom.
45pub fn read_cached_session(cache: &SessionCache, id: &str) -> Option<bamboo_agent_core::Session> {
46    cache
47        .get(id)
48        .map(|e| e.value().clone())
49        .map(|a| a.read().clone())
50}
51
52const SKILL_CONTEXT_START_MARKER: &str = "<!-- BAMBOO_SKILL_CONTEXT_START -->";
53const TOOL_GUIDE_START_MARKER: &str = "<!-- BAMBOO_TOOL_GUIDE_START -->";
54const EXTERNAL_MEMORY_START_MARKER: &str = "<!-- BAMBOO_EXTERNAL_MEMORY_START -->";
55const TASK_LIST_START_MARKER: &str = "<!-- BAMBOO_TASK_LIST_START -->";
56
57/// Outcome of an agent execution, handed to an optional
58/// [`SessionCompletionHook`].
59///
60/// Deliberately decoupled from the runtime's internal error type so the hook
61/// API stays stable across crates and callers don't need to match on engine
62/// error variants.
63pub struct SessionExecutionOutcome {
64    /// The run finished without error.
65    pub success: bool,
66    /// The run ended because it was cancelled (a non-success subset).
67    pub cancelled: bool,
68    /// Stringified error, present when `!success`.
69    pub error: Option<String>,
70}
71
72impl SessionExecutionOutcome {
73    fn from_result(result: &Result<(), AgentError>) -> Self {
74        match result {
75            Ok(()) => Self {
76                success: true,
77                cancelled: false,
78                error: None,
79            },
80            Err(error) => Self {
81                success: false,
82                cancelled: error.is_cancelled(),
83                error: Some(error.to_string()),
84            },
85        }
86    }
87}
88
89/// Optional post-execution hook for [`spawn_session_execution`].
90///
91/// Invoked after the runner is finalized but **before** the session is
92/// persisted, so a caller can record bespoke terminal bookkeeping (e.g. a
93/// scheduled-run status) and/or append a closing message that is then saved
94/// with the session. Receives the execution outcome plus a mutable handle to
95/// the session. This is how callers with extra finalization (the schedule
96/// manager) route through the single canonical execution path instead of
97/// forking their own spawn + `execute` + persist sequence.
98pub type SessionCompletionHook = Box<
99    dyn for<'a> FnOnce(
100            SessionExecutionOutcome,
101            &'a mut Session,
102        ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>
103        + Send,
104>;
105
106/// Arguments for spawning a background agent execution.
107///
108/// This is the crate-agnostic equivalent of the server's `SpawnAgentExecution`.
109/// It holds everything needed to run the agent loop and persist the result,
110/// without depending on HTTP types or `AppState`.
111pub struct SessionExecutionArgs {
112    // Core execution.
113    pub agent: Arc<Agent>,
114    pub session_id: String,
115    pub session: Session,
116
117    // Execution parameters.
118    pub tools_override: Option<Arc<dyn ToolExecutor>>,
119    pub provider_override: Option<Arc<dyn LLMProvider>>,
120    /// Cohesive primary + auxiliary model/provider selection. The primary
121    /// `model` is required for a spawn (see [`ModelRoster::model`]); the three
122    /// auxiliary roles default to their `Config::get_*` fallbacks when `None`.
123    pub model_roster: ModelRoster,
124    pub reasoning_effort: Option<ReasoningEffort>,
125    pub reasoning_effort_source: String,
126    pub auxiliary_model_resolver:
127        Option<Arc<dyn Fn() -> crate::runtime::config::AuxiliaryModelConfig + Send + Sync>>,
128    pub disabled_tools: Option<BTreeSet<String>>,
129    pub disabled_skill_ids: Option<BTreeSet<String>>,
130    pub selected_skill_ids: Option<Vec<String>>,
131    pub selected_skill_mode: Option<String>,
132    pub cancel_token: CancellationToken,
133    pub mpsc_tx: mpsc::Sender<AgentEvent>,
134    pub image_fallback: Option<ImageFallbackConfig>,
135    pub gold_config: Option<GoldConfig>,
136    pub app_data_dir: Option<std::path::PathBuf>,
137
138    // Post-execution resources.
139    pub runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
140    pub sessions_cache: SessionCache,
141
142    /// Optional bespoke finalization, run after the runner is finalized and
143    /// before the session is persisted. See [`SessionCompletionHook`].
144    pub on_complete: Option<SessionCompletionHook>,
145}
146
147/// The per-request parameter subset of [`SessionExecutionArgs`] — everything
148/// that maps onto an [`ExecuteRequest`], minus the three required positional
149/// fields (`initial_message`, `event_tx`, `cancel_token`) and the post-execution
150/// resources (runners, sessions cache, completion hook).
151///
152/// Grouping these here lets [`build_execute_request`] perform the
153/// `SessionExecutionArgs` → [`ExecuteRequest`] mapping through the canonical
154/// [`ExecuteRequestBuilder`] in one place, instead of a hand-written struct
155/// literal that must be kept field-aligned with [`ExecuteRequest`] by hand.
156struct ExecuteRequestParams {
157    tools: Option<Arc<dyn ToolExecutor>>,
158    provider_override: Option<Arc<dyn LLMProvider>>,
159    model_roster: ModelRoster,
160    reasoning_effort: Option<ReasoningEffort>,
161    auxiliary_model_resolver: Option<Arc<dyn Fn() -> AuxiliaryModelConfig + Send + Sync>>,
162    disabled_tools: Option<BTreeSet<String>>,
163    disabled_skill_ids: Option<BTreeSet<String>>,
164    selected_skill_ids: Option<Vec<String>>,
165    selected_skill_mode: Option<String>,
166    image_fallback: Option<ImageFallbackConfig>,
167    gold_config: Option<GoldConfig>,
168    app_data_dir: Option<std::path::PathBuf>,
169}
170
171/// Assemble an [`ExecuteRequest`] from the resolved spawn parameters via the
172/// canonical [`ExecuteRequestBuilder`].
173///
174/// Centralizing this mapping keeps every optional field threaded with exactly
175/// the same value the old struct literal carried (the builder defaults each
176/// unset field to `None`), while removing the field-by-field duplication.
177fn build_execute_request(
178    initial_message: String,
179    event_tx: mpsc::Sender<AgentEvent>,
180    cancel_token: CancellationToken,
181    params: ExecuteRequestParams,
182) -> ExecuteRequest {
183    let ExecuteRequestParams {
184        tools,
185        provider_override,
186        model_roster,
187        reasoning_effort,
188        auxiliary_model_resolver,
189        disabled_tools,
190        disabled_skill_ids,
191        selected_skill_ids,
192        selected_skill_mode,
193        image_fallback,
194        gold_config,
195        app_data_dir,
196    } = params;
197
198    let mut builder = ExecuteRequestBuilder::new(initial_message, event_tx, cancel_token)
199        .model_roster(model_roster)
200        .gold_config(gold_config);
201
202    if let Some(tools) = tools {
203        builder = builder.tools(tools);
204    }
205    if let Some(provider_override) = provider_override {
206        builder = builder.provider_override(provider_override);
207    }
208    if let Some(reasoning_effort) = reasoning_effort {
209        builder = builder.reasoning_effort(reasoning_effort);
210    }
211    if let Some(auxiliary_model_resolver) = auxiliary_model_resolver {
212        builder = builder.auxiliary_model_resolver(auxiliary_model_resolver);
213    }
214    if let Some(disabled_tools) = disabled_tools {
215        builder = builder.disabled_tools(disabled_tools);
216    }
217    if let Some(disabled_skill_ids) = disabled_skill_ids {
218        builder = builder.disabled_skill_ids(disabled_skill_ids);
219    }
220    if let Some(selected_skill_ids) = selected_skill_ids {
221        builder = builder.selected_skill_ids(selected_skill_ids);
222    }
223    if let Some(selected_skill_mode) = selected_skill_mode {
224        builder = builder.selected_skill_mode(selected_skill_mode);
225    }
226    if let Some(image_fallback) = image_fallback {
227        builder = builder.image_fallback(image_fallback);
228    }
229    if let Some(app_data_dir) = app_data_dir {
230        builder = builder.app_data_dir(app_data_dir);
231    }
232
233    builder.build()
234}
235
236/// Spawn a background agent execution task.
237///
238/// This function spawns a tokio task that:
239/// 1. Executes the agent loop via `agent.execute()`
240/// 2. Sends a terminal error event if the execution fails
241/// 3. Finalizes the runner status
242/// 4. Persists the session via merge-save (preserves concurrent UI title/pin edits)
243/// 5. Updates the in-memory session cache
244pub fn spawn_session_execution(args: SessionExecutionArgs) {
245    let span_session_id = args.session_id.clone();
246    let session_span = tracing::info_span!("agent_execution", session_id = %span_session_id);
247
248    tokio::spawn(
249        async move {
250            let SessionExecutionArgs {
251                agent,
252                session_id,
253                mut session,
254                tools_override,
255                provider_override,
256                model_roster,
257                reasoning_effort,
258                reasoning_effort_source,
259                auxiliary_model_resolver,
260                disabled_tools,
261                disabled_skill_ids,
262                selected_skill_ids,
263                selected_skill_mode,
264                cancel_token,
265                mpsc_tx,
266                image_fallback,
267                gold_config,
268                app_data_dir,
269                runners,
270                sessions_cache,
271                on_complete,
272            } = args;
273
274            // The primary model is required for a spawn; the roster stores it as
275            // `Option<String>` for uniformity, so recover the owned String here
276            // for session attribution / logging (same value the caller set).
277            let model = model_roster.model.clone().unwrap_or_default();
278
279            let initial_message = initial_user_message_for_session(&session);
280            let selected_skill_ids =
281                selected_skill_ids.or_else(|| selected_skill_ids_for_session(&session));
282            let selected_skill_mode =
283                selected_skill_mode.or_else(|| selected_skill_mode_for_session(&session));
284
285            tracing::info!(
286                "[{}] Using resolved session model: {}, reasoning_effort={}, reasoning_source={}",
287                session_id,
288                model,
289                reasoning_effort
290                    .map(ReasoningEffort::as_str)
291                    .unwrap_or("none"),
292                reasoning_effort_source
293            );
294
295            // Set the resolved model via the single authoritative pre-execution
296            // mutation point. The caller already placed the system prompt on the
297            // session, so pass `None` for `system_prompt` (the subsequent
298            // `system_prompt_for_session` read below sees the caller's message).
299            // This must run before that read / logging so the observable
300            // sequence (model set, then prompt snapshot) is identical.
301            crate::session_app::execution_prep::prepare_session_for_execution(
302                &mut session,
303                None,
304                Some(&model),
305            );
306
307            let system_prompt = system_prompt_for_session(&session);
308            if let Some(prompt) = system_prompt.as_ref() {
309                log_base_system_prompt_snapshot(&session_id, prompt);
310            }
311
312            let execute_request = build_execute_request(
313                initial_message,
314                mpsc_tx.clone(),
315                cancel_token,
316                ExecuteRequestParams {
317                    tools: tools_override,
318                    provider_override,
319                    model_roster,
320                    reasoning_effort,
321                    auxiliary_model_resolver,
322                    disabled_tools,
323                    disabled_skill_ids,
324                    selected_skill_ids,
325                    selected_skill_mode,
326                    image_fallback,
327                    gold_config,
328                    app_data_dir,
329                },
330            );
331
332            let result = agent.execute(&mut session, execute_request).await;
333
334            // Send terminal event for all error cases (including cancellation).
335            if let Some(error_event) = terminal_error_event_for_result(&result) {
336                let _ = mpsc_tx.send(error_event).await;
337            }
338
339            // Update runner status.
340            finalize_runner(&runners, &session_id, &result).await;
341
342            // Bespoke terminal bookkeeping (e.g. a scheduled-run status) runs
343            // here — after the runner is finalized but before persistence — so
344            // any closing message the hook appends is saved with the session
345            // below.
346            if let Some(on_complete) = on_complete {
347                on_complete(SessionExecutionOutcome::from_result(&result), &mut session).await;
348            }
349
350            // Save session via merge-save so any concurrent UI edits to
351            // title / pinned / title_version are preserved (the runtime is not
352            // an authoritative title writer).
353            if let Err(error) = agent.persistence().save_runtime_session(&mut session).await {
354                tracing::warn!("[{}] Failed to save session: {}", session_id, error);
355            }
356
357            // Update memory cache.
358            sessions_cache.insert(
359                session_id.clone(),
360                Arc::new(parking_lot::RwLock::new(session)),
361            );
362
363            tracing::info!("[{}] Agent execution completed", session_id);
364        }
365        .instrument(session_span),
366    );
367}
368
369/// Log a snapshot of the base system prompt for debugging.
370pub fn log_base_system_prompt_snapshot(session_id: &str, prompt: &str) {
371    tracing::info!(
372        "[{}] Base system prompt snapshot: len={} chars, has_skill={}, has_tool_guide={}, has_external_memory={}, has_task_list={}",
373        session_id,
374        prompt.len(),
375        prompt.contains(SKILL_CONTEXT_START_MARKER),
376        prompt.contains(TOOL_GUIDE_START_MARKER),
377        prompt.contains(EXTERNAL_MEMORY_START_MARKER),
378        prompt.contains(TASK_LIST_START_MARKER),
379    );
380
381    tracing::debug!(
382        "[{}] ========== BASE SYSTEM PROMPT SNAPSHOT ==========",
383        session_id
384    );
385    tracing::debug!("[{}] Snapshot length: {} chars", session_id, prompt.len());
386    tracing::debug!("[{}] -----------------------------------", session_id);
387    tracing::debug!("[{}] {}", session_id, prompt);
388    tracing::debug!(
389        "[{}] ========== END BASE SYSTEM PROMPT SNAPSHOT ==========",
390        session_id
391    );
392}
393
394/// Map an execution result to a terminal error event.
395pub fn terminal_error_event_for_result(result: &Result<(), AgentError>) -> Option<AgentEvent> {
396    match result {
397        Ok(_) => None,
398        Err(error) if error.is_cancelled() => Some(AgentEvent::Error {
399            message: "Agent execution cancelled by user".to_string(),
400        }),
401        Err(error) => Some(AgentEvent::Error {
402            message: error.to_string(),
403        }),
404    }
405}
406
407// Session metadata helpers (pure functions, no server dependency).
408
409fn system_prompt_for_session(session: &Session) -> Option<String> {
410    session
411        .messages
412        .iter()
413        .find(|message| matches!(message.role, bamboo_agent_core::Role::System))
414        .map(|message| message.content.clone())
415}
416
417fn initial_user_message_for_session(session: &Session) -> String {
418    session
419        .messages
420        .last()
421        .filter(|message| matches!(message.role, bamboo_agent_core::Role::User))
422        .map(|message| message.content.clone())
423        .unwrap_or_default()
424}
425
426fn selected_skill_ids_for_session(session: &Session) -> Option<Vec<String>> {
427    session
428        .metadata
429        .get("selected_skill_ids")
430        .and_then(|raw| bamboo_skills::selection::parse_selected_skill_ids_metadata(raw))
431}
432
433fn selected_skill_mode_for_session(session: &Session) -> Option<String> {
434    let value = session
435        .metadata
436        .get("skill_mode")
437        .or_else(|| session.metadata.get("mode"))?;
438    let trimmed = value.trim();
439    if trimmed.is_empty() {
440        None
441    } else {
442        Some(trimmed.to_string())
443    }
444}