use actix_web::{web, HttpResponse};
use tokio::sync::mpsc;
use super::image_fallback::{resolve_image_fallback, validate_image_fallback_for_session};
use super::runtime::{
consume_pending_conclusion_with_options_resume, has_pending_user_message, reserve_runner,
spawn_agent_execution, spawn_event_forwarder, RunnerReservation, SpawnAgentExecution,
};
use super::session::load_session;
use super::{ExecuteClientSync, ExecuteRequest, ExecuteSyncInfo, ExecuteSyncReason};
use crate::agent::core::SessionKind;
use crate::server::app_state::AppState;
use self::response::{
already_running_response, bad_request_error_response, completed_response,
internal_server_error_response, started_response,
};
use self::validation::{require_resolved_model, validate_and_normalize_model};
mod response;
#[cfg(test)]
mod tests;
mod validation;
#[derive(Debug, Clone, PartialEq, Eq)]
struct ServerExecuteSnapshot {
message_count: usize,
last_message_id: Option<String>,
has_pending_question: bool,
pending_question_tool_call_id: Option<String>,
has_pending_user_message: bool,
}
impl ServerExecuteSnapshot {
fn from_session(session: &crate::agent::core::Session) -> Self {
Self {
message_count: session.messages.len(),
last_message_id: session.messages.last().map(|message| message.id.clone()),
has_pending_question: session.pending_question.is_some(),
pending_question_tool_call_id: session
.pending_question
.as_ref()
.map(|pending| pending.tool_call_id.clone()),
has_pending_user_message: has_pending_user_message(session),
}
}
fn to_sync_info(&self, reason: Option<ExecuteSyncReason>) -> ExecuteSyncInfo {
ExecuteSyncInfo {
need_sync: reason.is_some(),
reason,
server_message_count: self.message_count,
server_last_message_id: self.last_message_id.clone(),
has_pending_question: self.has_pending_question,
pending_question_tool_call_id: self.pending_question_tool_call_id.clone(),
has_pending_user_message: self.has_pending_user_message,
}
}
}
fn evaluate_client_sync(
client_sync: Option<&ExecuteClientSync>,
server_snapshot: &ServerExecuteSnapshot,
) -> Option<ExecuteSyncReason> {
let client_sync = client_sync?;
let client_pending_question_tool_call_id = client_sync
.client_pending_question_tool_call_id
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty());
let server_pending_question_tool_call_id = server_snapshot
.pending_question_tool_call_id
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty());
if client_sync.client_has_pending_question != server_snapshot.has_pending_question {
return Some(ExecuteSyncReason::PendingQuestionMismatch);
}
if client_sync.client_has_pending_question
&& client_pending_question_tool_call_id.is_some()
&& client_pending_question_tool_call_id != server_pending_question_tool_call_id
{
return Some(ExecuteSyncReason::PendingQuestionMismatch);
}
if client_sync.client_message_count != server_snapshot.message_count {
return Some(ExecuteSyncReason::MessageCountMismatch);
}
let client_last_message_id = client_sync
.client_last_message_id
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty());
let server_last_message_id = server_snapshot
.last_message_id
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty());
if client_last_message_id != server_last_message_id {
return Some(ExecuteSyncReason::LastMessageIdMismatch);
}
None
}
pub async fn handler(
state: web::Data<AppState>,
path: web::Path<String>,
req: web::Json<ExecuteRequest>,
) -> HttpResponse {
let session_id = path.into_inner();
let request_model = match validate_and_normalize_model(req.model.as_deref()) {
Ok(model) => model,
Err(response) => return response,
};
let request_reasoning_effort = req.reasoning_effort;
let request_skill_mode = req.skill_mode.clone();
let request_client_sync = req.client_sync.clone();
let mut session = match load_session(&state, &session_id).await {
Ok(session) => session,
Err(response) => return response,
};
let is_child_session = session.kind == SessionKind::Child;
let server_snapshot = ServerExecuteSnapshot::from_session(&session);
if let Some(reason) = evaluate_client_sync(request_client_sync.as_ref(), &server_snapshot) {
tracing::info!(
"[{}] Execute sync mismatch detected before start: {}",
session_id,
reason.as_str()
);
state
.metrics_service
.collector()
.execute_sync_mismatch(reason.as_str(), chrono::Utc::now());
return completed_response(&session_id, server_snapshot.to_sync_info(Some(reason)));
}
let config_snapshot = state.config.read().await.clone();
let default_model = config_snapshot.get_model();
let default_reasoning_effort = config_snapshot.get_reasoning_effort();
let session_model = match validate_and_normalize_model(Some(session.model.as_str())) {
Ok(model) => model,
Err(response) => return response,
};
let effective_model = match require_resolved_model(
session_model
.clone()
.or_else(|| default_model.clone())
.or(request_model.clone()),
) {
Ok(model) => model,
Err(response) => return response,
};
let effective_reasoning_effort = session
.reasoning_effort
.or(default_reasoning_effort)
.or(request_reasoning_effort);
let disabled_tools = config_snapshot.disabled_tool_names();
let disabled_skill_ids = config_snapshot.disabled_skill_ids();
let model_source = if session_model.is_some() {
"session"
} else if default_model.is_some() {
"provider_default"
} else if request_model.is_some() {
"request"
} else {
"none"
};
let reasoning_effort_source = if session.reasoning_effort.is_some() {
"session"
} else if default_reasoning_effort.is_some() {
"provider_default"
} else if request_reasoning_effort.is_some() {
"request"
} else {
"none"
};
let image_fallback = match resolve_image_fallback(&config_snapshot) {
Ok(value) => value,
Err(error) => return internal_server_error_response(error),
};
if let Err(error) = validate_image_fallback_for_session(&session, image_fallback.as_ref()) {
return bad_request_error_response(error);
}
if !server_snapshot.has_pending_user_message {
tracing::debug!(
"[{}] No pending user message, returning completed status",
session_id
);
return completed_response(&session_id, server_snapshot.to_sync_info(None));
}
session.model = effective_model.clone();
session.reasoning_effort = effective_reasoning_effort;
session
.metadata
.insert("model_source".to_string(), model_source.to_string());
if let Some(reasoning_effort) = effective_reasoning_effort {
session.metadata.insert(
"reasoning_effort_source".to_string(),
reasoning_effort_source.to_string(),
);
session.metadata.insert(
"reasoning_effort_compat".to_string(),
reasoning_effort.as_str().to_string(),
);
} else {
session.metadata.remove("reasoning_effort_source");
session.metadata.remove("reasoning_effort_compat");
}
if let Some(skill_mode) = request_skill_mode {
let trimmed = skill_mode.trim();
if trimmed.is_empty() {
session.metadata.remove("skill_mode");
} else {
session
.metadata
.insert("skill_mode".to_string(), trimmed.to_string());
}
}
let session_tx = state.get_session_event_sender(&session_id).await;
let cancel_token = match reserve_runner(state.get_ref(), &session_id, &session_tx).await {
RunnerReservation::Started(token) => token,
RunnerReservation::AlreadyRunning => {
return already_running_response(&session_id, server_snapshot.to_sync_info(None));
}
};
consume_pending_conclusion_with_options_resume(&mut session);
if let Err(error) = state.storage.save_session(&session).await {
return internal_server_error_response(format!(
"Failed to persist session config before execute: {}",
error
));
}
{
let mut sessions = state.sessions.write().await;
sessions.insert(session_id.clone(), session.clone());
}
tracing::info!(
"[{}] Starting agent execution with model={}, model_source={}, reasoning_effort={}, reasoning_source={}",
session_id,
effective_model,
model_source,
effective_reasoning_effort
.map(crate::core::ReasoningEffort::as_str)
.unwrap_or("none"),
reasoning_effort_source
);
let (mpsc_tx, mpsc_rx) = mpsc::channel::<crate::agent::core::AgentEvent>(100);
spawn_event_forwarder(
state.clone(),
session_id.clone(),
mpsc_rx,
session_tx.clone(),
);
spawn_agent_execution(SpawnAgentExecution {
state: state.clone(),
session_id: session_id.clone(),
session,
is_child_session,
provider_name: config_snapshot.provider.clone(),
model: effective_model,
fast_model: config_snapshot.get_memory_background_model(),
reasoning_effort: effective_reasoning_effort,
reasoning_effort_source: reasoning_effort_source.to_string(),
disabled_tools,
disabled_skill_ids,
cancel_token,
mpsc_tx,
image_fallback,
});
started_response(&session_id, server_snapshot.to_sync_info(None))
}