1use crate::{RuntimeHostAdapter, RuntimeSessionLifecycle};
5use everruns_core::atoms::{ActInput, AtomContext};
6use everruns_core::error::{AgentLoopError, Result};
7use everruns_core::typed_id::{AgentId, ExecId, HarnessId, MessageId, SessionId, TurnId};
8use everruns_core::{
9 Controls, ReasonResult, UserFacingError, UserFacingErrorContext,
10 classify_runtime_error_message, user_facing_error_codes,
11};
12use serde::{Deserialize, Serialize};
13use tracing::{debug, info};
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
24pub struct RuntimeTurnState {
25 pub org_id: i64,
26 pub session_id: SessionId,
27 pub harness_id: HarnessId,
28 pub agent_id: Option<AgentId>,
29 pub input_message_id: MessageId,
30 #[serde(skip_serializing_if = "Option::is_none")]
31 pub turn_id: Option<TurnId>,
32 #[serde(skip_serializing_if = "Option::is_none", default)]
33 pub previous_response_id: Option<String>,
34 #[serde(default = "default_iteration")]
35 pub iteration: u32,
36 #[serde(skip_serializing_if = "Option::is_none", default)]
37 pub request_id: Option<String>,
38}
39
40fn default_iteration() -> u32 {
41 1
42}
43
44#[derive(Debug, Clone)]
48pub struct RuntimeActPlan {
49 pub input: ActInput,
50 pub previous_response_id: Option<String>,
51 pub iteration: u32,
52 pub request_id: Option<String>,
53}
54
55#[derive(Debug, Clone)]
61pub enum RuntimeTurnPlan {
62 ScheduleReason(RuntimeTurnState),
63 ScheduleAct(RuntimeActPlan),
64 Complete { error: Option<String> },
65 WaitForToolResults { resume: RuntimeTurnState },
66}
67
68fn classify_reason_failure(reason_result: &ReasonResult) -> UserFacingError {
69 let from_text =
70 classify_runtime_error_message(&reason_result.text, &UserFacingErrorContext::default());
71
72 let Some(error) = reason_result.error.as_deref() else {
73 return from_text;
74 };
75
76 let from_error = classify_runtime_error_message(error, &UserFacingErrorContext::default());
77
78 if from_error.code == user_facing_error_codes::PROCESSING_ERROR {
79 return from_text;
80 }
81
82 if from_error.code == from_text.code
83 && from_error.fields.is_empty()
84 && !from_text.fields.is_empty()
85 {
86 return from_text;
87 }
88
89 from_error
90}
91
92pub async fn plan_next_host_turn<A: RuntimeHostAdapter>(
109 adapter: &A,
110 completed_activity: &str,
111 state: &RuntimeTurnState,
112 output: &serde_json::Value,
113 pending_user_message_count: usize,
114) -> Result<RuntimeTurnPlan> {
115 match completed_activity {
116 "process_input" => {
117 let turn_id: Option<TurnId> = output
118 .get("turn_id")
119 .and_then(|value| value.as_str())
120 .and_then(|value| value.parse().ok());
121 let next = RuntimeTurnState {
122 turn_id,
123 previous_response_id: None,
124 iteration: 1,
125 ..state.clone()
126 };
127 debug!(session_id = %state.session_id, turn_id = ?turn_id, "planned reason step");
128 Ok(RuntimeTurnPlan::ScheduleReason(next))
129 }
130 "reason" => {
131 let reason_result: ReasonResult = serde_json::from_value(output.clone())
132 .map_err(|error| AgentLoopError::Internal(error.into()))?;
133 let response_id = reason_result.response_id.clone();
134
135 if reason_result.has_tool_calls && reason_result.success {
136 let session_blueprint_id = adapter
137 .session_store(state.org_id)
138 .get_session(state.session_id)
139 .await?
140 .and_then(|session| session.blueprint_id);
141 let plan = RuntimeActPlan {
142 input: ActInput {
143 org_id: Some(state.org_id),
144 context: AtomContext {
145 session_id: state.session_id,
146 turn_id: state.turn_id.unwrap_or_default(),
147 input_message_id: state.input_message_id,
148 exec_id: ExecId::new(),
149 },
150 harness_id: state.harness_id,
151 agent_id: state.agent_id,
152 tool_calls: reason_result.tool_calls,
153 tool_definitions: reason_result.tool_definitions,
154 locale: reason_result.locale,
155 blueprint_id: session_blueprint_id,
156 network_access: reason_result.network_access,
157 },
158 previous_response_id: response_id,
159 iteration: state.iteration,
160 request_id: state.request_id.clone(),
161 };
162 return Ok(RuntimeTurnPlan::ScheduleAct(plan));
163 }
164
165 if reason_result.success && pending_user_message_count > 0 {
166 if pending_user_message_count > 1 {
167 info!(
168 session_id = %state.session_id,
169 pending_user_message_count,
170 "multiple steering messages arrived during turn"
171 );
172 }
173
174 let next = RuntimeTurnState {
175 previous_response_id: response_id,
176 iteration: state.iteration.saturating_add(1),
177 ..state.clone()
178 };
179 return Ok(RuntimeTurnPlan::ScheduleReason(next));
180 }
181
182 let lifecycle =
183 RuntimeSessionLifecycle::new(adapter.clone(), state.org_id, state.session_id);
184 let turn_id = state.turn_id.unwrap_or_default();
185
186 if reason_result.success {
187 lifecycle
188 .emit_turn_completed(
189 turn_id,
190 state.input_message_id,
191 state.iteration,
192 reason_result.usage.clone(),
193 None,
194 )
195 .await;
196 lifecycle
197 .emit_session_idled(
198 turn_id,
199 state.input_message_id,
200 Some(state.iteration),
201 reason_result.usage.clone(),
202 )
203 .await;
204 } else {
205 let user_error = classify_reason_failure(&reason_result);
206 lifecycle
207 .turn_failed(
208 turn_id,
209 state.input_message_id,
210 &reason_result.text,
211 Some(&user_error),
212 )
213 .await;
214 }
215
216 Ok(RuntimeTurnPlan::Complete {
217 error: reason_result.error,
218 })
219 }
220 "act" => {
221 if output
222 .get("blocked")
223 .and_then(|value| value.as_bool())
224 .unwrap_or(false)
225 {
226 return Ok(RuntimeTurnPlan::Complete { error: None });
227 }
228
229 let waiting_for_tool_results = output
230 .get("waiting_for_tool_results")
231 .and_then(|value| value.as_bool())
232 .unwrap_or(false);
233 let should_pause_for_tool_results = waiting_for_tool_results
234 && setup_connection_hint_enabled(adapter, state.org_id, state.session_id).await;
235
236 let next = RuntimeTurnState {
237 iteration: state.iteration.saturating_add(1),
238 ..state.clone()
239 };
240
241 if should_pause_for_tool_results {
242 let lifecycle =
243 RuntimeSessionLifecycle::new(adapter.clone(), state.org_id, state.session_id);
244 lifecycle.waiting_for_tool_results().await;
245 return Ok(RuntimeTurnPlan::WaitForToolResults { resume: next });
246 }
247
248 if waiting_for_tool_results {
249 info!(
250 session_id = %state.session_id,
251 "setup_connection hint absent, continuing turn instead of pausing"
252 );
253 }
254
255 Ok(RuntimeTurnPlan::ScheduleReason(next))
256 }
257 other => Err(AgentLoopError::config(format!(
258 "Unknown activity type completed: {other}"
259 ))),
260 }
261}
262
263async fn setup_connection_hint_enabled<A: RuntimeHostAdapter>(
264 adapter: &A,
265 org_id: i64,
266 session_id: SessionId,
267) -> bool {
268 match adapter.session_store(org_id).get_session(session_id).await {
269 Ok(Some(session)) => {
270 let hints = Controls::resolve_hints(session.hints.as_ref(), None);
271 hints
272 .get("setup_connection")
273 .and_then(|value| value.as_bool())
274 .unwrap_or(false)
275 }
276 _ => false,
277 }
278}