Skip to main content

bamboo_engine/runtime/execution/
agent_spawn.rs

1//! Core agent execution spawning logic.
2//!
3//! Provides [`spawn_session_execution`] which handles the full lifecycle of a
4//! background agent run: spawn task → execute → finalize runner → persist session.
5
6use std::collections::{BTreeSet, HashMap};
7use std::future::Future;
8use std::pin::Pin;
9use std::sync::Arc;
10
11use tokio::sync::{mpsc, RwLock};
12use tokio_util::sync::CancellationToken;
13use tracing::Instrument;
14
15use bamboo_agent_core::tools::ToolExecutor;
16use bamboo_agent_core::{AgentError, AgentEvent, Session};
17use bamboo_domain::ReasoningEffort;
18use bamboo_llm::LLMProvider;
19
20use crate::runtime::config::{
21    AuxiliaryModelConfig, BashResumeHook, GoldConfig, GuardianConfig, GuardianSpawner,
22    ImageFallbackConfig,
23};
24use crate::runtime::execution::runner_lifecycle::finalize_runner;
25use crate::runtime::execution::runner_state::AgentRunner;
26use crate::runtime::model_roster::ModelRoster;
27use crate::runtime::Agent;
28use crate::runtime::{ExecuteRequest, ExecuteRequestBuilder};
29
30/// Shared, per-session-locked session cache.
31///
32/// A `DashMap` gives each session id its own shard-level lock (so unrelated
33/// sessions never contend), and the inner `parking_lot::RwLock` is a *sync*
34/// lock held only to briefly clone-out or mutate-and-write-back a single
35/// `Session` — never across an `.await`. Using a sync lock makes "no guard
36/// across await" a compile-time guarantee in Send futures.
37pub type SessionCache = std::sync::Arc<
38    dashmap::DashMap<String, std::sync::Arc<parking_lot::RwLock<bamboo_agent_core::Session>>>,
39>;
40
41/// Read a session out of the in-memory cache, cloning it out from under the
42/// brief sync read-lock. Returns `None` on a cache miss.
43///
44/// This is the single canonical cache-read used everywhere a caller holds a
45/// `SessionCache` (HTTP handlers, server tools, the app-state loader). It
46/// replaced ~13 verbatim copies of the
47/// `cache.get(id).map(|e| e.value().clone()).map(|a| a.read().clone())` idiom.
48pub fn read_cached_session(cache: &SessionCache, id: &str) -> Option<bamboo_agent_core::Session> {
49    cache
50        .get(id)
51        .map(|e| e.value().clone())
52        .map(|a| a.read().clone())
53}
54
55const SKILL_CONTEXT_START_MARKER: &str = "<!-- BAMBOO_SKILL_CONTEXT_START -->";
56const TOOL_GUIDE_START_MARKER: &str = "<!-- BAMBOO_TOOL_GUIDE_START -->";
57const EXTERNAL_MEMORY_START_MARKER: &str = "<!-- BAMBOO_EXTERNAL_MEMORY_START -->";
58const TASK_LIST_START_MARKER: &str = "<!-- BAMBOO_TASK_LIST_START -->";
59
60/// Outcome of an agent execution, handed to an optional
61/// [`SessionCompletionHook`].
62///
63/// Deliberately decoupled from the runtime's internal error type so the hook
64/// API stays stable across crates and callers don't need to match on engine
65/// error variants.
66pub struct SessionExecutionOutcome {
67    /// The run finished without error.
68    pub success: bool,
69    /// The run ended because it was cancelled (a non-success subset).
70    pub cancelled: bool,
71    /// Stringified error, present when `!success`.
72    pub error: Option<String>,
73}
74
75impl SessionExecutionOutcome {
76    fn from_result(result: &Result<(), AgentError>) -> Self {
77        match result {
78            Ok(()) => Self {
79                success: true,
80                cancelled: false,
81                error: None,
82            },
83            Err(error) => Self {
84                success: false,
85                cancelled: error.is_cancelled(),
86                error: Some(error.to_string()),
87            },
88        }
89    }
90}
91
92/// Optional post-execution hook for [`spawn_session_execution`].
93///
94/// Invoked after the runner is finalized but **before** the session is
95/// persisted, so a caller can record bespoke terminal bookkeeping (e.g. a
96/// scheduled-run status) and/or append a closing message that is then saved
97/// with the session. Receives the execution outcome plus a mutable handle to
98/// the session. This is how callers with extra finalization (the schedule
99/// manager) route through the single canonical execution path instead of
100/// forking their own spawn + `execute` + persist sequence.
101pub type SessionCompletionHook = Box<
102    dyn for<'a> FnOnce(
103            SessionExecutionOutcome,
104            &'a mut Session,
105        ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>
106        + Send,
107>;
108
109/// Arguments for spawning a background agent execution.
110///
111/// This is the crate-agnostic equivalent of the server's `SpawnAgentExecution`.
112/// It holds everything needed to run the agent loop and persist the result,
113/// without depending on HTTP types or `AppState`.
114pub struct SessionExecutionArgs {
115    // Core execution.
116    pub agent: Arc<Agent>,
117    pub session_id: String,
118    pub session: Session,
119
120    // Execution parameters.
121    pub tools_override: Option<Arc<dyn ToolExecutor>>,
122    pub provider_override: Option<Arc<dyn LLMProvider>>,
123    /// Cohesive primary + auxiliary model/provider selection. The primary
124    /// `model` is required for a spawn (see [`ModelRoster::model`]); the three
125    /// auxiliary roles default to their `Config::get_*` fallbacks when `None`.
126    pub model_roster: ModelRoster,
127    pub reasoning_effort: Option<ReasoningEffort>,
128    pub reasoning_effort_source: String,
129    pub auxiliary_model_resolver:
130        Option<Arc<dyn Fn() -> crate::runtime::config::AuxiliaryModelConfig + Send + Sync>>,
131    pub disabled_tools: Option<BTreeSet<String>>,
132    pub disabled_skill_ids: Option<BTreeSet<String>>,
133    pub selected_skill_ids: Option<Vec<String>>,
134    pub selected_skill_mode: Option<String>,
135    pub cancel_token: CancellationToken,
136    pub mpsc_tx: mpsc::Sender<AgentEvent>,
137    pub image_fallback: Option<ImageFallbackConfig>,
138    pub gold_config: Option<GoldConfig>,
139    /// Optional guardian adversarial-review gate configuration.
140    pub guardian_config: Option<GuardianConfig>,
141    /// Late-bound guardian reviewer spawner (server-provided; the runner cannot
142    /// construct a child directly).
143    pub guardian_spawner: Option<Arc<dyn GuardianSpawner>>,
144    /// Late-bound bash self-resume hook (issue #84 Phase 2b).
145    pub bash_resume_hook: Option<Arc<dyn BashResumeHook>>,
146    pub app_data_dir: Option<std::path::PathBuf>,
147
148    // Post-execution resources.
149    pub runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
150    pub sessions_cache: SessionCache,
151
152    /// Optional bespoke finalization, run after the runner is finalized and
153    /// before the session is persisted. See [`SessionCompletionHook`].
154    pub on_complete: Option<SessionCompletionHook>,
155}
156
157/// The per-request parameter subset of [`SessionExecutionArgs`] — everything
158/// that maps onto an [`ExecuteRequest`], minus the three required positional
159/// fields (`initial_message`, `event_tx`, `cancel_token`) and the post-execution
160/// resources (runners, sessions cache, completion hook).
161///
162/// Grouping these here lets [`build_execute_request`] perform the
163/// `SessionExecutionArgs` → [`ExecuteRequest`] mapping through the canonical
164/// [`ExecuteRequestBuilder`] in one place, instead of a hand-written struct
165/// literal that must be kept field-aligned with [`ExecuteRequest`] by hand.
166struct ExecuteRequestParams {
167    tools: Option<Arc<dyn ToolExecutor>>,
168    provider_override: Option<Arc<dyn LLMProvider>>,
169    model_roster: ModelRoster,
170    reasoning_effort: Option<ReasoningEffort>,
171    auxiliary_model_resolver: Option<Arc<dyn Fn() -> AuxiliaryModelConfig + Send + Sync>>,
172    disabled_tools: Option<BTreeSet<String>>,
173    disabled_skill_ids: Option<BTreeSet<String>>,
174    selected_skill_ids: Option<Vec<String>>,
175    selected_skill_mode: Option<String>,
176    image_fallback: Option<ImageFallbackConfig>,
177    gold_config: Option<GoldConfig>,
178    guardian_config: Option<GuardianConfig>,
179    guardian_spawner: Option<Arc<dyn GuardianSpawner>>,
180    bash_resume_hook: Option<Arc<dyn BashResumeHook>>,
181    app_data_dir: Option<std::path::PathBuf>,
182}
183
184/// Assemble an [`ExecuteRequest`] from the resolved spawn parameters via the
185/// canonical [`ExecuteRequestBuilder`].
186///
187/// Centralizing this mapping keeps every optional field threaded with exactly
188/// the same value the old struct literal carried (the builder defaults each
189/// unset field to `None`), while removing the field-by-field duplication.
190fn build_execute_request(
191    initial_message: String,
192    event_tx: mpsc::Sender<AgentEvent>,
193    cancel_token: CancellationToken,
194    params: ExecuteRequestParams,
195) -> ExecuteRequest {
196    let ExecuteRequestParams {
197        tools,
198        provider_override,
199        model_roster,
200        reasoning_effort,
201        auxiliary_model_resolver,
202        disabled_tools,
203        disabled_skill_ids,
204        selected_skill_ids,
205        selected_skill_mode,
206        image_fallback,
207        gold_config,
208        guardian_config,
209        guardian_spawner,
210        bash_resume_hook,
211        app_data_dir,
212    } = params;
213
214    let mut builder = ExecuteRequestBuilder::new(initial_message, event_tx, cancel_token)
215        .model_roster(model_roster)
216        .gold_config(gold_config)
217        .guardian_config(guardian_config)
218        .guardian_spawner(guardian_spawner)
219        .bash_resume_hook(bash_resume_hook);
220
221    if let Some(tools) = tools {
222        builder = builder.tools(tools);
223    }
224    if let Some(provider_override) = provider_override {
225        builder = builder.provider_override(provider_override);
226    }
227    if let Some(reasoning_effort) = reasoning_effort {
228        builder = builder.reasoning_effort(reasoning_effort);
229    }
230    if let Some(auxiliary_model_resolver) = auxiliary_model_resolver {
231        builder = builder.auxiliary_model_resolver(auxiliary_model_resolver);
232    }
233    if let Some(disabled_tools) = disabled_tools {
234        builder = builder.disabled_tools(disabled_tools);
235    }
236    if let Some(disabled_skill_ids) = disabled_skill_ids {
237        builder = builder.disabled_skill_ids(disabled_skill_ids);
238    }
239    if let Some(selected_skill_ids) = selected_skill_ids {
240        builder = builder.selected_skill_ids(selected_skill_ids);
241    }
242    if let Some(selected_skill_mode) = selected_skill_mode {
243        builder = builder.selected_skill_mode(selected_skill_mode);
244    }
245    if let Some(image_fallback) = image_fallback {
246        builder = builder.image_fallback(image_fallback);
247    }
248    if let Some(app_data_dir) = app_data_dir {
249        builder = builder.app_data_dir(app_data_dir);
250    }
251
252    builder.build()
253}
254
255/// Spawn a background agent execution task.
256///
257/// This function spawns a tokio task that:
258/// 1. Executes the agent loop via `agent.execute()`
259/// 2. Sends a terminal error event if the execution fails
260/// 3. Finalizes the runner status
261/// 4. Persists the session via merge-save (preserves concurrent UI title/pin edits)
262/// 5. Updates the in-memory session cache
263pub fn spawn_session_execution(args: SessionExecutionArgs) {
264    let span_session_id = args.session_id.clone();
265    let session_span = tracing::info_span!("agent_execution", session_id = %span_session_id);
266
267    tokio::spawn(
268        async move {
269            let SessionExecutionArgs {
270                agent,
271                session_id,
272                mut session,
273                tools_override,
274                provider_override,
275                model_roster,
276                reasoning_effort,
277                reasoning_effort_source,
278                auxiliary_model_resolver,
279                disabled_tools,
280                disabled_skill_ids,
281                selected_skill_ids,
282                selected_skill_mode,
283                cancel_token,
284                mpsc_tx,
285                image_fallback,
286                gold_config,
287                guardian_config,
288                guardian_spawner,
289                bash_resume_hook,
290                app_data_dir,
291                runners,
292                sessions_cache,
293                on_complete,
294            } = args;
295
296            // The primary model is required for a spawn; the roster stores it as
297            // `Option<String>` for uniformity, so recover the owned String here
298            // for session attribution / logging (same value the caller set).
299            let model = model_roster.model.clone().unwrap_or_default();
300
301            let initial_message = initial_user_message_for_session(&session);
302            let selected_skill_ids =
303                selected_skill_ids.or_else(|| selected_skill_ids_for_session(&session));
304            let selected_skill_mode =
305                selected_skill_mode.or_else(|| selected_skill_mode_for_session(&session));
306
307            tracing::info!(
308                "[{}] Using resolved session model: {}, reasoning_effort={}, reasoning_source={}",
309                session_id,
310                model,
311                reasoning_effort
312                    .map(ReasoningEffort::as_str)
313                    .unwrap_or("none"),
314                reasoning_effort_source
315            );
316
317            // Set the resolved model via the single authoritative pre-execution
318            // mutation point. The caller already placed the system prompt on the
319            // session, so pass `None` for `system_prompt` (the subsequent
320            // `system_prompt_for_session` read below sees the caller's message).
321            // This must run before that read / logging so the observable
322            // sequence (model set, then prompt snapshot) is identical.
323            crate::session_app::execution_prep::prepare_session_for_execution(
324                &mut session,
325                None,
326                Some(&model),
327            );
328
329            let system_prompt = system_prompt_for_session(&session);
330            if let Some(prompt) = system_prompt.as_ref() {
331                log_base_system_prompt_snapshot(&session_id, prompt);
332            }
333
334            let execute_request = build_execute_request(
335                initial_message,
336                mpsc_tx.clone(),
337                cancel_token,
338                ExecuteRequestParams {
339                    tools: tools_override,
340                    provider_override,
341                    model_roster,
342                    reasoning_effort,
343                    auxiliary_model_resolver,
344                    disabled_tools,
345                    disabled_skill_ids,
346                    selected_skill_ids,
347                    selected_skill_mode,
348                    image_fallback,
349                    gold_config,
350                    guardian_config,
351                    guardian_spawner,
352                    bash_resume_hook,
353                    app_data_dir,
354                },
355            );
356
357            let result = agent.execute(&mut session, execute_request).await;
358
359            // Send terminal event for all error cases (including cancellation).
360            if let Some(error_event) = terminal_error_event_for_result(&result) {
361                let _ = mpsc_tx.send(error_event).await;
362            }
363
364            // Update runner status.
365            finalize_runner(&runners, &session_id, &result).await;
366
367            // Bespoke terminal bookkeeping (e.g. a scheduled-run status) runs
368            // here — after the runner is finalized but before persistence — so
369            // any closing message the hook appends is saved with the session
370            // below.
371            if let Some(on_complete) = on_complete {
372                on_complete(SessionExecutionOutcome::from_result(&result), &mut session).await;
373            }
374
375            // Save session via merge-save so any concurrent UI edits to
376            // title / pinned / title_version are preserved (the runtime is not
377            // an authoritative title writer).
378            if let Err(error) = agent.persistence().save_runtime_session(&mut session).await {
379                tracing::warn!("[{}] Failed to save session: {}", session_id, error);
380            }
381
382            // Update memory cache.
383            sessions_cache.insert(
384                session_id.clone(),
385                Arc::new(parking_lot::RwLock::new(session)),
386            );
387
388            tracing::info!("[{}] Agent execution completed", session_id);
389        }
390        .instrument(session_span),
391    );
392}
393
394/// Log a snapshot of the base system prompt for debugging.
395pub fn log_base_system_prompt_snapshot(session_id: &str, prompt: &str) {
396    tracing::info!(
397        "[{}] Base system prompt snapshot: len={} chars, has_skill={}, has_tool_guide={}, has_external_memory={}, has_task_list={}",
398        session_id,
399        prompt.len(),
400        prompt.contains(SKILL_CONTEXT_START_MARKER),
401        prompt.contains(TOOL_GUIDE_START_MARKER),
402        prompt.contains(EXTERNAL_MEMORY_START_MARKER),
403        prompt.contains(TASK_LIST_START_MARKER),
404    );
405
406    tracing::debug!(
407        "[{}] ========== BASE SYSTEM PROMPT SNAPSHOT ==========",
408        session_id
409    );
410    tracing::debug!("[{}] Snapshot length: {} chars", session_id, prompt.len());
411    tracing::debug!("[{}] -----------------------------------", session_id);
412    tracing::debug!("[{}] {}", session_id, prompt);
413    tracing::debug!(
414        "[{}] ========== END BASE SYSTEM PROMPT SNAPSHOT ==========",
415        session_id
416    );
417}
418
419/// Map an execution result to a terminal error event.
420pub fn terminal_error_event_for_result(result: &Result<(), AgentError>) -> Option<AgentEvent> {
421    match result {
422        Ok(_) => None,
423        Err(error) if error.is_cancelled() => Some(AgentEvent::Error {
424            message: "Agent execution cancelled by user".to_string(),
425        }),
426        Err(error) => Some(AgentEvent::Error {
427            message: error.to_string(),
428        }),
429    }
430}
431
432// Session metadata helpers (pure functions, no server dependency).
433
434fn system_prompt_for_session(session: &Session) -> Option<String> {
435    session
436        .messages
437        .iter()
438        .find(|message| matches!(message.role, bamboo_agent_core::Role::System))
439        .map(|message| message.content.clone())
440}
441
442fn initial_user_message_for_session(session: &Session) -> String {
443    session
444        .messages
445        .last()
446        .filter(|message| matches!(message.role, bamboo_agent_core::Role::User))
447        .map(|message| message.content.clone())
448        .unwrap_or_default()
449}
450
451fn selected_skill_ids_for_session(session: &Session) -> Option<Vec<String>> {
452    session
453        .metadata
454        .get("selected_skill_ids")
455        .and_then(|raw| bamboo_skills::selection::parse_selected_skill_ids_metadata(raw))
456}
457
458fn selected_skill_mode_for_session(session: &Session) -> Option<String> {
459    let value = session
460        .metadata
461        .get("skill_mode")
462        .or_else(|| session.metadata.get("mode"))?;
463    let trimmed = value.trim();
464    if trimmed.is_empty() {
465        None
466    } else {
467        Some(trimmed.to_string())
468    }
469}