bamboo_server/handlers/agent/execute/handler/
mod.rs1use actix_web::{web, HttpResponse};
2use std::collections::BTreeSet;
3use tokio::sync::mpsc;
4
5use super::image_fallback::resolve_image_fallback;
6use super::runtime::{
7 reserve_runner, spawn_agent_execution, spawn_event_forwarder, RunnerReservation,
8 SpawnAgentExecution,
9};
10use super::{ExecuteRequest, ExecuteSyncInfo, ExecuteSyncReason};
11use crate::app_state::AppState;
12use crate::model_config_helper::{
13 get_default_model_for_provider, get_memory_background_model_for_provider,
14 get_reasoning_effort_for_provider,
15};
16use crate::session_app::provider_model::session_effective_model_ref;
17
18use self::response::{
19 already_running_response, bad_request_error_response, completed_response,
20 internal_server_error_response, started_response,
21};
22
23mod response;
24#[cfg(test)]
25mod tests;
26mod validation;
27
28pub async fn handler(
30 state: web::Data<AppState>,
31 path: web::Path<String>,
32 req: web::Json<ExecuteRequest>,
33) -> HttpResponse {
34 let session_id = path.into_inner();
35
36 let config_snapshot = state.config.read().await.clone();
38 let image_fallback = match resolve_image_fallback(&config_snapshot) {
39 Ok(value) => value,
40 Err(error) => return internal_server_error_response(error),
41 };
42
43 let disabled_tools_vec: Vec<String> =
44 config_snapshot.disabled_tool_names().into_iter().collect();
45 let disabled_skill_ids_vec: Vec<String> =
46 config_snapshot.disabled_skill_ids().into_iter().collect();
47 let requested_provider = req
48 .model_ref
49 .as_ref()
50 .map(|model_ref| model_ref.provider.as_str())
51 .or(req.provider.as_deref())
52 .unwrap_or(config_snapshot.provider.as_str());
53
54 let config = crate::session_app::types::ExecutionConfigSnapshot {
55 default_model: get_default_model_for_provider(&config_snapshot, requested_provider).ok(),
56 default_model_ref: config_snapshot.defaults.as_ref().map(|d| d.chat.clone()),
57 default_reasoning_effort: get_reasoning_effort_for_provider(
58 &config_snapshot,
59 requested_provider,
60 ),
61 disabled_tools: disabled_tools_vec.clone(),
62 disabled_skill_ids: disabled_skill_ids_vec.clone(),
63 provider_name: requested_provider.to_string(),
64 fast_model: get_memory_background_model_for_provider(&config_snapshot, requested_provider),
65 fast_model_ref: config_snapshot
66 .defaults
67 .as_ref()
68 .and_then(|d| d.fast.clone()),
69 image_fallback: image_fallback.clone(),
70 provider_model_ref_enabled: config_snapshot.features.provider_model_ref,
71 };
72
73 let input = crate::session_app::types::ExecuteInput {
74 session_id: session_id.clone(),
75 request_model: req.model.clone(),
76 request_model_ref: req.model_ref.clone(),
77 request_provider: req.provider.clone(),
78 request_reasoning_effort: req.reasoning_effort,
79 request_skill_mode: req.skill_mode.clone(),
80 client_sync: req.client_sync.as_ref().map(|cs| {
81 crate::session_app::types::ExecuteClientSync {
82 client_message_count: cs.client_message_count,
83 client_last_message_id: cs.client_last_message_id.clone(),
84 client_has_pending_question: cs.client_has_pending_question,
85 client_pending_question_tool_call_id: cs
86 .client_pending_question_tool_call_id
87 .clone(),
88 }
89 }),
90 };
91
92 let outcome =
93 match crate::session_app::execute::prepare_execute(state.as_ref(), config.clone(), input)
94 .await
95 {
96 Ok(outcome) => outcome,
97 Err(error) => {
98 return match error {
99 crate::session_app::errors::ExecutePreparationError::NotFound(_) => {
100 tracing::warn!("[{session_id}] Execute session not found");
101 HttpResponse::NotFound().json(serde_json::json!({
102 "error": "Session not found",
103 "session_id": session_id
104 }))
105 }
106 crate::session_app::errors::ExecutePreparationError::LoadFailed(load_err) => {
107 let err_msg = load_err.to_string();
108 tracing::error!("[{session_id}] Execute session load error: {err_msg}");
109 HttpResponse::InternalServerError().json(serde_json::json!({
110 "error": format!("Failed to load session: {err_msg}")
111 }))
112 }
113 _ => internal_server_error_response(format!(
114 "Execute preparation failed: {error}"
115 )),
116 };
117 }
118 };
119
120 match outcome {
121 crate::session_app::types::ExecutePreparationOutcome::Ready {
122 session,
123 effective_model,
124 effective_reasoning_effort,
125 model_source,
126 reasoning_source,
127 is_child_session,
128 } => {
129 let session = *session;
130 let session_tx = state.get_session_event_sender(&session_id).await;
132 let cancel_token = match reserve_runner(state.get_ref(), &session_id, &session_tx).await
133 {
134 RunnerReservation::Started(token) => token,
135 RunnerReservation::AlreadyRunning => {
136 let sync_info = build_sync_info_from_session(&session, None);
137 return already_running_response(&session_id, sync_info);
138 }
139 };
140
141 if let Err(error) = state.storage.save_session(&session).await {
143 return internal_server_error_response(format!(
144 "Failed to persist session config before execute: {}",
145 error
146 ));
147 }
148 {
149 let mut sessions = state.sessions.write().await;
150 sessions.insert(session_id.clone(), session.clone());
151 }
152
153 let disabled_tools: BTreeSet<String> = disabled_tools_vec.into_iter().collect();
154 let disabled_skill_ids: BTreeSet<String> = disabled_skill_ids_vec.into_iter().collect();
155 let resolved_provider_name = session_effective_model_ref(&session)
156 .map(|model_ref| model_ref.provider)
157 .unwrap_or_else(|| config.provider_name.clone());
158 let resolved_bg = crate::model_config_helper::resolve_background_model(
159 &config_snapshot,
160 &resolved_provider_name,
161 &state.provider_registry,
162 );
163 let resolved_background_model = resolved_bg.as_ref().map(|m| m.model_name.clone());
164 let resolved_bg_provider = resolved_bg.map(|m| m.provider);
165
166 let sync_info = build_sync_info_from_session(&session, None);
168
169 tracing::info!(
170 "[{}] Starting agent execution with provider={}, model={}, model_source={}, reasoning_effort={}, reasoning_source={}",
171 session_id,
172 resolved_provider_name,
173 effective_model,
174 model_source,
175 effective_reasoning_effort
176 .map(bamboo_domain::reasoning::ReasoningEffort::as_str)
177 .unwrap_or("none"),
178 reasoning_source
179 );
180
181 let (mpsc_tx, mpsc_rx) = mpsc::channel::<bamboo_agent_core::AgentEvent>(100);
183
184 spawn_event_forwarder(
185 state.clone(),
186 session_id.clone(),
187 mpsc_rx,
188 session_tx.clone(),
189 );
190 spawn_agent_execution(SpawnAgentExecution {
191 state: state.clone(),
192 session_id: session_id.clone(),
193 session,
194 is_child_session,
195 provider_name: resolved_provider_name,
196 provider_override: None,
197 model: effective_model,
198 fast_model: resolved_background_model,
199 background_model_provider: resolved_bg_provider,
200 reasoning_effort: effective_reasoning_effort,
201 reasoning_effort_source: reasoning_source.to_string(),
202 disabled_tools,
203 disabled_skill_ids,
204 cancel_token,
205 mpsc_tx,
206 image_fallback,
207 });
208
209 started_response(&session_id, sync_info)
210 }
211
212 crate::session_app::types::ExecutePreparationOutcome::AlreadyRunning {
213 server_snapshot,
214 } => {
215 let sync_info = server_snapshot_to_sync_info(&server_snapshot, None);
216 already_running_response(&session_id, sync_info)
217 }
218
219 crate::session_app::types::ExecutePreparationOutcome::NoPendingMessage {
220 server_snapshot,
221 } => {
222 tracing::debug!(
223 "[{}] No pending user message, returning completed status",
224 session_id
225 );
226 let sync_info = server_snapshot_to_sync_info(&server_snapshot, None);
227 completed_response(&session_id, sync_info)
228 }
229
230 crate::session_app::types::ExecutePreparationOutcome::SyncMismatch {
231 reason,
232 server_snapshot,
233 } => {
234 state
235 .metrics_service
236 .collector()
237 .execute_sync_mismatch(reason.as_str(), chrono::Utc::now());
238 let sync_info = server_snapshot_to_sync_info(&server_snapshot, Some(reason));
239 completed_response(&session_id, sync_info)
240 }
241
242 crate::session_app::types::ExecutePreparationOutcome::ModelRequired => {
243 bad_request_error_response("no model configured for session or provider")
244 }
245
246 crate::session_app::types::ExecutePreparationOutcome::ImageFallbackError(error) => {
247 bad_request_error_response(error)
248 }
249 }
250}
251
252fn crate_sync_reason_to_handler(
254 reason: crate::session_app::types::ExecuteSyncReason,
255) -> ExecuteSyncReason {
256 match reason {
257 crate::session_app::types::ExecuteSyncReason::PendingQuestionMismatch => {
258 ExecuteSyncReason::PendingQuestionMismatch
259 }
260 crate::session_app::types::ExecuteSyncReason::MessageCountMismatch => {
261 ExecuteSyncReason::MessageCountMismatch
262 }
263 crate::session_app::types::ExecuteSyncReason::LastMessageIdMismatch => {
264 ExecuteSyncReason::LastMessageIdMismatch
265 }
266 }
267}
268
269fn server_snapshot_to_sync_info(
270 server_snapshot: &crate::session_app::types::ServerExecuteSnapshot,
271 reason: Option<crate::session_app::types::ExecuteSyncReason>,
272) -> ExecuteSyncInfo {
273 ExecuteSyncInfo {
274 need_sync: reason.is_some(),
275 reason: reason.map(crate_sync_reason_to_handler),
276 server_message_count: server_snapshot.message_count,
277 server_last_message_id: server_snapshot.last_message_id.clone(),
278 has_pending_question: server_snapshot.has_pending_question,
279 pending_question_tool_call_id: server_snapshot.pending_question_tool_call_id.clone(),
280 has_pending_user_message: server_snapshot.has_pending_user_message,
281 }
282}
283
284fn build_sync_info_from_session(
285 session: &bamboo_agent_core::Session,
286 reason: Option<ExecuteSyncReason>,
287) -> ExecuteSyncInfo {
288 ExecuteSyncInfo {
289 need_sync: reason.is_some(),
290 reason,
291 server_message_count: session.messages.len(),
292 server_last_message_id: session.messages.last().map(|m| m.id.clone()),
293 has_pending_question: session.pending_question.is_some(),
294 pending_question_tool_call_id: session
295 .pending_question
296 .as_ref()
297 .map(|p| p.tool_call_id.clone()),
298 has_pending_user_message: false,
299 }
300}