Skip to main content

bamboo_server/handlers/agent/execute/handler/
mod.rs

1use actix_web::{web, HttpResponse};
2use std::collections::BTreeSet;
3use tokio::sync::mpsc;
4
5use super::image_fallback::resolve_image_fallback;
6use super::runtime::{
7    reserve_runner, spawn_agent_execution, spawn_event_forwarder, RunnerReservation,
8    SpawnAgentExecution,
9};
10use super::{ExecuteRequest, ExecuteSyncInfo, ExecuteSyncReason};
11use crate::app_state::AppState;
12use crate::model_config_helper::{
13    get_default_model_for_provider, get_memory_background_model_for_provider,
14    get_reasoning_effort_for_provider,
15};
16use crate::session_app::provider_model::session_effective_model_ref;
17
18use self::response::{
19    already_running_response, bad_request_error_response, completed_response,
20    internal_server_error_response, started_response,
21};
22
23mod response;
24#[cfg(test)]
25mod tests;
26mod validation;
27
28/// Execute the AI agent on a chat session.
29pub async fn handler(
30    state: web::Data<AppState>,
31    path: web::Path<String>,
32    req: web::Json<ExecuteRequest>,
33) -> HttpResponse {
34    let session_id = path.into_inner();
35
36    // ---- Build execution config snapshot from server config ----
37    let config_snapshot = state.config.read().await.clone();
38    let image_fallback = match resolve_image_fallback(&config_snapshot) {
39        Ok(value) => value,
40        Err(error) => return internal_server_error_response(error),
41    };
42
43    let disabled_tools_vec: Vec<String> =
44        config_snapshot.disabled_tool_names().into_iter().collect();
45    let disabled_skill_ids_vec: Vec<String> =
46        config_snapshot.disabled_skill_ids().into_iter().collect();
47    let requested_provider = req
48        .model_ref
49        .as_ref()
50        .map(|model_ref| model_ref.provider.as_str())
51        .or(req.provider.as_deref())
52        .unwrap_or(config_snapshot.provider.as_str());
53
54    let config = crate::session_app::types::ExecutionConfigSnapshot {
55        default_model: get_default_model_for_provider(&config_snapshot, requested_provider).ok(),
56        default_model_ref: config_snapshot.defaults.as_ref().map(|d| d.chat.clone()),
57        default_reasoning_effort: get_reasoning_effort_for_provider(
58            &config_snapshot,
59            requested_provider,
60        ),
61        disabled_tools: disabled_tools_vec.clone(),
62        disabled_skill_ids: disabled_skill_ids_vec.clone(),
63        provider_name: requested_provider.to_string(),
64        fast_model: get_memory_background_model_for_provider(&config_snapshot, requested_provider),
65        fast_model_ref: config_snapshot
66            .defaults
67            .as_ref()
68            .and_then(|d| d.fast.clone()),
69        image_fallback: image_fallback.clone(),
70        provider_model_ref_enabled: config_snapshot.features.provider_model_ref,
71    };
72
73    let input = crate::session_app::types::ExecuteInput {
74        session_id: session_id.clone(),
75        request_model: req.model.clone(),
76        request_model_ref: req.model_ref.clone(),
77        request_provider: req.provider.clone(),
78        request_reasoning_effort: req.reasoning_effort,
79        request_skill_mode: req.skill_mode.clone(),
80        client_sync: req.client_sync.as_ref().map(|cs| {
81            crate::session_app::types::ExecuteClientSync {
82                client_message_count: cs.client_message_count,
83                client_last_message_id: cs.client_last_message_id.clone(),
84                client_has_pending_question: cs.client_has_pending_question,
85                client_pending_question_tool_call_id: cs
86                    .client_pending_question_tool_call_id
87                    .clone(),
88            }
89        }),
90    };
91
92    let outcome =
93        match crate::session_app::execute::prepare_execute(state.as_ref(), config.clone(), input)
94            .await
95        {
96            Ok(outcome) => outcome,
97            Err(error) => {
98                return match error {
99                    crate::session_app::errors::ExecutePreparationError::NotFound(_) => {
100                        tracing::warn!("[{session_id}] Execute session not found");
101                        HttpResponse::NotFound().json(serde_json::json!({
102                            "error": "Session not found",
103                            "session_id": session_id
104                        }))
105                    }
106                    crate::session_app::errors::ExecutePreparationError::LoadFailed(load_err) => {
107                        let err_msg = load_err.to_string();
108                        tracing::error!("[{session_id}] Execute session load error: {err_msg}");
109                        HttpResponse::InternalServerError().json(serde_json::json!({
110                            "error": format!("Failed to load session: {err_msg}")
111                        }))
112                    }
113                    _ => internal_server_error_response(format!(
114                        "Execute preparation failed: {error}"
115                    )),
116                };
117            }
118        };
119
120    match outcome {
121        crate::session_app::types::ExecutePreparationOutcome::Ready {
122            session,
123            effective_model,
124            effective_reasoning_effort,
125            model_source,
126            reasoning_source,
127            is_child_session,
128        } => {
129            let session = *session;
130            // ---- Reserve runner ----
131            let session_tx = state.get_session_event_sender(&session_id).await;
132            let cancel_token = match reserve_runner(state.get_ref(), &session_id, &session_tx).await
133            {
134                RunnerReservation::Started(token) => token,
135                RunnerReservation::AlreadyRunning => {
136                    let sync_info = build_sync_info_from_session(&session, None);
137                    return already_running_response(&session_id, sync_info);
138                }
139            };
140
141            // ---- Save session before spawn ----
142            if let Err(error) = state.storage.save_session(&session).await {
143                return internal_server_error_response(format!(
144                    "Failed to persist session config before execute: {}",
145                    error
146                ));
147            }
148            {
149                let mut sessions = state.sessions.write().await;
150                sessions.insert(session_id.clone(), session.clone());
151            }
152
153            let disabled_tools: BTreeSet<String> = disabled_tools_vec.into_iter().collect();
154            let disabled_skill_ids: BTreeSet<String> = disabled_skill_ids_vec.into_iter().collect();
155            let resolved_provider_name = session_effective_model_ref(&session)
156                .map(|model_ref| model_ref.provider)
157                .unwrap_or_else(|| config.provider_name.clone());
158            let resolved_bg = crate::model_config_helper::resolve_background_model(
159                &config_snapshot,
160                &resolved_provider_name,
161                &state.provider_registry,
162            );
163            let resolved_background_model = resolved_bg.as_ref().map(|m| m.model_name.clone());
164            let resolved_bg_provider = resolved_bg.map(|m| m.provider);
165
166            // Build sync info before moving session into SpawnAgentExecution.
167            let sync_info = build_sync_info_from_session(&session, None);
168
169            tracing::info!(
170                "[{}] Starting agent execution with provider={}, model={}, model_source={}, reasoning_effort={}, reasoning_source={}",
171                session_id,
172                resolved_provider_name,
173                effective_model,
174                model_source,
175                effective_reasoning_effort
176                    .map(bamboo_domain::reasoning::ReasoningEffort::as_str)
177                    .unwrap_or("none"),
178                reasoning_source
179            );
180
181            // Create mpsc channel for agent loop.
182            let (mpsc_tx, mpsc_rx) = mpsc::channel::<bamboo_agent_core::AgentEvent>(100);
183
184            spawn_event_forwarder(
185                state.clone(),
186                session_id.clone(),
187                mpsc_rx,
188                session_tx.clone(),
189            );
190            spawn_agent_execution(SpawnAgentExecution {
191                state: state.clone(),
192                session_id: session_id.clone(),
193                session,
194                is_child_session,
195                provider_name: resolved_provider_name,
196                provider_override: None,
197                model: effective_model,
198                fast_model: resolved_background_model,
199                background_model_provider: resolved_bg_provider,
200                reasoning_effort: effective_reasoning_effort,
201                reasoning_effort_source: reasoning_source.to_string(),
202                disabled_tools,
203                disabled_skill_ids,
204                cancel_token,
205                mpsc_tx,
206                image_fallback,
207            });
208
209            started_response(&session_id, sync_info)
210        }
211
212        crate::session_app::types::ExecutePreparationOutcome::AlreadyRunning {
213            server_snapshot,
214        } => {
215            let sync_info = server_snapshot_to_sync_info(&server_snapshot, None);
216            already_running_response(&session_id, sync_info)
217        }
218
219        crate::session_app::types::ExecutePreparationOutcome::NoPendingMessage {
220            server_snapshot,
221        } => {
222            tracing::debug!(
223                "[{}] No pending user message, returning completed status",
224                session_id
225            );
226            let sync_info = server_snapshot_to_sync_info(&server_snapshot, None);
227            completed_response(&session_id, sync_info)
228        }
229
230        crate::session_app::types::ExecutePreparationOutcome::SyncMismatch {
231            reason,
232            server_snapshot,
233        } => {
234            state
235                .metrics_service
236                .collector()
237                .execute_sync_mismatch(reason.as_str(), chrono::Utc::now());
238            let sync_info = server_snapshot_to_sync_info(&server_snapshot, Some(reason));
239            completed_response(&session_id, sync_info)
240        }
241
242        crate::session_app::types::ExecutePreparationOutcome::ModelRequired => {
243            bad_request_error_response("no model configured for session or provider")
244        }
245
246        crate::session_app::types::ExecutePreparationOutcome::ImageFallbackError(error) => {
247            bad_request_error_response(error)
248        }
249    }
250}
251
252/// Convert a crate's `ExecuteSyncReason` to the handler's `ExecuteSyncReason`.
253fn crate_sync_reason_to_handler(
254    reason: crate::session_app::types::ExecuteSyncReason,
255) -> ExecuteSyncReason {
256    match reason {
257        crate::session_app::types::ExecuteSyncReason::PendingQuestionMismatch => {
258            ExecuteSyncReason::PendingQuestionMismatch
259        }
260        crate::session_app::types::ExecuteSyncReason::MessageCountMismatch => {
261            ExecuteSyncReason::MessageCountMismatch
262        }
263        crate::session_app::types::ExecuteSyncReason::LastMessageIdMismatch => {
264            ExecuteSyncReason::LastMessageIdMismatch
265        }
266    }
267}
268
269fn server_snapshot_to_sync_info(
270    server_snapshot: &crate::session_app::types::ServerExecuteSnapshot,
271    reason: Option<crate::session_app::types::ExecuteSyncReason>,
272) -> ExecuteSyncInfo {
273    ExecuteSyncInfo {
274        need_sync: reason.is_some(),
275        reason: reason.map(crate_sync_reason_to_handler),
276        server_message_count: server_snapshot.message_count,
277        server_last_message_id: server_snapshot.last_message_id.clone(),
278        has_pending_question: server_snapshot.has_pending_question,
279        pending_question_tool_call_id: server_snapshot.pending_question_tool_call_id.clone(),
280        has_pending_user_message: server_snapshot.has_pending_user_message,
281    }
282}
283
284fn build_sync_info_from_session(
285    session: &bamboo_agent_core::Session,
286    reason: Option<ExecuteSyncReason>,
287) -> ExecuteSyncInfo {
288    ExecuteSyncInfo {
289        need_sync: reason.is_some(),
290        reason,
291        server_message_count: session.messages.len(),
292        server_last_message_id: session.messages.last().map(|m| m.id.clone()),
293        has_pending_question: session.pending_question.is_some(),
294        pending_question_tool_call_id: session
295            .pending_question
296            .as_ref()
297            .map(|p| p.tool_call_id.clone()),
298        has_pending_user_message: false,
299    }
300}