Skip to main content

bamboo_engine/runtime/execution/
agent_spawn.rs

1//! Core agent execution spawning logic.
2//!
3//! Provides [`spawn_session_execution`] which handles the full lifecycle of a
4//! background agent run: spawn task → execute → finalize runner → persist session.
5
6use std::collections::{BTreeSet, HashMap};
7use std::future::Future;
8use std::pin::Pin;
9use std::sync::Arc;
10
11use tokio::sync::{mpsc, RwLock};
12use tokio_util::sync::CancellationToken;
13use tracing::Instrument;
14
15use bamboo_agent_core::tools::ToolExecutor;
16use bamboo_agent_core::{AgentError, AgentEvent, Session};
17use bamboo_domain::ReasoningEffort;
18use bamboo_llm::LLMProvider;
19
20use crate::runtime::config::{
21    AuxiliaryModelConfig, BashResumeHook, GoldConfig, GuardianConfig, GuardianSpawner,
22    ImageFallbackConfig,
23};
24use crate::runtime::execution::runner_lifecycle::finalize_runner;
25use crate::runtime::execution::runner_state::AgentRunner;
26use crate::runtime::model_roster::ModelRoster;
27use crate::runtime::Agent;
28use crate::runtime::{ExecuteRequest, ExecuteRequestBuilder};
29
30/// Shared, per-session-locked session cache.
31///
32/// A `DashMap` gives each session id its own shard-level lock (so unrelated
33/// sessions never contend), and the inner `parking_lot::RwLock` is a *sync*
34/// lock held only to briefly clone-out or mutate-and-write-back a single
35/// `Session` — never across an `.await`. Using a sync lock makes "no guard
36/// across await" a compile-time guarantee in Send futures.
37pub type SessionCache = std::sync::Arc<
38    dashmap::DashMap<String, std::sync::Arc<parking_lot::RwLock<bamboo_agent_core::Session>>>,
39>;
40
41/// Read a session out of the in-memory cache, cloning it out from under the
42/// brief sync read-lock. Returns `None` on a cache miss.
43///
44/// This is the single canonical cache-read used everywhere a caller holds a
45/// `SessionCache` (HTTP handlers, server tools, the app-state loader). It
46/// replaced ~13 verbatim copies of the
47/// `cache.get(id).map(|e| e.value().clone()).map(|a| a.read().clone())` idiom.
48pub fn read_cached_session(cache: &SessionCache, id: &str) -> Option<bamboo_agent_core::Session> {
49    cache
50        .get(id)
51        .map(|e| e.value().clone())
52        .map(|a| a.read().clone())
53}
54
55const SKILL_CONTEXT_START_MARKER: &str = "<!-- BAMBOO_SKILL_CONTEXT_START -->";
56const TOOL_GUIDE_START_MARKER: &str = "<!-- BAMBOO_TOOL_GUIDE_START -->";
57const EXTERNAL_MEMORY_START_MARKER: &str = "<!-- BAMBOO_EXTERNAL_MEMORY_START -->";
58const TASK_LIST_START_MARKER: &str = "<!-- BAMBOO_TASK_LIST_START -->";
59
60/// Outcome of an agent execution, handed to an optional
61/// [`SessionCompletionHook`].
62///
63/// Deliberately decoupled from the runtime's internal error type so the hook
64/// API stays stable across crates and callers don't need to match on engine
65/// error variants.
66pub struct SessionExecutionOutcome {
67    /// The run finished without error.
68    pub success: bool,
69    /// The run ended because it was cancelled (a non-success subset).
70    pub cancelled: bool,
71    /// Stringified error, present when `!success`.
72    pub error: Option<String>,
73}
74
75impl SessionExecutionOutcome {
76    fn from_result(result: &Result<(), AgentError>) -> Self {
77        match result {
78            Ok(()) => Self {
79                success: true,
80                cancelled: false,
81                error: None,
82            },
83            Err(error) => Self {
84                success: false,
85                cancelled: error.is_cancelled(),
86                error: Some(error.to_string()),
87            },
88        }
89    }
90}
91
92/// Optional post-execution hook for [`spawn_session_execution`].
93///
94/// Invoked after the runner is finalized but **before** the session is
95/// persisted, so a caller can record bespoke terminal bookkeeping (e.g. a
96/// scheduled-run status) and/or append a closing message that is then saved
97/// with the session. Receives the execution outcome plus a mutable handle to
98/// the session. This is how callers with extra finalization (the schedule
99/// manager) route through the single canonical execution path instead of
100/// forking their own spawn + `execute` + persist sequence.
101pub type SessionCompletionHook = Box<
102    dyn for<'a> FnOnce(
103            SessionExecutionOutcome,
104            &'a mut Session,
105        ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>
106        + Send,
107>;
108
109/// Arguments for spawning a background agent execution.
110///
111/// This is the crate-agnostic equivalent of the server's `SpawnAgentExecution`.
112/// It holds everything needed to run the agent loop and persist the result,
113/// without depending on HTTP types or `AppState`.
114pub struct SessionExecutionArgs {
115    // Core execution.
116    pub agent: Arc<Agent>,
117    pub session_id: String,
118    pub session: Session,
119
120    // Execution parameters.
121    pub tools_override: Option<Arc<dyn ToolExecutor>>,
122    pub provider_override: Option<Arc<dyn LLMProvider>>,
123    /// Cohesive primary + auxiliary model/provider selection. The primary
124    /// `model` is required for a spawn (see [`ModelRoster::model`]); the three
125    /// auxiliary roles default to their `Config::get_*` fallbacks when `None`.
126    pub model_roster: ModelRoster,
127    pub reasoning_effort: Option<ReasoningEffort>,
128    pub reasoning_effort_source: String,
129    pub auxiliary_model_resolver:
130        Option<Arc<dyn Fn() -> crate::runtime::config::AuxiliaryModelConfig + Send + Sync>>,
131    /// Optional per-round live resolver for the disabled tool/skill sets (#136).
132    /// When `None` the per-run snapshot is used (sub-agent spawns pass `None`, so
133    /// short-lived children keep the spawn-time snapshot — by design).
134    pub disabled_filter_resolver:
135        Option<Arc<dyn Fn() -> (BTreeSet<String>, BTreeSet<String>) + Send + Sync>>,
136    pub disabled_tools: Option<BTreeSet<String>>,
137    pub disabled_skill_ids: Option<BTreeSet<String>>,
138    pub selected_skill_ids: Option<Vec<String>>,
139    pub selected_skill_mode: Option<String>,
140    pub cancel_token: CancellationToken,
141    pub mpsc_tx: mpsc::Sender<AgentEvent>,
142    pub image_fallback: Option<ImageFallbackConfig>,
143    pub gold_config: Option<GoldConfig>,
144    /// Optional guardian adversarial-review gate configuration.
145    pub guardian_config: Option<GuardianConfig>,
146    /// Late-bound guardian reviewer spawner (server-provided; the runner cannot
147    /// construct a child directly).
148    pub guardian_spawner: Option<Arc<dyn GuardianSpawner>>,
149    /// Late-bound bash self-resume hook (issue #84 Phase 2b).
150    pub bash_resume_hook: Option<Arc<dyn BashResumeHook>>,
151    pub app_data_dir: Option<std::path::PathBuf>,
152
153    // Post-execution resources.
154    pub runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
155    pub sessions_cache: SessionCache,
156
157    /// Optional bespoke finalization, run after the runner is finalized and
158    /// before the session is persisted. See [`SessionCompletionHook`].
159    pub on_complete: Option<SessionCompletionHook>,
160}
161
162/// The per-request parameter subset of [`SessionExecutionArgs`] — everything
163/// that maps onto an [`ExecuteRequest`], minus the three required positional
164/// fields (`initial_message`, `event_tx`, `cancel_token`) and the post-execution
165/// resources (runners, sessions cache, completion hook).
166///
167/// Grouping these here lets [`build_execute_request`] perform the
168/// `SessionExecutionArgs` → [`ExecuteRequest`] mapping through the canonical
169/// [`ExecuteRequestBuilder`] in one place, instead of a hand-written struct
170/// literal that must be kept field-aligned with [`ExecuteRequest`] by hand.
171struct ExecuteRequestParams {
172    tools: Option<Arc<dyn ToolExecutor>>,
173    provider_override: Option<Arc<dyn LLMProvider>>,
174    model_roster: ModelRoster,
175    reasoning_effort: Option<ReasoningEffort>,
176    auxiliary_model_resolver: Option<Arc<dyn Fn() -> AuxiliaryModelConfig + Send + Sync>>,
177    disabled_filter_resolver:
178        Option<Arc<dyn Fn() -> (BTreeSet<String>, BTreeSet<String>) + Send + Sync>>,
179    disabled_tools: Option<BTreeSet<String>>,
180    disabled_skill_ids: Option<BTreeSet<String>>,
181    selected_skill_ids: Option<Vec<String>>,
182    selected_skill_mode: Option<String>,
183    image_fallback: Option<ImageFallbackConfig>,
184    gold_config: Option<GoldConfig>,
185    guardian_config: Option<GuardianConfig>,
186    guardian_spawner: Option<Arc<dyn GuardianSpawner>>,
187    bash_resume_hook: Option<Arc<dyn BashResumeHook>>,
188    app_data_dir: Option<std::path::PathBuf>,
189}
190
191/// Assemble an [`ExecuteRequest`] from the resolved spawn parameters via the
192/// canonical [`ExecuteRequestBuilder`].
193///
194/// Centralizing this mapping keeps every optional field threaded with exactly
195/// the same value the old struct literal carried (the builder defaults each
196/// unset field to `None`), while removing the field-by-field duplication.
197fn build_execute_request(
198    initial_message: String,
199    event_tx: mpsc::Sender<AgentEvent>,
200    cancel_token: CancellationToken,
201    params: ExecuteRequestParams,
202) -> ExecuteRequest {
203    let ExecuteRequestParams {
204        tools,
205        provider_override,
206        model_roster,
207        reasoning_effort,
208        auxiliary_model_resolver,
209        disabled_filter_resolver,
210        disabled_tools,
211        disabled_skill_ids,
212        selected_skill_ids,
213        selected_skill_mode,
214        image_fallback,
215        gold_config,
216        guardian_config,
217        guardian_spawner,
218        bash_resume_hook,
219        app_data_dir,
220    } = params;
221
222    let mut builder = ExecuteRequestBuilder::new(initial_message, event_tx, cancel_token)
223        .model_roster(model_roster)
224        .gold_config(gold_config)
225        .guardian_config(guardian_config)
226        .guardian_spawner(guardian_spawner)
227        .bash_resume_hook(bash_resume_hook);
228
229    if let Some(tools) = tools {
230        builder = builder.tools(tools);
231    }
232    if let Some(provider_override) = provider_override {
233        builder = builder.provider_override(provider_override);
234    }
235    if let Some(reasoning_effort) = reasoning_effort {
236        builder = builder.reasoning_effort(reasoning_effort);
237    }
238    if let Some(disabled_filter_resolver) = disabled_filter_resolver {
239        builder = builder.disabled_filter_resolver(disabled_filter_resolver);
240    }
241    if let Some(auxiliary_model_resolver) = auxiliary_model_resolver {
242        builder = builder.auxiliary_model_resolver(auxiliary_model_resolver);
243    }
244    if let Some(disabled_tools) = disabled_tools {
245        builder = builder.disabled_tools(disabled_tools);
246    }
247    if let Some(disabled_skill_ids) = disabled_skill_ids {
248        builder = builder.disabled_skill_ids(disabled_skill_ids);
249    }
250    if let Some(selected_skill_ids) = selected_skill_ids {
251        builder = builder.selected_skill_ids(selected_skill_ids);
252    }
253    if let Some(selected_skill_mode) = selected_skill_mode {
254        builder = builder.selected_skill_mode(selected_skill_mode);
255    }
256    if let Some(image_fallback) = image_fallback {
257        builder = builder.image_fallback(image_fallback);
258    }
259    if let Some(app_data_dir) = app_data_dir {
260        builder = builder.app_data_dir(app_data_dir);
261    }
262
263    builder.build()
264}
265
266/// Spawn a background agent execution task.
267///
268/// This function spawns a tokio task that:
269/// 1. Executes the agent loop via `agent.execute()`
270/// 2. Sends a terminal error event if the execution fails
271/// 3. Finalizes the runner status
272/// 4. Persists the session via merge-save (preserves concurrent UI title/pin edits)
273/// 5. Updates the in-memory session cache
274pub fn spawn_session_execution(args: SessionExecutionArgs) {
275    let span_session_id = args.session_id.clone();
276    let session_span = tracing::info_span!("agent_execution", session_id = %span_session_id);
277
278    tokio::spawn(
279        async move {
280            let SessionExecutionArgs {
281                agent,
282                session_id,
283                mut session,
284                tools_override,
285                provider_override,
286                model_roster,
287                reasoning_effort,
288                reasoning_effort_source,
289                auxiliary_model_resolver,
290                disabled_filter_resolver,
291                disabled_tools,
292                disabled_skill_ids,
293                selected_skill_ids,
294                selected_skill_mode,
295                cancel_token,
296                mpsc_tx,
297                image_fallback,
298                gold_config,
299                guardian_config,
300                guardian_spawner,
301                bash_resume_hook,
302                app_data_dir,
303                runners,
304                sessions_cache,
305                on_complete,
306            } = args;
307
308            // The primary model is required for a spawn; the roster stores it as
309            // `Option<String>` for uniformity, so recover the owned String here
310            // for session attribution / logging (same value the caller set).
311            let model = model_roster.model.clone().unwrap_or_default();
312
313            let initial_message = initial_user_message_for_session(&session);
314            let selected_skill_ids =
315                selected_skill_ids.or_else(|| selected_skill_ids_for_session(&session));
316            let selected_skill_mode =
317                selected_skill_mode.or_else(|| selected_skill_mode_for_session(&session));
318
319            tracing::info!(
320                "[{}] Using resolved session model: {}, reasoning_effort={}, reasoning_source={}",
321                session_id,
322                model,
323                reasoning_effort
324                    .map(ReasoningEffort::as_str)
325                    .unwrap_or("none"),
326                reasoning_effort_source
327            );
328
329            // Set the resolved model via the single authoritative pre-execution
330            // mutation point. The caller already placed the system prompt on the
331            // session, so pass `None` for `system_prompt` (the subsequent
332            // `system_prompt_for_session` read below sees the caller's message).
333            // This must run before that read / logging so the observable
334            // sequence (model set, then prompt snapshot) is identical.
335            crate::session_app::execution_prep::prepare_session_for_execution(
336                &mut session,
337                None,
338                Some(&model),
339            );
340
341            let system_prompt = system_prompt_for_session(&session);
342            if let Some(prompt) = system_prompt.as_ref() {
343                log_base_system_prompt_snapshot(&session_id, prompt);
344            }
345
346            let execute_request = build_execute_request(
347                initial_message,
348                mpsc_tx.clone(),
349                cancel_token,
350                ExecuteRequestParams {
351                    tools: tools_override,
352                    provider_override,
353                    model_roster,
354                    reasoning_effort,
355                    auxiliary_model_resolver,
356                    disabled_filter_resolver,
357                    disabled_tools,
358                    disabled_skill_ids,
359                    selected_skill_ids,
360                    selected_skill_mode,
361                    image_fallback,
362                    gold_config,
363                    guardian_config,
364                    guardian_spawner,
365                    bash_resume_hook,
366                    app_data_dir,
367                },
368            );
369
370            let result = agent.execute(&mut session, execute_request).await;
371
372            // Send terminal event for all error cases (including cancellation).
373            if let Some(error_event) = terminal_error_event_for_result(&result) {
374                let _ = mpsc_tx.send(error_event).await;
375            }
376
377            // Update runner status.
378            finalize_runner(&runners, &session_id, &result).await;
379
380            // Bespoke terminal bookkeeping (e.g. a scheduled-run status) runs
381            // here — after the runner is finalized but before persistence — so
382            // any closing message the hook appends is saved with the session
383            // below.
384            if let Some(on_complete) = on_complete {
385                on_complete(SessionExecutionOutcome::from_result(&result), &mut session).await;
386            }
387
388            // Save session via merge-save so any concurrent UI edits to
389            // title / pinned / title_version are preserved (the runtime is not
390            // an authoritative title writer).
391            if let Err(error) = agent.persistence().save_runtime_session(&mut session).await {
392                tracing::warn!("[{}] Failed to save session: {}", session_id, error);
393            }
394
395            // Update memory cache.
396            sessions_cache.insert(
397                session_id.clone(),
398                Arc::new(parking_lot::RwLock::new(session)),
399            );
400
401            tracing::info!("[{}] Agent execution completed", session_id);
402        }
403        .instrument(session_span),
404    );
405}
406
407/// Log a snapshot of the base system prompt for debugging.
408pub fn log_base_system_prompt_snapshot(session_id: &str, prompt: &str) {
409    tracing::info!(
410        "[{}] Base system prompt snapshot: len={} chars, has_skill={}, has_tool_guide={}, has_external_memory={}, has_task_list={}",
411        session_id,
412        prompt.len(),
413        prompt.contains(SKILL_CONTEXT_START_MARKER),
414        prompt.contains(TOOL_GUIDE_START_MARKER),
415        prompt.contains(EXTERNAL_MEMORY_START_MARKER),
416        prompt.contains(TASK_LIST_START_MARKER),
417    );
418
419    tracing::debug!(
420        "[{}] ========== BASE SYSTEM PROMPT SNAPSHOT ==========",
421        session_id
422    );
423    tracing::debug!("[{}] Snapshot length: {} chars", session_id, prompt.len());
424    tracing::debug!("[{}] -----------------------------------", session_id);
425    tracing::debug!("[{}] {}", session_id, prompt);
426    tracing::debug!(
427        "[{}] ========== END BASE SYSTEM PROMPT SNAPSHOT ==========",
428        session_id
429    );
430}
431
432/// Map an execution result to a terminal error event.
433pub fn terminal_error_event_for_result(result: &Result<(), AgentError>) -> Option<AgentEvent> {
434    match result {
435        Ok(_) => None,
436        Err(error) if error.is_cancelled() => Some(AgentEvent::Error {
437            message: "Agent execution cancelled by user".to_string(),
438        }),
439        Err(error) => Some(AgentEvent::Error {
440            message: error.to_string(),
441        }),
442    }
443}
444
445// Session metadata helpers (pure functions, no server dependency).
446
447fn system_prompt_for_session(session: &Session) -> Option<String> {
448    session
449        .messages
450        .iter()
451        .find(|message| matches!(message.role, bamboo_agent_core::Role::System))
452        .map(|message| message.content.clone())
453}
454
455fn initial_user_message_for_session(session: &Session) -> String {
456    session
457        .messages
458        .last()
459        .filter(|message| matches!(message.role, bamboo_agent_core::Role::User))
460        .map(|message| message.content.clone())
461        .unwrap_or_default()
462}
463
464fn selected_skill_ids_for_session(session: &Session) -> Option<Vec<String>> {
465    session
466        .metadata
467        .get("selected_skill_ids")
468        .and_then(|raw| bamboo_skills::selection::parse_selected_skill_ids_metadata(raw))
469}
470
471fn selected_skill_mode_for_session(session: &Session) -> Option<String> {
472    let value = session
473        .metadata
474        .get("skill_mode")
475        .or_else(|| session.metadata.get("mode"))?;
476    let trimmed = value.trim();
477    if trimmed.is_empty() {
478        None
479    } else {
480        Some(trimmed.to_string())
481    }
482}