Skip to main content

bamboo_engine/runtime/execution/
agent_spawn.rs

1//! Core agent execution spawning logic.
2//!
3//! Provides [`spawn_session_execution`] which handles the full lifecycle of a
4//! background agent run: spawn task → execute → finalize runner → persist session.
5
6use std::collections::{BTreeSet, HashMap};
7use std::future::Future;
8use std::pin::Pin;
9use std::sync::Arc;
10
11use tokio::sync::{mpsc, RwLock};
12use tokio_util::sync::CancellationToken;
13use tracing::Instrument;
14
15use bamboo_agent_core::tools::ToolExecutor;
16use bamboo_agent_core::{AgentError, AgentEvent, Session};
17use bamboo_domain::ReasoningEffort;
18use bamboo_llm::LLMProvider;
19
20use crate::runtime::config::{
21    AuxiliaryModelConfig, GoldConfig, GuardianConfig, GuardianSpawner, ImageFallbackConfig,
22};
23use crate::runtime::execution::runner_lifecycle::finalize_runner;
24use crate::runtime::execution::runner_state::AgentRunner;
25use crate::runtime::model_roster::ModelRoster;
26use crate::runtime::Agent;
27use crate::runtime::{ExecuteRequest, ExecuteRequestBuilder};
28
29/// Shared, per-session-locked session cache.
30///
31/// A `DashMap` gives each session id its own shard-level lock (so unrelated
32/// sessions never contend), and the inner `parking_lot::RwLock` is a *sync*
33/// lock held only to briefly clone-out or mutate-and-write-back a single
34/// `Session` — never across an `.await`. Using a sync lock makes "no guard
35/// across await" a compile-time guarantee in Send futures.
36pub type SessionCache = std::sync::Arc<
37    dashmap::DashMap<String, std::sync::Arc<parking_lot::RwLock<bamboo_agent_core::Session>>>,
38>;
39
40/// Read a session out of the in-memory cache, cloning it out from under the
41/// brief sync read-lock. Returns `None` on a cache miss.
42///
43/// This is the single canonical cache-read used everywhere a caller holds a
44/// `SessionCache` (HTTP handlers, server tools, the app-state loader). It
45/// replaced ~13 verbatim copies of the
46/// `cache.get(id).map(|e| e.value().clone()).map(|a| a.read().clone())` idiom.
47pub fn read_cached_session(cache: &SessionCache, id: &str) -> Option<bamboo_agent_core::Session> {
48    cache
49        .get(id)
50        .map(|e| e.value().clone())
51        .map(|a| a.read().clone())
52}
53
54const SKILL_CONTEXT_START_MARKER: &str = "<!-- BAMBOO_SKILL_CONTEXT_START -->";
55const TOOL_GUIDE_START_MARKER: &str = "<!-- BAMBOO_TOOL_GUIDE_START -->";
56const EXTERNAL_MEMORY_START_MARKER: &str = "<!-- BAMBOO_EXTERNAL_MEMORY_START -->";
57const TASK_LIST_START_MARKER: &str = "<!-- BAMBOO_TASK_LIST_START -->";
58
59/// Outcome of an agent execution, handed to an optional
60/// [`SessionCompletionHook`].
61///
62/// Deliberately decoupled from the runtime's internal error type so the hook
63/// API stays stable across crates and callers don't need to match on engine
64/// error variants.
65pub struct SessionExecutionOutcome {
66    /// The run finished without error.
67    pub success: bool,
68    /// The run ended because it was cancelled (a non-success subset).
69    pub cancelled: bool,
70    /// Stringified error, present when `!success`.
71    pub error: Option<String>,
72}
73
74impl SessionExecutionOutcome {
75    fn from_result(result: &Result<(), AgentError>) -> Self {
76        match result {
77            Ok(()) => Self {
78                success: true,
79                cancelled: false,
80                error: None,
81            },
82            Err(error) => Self {
83                success: false,
84                cancelled: error.is_cancelled(),
85                error: Some(error.to_string()),
86            },
87        }
88    }
89}
90
91/// Optional post-execution hook for [`spawn_session_execution`].
92///
93/// Invoked after the runner is finalized but **before** the session is
94/// persisted, so a caller can record bespoke terminal bookkeeping (e.g. a
95/// scheduled-run status) and/or append a closing message that is then saved
96/// with the session. Receives the execution outcome plus a mutable handle to
97/// the session. This is how callers with extra finalization (the schedule
98/// manager) route through the single canonical execution path instead of
99/// forking their own spawn + `execute` + persist sequence.
100pub type SessionCompletionHook = Box<
101    dyn for<'a> FnOnce(
102            SessionExecutionOutcome,
103            &'a mut Session,
104        ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>
105        + Send,
106>;
107
108/// Arguments for spawning a background agent execution.
109///
110/// This is the crate-agnostic equivalent of the server's `SpawnAgentExecution`.
111/// It holds everything needed to run the agent loop and persist the result,
112/// without depending on HTTP types or `AppState`.
113pub struct SessionExecutionArgs {
114    // Core execution.
115    pub agent: Arc<Agent>,
116    pub session_id: String,
117    pub session: Session,
118
119    // Execution parameters.
120    pub tools_override: Option<Arc<dyn ToolExecutor>>,
121    pub provider_override: Option<Arc<dyn LLMProvider>>,
122    /// Cohesive primary + auxiliary model/provider selection. The primary
123    /// `model` is required for a spawn (see [`ModelRoster::model`]); the three
124    /// auxiliary roles default to their `Config::get_*` fallbacks when `None`.
125    pub model_roster: ModelRoster,
126    pub reasoning_effort: Option<ReasoningEffort>,
127    pub reasoning_effort_source: String,
128    pub auxiliary_model_resolver:
129        Option<Arc<dyn Fn() -> crate::runtime::config::AuxiliaryModelConfig + Send + Sync>>,
130    pub disabled_tools: Option<BTreeSet<String>>,
131    pub disabled_skill_ids: Option<BTreeSet<String>>,
132    pub selected_skill_ids: Option<Vec<String>>,
133    pub selected_skill_mode: Option<String>,
134    pub cancel_token: CancellationToken,
135    pub mpsc_tx: mpsc::Sender<AgentEvent>,
136    pub image_fallback: Option<ImageFallbackConfig>,
137    pub gold_config: Option<GoldConfig>,
138    /// Optional guardian adversarial-review gate configuration.
139    pub guardian_config: Option<GuardianConfig>,
140    /// Late-bound guardian reviewer spawner (server-provided; the runner cannot
141    /// construct a child directly).
142    pub guardian_spawner: Option<Arc<dyn GuardianSpawner>>,
143    pub app_data_dir: Option<std::path::PathBuf>,
144
145    // Post-execution resources.
146    pub runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
147    pub sessions_cache: SessionCache,
148
149    /// Optional bespoke finalization, run after the runner is finalized and
150    /// before the session is persisted. See [`SessionCompletionHook`].
151    pub on_complete: Option<SessionCompletionHook>,
152}
153
154/// The per-request parameter subset of [`SessionExecutionArgs`] — everything
155/// that maps onto an [`ExecuteRequest`], minus the three required positional
156/// fields (`initial_message`, `event_tx`, `cancel_token`) and the post-execution
157/// resources (runners, sessions cache, completion hook).
158///
159/// Grouping these here lets [`build_execute_request`] perform the
160/// `SessionExecutionArgs` → [`ExecuteRequest`] mapping through the canonical
161/// [`ExecuteRequestBuilder`] in one place, instead of a hand-written struct
162/// literal that must be kept field-aligned with [`ExecuteRequest`] by hand.
163struct ExecuteRequestParams {
164    tools: Option<Arc<dyn ToolExecutor>>,
165    provider_override: Option<Arc<dyn LLMProvider>>,
166    model_roster: ModelRoster,
167    reasoning_effort: Option<ReasoningEffort>,
168    auxiliary_model_resolver: Option<Arc<dyn Fn() -> AuxiliaryModelConfig + Send + Sync>>,
169    disabled_tools: Option<BTreeSet<String>>,
170    disabled_skill_ids: Option<BTreeSet<String>>,
171    selected_skill_ids: Option<Vec<String>>,
172    selected_skill_mode: Option<String>,
173    image_fallback: Option<ImageFallbackConfig>,
174    gold_config: Option<GoldConfig>,
175    guardian_config: Option<GuardianConfig>,
176    guardian_spawner: Option<Arc<dyn GuardianSpawner>>,
177    app_data_dir: Option<std::path::PathBuf>,
178}
179
180/// Assemble an [`ExecuteRequest`] from the resolved spawn parameters via the
181/// canonical [`ExecuteRequestBuilder`].
182///
183/// Centralizing this mapping keeps every optional field threaded with exactly
184/// the same value the old struct literal carried (the builder defaults each
185/// unset field to `None`), while removing the field-by-field duplication.
186fn build_execute_request(
187    initial_message: String,
188    event_tx: mpsc::Sender<AgentEvent>,
189    cancel_token: CancellationToken,
190    params: ExecuteRequestParams,
191) -> ExecuteRequest {
192    let ExecuteRequestParams {
193        tools,
194        provider_override,
195        model_roster,
196        reasoning_effort,
197        auxiliary_model_resolver,
198        disabled_tools,
199        disabled_skill_ids,
200        selected_skill_ids,
201        selected_skill_mode,
202        image_fallback,
203        gold_config,
204        guardian_config,
205        guardian_spawner,
206        app_data_dir,
207    } = params;
208
209    let mut builder = ExecuteRequestBuilder::new(initial_message, event_tx, cancel_token)
210        .model_roster(model_roster)
211        .gold_config(gold_config)
212        .guardian_config(guardian_config)
213        .guardian_spawner(guardian_spawner);
214
215    if let Some(tools) = tools {
216        builder = builder.tools(tools);
217    }
218    if let Some(provider_override) = provider_override {
219        builder = builder.provider_override(provider_override);
220    }
221    if let Some(reasoning_effort) = reasoning_effort {
222        builder = builder.reasoning_effort(reasoning_effort);
223    }
224    if let Some(auxiliary_model_resolver) = auxiliary_model_resolver {
225        builder = builder.auxiliary_model_resolver(auxiliary_model_resolver);
226    }
227    if let Some(disabled_tools) = disabled_tools {
228        builder = builder.disabled_tools(disabled_tools);
229    }
230    if let Some(disabled_skill_ids) = disabled_skill_ids {
231        builder = builder.disabled_skill_ids(disabled_skill_ids);
232    }
233    if let Some(selected_skill_ids) = selected_skill_ids {
234        builder = builder.selected_skill_ids(selected_skill_ids);
235    }
236    if let Some(selected_skill_mode) = selected_skill_mode {
237        builder = builder.selected_skill_mode(selected_skill_mode);
238    }
239    if let Some(image_fallback) = image_fallback {
240        builder = builder.image_fallback(image_fallback);
241    }
242    if let Some(app_data_dir) = app_data_dir {
243        builder = builder.app_data_dir(app_data_dir);
244    }
245
246    builder.build()
247}
248
249/// Spawn a background agent execution task.
250///
251/// This function spawns a tokio task that:
252/// 1. Executes the agent loop via `agent.execute()`
253/// 2. Sends a terminal error event if the execution fails
254/// 3. Finalizes the runner status
255/// 4. Persists the session via merge-save (preserves concurrent UI title/pin edits)
256/// 5. Updates the in-memory session cache
257pub fn spawn_session_execution(args: SessionExecutionArgs) {
258    let span_session_id = args.session_id.clone();
259    let session_span = tracing::info_span!("agent_execution", session_id = %span_session_id);
260
261    tokio::spawn(
262        async move {
263            let SessionExecutionArgs {
264                agent,
265                session_id,
266                mut session,
267                tools_override,
268                provider_override,
269                model_roster,
270                reasoning_effort,
271                reasoning_effort_source,
272                auxiliary_model_resolver,
273                disabled_tools,
274                disabled_skill_ids,
275                selected_skill_ids,
276                selected_skill_mode,
277                cancel_token,
278                mpsc_tx,
279                image_fallback,
280                gold_config,
281                guardian_config,
282                guardian_spawner,
283                app_data_dir,
284                runners,
285                sessions_cache,
286                on_complete,
287            } = args;
288
289            // The primary model is required for a spawn; the roster stores it as
290            // `Option<String>` for uniformity, so recover the owned String here
291            // for session attribution / logging (same value the caller set).
292            let model = model_roster.model.clone().unwrap_or_default();
293
294            let initial_message = initial_user_message_for_session(&session);
295            let selected_skill_ids =
296                selected_skill_ids.or_else(|| selected_skill_ids_for_session(&session));
297            let selected_skill_mode =
298                selected_skill_mode.or_else(|| selected_skill_mode_for_session(&session));
299
300            tracing::info!(
301                "[{}] Using resolved session model: {}, reasoning_effort={}, reasoning_source={}",
302                session_id,
303                model,
304                reasoning_effort
305                    .map(ReasoningEffort::as_str)
306                    .unwrap_or("none"),
307                reasoning_effort_source
308            );
309
310            // Set the resolved model via the single authoritative pre-execution
311            // mutation point. The caller already placed the system prompt on the
312            // session, so pass `None` for `system_prompt` (the subsequent
313            // `system_prompt_for_session` read below sees the caller's message).
314            // This must run before that read / logging so the observable
315            // sequence (model set, then prompt snapshot) is identical.
316            crate::session_app::execution_prep::prepare_session_for_execution(
317                &mut session,
318                None,
319                Some(&model),
320            );
321
322            let system_prompt = system_prompt_for_session(&session);
323            if let Some(prompt) = system_prompt.as_ref() {
324                log_base_system_prompt_snapshot(&session_id, prompt);
325            }
326
327            let execute_request = build_execute_request(
328                initial_message,
329                mpsc_tx.clone(),
330                cancel_token,
331                ExecuteRequestParams {
332                    tools: tools_override,
333                    provider_override,
334                    model_roster,
335                    reasoning_effort,
336                    auxiliary_model_resolver,
337                    disabled_tools,
338                    disabled_skill_ids,
339                    selected_skill_ids,
340                    selected_skill_mode,
341                    image_fallback,
342                    gold_config,
343                    guardian_config,
344                    guardian_spawner,
345                    app_data_dir,
346                },
347            );
348
349            let result = agent.execute(&mut session, execute_request).await;
350
351            // Send terminal event for all error cases (including cancellation).
352            if let Some(error_event) = terminal_error_event_for_result(&result) {
353                let _ = mpsc_tx.send(error_event).await;
354            }
355
356            // Update runner status.
357            finalize_runner(&runners, &session_id, &result).await;
358
359            // Bespoke terminal bookkeeping (e.g. a scheduled-run status) runs
360            // here — after the runner is finalized but before persistence — so
361            // any closing message the hook appends is saved with the session
362            // below.
363            if let Some(on_complete) = on_complete {
364                on_complete(SessionExecutionOutcome::from_result(&result), &mut session).await;
365            }
366
367            // Save session via merge-save so any concurrent UI edits to
368            // title / pinned / title_version are preserved (the runtime is not
369            // an authoritative title writer).
370            if let Err(error) = agent.persistence().save_runtime_session(&mut session).await {
371                tracing::warn!("[{}] Failed to save session: {}", session_id, error);
372            }
373
374            // Update memory cache.
375            sessions_cache.insert(
376                session_id.clone(),
377                Arc::new(parking_lot::RwLock::new(session)),
378            );
379
380            tracing::info!("[{}] Agent execution completed", session_id);
381        }
382        .instrument(session_span),
383    );
384}
385
386/// Log a snapshot of the base system prompt for debugging.
387pub fn log_base_system_prompt_snapshot(session_id: &str, prompt: &str) {
388    tracing::info!(
389        "[{}] Base system prompt snapshot: len={} chars, has_skill={}, has_tool_guide={}, has_external_memory={}, has_task_list={}",
390        session_id,
391        prompt.len(),
392        prompt.contains(SKILL_CONTEXT_START_MARKER),
393        prompt.contains(TOOL_GUIDE_START_MARKER),
394        prompt.contains(EXTERNAL_MEMORY_START_MARKER),
395        prompt.contains(TASK_LIST_START_MARKER),
396    );
397
398    tracing::debug!(
399        "[{}] ========== BASE SYSTEM PROMPT SNAPSHOT ==========",
400        session_id
401    );
402    tracing::debug!("[{}] Snapshot length: {} chars", session_id, prompt.len());
403    tracing::debug!("[{}] -----------------------------------", session_id);
404    tracing::debug!("[{}] {}", session_id, prompt);
405    tracing::debug!(
406        "[{}] ========== END BASE SYSTEM PROMPT SNAPSHOT ==========",
407        session_id
408    );
409}
410
411/// Map an execution result to a terminal error event.
412pub fn terminal_error_event_for_result(result: &Result<(), AgentError>) -> Option<AgentEvent> {
413    match result {
414        Ok(_) => None,
415        Err(error) if error.is_cancelled() => Some(AgentEvent::Error {
416            message: "Agent execution cancelled by user".to_string(),
417        }),
418        Err(error) => Some(AgentEvent::Error {
419            message: error.to_string(),
420        }),
421    }
422}
423
424// Session metadata helpers (pure functions, no server dependency).
425
426fn system_prompt_for_session(session: &Session) -> Option<String> {
427    session
428        .messages
429        .iter()
430        .find(|message| matches!(message.role, bamboo_agent_core::Role::System))
431        .map(|message| message.content.clone())
432}
433
434fn initial_user_message_for_session(session: &Session) -> String {
435    session
436        .messages
437        .last()
438        .filter(|message| matches!(message.role, bamboo_agent_core::Role::User))
439        .map(|message| message.content.clone())
440        .unwrap_or_default()
441}
442
443fn selected_skill_ids_for_session(session: &Session) -> Option<Vec<String>> {
444    session
445        .metadata
446        .get("selected_skill_ids")
447        .and_then(|raw| bamboo_skills::selection::parse_selected_skill_ids_metadata(raw))
448}
449
450fn selected_skill_mode_for_session(session: &Session) -> Option<String> {
451    let value = session
452        .metadata
453        .get("skill_mode")
454        .or_else(|| session.metadata.get("mode"))?;
455    let trimmed = value.trim();
456    if trimmed.is_empty() {
457        None
458    } else {
459        Some(trimmed.to_string())
460    }
461}